65.9K
CodeProject 正在变化。 阅读更多。
Home

MSMQ 发送队列移动器

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2012 年 1 月 25 日

CPOL

4分钟阅读

viewsIcon

31148

downloadIcon

462

如何将 WCF MSMQ 消息从一个出站队列移动到另一个出站队列。

引言

在为一位客户处理 WCF SOA 解决方案时,我们需要通过 MSMQ 端点来服务客户。我们遇到了一个问题,MSMQ 消息卡在出站队列中,因为使用了错误的 IP 地址。我们不想丢失客户的任何数据,所以我们决定编写一个管理工具,将消息从错误的出站队列移动到正确的出站队列。

背景

为了理解该过程的内部工作原理,您需要熟悉以下几点(主要作为参考):

概述

当消息从客户端发送到 WCF 服务时,消息会被序列化。序列化格式取决于所使用的绑定,在本例中,我们使用 NetMsmqBinding。如果您查看 WCF 协议栈,您会注意到该绑定规范是 MS-NMFMB(.NET 消息帧 MSMQ 绑定协议规范),它基于 MS-NMF(.NET 消息帧协议规范)。如果我们查看已发送的原始 MSMQ 消息,我们会发现以下结构:

  • .NET 消息帧头(红色)
  • WCF 通道消息(绿色)
  • 以下字符串的大小(蓝色)

message bytes

.NET 消息帧头

  • 0x00 – 版本记录,后跟 2 字节。
    • 0x01 – 主版本。
    • 0x00 – 次版本。
  • 0x01 – 模式记录,后跟 1 字节。
    • 0x04 – 模式(来自单个源的单个单向消息)。
  • 0x02 – Via 记录,后跟 Via 长度。这是关键部分。
    • 0x27 – ViaLength 0x27 是 39,这是字符串 "net.msmq://192.168.100.154/private/test" 的大小。
  • 0x03 – 已知编码记录,后跟一个字节。
    • 0x07 – 二进制,如 MC-NBFS 中所述。

以下数据将是序列化的 WCF 通道消息数据。WCF 使用二进制 XML 字典来序列化消息;您应该看一下 Carlos Figueria 的帖子。这是原始的 WCF 通道消息 XML:

<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope">
  <s:Header>
    <a:Action s:mustUnderstand="1" 
      xmlns:a="http://www.w3.org/2005/08/addressing">http://tempuri.org/ITestMSMQ/bla</a:Action>
    <a:To s:mustUnderstand="1" 
      xmlns:a="http://www.w3.org/2005/08/addressing">net.msmq://192.168.100.154/private/test</a:To>
  </s:Header>
  <s:Body>
    <bla xmlns="http://tempuri.org/">
      <input xmlns:b="http://schemas.datacontract.org/2004/07/WcfService1" 
                   xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
        <b:a>0</b:a>
      </input>
    </bla>
  </s:Body>
</s:Envelope>

这次我们将使用 MC-NBFS(.NET 二进制格式:SOAP 数据结构)进行 XML 编码,并使用 MC-NBFX(.NET 二进制格式:XML 数据结构)进行字典字符串。查看绿色部分(WCF 通道消息部分)的第四行,忽略前 3 个字节(这是“bla”部分),我们到达字节 0x44。

WCF 通道消息

  • 0x44 – PrefixDictionaryElementA,告诉我们要查找字典中的字符串。
  • 0x0C – To(来自字典字符串)
  • 0x1E – Identifier
  • 0x00 – mustUnderstand
  • 0x82 – OneText 记录
  • 0x99 – Chars8TextWithEndElement

下一个字节是我们感兴趣的。这是后面字符串的大小,该字符串是 MSMQ 队列的地址。现在我们对消息原始字节的含义有了一个大致的了解,我们可以对其进行修改。以下是我们需要的步骤:

  1. 从出站队列加载原始消息。
  2. 将帧和 WCF 通道消息的地址都修改为新队列地址。
  3. 将修改后的消息重新发送到新目标。
  4. 从源队列中移除旧消息。

使用代码

  1. 使用 System.Messaging API 加载消息是一个相当简单的任务。我们首先使用 Peek,这样在我们成功发送重定向的消息之前,消息会一直保留。
  2. private void readMessage()
    {
        try
        {                
            message = sourceQueue.Peek(TimeSpan.FromSeconds(TIMEOUT));
    
            if (message == null)
                throw new Exception("cannot peek message, queue might be empty");
        }
        catch (Exception ex)
        {
            throw new Exception("failed to read message from queue", ex);
        }
    }
  3. 此步骤涉及几个操作:
    • 我们获取源地址,并在其前面加上 "net.msmq://",对目标地址也这样做。
    • 我们还应该记住,每个字符串前面都带有一个表示字符串大小的字节,所以我们会添加这个大小字节。
    • 我们遍历原始消息字节,查找字符串 "net.msmq://source_address",每次出现时(应该有两个:一个用于 .NET 帧,一个用于 WCF 通道消息)。我们将其替换为 "net.msmq://dest_address"。
    • 将修改后的数据放回消息中。
    private void alterMessage()
    {
        byte[] data = new byte[message.BodyStream.Length];
        byte[] source = getHost(PREFIX + sourceName);
        byte[] dest = getHost(PREFIX + destName);
        
        message.BodyStream.Read(data, 0, data.Length);
    
        int offset = find(data, source);
    
        if (offset == NOT_FOUND)
            throw new Exception("cannot find source name in message");
    
        // get the total string size from the prefix byte which holds the size
        byte sourceSize = Convert.ToByte(data[offset - 1]);
        byte destSize = Convert.ToByte(sourceSize - (source.Length - dest.Length));
    
        // prefix the source & dest with a byte that holds the string size
        source = prefixSize(source, sourceSize);
        dest = prefixSize(dest, destSize);
    
        data = replace(data, source, dest); // update framing message address
        data = replace(data, source, dest); // update wcf channels message address
    
        //put the altered data back into the message
        message.BodyStream = new MemoryStream(data);
    }
     
    private byte[] getHost(string name)
    {
        // name should be in format of ip\path, ie: 192.168.100.1\private$\test
        string host = name.Substring(0, name.IndexOf('\\'));
    
        return Encoding.UTF8.GetBytes(host);
    }
    
    private byte[] prefixSize(byte[] data, byte dataSize)
    {
        byte[] newData = new byte[data.Length + 1];
    
        newData[0] = dataSize;
        Buffer.BlockCopy(data, 0, newData, 1, data.Length);
    
        return newData;
    }
  4. 对于重新发送消息,我们将再次使用 System.Messaging API。
  5. private void sendMessage()
    {
        MessageQueue destQueue = new MessageQueue("FormatName:Direct=TCP:" + 
                          destName, QueueAccessMode.Send);
     
        destQueue.Send(message, MessageQueueTransactionType.Single);
            destQueue.Close();
    }
  6. 对于使用 System.Messaging API 移除消息也是如此。我们只需接收它即可将其移除。
  7. private void removeMessage()
    {
        sourceQueue.ReceiveByLookupId(message.LookupId);
    }

补充说明

  • 除了队列地址之外,路径也是可变的,但这在本代码中不包含,可以很容易地添加。
  • 如果出站队列正在进行处理,您应该暂停队列并重启 MSMQ 服务。
© . All rights reserved.