演示 MSMQ 消息路由的简单示例






2.83/5 (3投票s)
本文演示了如何编写可重用组件,在本例中,是一个消息路由器,用于将消息从一个消息队列转发到另一个消息队列。
摘要
本文演示了如何编写可重用组件,在本例中,是一个消息路由器,用于将消息从一个消息队列转发到另一个消息队列。
引言
断开连接的企业应用程序设计起来很困难,尤其是在数据至关重要的情况下。有几种可用方法,但并非所有方法都适用。一些可能的实现可能包括 Web 服务、消息队列、企业服务总线 (ESB) 体系结构和自定义同步策略。
在我们的一项项目中,我们需要为基于分布式体系结构的应用程序找到最佳解决方案。该项目要求信息通过分布在多个地理区域的多个办事处进行交换。因此,在本文中,我将解释我们采用的方法,以简化它并完成工作。
问题
该设置涉及与各种办事处、特许经营商、第三方独立承包商及其客户的多个集成点,他们都需要访问不同级别的安全性数据。因此,我们需要一种可靠的机制来共享跨地理限制的详细信息。
集成断开连接应用程序的著名方法之一是使用消息传递体系结构。这正是我们选择的。但是,使用此类体系结构有利有弊。好吧,那是另一篇文章的主题。
总之,对于我们的应用程序,我们希望提供可靠的消息传递,并在高度断开连接的基础设施中工作。我们发现消息队列为我们提供了答案;但是,我们仍然需要解决一些令人烦恼的问题,例如我们如何将消息从一个队列转发到另一个队列?如果我们要根据消息的内容进行转发,该怎么办?
因此,我们决定,为什么不通过一些技巧创建一个快速的实用程序来为我们完成这项工作呢?此外,为什么不使其可配置和多线程呢?
因此,基本上,我们想做以下事情
- 将消息从一个消息队列路由到另一个消息队列。
- 将消息从一个消息队列路由到多个消息队列。
- 基于消息中的文本进行路由。
第一个实用程序可以将所有消息从一个公共队列直接发送到另一个公共队列,第二个实用程序可以将消息从一个消息队列发送到多个消息队列,第三个实用程序,即内容感知路由器,可以根据消息中的内容将消息从一个队列分发到另一个队列。
我们还想检查应用程序的活动,因此我们需要记录其工作。我们还希望代码具有适应性。我们希望能够使用可扩展标记语言 (XML) 配置文件从应用程序外部更改消息队列的路径。一个快速的技巧是使用 .NET 框架的内置序列化。
哦,我忘了提我们希望它在我们新的四核服务器上运行?为了实现并行性并利用多线程应用程序的优势,我们需要使我们的应用程序成为多线程且线程安全的,以便可以同时从多个队列传输消息,而不会干扰其他队列的处理。
现在,你可能会问,为什么要做这么多工作,而简单的代码就可以解决问题。好吧,我们还需要监视消息队列活动,以了解发生了什么。执行的操作还需要显示在图形用户界面 (GUI) 上,这将在即将发布的文章中讨论。这些的实现是我们将在后续文章中编写的监视实用程序的基础。
解决方案
首先,我们需要一个配置类来保存我们可以传递给转发器的值。这是一个简单的设计
我们开始创建 Forwarder 实用程序,它只是将消息从一个消息队列转发到另一个消息队列,而不查看消息的内容。为此,我们创建了一个类,该类在构造函数中从 XML 序列化文件中接收消息队列的路径。
StreamReader strRd = new StreamReader(xmlfile);
MessageForwarderConfiguration queue =(MessageForwarderConfiguration)
MessageForwarderConfiguration.Serializer.Deserialize(strRd);
MessageForwardingWorker fwdWorker = new MessageForwardingWorker(queue);
fwdWorker.RunWorkerAsync();
所有处理都在单独的线程上进行,以便应用程序可以同时管理多个队列之间的交互。为了实现这一点,我们继承了 `BackgroundWorker` 类 (`System.ComponentModel.BackgroungWorker`),以便我们可以在多个线程上执行操作。
using System.ComponentModel;
public class MessageForwardingWorker : BackgroundWorker
{
....
}
然后,在 `MessageForwardingWorker` 类中重载了 `BackgroungWorker` 类的 `OnDoWork()` 方法,以创建 `MessageQueue` 类的相应对象——一个用于接收消息,另一个用于发送消息。
// constructor
public MessageForwardingWorker(MessageForwarderConfiguration objForwardQueue)
{
queueInfo_ = objForwardQueue;
}
// overloaded method of BackgroundWorker
protected override void OnDoWork(System.ComponentModel.DoWorkEventArgs e)
{
while (!this.CancellationPending)
{
MessageQueue srcqueue = new MessageQueue(queueInfo_.SourceQueue);
MessageQueue destqueue = new MessageQueue(queueInfo_.DestinationQueue);
...
}
}
然后,我们确定了源队列和目标队列的存在以及它们的读写访问权限。
if (MessageQueue.Exists(queueInfo_.SourceQueue))
{
if (MessageQueue.Exists(queueInfo_.DestinationQueue))
{
if (srcqueue.CanRead)
{
if (destqueue.CanWrite)
{
// do the actual work here
....
}
}
}
}
我们使用 `GetEnumerator2()` 方法开始动态接收源队列中的消息,该方法提供了对我们正在引用的队列的动态链接。从源队列中获取消息后,将消息发送到目标队列。
MessageEnumerator msgEnum = srcqueue.GetMessageEnumerator2();
while (msgEnum.MoveNext())
{
// iterating on each message
msgToSend = msgEnum.Current;
id = msgToSend.Id;
destqueue.Send(msgToSend);
// removing the message after sending to destination queue
srcqueue.ReceiveById(id);
}
注意:为了获得服务质量 (QOS),我们接收消息,并且仅在将消息发送到目标队列后,才从源队列中删除消息。这为我们提供了消息成功传输的保证,否则,如果我们实际上在将消息发送到目标队列之前就将其从队列中删除,则可能导致一致性问题。
我们还想确保事务性目标队列得到正确处理。
while (msgEnum.MoveNext())
{
// iterating on messages and using transactions
msgToSend = msgEnum.Current;
id = msgToSend.Id;
if (isTransactional) destqueue.Send(msgToSend, mqtr);
else destqueue.Send(msgToSend);
srcqueue.ReceiveById(id);
}
if (isTransactional) mqtr.Commit();
此外,我们怎么知道发生了什么?好吧,我们使用 Apache 项目的 LOG4NET 将所有操作记录在文本文件中。
private log4net.ILog log = log4net.LogManager.GetLogger(
System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public void doLog(string addToLog)
{
...
log.Info(addToLog);
}
编写多线程应用程序时,有一点必须牢记。使线程高效,不要完全占用系统。在我们的实现中,当转发器传输完所有消息后,它将等待 XML 配置文件中指定的预定义时间间隔,当线程从等待期中出来后,它将再次开始执行相同的操作。
Thread.Sleep(queueInfo_.SleepTime);
该系统可以通过实现目标队列列表或一个将来自多个源队列的消息合并到单个目标的消息聚合器来增强,以允许转发到多个目标。
增强消息转发器以实现内容感知消息路由器
第二个实用程序只是这个消息转发器实用程序的细化,它通过查看消息的内容将消息发送到一个以上的目的地队列。从技术上讲,我们可以仅使用此实用程序来实现路由器。
下一步是实现转发器实用程序的升级版本,以实现内容感知消息路由。我们希望我们的应用程序能够根据消息内容在各种队列之间分发消息。
需要一种可靠高效的方法来匹配源队列中的消息,以便将它们发送到适当的目标队列。我们使用与每条消息关联的 `label` 属性来标识消息在存储在源队列中时的身份。因此,我们只需要匹配每条消息的标签,而不是整个消息正文,并且无需解密消息。对于匹配标签,我们倾向于使用正则表达式(也称为 RegEx),因为它们具有强大的匹配功能。
我们根据需要修改了配置类
using System.ComponentModel;
public class MessageRoutingWorker : BackgroundWorker
{
MessageRouterConfiguration queueInfo_;
Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;
MessageQueue srcQueue;
...
}
为了实现应用程序的并行性,消息将从一个以上的源队列读取,因此它也将是多线程且线程安全的,每个线程独立运行。
为了匹配消息的标签,我们创建了一个 Regex 字典和一个消息队列列表,其中对于每个匹配的 Regex,都有一个需要发送消息的消息队列列表。
Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;
和以前一样,我们使用 XML 配置初始化我们的类,并使用匹配模式以及相应的源和目标队列路径初始化我们的字典。
// Constructor
public MessageRoutingWorker(MessageRouterConfiguration objRouteQueue)
{
// Set the internal configuration object
queueInfo_ = objRouteQueue;
// Create the source queue
srcQueue = new MessageQueue(queueInfo_.SourceQueue);
// Now call the helper to create destination and the relevant regex
doDictionaryInitializer();
}
private void doDictionaryInitializer()
{
dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
// We create a simple pair structure for holding two values.
foreach (Pair pair in queueInfo_.DestQueueList)
{
// Create an expression we want to match and associated destination
// queues in a list.
Regex regexpr = new Regex(pair.MatchExpression);
List<MessageQueue> listMQ = new List<MessageQueue>();
foreach (string str in pair.DestQueueList)
{
listMQ.Add(new MessageQueue(str));
}
// Now add these to our internal dictionary object
dictRegexMsgQList.Add(regexpr, listMQ);
}
}
此时,我们每个线程都有以下成员
- 源队列的路径
- 需要将消息发送到的匹配表达式和目标队列的集合
构造函数根据配置初始化所有消息队列和正则表达式。此时,我们启动线程运行并执行其工作。
StreamReader strRd = new StreamReader(xmlfile);
MessageRouterConfiguration queue = (MessageRouterConfiguration)
MessageRouterConfiguration.Serializer.Deserialize(strRd);
MessageRoutingWorker routingWorker = new MessageRoutingWorker(queue);
fwdWorker.RunWorkerAsync();
在每个线程中,都会检查源队列的存在及其读权限,并使用 Apache 项目的 LOG4NET 将其状态记录在文本文件中。
// preparing the dictionary
private void doDictionaryInitializer()
{
dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
// We create a simple pair structure for holding two values.
foreach (Pair pair in queueInfo_.DestQueueList)
{
Regex regexpr = new Regex(pair.MatchExpression);
List<MessageQueue> listMQ = new List<MessageQueue>();
foreach (string str in pair.DestQueueList)
{
listMQ.Add(new MessageQueue(str));
}
dictRegexMsgQList.Add(regexpr, listMQ);
}
}
为了实际路由消息,我们使用 `GetMessageEnumerator2()` 方法创建 `MessageEnumerator` 类的实例来枚举队列中的消息。现在,遍历每条消息,并将字典中的每个 Regex 与消息的标签进行匹配。找到匹配项后,将消息发送到相应的消息队列列表。在实际发送消息到指定的消息队列之前,会检查这些队列的可用性和写权限,并将其记录在文本文件中。
MessageEnumerator msgEnum = srcQueue.GetMessageEnumerator2();
Message msgToSend = null;
while (msgEnum.MoveNext())
{
// iterating on messages
msgToSend = msgEnum.Current;
string id = msgToSend.Id;
foreach (Regex regex in dictRegexMsgQList.Keys)
{
// matching the regex
if (regex.IsMatch(msgToSend.Label))
{
List<MessageQueue> destinationList = dictRegexMsgQList[regex];
if (doCheckAndSend(msgToSend, destinationList))
// if message is sent then remove it from queue
srcQueue.ReceiveById(id);
}
}
}
日志简单地作为辅助函数实现如下
private log4net.ILog log =log4net.LogManager.GetLogger(
System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public void doLog(string addToLog)
{
...
log.Info(addToLog);
}
`doCheckAndSend` 方法检查目标队列是否存在以及是否具有写权限,如果一切正常,则将消息(`msgToSend` 中的消息)发送到目标队列列表。
private bool doCheckAndSend(Message msg, List<MessageQueue> listDestQueue)
{
bool sendSucceded = false;
// checking properties of destination queues
foreach (MessageQueue destQueue in listDestQueue)
{
// logging the activities of application
doLog("Checking Existence Of Destination Queue [" + destQueue.Path + "]");
if (MessageQueue.Exists(destQueue.Path))
if (destQueue.CanWrite)
...
else ...
else ...
}
foreach (MessageQueue destQueue in listDestQueue)
{
bool queueTransactional = destQueue.Transactional;
MessageQueueTransaction mqTr = new MessageQueueTransaction();
if (queueTransactional) ...
else ...
if (queueTransactional) mqTr.Begin();
if (queueTransactional) destQueue.Send(msg, mqTr);
else destQueue.Send(msg);
if (queueTransactional) mqTr.Commit();
sendSucceded = true;
}
return sendSucceded;
}
注意:为了获得服务质量 (QOS),我们接收消息,并且仅在将消息发送到目标队列后,才从源队列中删除消息。这为我们提供了消息成功传输的保证,否则,如果我们实际上在将消息发送到目标队列之前就将其从队列中删除,则可能导致一致性问题。
当一个线程传输完所有消息后,它将等待 XML 配置文件中指定的时间。当线程从等待期中出来后,它将再次开始执行路由操作。
Thread.Sleep(queueInfo_.SleepTime);
结论
有了这些实用程序,我们现在就可以将消息从任何队列(源)传输到任何队列(本地或远程目标)。我们可以根据其内容聚合和分发消息。我们有能力在当前以及未来的场景中部署任意数量的消息队列。这还为我们提供了在服务器出现问题或需要对高吞吐量队列进行负载均衡时重新路由消息的灵活性。
最重要的是,我们现在可以在断开连接的体系结构中保持连接。
许可证
以上文章可免费使用。我没有附带任何文件,因为代码来自一个大型专有项目。上述代码已获得 Aquevix Solutions 的许可发布,该公司拥有该代码的版权。