MSMQ 发送队列移动器





5.00/5 (3投票s)
如何将 WCF MSMQ 消息从一个出站队列移动到另一个出站队列。
引言
在为一位客户处理 WCF SOA 解决方案时,我们需要通过 MSMQ 端点来服务客户。我们遇到了一个问题,MSMQ 消息卡在出站队列中,因为使用了错误的 IP 地址。我们不想丢失客户的任何数据,所以我们决定编写一个管理工具,将消息从错误的出站队列移动到正确的出站队列。
背景
为了理解该过程的内部工作原理,您需要熟悉以下几点(主要作为参考):
System.Message
的基础知识(处理 MSMQ 的类)。- WCF 协议栈的常规信息
- .NET 消息帧协议规范 (MC-NMF)
- .NET 消息帧 MSMQ 绑定协议规范 (MC-NMFMB)
- .NET 二进制格式:XML 数据结构 (MC-NBFX) - 通用 XML 二进制格式化
- .NET 二进制格式:SOAP 数据结构 (MC-NBFS) - WCF XML 字典
概述
当消息从客户端发送到 WCF 服务时,消息会被序列化。序列化格式取决于所使用的绑定,在本例中,我们使用 NetMsmqBinding
。如果您查看 WCF 协议栈,您会注意到该绑定规范是 MS-NMFMB(.NET 消息帧 MSMQ 绑定协议规范),它基于 MS-NMF(.NET 消息帧协议规范)。如果我们查看已发送的原始 MSMQ 消息,我们会发现以下结构:
- .NET 消息帧头(红色)
- WCF 通道消息(绿色)
- 以下字符串的大小(蓝色)
.NET 消息帧头
- 0x00 – 版本记录,后跟 2 字节。
- 0x01 – 主版本。
- 0x00 – 次版本。
- 0x01 – 模式记录,后跟 1 字节。
- 0x04 – 模式(来自单个源的单个单向消息)。
- 0x02 – Via 记录,后跟 Via 长度。这是关键部分。
- 0x27 – ViaLength 0x27 是 39,这是字符串 "net.msmq://192.168.100.154/private/test" 的大小。
- 0x03 – 已知编码记录,后跟一个字节。
- 0x07 – 二进制,如 MC-NBFS 中所述。
以下数据将是序列化的 WCF 通道消息数据。WCF 使用二进制 XML 字典来序列化消息;您应该看一下 Carlos Figueria 的帖子。这是原始的 WCF 通道消息 XML:
<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope">
<s:Header>
<a:Action s:mustUnderstand="1"
xmlns:a="http://www.w3.org/2005/08/addressing">http://tempuri.org/ITestMSMQ/bla</a:Action>
<a:To s:mustUnderstand="1"
xmlns:a="http://www.w3.org/2005/08/addressing">net.msmq://192.168.100.154/private/test</a:To>
</s:Header>
<s:Body>
<bla xmlns="http://tempuri.org/">
<input xmlns:b="http://schemas.datacontract.org/2004/07/WcfService1"
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<b:a>0</b:a>
</input>
</bla>
</s:Body>
</s:Envelope>
这次我们将使用 MC-NBFS(.NET 二进制格式:SOAP 数据结构)进行 XML 编码,并使用 MC-NBFX(.NET 二进制格式:XML 数据结构)进行字典字符串。查看绿色部分(WCF 通道消息部分)的第四行,忽略前 3 个字节(这是“bla”部分),我们到达字节 0x44。
WCF 通道消息
- 0x44 – PrefixDictionaryElementA,告诉我们要查找字典中的字符串。
- 0x0C – To(来自字典字符串)
- 0x1E – Identifier
- 0x00 – mustUnderstand
- 0x82 – OneText 记录
- 0x99 – Chars8TextWithEndElement
下一个字节是我们感兴趣的。这是后面字符串的大小,该字符串是 MSMQ 队列的地址。现在我们对消息原始字节的含义有了一个大致的了解,我们可以对其进行修改。以下是我们需要的步骤:
- 从出站队列加载原始消息。
- 将帧和 WCF 通道消息的地址都修改为新队列地址。
- 将修改后的消息重新发送到新目标。
- 从源队列中移除旧消息。
使用代码
- 使用
System.Messaging
API 加载消息是一个相当简单的任务。我们首先使用Peek
,这样在我们成功发送重定向的消息之前,消息会一直保留。 - 此步骤涉及几个操作:
- 我们获取源地址,并在其前面加上 "net.msmq://",对目标地址也这样做。
- 我们还应该记住,每个字符串前面都带有一个表示字符串大小的字节,所以我们会添加这个大小字节。
- 我们遍历原始消息字节,查找字符串 "net.msmq://source_address",每次出现时(应该有两个:一个用于 .NET 帧,一个用于 WCF 通道消息)。我们将其替换为 "net.msmq://dest_address"。
- 将修改后的数据放回消息中。
- 对于重新发送消息,我们将再次使用
System.Messaging
API。 - 对于使用
System.Messaging
API 移除消息也是如此。我们只需接收它即可将其移除。
private void readMessage()
{
try
{
message = sourceQueue.Peek(TimeSpan.FromSeconds(TIMEOUT));
if (message == null)
throw new Exception("cannot peek message, queue might be empty");
}
catch (Exception ex)
{
throw new Exception("failed to read message from queue", ex);
}
}
private void alterMessage()
{
byte[] data = new byte[message.BodyStream.Length];
byte[] source = getHost(PREFIX + sourceName);
byte[] dest = getHost(PREFIX + destName);
message.BodyStream.Read(data, 0, data.Length);
int offset = find(data, source);
if (offset == NOT_FOUND)
throw new Exception("cannot find source name in message");
// get the total string size from the prefix byte which holds the size
byte sourceSize = Convert.ToByte(data[offset - 1]);
byte destSize = Convert.ToByte(sourceSize - (source.Length - dest.Length));
// prefix the source & dest with a byte that holds the string size
source = prefixSize(source, sourceSize);
dest = prefixSize(dest, destSize);
data = replace(data, source, dest); // update framing message address
data = replace(data, source, dest); // update wcf channels message address
//put the altered data back into the message
message.BodyStream = new MemoryStream(data);
}
private byte[] getHost(string name)
{
// name should be in format of ip\path, ie: 192.168.100.1\private$\test
string host = name.Substring(0, name.IndexOf('\\'));
return Encoding.UTF8.GetBytes(host);
}
private byte[] prefixSize(byte[] data, byte dataSize)
{
byte[] newData = new byte[data.Length + 1];
newData[0] = dataSize;
Buffer.BlockCopy(data, 0, newData, 1, data.Length);
return newData;
}
private void sendMessage()
{
MessageQueue destQueue = new MessageQueue("FormatName:Direct=TCP:" +
destName, QueueAccessMode.Send);
destQueue.Send(message, MessageQueueTransactionType.Single);
destQueue.Close();
}
private void removeMessage()
{
sourceQueue.ReceiveByLookupId(message.LookupId);
}
补充说明
- 除了队列地址之外,路径也是可变的,但这在本代码中不包含,可以很容易地添加。
- 如果出站队列正在进行处理,您应该暂停队列并重启 MSMQ 服务。