使用计数信号量实现消息队列
在 C# 中实现消息队列。

引言
消息队列是一种有效且高效的进程间或线程间通信技术。微软的 MSMQ 是消息队列技术的一个很好的实现。然而,当您需要一个简单的消息队列数据结构,该结构将在单个应用程序内的多个线程中使用时,MSMQ 显然不是理想的选择。此外,对于实时分布式应用程序,例如套接字读取 UDP 多播消息,需要在尽可能快的时间内由另一个线程处理,MSMQ 速度太慢。在这种情况下,MSMQ 可能不是最佳选择。
本文演示了使用计数信号量实现消息队列数据结构的技术。
背景
建议了解等待句柄、本地信号量和线程的基本概念,以便理解代码。
Using the Code
MessageQueue
类的源代码如下:
using System;
using System.Collections.Generic;
using System.Threading;
public class MessageQueue<T> : IDisposable
{
private readonly int _QUEUE_SIZE;
// a semaphore object is used to notify enqueue happend signal to the dequeue
// subrouting
private Semaphore _semaphore;
// an internal queue data structure holds the messages in a queue
private Queue<T> _internalQueue;
// a private object that is used to acquire a lock on it to ensure synchronous
// execution of some operations in multithreaded environment
private object _syncLock;
/**************************************************************************
* Construct the message queue object with the maximum size limit. *
* If no of messages in the queue meets the maximum size of the queue, any *
* subsequent enqueue will be discareded. i.e. those message will be lost *
* until you dequeue any message i.e. provide a room for new message to *
* enter the queue. *
**************************************************************************/
public MessageQueue(int queueSize)
{
_syncLock = new object();
_QUEUE_SIZE = queueSize;
_internalQueue = new Queue<T>(_QUEUE_SIZE);
_semaphore = new Semaphore(0, _QUEUE_SIZE);
}
/***********************************************************************
* Reset the MessageQueue *
***********************************************************************/
public void Reset()
{
// instantiate the semaphore with initial count 0 i.e. the semaphore is
// entirely woned and there is no room to enter
_semaphore = new Semaphore(0, _QUEUE_SIZE);
// clear all existing messages from the message queue
_internalQueue.Clear();
}
/**********************************************************************
* Enqueue message in to the Message Queue *
**********************************************************************/
public void EnqueueMessage(T message)
{
lock (_syncLock)
{
if (_semaphore != null && message != null)
{
try
{
// try to provide a room in the semaphore so that DequeueMessage
// can enter into it
_semaphore.Release();
// now enqueue the message in to the internal queue data structure
_internalQueue.Enqueue(message);
}
catch { }
}
}
}
/*********************************************************************
* Dequeue message from the Message Queue *
*********************************************************************/
public T DequeueMessage()
{
// try to acquire a room in the semaphore and sleep until the room is available
_semaphore.WaitOne();
// if any room could be acquired, proceed to next step. i.e. dequeue message from
// the internal queue and return it
lock (_syncLock)
{
T message = _internalQueue.Dequeue();
return message;
}
}
/********************************************************************
* Dispose the Message Queue object *
********************************************************************/
public void Dispose()
{
// if the semaphore is not null, close it and set it to null
if (_semaphore != null)
{
_semaphore.Close();
_semaphore = null;
}
// clear the items of the internal queue
_internalQueue.Clear();
}
}
以下程序演示了该类的用法
using System;
using System.Threading;
namespace ConsoleApplication1
{
public class Program
{
static int __receivedCount = 0;
// instantiate the message queue object with maximum size limit 1000. i.e. in
// any case there will not be more than 1000 items in the queue at a time
static MessageQueue<object> __messageQueue = new MessageQueue<object>(1000);
public static void Main()
{
// fork a thread that dequeues messages from the message queue and
// increments the __receivedCount
Thread dequeueThread = new Thread(new ThreadStart(_doDequeue));
dequeueThread.Start();
// we will try to enqueue 10000000 messages in side the queue and will
// excpect to received 100% of them in the dequeue thread.
int messageCount = 10000000;
for (int i = 0; i < messageCount; i++)
{
__messageQueue.EnqueueMessage(new object());
Console.SetCursorPosition(30, 10);
Console.WriteLine((i + 1).ToString().PadLeft(10) + " of " +
messageCount.ToString());
}
Console.WriteLine("Press any key...");
Console.Read();
int receivedPercent = (int)((__receivedCount / (double)messageCount) * 100);
Console.WriteLine(
"{0} out of {1} message received.\nReceived = {2}%\nLost = {3}%",
__receivedCount, messageCount, receivedPercent, (100 - receivedPercent));
Console.Read();
// abort the dequeue thread
try
{
dequeueThread.Abort();
}
catch { }
// dispose the message queue object
__messageQueue.Dispose();
}
static void _doDequeue()
{
while (true)
{
object message = __messageQueue.DequeueMessage();
__receivedCount++;
}
}
}
}
关注点
使用这种 MessageQueue
实现,内部队列大小将永远不会无限增加,从而可能导致 OutOfMemoryException
。 此外,使用信号量会将队列置于休眠状态,并且在没有可用于出队的消息时不会消耗任何 CPU 周期。
历史
- 2009 年 6 月 26 日:初始发布