65.9K
CodeProject 正在变化。 阅读更多。
Home

使用计数信号量实现消息队列

starIconstarIcon
emptyStarIcon
starIcon
emptyStarIconemptyStarIcon

2.89/5 (11投票s)

2009年6月26日

CPOL

1分钟阅读

viewsIcon

47500

downloadIcon

600

在 C# 中实现消息队列。

Article_src

引言

消息队列是一种有效且高效的进程间或线程间通信技术。微软的 MSMQ 是消息队列技术的一个很好的实现。然而,当您需要一个简单的消息队列数据结构,该结构将在单个应用程序内的多个线程中使用时,MSMQ 显然不是理想的选择。此外,对于实时分布式应用程序,例如套接字读取 UDP 多播消息,需要在尽可能快的时间内由另一个线程处理,MSMQ 速度太慢。在这种情况下,MSMQ 可能不是最佳选择。

本文演示了使用计数信号量实现消息队列数据结构的技术。

背景

建议了解等待句柄、本地信号量和线程的基本概念,以便理解代码。

Using the Code

MessageQueue 类的源代码如下:

using System;
using System.Collections.Generic;
using System.Threading;
public class MessageQueue<T> : IDisposable
{
    private readonly int _QUEUE_SIZE;
    // a semaphore object is used to notify enqueue happend signal to the dequeue
    // subrouting
    private Semaphore _semaphore;
    // an internal queue data structure holds the messages in a queue
    private Queue<T> _internalQueue;
    // a private object that is used to acquire a lock on it to ensure synchronous
    // execution of some operations in multithreaded environment
    private object _syncLock;
    /**************************************************************************
    * Construct the message queue object with the maximum size limit.         *
    * If no of messages in the queue meets the maximum size of the queue, any *
    * subsequent enqueue will be discareded. i.e. those message will be lost  *
    * until you dequeue any message i.e. provide a room for new message to    *
    * enter the queue.                                                        *
    **************************************************************************/
    public MessageQueue(int queueSize)
    {
        _syncLock = new object();
        _QUEUE_SIZE = queueSize;
        _internalQueue = new Queue<T>(_QUEUE_SIZE);
        _semaphore = new Semaphore(0, _QUEUE_SIZE);
    }
    /***********************************************************************
    * Reset the MessageQueue                                               *
    ***********************************************************************/
    public void Reset()
    {
        // instantiate the semaphore with initial count 0 i.e. the semaphore is
        // entirely woned and there is no room to enter
        _semaphore = new Semaphore(0, _QUEUE_SIZE);
        // clear all existing messages from the message queue
        _internalQueue.Clear();
    }
    /**********************************************************************
    * Enqueue message in to the Message Queue                             *
    **********************************************************************/
    public void EnqueueMessage(T message)
    {
        lock (_syncLock)
        {
            if (_semaphore != null && message != null)
            {
                try
                {
                    // try to provide a room in the semaphore so that DequeueMessage
                    // can enter into it
                    _semaphore.Release();
                    // now enqueue the message in to the internal queue data structure
                    _internalQueue.Enqueue(message);
                }
                catch { }
            }
        }
    }
    /*********************************************************************
    * Dequeue message from the Message Queue                             *
    *********************************************************************/
    public T DequeueMessage()
    {
        // try to acquire a room in the semaphore and sleep until the room is available
        _semaphore.WaitOne();
        // if any room could be acquired, proceed to next step. i.e. dequeue message from 
        // the internal queue and return it
        lock (_syncLock)
        {
            T message = _internalQueue.Dequeue();
            return message;
        }
    }
    
    /********************************************************************
    * Dispose the Message Queue object                                  *
    ********************************************************************/
    public void Dispose()
    {
        // if the semaphore is not null, close it and set it to null
        if (_semaphore != null)
        {
            _semaphore.Close();
            _semaphore = null;
        }
        // clear the items of the internal queue
        _internalQueue.Clear();
    }
}

以下程序演示了该类的用法

using System;
using System.Threading;
namespace ConsoleApplication1 
{
    public class Program
    {        
        static int __receivedCount = 0;
        // instantiate the message queue object with maximum size limit 1000. i.e. in
        // any case there will not be more than 1000 items in the queue at a time        
        static MessageQueue<object> __messageQueue = new MessageQueue<object>(1000);
        public static void Main()
        {
            // fork a thread that dequeues messages from the message queue and
            // increments the __receivedCount
            Thread dequeueThread = new Thread(new ThreadStart(_doDequeue));
            dequeueThread.Start();
            
            // we will try to enqueue 10000000 messages in side the queue and will
            // excpect to received 100% of them in the dequeue thread.
            int messageCount = 10000000;
            for (int i = 0; i < messageCount; i++)
            {                
                __messageQueue.EnqueueMessage(new object());
                Console.SetCursorPosition(30, 10);
                Console.WriteLine((i + 1).ToString().PadLeft(10) + " of " + 
                    messageCount.ToString());
            }
            
            Console.WriteLine("Press any key...");
            Console.Read();
            int receivedPercent = (int)((__receivedCount / (double)messageCount) * 100);
            Console.WriteLine(
                "{0} out of {1} message received.\nReceived = {2}%\nLost = {3}%",
                __receivedCount, messageCount, receivedPercent, (100 - receivedPercent));
            Console.Read();
            // abort the dequeue thread
            try 
            {
               dequeueThread.Abort();
            }
            catch { }
            // dispose the message queue object
            __messageQueue.Dispose();
        }
        static void _doDequeue()
        {
            while (true)
            {
                object message = __messageQueue.DequeueMessage();
                __receivedCount++;
            }
        }
    }
}

关注点

使用这种 MessageQueue 实现,内部队列大小将永远不会无限增加,从而可能导致 OutOfMemoryException。 此外,使用信号量会将队列置于休眠状态,并且在没有可用于出队的消息时不会消耗任何 CPU 周期。

历史

  • 2009 年 6 月 26 日:初始发布
© . All rights reserved.