MSMQ 中的高效消息关联





5.00/5 (5投票s)
本文描述了MessageQueue.ReceiveByCorrelationId方法的可扩展实现。
引言
假设我们需要通过MSMQ与第三方系统进行接口,发送消息并接收响应。再假设我们需要在单个执行上下文中完成此操作,这意味着我们希望在本地进行一些工作,然后通过MSMQ向第三方模块发送消息,接收响应,并继续本地处理。这可以通过使用.NET的async
/await
模式与MSMQ的MessageQueue.ReceiveByCorrelationId
方法来实现——发送消息后,我们可以记住其ID,并通过调用ReceiveByCorrelationId
并将该ID作为参数来等待结果消息。
在本文中,我将演示两种不同的方法,通过关联ID从MSMQ检索消息。第一种方法,我称之为“远程同步”,是通过MessageQueue.ReceiveByCorrelationId
方法建议的标准MSMQ API方式。第二种方法,我称之为“本地同步”,是ReceiveByCorrelationId
的自定义实现,它内部使用MSMQ的MessageQueue.Receive
方法并在本地解析消息关联。
在本文中,我不会涵盖MSMQ的基础知识,假设您已经熟悉消息导向设计的主要概念。Code Project上有很多关于该主题的好文章。此外,以下文章描述了如何使用System.Messaging关联请求/响应消息。
为什么?
通常,在设计消息导向系统时,您应该避免使用同步的Send
/ReceiveByCorrelationId
技术,而应优先使用异步的Send
/Receive
。这听起来很简单,但它实际上可以极大地改变整个架构,因为您必须将执行上下文分开,并将发送方和接收方作为独立的单元。这引出了一个基本问题:在应用程序中,当用户点击“计算
”按钮时,您是阻塞当前线程等待响应,还是设置一个在收到响应时触发的回调,并让用户继续使用应用程序?第一种方法很简单,但可伸缩性较差(阻塞UI),而第二种方法实现起来更困难,但对用户更友好,可伸缩性更好(响应式UI)。
众所周知,当有许多并行线程等待响应并且大量消息通过MSMQ推送时,ReceiveByCorrelationId
方法(ReceiveByLookupID
和ReceiveById
也适用)是性能杀手。这是由于该方法内部实现关联解析的性质。在底层,它使用一个Cursor
来Peek
队列中的所有消息,直到找到具有匹配关联ID的消息。当它找到消息时,它会调用Receive
方法来接收它。当有太多线程不断迭代游标时,性能会迅速下降。根据微软的说法:
ReceiveByCorrelationId
使用序列搜索在队列中查找消息。请谨慎使用;当队列中存在大量消息时,它可能会效率低下。在替代方法中,我们将维护一个内部Dictionary
,其中包含我们发送的所有消息ID,对于从队列中取出的每条消息,让接收线程在该Dictionary
中查找匹配的ID。这种方法的缺点是所有正在处理的消息的信息都存储在内存中,如果服务器宕机可能会丢失。
演示应用程序
演示应用程序是一个控制台应用程序,接受两个可能的参数:-l
(本地同步)和-r
(远程同步)。如果未指定任何参数,默认将使用本地同步。
应用程序中硬编码了一些常量,如果需要,您可以更改它们(请参阅Program.cs中的私有常量)
InputQueue
,OutputQueue
:演示输入和输出队列的名称TimeoutSeconds
:接收响应的等待时间(秒)。请注意,在远程同步中,超时时间按定义用于每条消息;在本地同步中,由于消息是批量发送的,超时时间乘以一次发送的消息数量。MaxItems
:演示中使用的消息数量(默认50,000)MaxBuffer
:在本地同步场景中,一次发送到MSQM的消息数量UseLocalSyncByDefault
:指示是否默认使用本地同步演示的标志
根据当前的处理器类型,应用程序将实例化MsmqSyncLocal
(用于本地同步演示)或MsmqSyncRemote
(用于远程同步演示)。它们都具有异步的ProcessAsync
方法,接受string
数据和取消令牌,并返回Task<string>
。此方法旨在将数据发送到输入队列,等待并接收结果。
下一步,应用程序将运行几个(根据CPU核心数)工作线程,模拟我之前提到的第三方处理器。每个Worker
的任务是持续从输入队列检索传入消息,将它们转换为整数,计算平方根,并将结果发送到输出队列。工作方法的代码如下所示:
private static async Task RunWorkerAsync(int workerIndex, CancellationToken cancellationToken)
{
var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", InputQueue),
QueueAccessMode.Receive)
{
Formatter = new ActiveXMessageFormatter(),
MessageReadPropertyFilter = {Id = true, Body = true}
};
var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", OutputQueue),
QueueAccessMode.Send)
{
Formatter = new ActiveXMessageFormatter()
};
while (!cancellationToken.IsCancellationRequested)
{
var message = await inputQueue.ReceiveAsync(cancellationToken);
var data = (string) message.Body;
try
{
// Process Data
var intData = int.Parse(data);
var result = Math.Sqrt(intData).ToString(CultureInfo.InvariantCulture);
outputQueue.Send(new Message(result, new ActiveXMessageFormatter())
{
CorrelationId = message.Id,
Label = string.Format("Worker {0}", workerIndex)
});
}
catch (Exception ex)
{
outputQueue.Send(new Message(ex.ToString(), new ActiveXMessageFormatter())
{
CorrelationId = message.Id,
Label = string.Format("ERROR: Worker {0}", workerIndex)
});
}
}
}
远程同步
如前所述,远程同步处理器使用ReceiveByCorrelationId
方法从输出队列检索消息。这是演示它的代码:
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
var inputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_inputQueue),
QueueAccessMode.Send)
{
Formatter = ActiveXFormatter
};
var outputQueue = new MessageQueue(string.Format(@".\private$\{0}", m_outputQueue),
QueueAccessMode.Receive)
{
Formatter = ActiveXFormatter,
MessageReadPropertyFilter = {Id = true, CorrelationId = true, Body = true, Label = true}
};
var message = new Message(data, ActiveXFormatter);
inputQueue.Send(message);
var id = message.Id;
try
{
//var resultMessage = outputQueue.ReceiveByCorrelationId(id, m_timeout);
var resultMessage = await outputQueue.ReceiveByCorrelationIdAsync(id, ct);
var label = resultMessage.Label;
var result = (string) resultMessage.Body;
if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) && label.Contains("ERROR"))
throw m_exceptionHandler(data);
return result;
}
catch (Exception)
{
if (!ct.IsCancellationRequested)
throw;
return null;
}
}
请注意,我没有直接使用ReceiveByCorrelationId
,而是通过一个名为ReceiveByCorrelationIdAsync
的扩展方法。这是因为原始的ReceiveByCorrelationId
方法无法正确处理超时,并且不支持取消令牌。
本地同步
在本地同步场景中,ProcessAsync
方法具有相同的签名。然而,这次它是真正的异步:
public async Task<string> ProcessAsync(string data, CancellationToken ct)
{
await m_semaphore.WaitAsync(ct);
var tcs = new TaskCompletionSource<string>();
var message = new Message(data, ActiveXFormatter);
m_inputQueue.Send(message);
var id = message.Id;
m_items.TryAdd(id, tcs);
var tcsForBag = new TaskCompletionSource<bool>();
if (!m_bag.TryAdd(id, tcsForBag))
m_bag[id].TrySetResult(true);
var task = await Task.WhenAny(Task.Delay(m_timeout, ct), tcs.Task);
if (task != tcs.Task)
{
if (m_items.TryRemove(id, out tcs))
{
m_semaphore.Release();
if (ct.IsCancellationRequested)
tcs.TrySetCanceled();
else
tcs.TrySetException(new TimeoutException(string.Format
("Timeout waiting for a message on queue [{0}]", m_outputQueue.QueueName)));
}
}
return await tcs.Task;
}
MsmqSyncLocal
类的一个实例内部维护一个SemaphoreSlim
,它控制我们一次发送到MSMQ的消息数量。这个数量可以通过MaxItems
常量进行配置。
当调用ProcessAsync
时,它创建一个Message
并立即将其发送到输入队列。然后它创建一个TaskCompletionSource
对象,我们将异步地将该对象的Task
属性返回给调用者。这个完成源对象以及我们之前发送的消息ID被添加到名为m_items
的内部ConcurrentDictionary
中。
另一个方法ReceiveMessagesAsync
,从构造函数中执行,运行一个无限循环,从输出队列中检索消息。对于接收到的每条消息,它会在m_items
字典中检查其CorrelationId
。如果找到匹配项,它将通知与之关联的TaskCompletionSource
,如果成功则设置Result,否则设置Cancellation/Error。
private async Task ReceiveMessagesAsync()
{
while (!m_stopToken.IsCancellationRequested)
{
var message = await m_outputQueue.ReceiveAsync(m_stopToken.Token);
var id = message.CorrelationId;
var label = message.Label;
var data = (string) message.Body;
LogErrors(Task.Run(async () =>
{
TaskCompletionSource<string> tcs;
var tcsForBag = new TaskCompletionSource<bool>();
if (m_bag.TryAdd(id, tcsForBag))
await m_bag[id].Task;
m_bag.TryRemove(id, out tcsForBag);
if (m_items.TryRemove(id, out tcs))
{
m_semaphore.Release();
if (m_exceptionHandler != null && !string.IsNullOrEmpty(label) &&
label.Contains("ERROR"))
tcs.TrySetException(m_exceptionHandler(data));
else
tcs.TrySetResult(data);
}
}));
}
}
您可能已经注意到涉及了另一个ConcurrentDictionary
(m_bag
)。它解决了ReceiveMessagesAsync
在消息ID添加到m_items
之前收到消息时可能发生的竞态条件问题。如果发生这种情况,我们将无法在字典中找到预期的关联ID。
实际上,如果我们事先知道Message ID
,我们可以在发送消息之前简单地填充m_items
,从而避免竞态条件。然而,这在MSMQ中是不可能的。
因此,为了协调消息ID插入,ProcessAsync
将尝试将TaskCompletionSource
对象以及该ID插入到m_bag
中。如果成功,我们无需执行任何其他操作,因为该元素将被ReceiveMessagesAsync
删除。否则,如果该ID已存在于m_bag
中,这意味着它已被ReceiveMessagesAsync
添加,我们将发出其完成信号,以便让接收进程知道它可以继续执行。
摘要
我的测试表明,在相同的环境下,远程同步大约需要65秒,而本地同步大约需要5秒。
ReceiveByCorrelationId
方法比其可能的速度慢了10倍以上,因此应尽可能避免使用。如果异步工作流不适合您的应用程序架构且性能是一个问题,我相信局部关联是一个值得考虑的选择。