Microsoft 消息队列 – 一个简单的多线程客户端和服务器






4.83/5 (30投票s)
一个简单的消息队列客户端和服务器 - 演示如何创建一个能够在三分钟内处理 500,000 条消息的解决方案
引言
本文展示了如何使用 Microsoft 消息队列创建一个非常简单的客户端-服务器解决方案 – 看起来非常需要一些非常简单的东西,只需要一些基本的东西就可以让你启动并运行一个性能适中的解决方案。
此解决方案可以在三分钟内轻松处理 500,000 条中等复杂的消息。
Microsoft 消息队列 – 使用 Microsoft SQL Server 记录交易信息 在此解决方案的基础上更进一步 – 通过演示一种将传入数据保存到 Microsoft SQL Server 的方法。
Microsoft 消息队列客户端的屏幕截图

背景
在问答部分,有人发布了一个问题,请求帮助解决一个需要能够每分钟处理 1500 条消息的解决方案。 1500 条消息应该不成问题 – 但关于如何实现这一点的简单答案仍然比我想作为问题回复发布的内容复杂一些。我在 CodeProject 上搜索了一下,令我惊讶的是,我没有找到一篇现有的文章来说明我脑海中的方法。
Microsoft 消息队列服务器的屏幕截图

通常,服务器部分将实现为 Windows 服务。
代码简要介绍
Payload
类的对象是我们即将从客户端发送到服务器的对象。该类很简单,但又不是过于简单。
[Serializable]
public class Payload
{
private Guid id;
private string text;
private DateTime timeStamp;
private byte[] buffer;
public Payload()
{
}
public Payload(string text, byte bufferFillValue, int bufferSize )
{
id = Guid.NewGuid();
this.text = text;
timeStamp = DateTime.UtcNow;
buffer = new byte[bufferSize];
for (int i = 0; i < bufferSize; i++)
{
buffer[i] = bufferFillValue;
}
}
// The rest is just properties exposing the fields
}
调用 UI 线程
大多数有趣的事情发生在工作线程中。为了与 UI 线程中的组件进行交互,我们使用熟悉的 InvokeRequired
/Invoke
模式。
private delegate void LogMessageDelegate(string text);
private void LogMessage(string text)
{
if (InvokeRequired)
{
Invoke(new LogMessageDelegate(LogMessage), text);
}
else
{
// Don't do this in a deployment scenario - use something like
// Log4Net. This is here just to illustrate interaction with
// the UI thread.
messageTextBox.AppendText(text + Environment.NewLine);
}
}
在服务器上初始化消息队列
在本示例中,我们使用 BinaryMessageFormatter
,并依赖于 .NET 序列化机制 – 通过直接处理字节数组来滚动您自己的序列化/反序列化代码,您将获得显着的加速。
在服务器上,我喜欢使用异步处理传入的消息。我们将事件处理方法分配给 ReceiveCompleted
事件,并调用 BeginReceive()
来告诉消息队列组件开始处理消息并调用我们的 OnReceiveCompleted
事件处理程序。
private void InitializeQueue()
{
receivedCounter = 0;
string queuePath = Constants.QueueName;
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
isRunning = true;
messageQueue.Formatter = new BinaryMessageFormatter();
messageQueue.ReceiveCompleted += OnReceiveCompleted;
messageQueue.BeginReceive();
}
接收消息
下面,您将看到我们的 OnReceiveCompleted
方法,该方法处理传入的消息。
它非常简单,除了反序列化之外,没有对传入的消息做任何有用的事情,并且每 10,000 条th 消息记录到 UI 线程。真正重要的一点是调用 BeginReceive()
来告诉消息队列组件我们已准备好接收下一条消息;如果我们忘记了这一点 – 我们的服务器只会处理一条消息,而不是我们想要的。
private void OnReceiveCompleted(Object source,
ReceiveCompletedEventArgs asyncResult)
{
try
{
MessageQueue mq = (MessageQueue)source;
if (mq != null)
{
try
{
System.Messaging.Message message = null;
try
{
message = mq.EndReceive(asyncResult.AsyncResult);
}
catch (Exception ex)
{
LogMessage(ex.Message);
}
if (message != null)
{
Payload payload = message.Body as Payload;
if (payload != null)
{
receivedCounter++;
if ((receivedCounter % 10000) == 0)
{
string messageText =
string.Format("Received {0} messages", receivedCounter);
LogMessage(messageText);
}
}
}
}
finally
{
if (isRunning)
{
mq.BeginReceive();
}
}
}
return;
}
catch (Exception exc)
{
LogMessage(exc.Message);
}
}
在客户端上初始化消息队列
我们的客户端消息队列初始化甚至比服务器端初始化更简单。只需记住在两端使用相同的格式化程序 – 在本例中,BinaryMessageFormatter
。
private void InitializeQueue()
{
string queuePath = Constants.QueueName;
if (!MessageQueue.Exists(queuePath))
{
messageQueue = MessageQueue.Create(queuePath);
}
else
{
messageQueue = new MessageQueue(queuePath);
}
messageQueue.Formatter = new BinaryMessageFormatter();
}
发送消息
如果要使用多个线程发送消息,则需要初始化多个 MessageQueue
组件。在本示例中,我们使用单个 MessageQueue
组件,并禁用和启用“发送消息”按钮,以防止用户同时从多个工作线程排队消息。当发送了所需数量的消息时,从工作线程调用 EnableSend()
以启用“发送消息”按钮。
private delegate void EnableSendDelegate();
private void EnableSend()
{
if (InvokeRequired)
{
Invoke(new EnableSendDelegate(EnableSend));
}
else
{
sendButton.Enabled = true;
}
}
以下是我们实际发送消息的操作,如果您删除用于测量函数性能的代码 – 实际上没有剩下多少。只要对象可以通过 .NET 使用分配的格式化程序进行序列化,我们需要做的就是调用 Send
并传递有效负载对象的实例。没有比这更简单的了。 :)
private void SendMessages(int count)
{
Random random = new Random(count);
string message = string.Format("Sending {0} messages",count);
LogMessage(message);
DateTime start = DateTime.Now;
for (int i = 0; i < count; i++)
{
byte b = Convert.ToByte( random.Next(128) );
int size = random.Next(1024);
string text = string.Format("Message: Fill {0} {1}",b,size);
Payload payload = new Payload(text, b, size);
messageQueue.Send(payload);
}
DateTime end = DateTime.Now;
TimeSpan ts = end - start;
message = string.Format("{0} messages sent in {1}", count, ts);
LogMessage(message);
}
维护响应式 UI
当我们在调度所需数量的消息时,我们的 UI 冻结会使用户不高兴。通过利用 System.Threading.ThreadPool.QueueUserWorkItem
– 我们以一种简单明了的方式保持我们的 UI 响应 – 再次没有真正地努力。 :)
private void AsychSendMessages(object countAsObject)
{
int count = (int)countAsObject;
SendMessages(count);
EnableSend();
}
private void sendButton_Click(object sender, EventArgs e)
{
sendButton.Enabled = false;
int count = Convert.ToInt32( messageCountNumericUpDown.Value );
System.Threading.ThreadPool.QueueUserWorkItem(
new System.Threading.WaitCallback(AsychSendMessages), count);
}
结论
客户端/服务器开发没有比这更容易的了 – 并且通过一些调整,性能可以显着提高。FastSerializer 代表了一种随时可用,高性能的序列化/反序列化解决方案。
因此,希望它可能会证明有用:试玩一下。
Microsoft 消息队列对于 .NET 开发人员来说既高效又非常易于使用。显然,本文只是触及了该技术的表面 – 但鉴于其性能,我非常确定你们中的一些人会发现它非常令人满意。
此致,
Espen Harlinn
历史
- 2011 年 3 月 12 日th - 首次发布