65.9K
CodeProject 正在变化。 阅读更多。
Home

ZeroMQ (C#): 多部分消息、JSON 和同步 PUB-SUB 模式

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (15投票s)

2012 年 12 月 25 日

CPOL

9分钟阅读

viewsIcon

73617

在本文中,我们将讨论 ZeroMQ 可以发送或接收的两种消息类型,以及如何使用 JSON 格式化这些消息。我们还将学习轮询机制以及如何使用它。最后,我们将通过一个示例探索同步 PUB-SUB 模式。

我关于 ZeroMQ 的文章

引言

在本文中,我们将讨论 ZeroMQ 可以发送或接收的两种消息类型,以及如何使用 JSON 格式化这些消息。我们还将学习轮询机制以及如何使用它。最后,我们将通过一个示例探索同步 PUB-SUB 模式。

ZeroMQ 消息

ZeroMQ 套接字可以发送或接收单部分消息或多部分消息。

单部分消息

单部分消息是只有一个帧的消息。帧是字节数组。帧的长度可以是零或更多。在 ZeroMQ 参考手册中,帧也称为“消息部分”。

例如,我们在上一篇文章中发送/接收的消息都是单部分消息类型。我们在发送接收操作中使用了 stringstring 是字节数组)。

我们可以使用以下方法(可在 clrzmq.dll 库中找到)发送或接收单部分消息(单帧消息)

  • 发送/接收
  • SendFrame\ReceiveFrame

多部分消息

多部分消息是包含多个帧的消息。ZeroMQ 将此消息作为单个在线消息发送,并保证传递所有消息部分(一个或多个),否则不传递任何部分。

帧具有一个名为 HasMore 的布尔属性(flag),该属性指示消息中是否还有其他帧。此属性在除最后一个帧外的所有帧中都为 true 值,在最后一个帧中为 false 值。

有两种方法可以发送或接收多部分消息:

分别发送或接收每个部分(帧)

以下代码片段显示了一个发送由三个帧组成的多部分消息的示例

socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 01")) 
                     { HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 02")) 
                     { HasMore = true });
socket.SendFrame(new Frame(Encoding.UTF8.GetBytes("My Frame 03")));  

因此,要分别发送每个帧:

  1. 创建第一个帧并将其 HasMore 属性设置为 true。此属性将指示 SendFrame 方法还有另一个帧要发送。
  2. 使用上面创建的帧作为参数调用 SendFrame 方法。
  3. 对除最后一个帧以外的其他帧重复上述步骤。
  4. 创建最后一个帧(HasMore 属性的默认值为 false)。此帧的 HasMore 属性将通知 SendFrame 方法没有其他帧了,这是要发送的最后一个帧。

要分别接收每个帧:

  1. 循环调用 ReceiveFrame 方法,直到收到的帧的 HasMore 属性等于 false

以下代码片段显示了一个接收多部分消息的示例

var frames = new List<Frame>();
do
{
    frames.Add(socket.ReceiveFrame());
}
while (frames.Last().HasMore); 

使用包装器发送或接收整个多部分(帧)消息

ZmqMessage 类(可在 clrzmq.dll 库中找到)封装了一个或多个帧,因此它代表单部分消息或多部分消息。此类会在内部管理所有帧的 HasMore 属性值。

以下代码片段显示了一个使用 ZmqMessage 类发送和接收多部分消息的示例

因此,要使用 ZmqMessage 类发送多部分消息:

  1. 创建 ZmqMessage 的实例。
  2. 创建帧并将它们追加到 ZmqMessage 实例。
  3. 使用 SendMessage 方法发送消息。

并使用 ReceiveMessage 方法接收 ZmqMessage 类型的消息。

多部分消息的优点

多部分消息在 PUB-SUB 模式中大放异彩。在此模式中,我们可以将订阅键放在一个单独的帧中(我们称之为“信封”),并将消息数据放在另一个帧中。这确保了订阅键和发布数据之间的分离。PUB 套接字通过比较订阅者发送的订阅前缀与要发送消息的起始字符来过滤消息。通过将订阅键放在单独的帧(第一个帧)中,PUB 套接字将仅使用此帧来过滤消息,而其他帧则不参与过滤过程。

多部分消息的其他优点之一是能够将地址(端点)放在帧中发送到目的地。例如,发布者可以通过帧发送一个私有地址,允许订阅者使用此地址向发布者发送回复。

请注意 ZeroMQ 参考手册中提到的注意事项

ZeroMQ 不会立即发送消息(单部分或多部分),而是在稍后的某个不确定时间发送。因此,多部分消息必须适合内存。如果要发送任意大小的文件,则应将其分解成块,并将每个块作为单独的单部分消息发送。

JSON over ZeroMQ

我们已经看到 ZeroMQ 消息由一个或多个帧组成(帧代表字节数组)。因此,我们可以使用 JSON 数据格式(或其他格式,如 MessagepackProtobufs 等)序列化帧。通过帧序列化,我们可以发送/接收具有一个或多个序列化帧的多部分消息。

在本文中,我使用了 ServiceStack.Text 库进行 JSON 序列化(当然,您也可以选择其他库)。以下代码片段显示了一个用于序列化和反序列化帧的包装器

public static class JsonFrame
{
    public static Frame Serialize<T>(T messageObject)
    {
        var message = JsonSerializer.SerializeToString<T>(
                                        messageObject);
        return new Frame(Encoding.UTF8.GetBytes(message));
    }
 
    public static T DeSerialize<T>(Frame frame)
    {
        var messageObject =   
                  JsonSerializer.DeserializeFromString<T>(
                                  Encoding
                                  .UTF8                                              
                                  .GetString(frame.Buffer));
        return messageObject;
    }
}

下面是一个展示如何使用上述类在多部分消息中序列化/反序列化帧的代码片段

在上面的代码中,我们创建了一个 ShoppingBasket 类的实例。然后,我们将该实例序列化并将其放入多部分消息的第二个帧中。第一个帧包含消息标题(文本格式,无序列化)。之后,我们发送了此消息(发布)。在接收端,我们接收了它,并从第一个帧中提取了消息标题,然后反序列化了第二个帧的内容。此反序列化将 ShoppingBasket 实例还原给我们。

轮询

当我们想要接收消息时,我们(直到目前为止)以前是这样实现的:

  • 构建主循环
    • 以阻塞模式调用 Receive\ReceiveFrameReceiveMessage 方法(调用该方法将不会返回,直到消息到达)。
    • 处理消息。

如果我们还有另一个套接字,并且也想从中接收消息,那么我们可以:

  1. 将另一个套接字放在不同的线程中,并循环调用接收方法。
  2. 构建主循环
    • 构建套接字 1 循环
      • 以非阻塞模式调用接收方法
    • 构建套接字 2 循环
      • 以非阻塞模式调用接收方法

在这些解决方案中,管理套接字变得困难,尤其是在有许多套接字的情况下。这时,轮询机制就派上用场了,它是一种通过多个套接字上的触发事件来发送/接收消息的机制。此机制在 Poll 类中实现。下面是一个展示如何使用它的代码片段

var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new                 
            EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new                  
            EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new               
         EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket, 
                           repSocket});
while (true)
{
    poller.Poll();
} 

以及事件的实现

void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
}
 
void repSocket_SendReady(object sender, SocketEventArgs e)
{
}  
 
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
} 

使用轮询:

  1. 创建套接字。
  2. 将套接字绑定/连接到它们的端点。
  3. 订阅以下套接字事件(取决于套接字类型,例如,我们无法通过 PUB 套接字发送消息):
    • SendReady
    • ReceiveReady
  4. 创建 Poll 类的实例,并将套接字传递给构造函数。
  5. 循环调用 Poll 实例对象的 poll 方法。

每次套接字准备好发送消息时,都会触发 SendReady 事件,当它准备好接收消息时,则触发 ReceiveReady 事件。

本文稍后将解释的同步 PUB-SUB 模式示例使用了轮询机制。

同步 PUB-SUB 模式

在基本的 PUB-SUB 模式中,订阅者由于连接发布者较晚而可能会丢失一些发布者消息。我们需要一种发布者和其订阅者之间的同步机制。一种简单的解决方案是在发布者中加入一个时间延迟(休眠),然后再开始发送消息,以确保所有订阅者都已连接。但这个解决方案是脆弱的,不适用于实际应用。好的解决方案是使用 REQ-REP 套接字来实现发布者和其订阅者之间的同步。下图显示了同步 PUB-SUB 模式

同步场景是:

  1. 发布者必须提前知道它期望的订阅者数量。
  2. 发布者通过 PUB 套接字开始发布消息,要求订阅者进行同步。然后,它等待所有订阅者连接。
  3. 订阅者:
    • 接收请求同步的消息
    • 连接 REQ 套接字并向发布者发送同步消息
  4. 发布者增加已同步订阅者的计数器。
  5. 发布者通过 REB 套接字回复订阅者。
  6. 当已同步订阅者的计数器达到预期的订阅者数量时,发布者开始发送实际数据。

一个发布者 – 两个订阅者

在此同步 PUB-SUB 模式示例中,我们有一个发布者和两个订阅者。发布者在所有订阅者准备好接收数据之前不会发布其数据。

下图说明了这个示例

发布者的代码是:

var pubSocket = ctx.CreateSocket(SocketType.PUB);
pubSocket.Bind(options.pubEndpoint);
pubSocket.SendReady +=new 
                 EventHandler<SocketEventArgs>(pubSocket_SendReady);
var repSocket = ctx.CreateSocket(SocketType.REP);
repSocket.Bind(options.repEndpoint);
repSocket.SendReady +=new 
                 EventHandler<SocketEventArgs>(repSocket_SendReady);
repSocket.ReceiveReady +=new 
              EventHandler<SocketEventArgs>(repSocket_ReceiveReady);
Poller poller = new Poller(new ZmqSocket[] {pubSocket, repSocket});
while (true)
{
    poller.Poll();
    if (options.maxMessage >= 0)
        if (msgCptr > options.maxMessage)
            Environment.Exit(0);
} 

以及套接字事件:

 #region REP events
static void repSocket_ReceiveReady(object sender, SocketEventArgs e)
{
    var reqMsg = e.Socket.Receive(Encoding.UTF8);
    DisplayRepMsg("REP, received: " + reqMsg);
}
 
static void repSocket_SendReady(object sender, SocketEventArgs e)
{
    DisplayRepMsg("REP, sending: Sync OK");
    e.Socket.Send(Encoding.UTF8.GetBytes("Sync OK"));
    nbSubscribersConnected++;
} 
#endregion
 
#region PUB events       
static void pubSocket_SendReady(object sender, SocketEventArgs e)
{
    var zmqMessage = new ZmqMessage();
    if (nbSubscribersConnected < options.nbExpectedSubscribers)
    {
        zmqMessage.Append(Encoding.UTF8.GetBytes("Sync"));         
        zmqMessage.Append(Encoding.UTF8
                                  .GetBytes(options.repEndpoint));
        Thread.Sleep(options.delay);
        Console.WriteLine("Publishing: Sync");
    }
    else
    {
        zmqMessage.Append(Encoding.UTF8.GetBytes("Data"));
        var data = BuildDataToPublish();
        if (!string.IsNullOrEmpty(data))
        {
            zmqMessage.Append(Encoding.UTF8.GetBytes(data));
            Thread.Sleep(options.delay);
            Console.WriteLine("Publishing (Data): " + data);
        }
    }
    e.Socket.SendMessage(zmqMessage);
} 
#endregion 

订阅者的代码是:

// Simulate late arrivals
Thread.Sleep(options.delay);
 
// Create and connect SUB socket
var subSocket = ctx.CreateSocket(SocketType.SUB);
subSocket.Connect(options.subEndpoint);
subSocket.SubscribeAll();
                    
// Receive Sync message
var pubMsg = subSocket.ReceiveMessage();
if (Encoding.UTF8.GetString(pubMsg[0]) == SYNC)
{
    Console.WriteLine("SUB; received: " + 
                      Encoding.UTF8.GetString(pubMsg[0]));
    using (var reqSocket = ctx.CreateSocket(SocketType.REQ))
    {
        reqSocket.Connect(Encoding.UTF8.GetString(pubMsg[1]));
        DisplayReqMsg("REQ; sending : Sync me");
        reqSocket.Send("Sync me", Encoding.UTF8);
        var repMsg = reqSocket.Receive(Encoding.UTF8);
        DisplayReqMsg("REQ; received: " + repMsg);
    }
}
                                     
// Receive published messages
while (true)
{                        
    pubMsg = subSocket.ReceiveMessage();
    if (Encoding.UTF8.GetString(pubMsg[0]) != SYNC)
    {
        Console.WriteLine(
                "SUB; received: " +
                Encoding.UTF8.GetString(pubMsg[1]));
    }
} 

双击 bin 目录下的 SyncPubSub_Pattern_1.bat 文件。此批处理文件包含以下命令:

start "Subscriber 1" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 0

start "Subscriber 2" cmd /T:4F /k SyncSub.exe -e tcp://127.0.0.1:5000 -d 4000

start "Publisher" cmd /T:0A /k SyncPub.exe -e tcp://127.0.0.1:5000 
      -p tcp://127.0.0.1:6000 -n 2 -m "Orange #nb#";"Apple  #nb#" -x 5 -d 1000 

前两条命令将运行两个同步订阅者应用程序(SyncSub.exe)的实例。每个订阅者将连接到端点 tcp://127.0.0.1:5000 并订阅所有发布者消息。

第三条命令将运行同步发布者(SyncPub.exe)。发布者将 PUB 套接字绑定到端点 tcp://127.0.0.1:5000,并将 REP 套接字绑定到端点 tcp://127.0.0.1:6000。然后,它等待订阅者的连接。它期望两个订阅者连接(-n 开关)。

发布者开始发布同步消息。这些消息是多部分消息,由两个帧组成。第一个帧包含标题消息(“Sync”),第二个帧包含 REP 套接字的端点(tcp://127.0.0.1:6000)。

当订阅者收到同步消息时,它会连接到接收到的端点(在同步消息的第二个帧中找到)的 REQ 套接字,并发送一个请求。发布者接收请求,增加订阅者计数器并确认请求。

当同步订阅者的计数器达到预期的订阅者数量时,发布者开始发布数据。它发送五条消息(在“Orange”和“Apple”这两个词之间交替),每个词都与消息编号连接(#nb# 宏)。消息之间的延迟为 1000 毫秒(-d 开关)。

请注意,第二个订阅者有 4000 毫秒的延迟,用于模拟晚到的情况(两个订阅者并非同时连接)。

运行上述命令后,我们得到以下结果:

结论

ZeroMQ 可以发送单部分或多部分消息。每个消息部分都是一个帧,它是一个字节数组。帧可以使用 JSON 数据格式(或其他格式)进行序列化。

轮询是一种通过多个套接字上的触发事件来发送/接收消息的机制。

同步 PUB-SUB 模式可确保在所有预期的订阅者都连接好并准备好接收实际数据之前,不会发布数据。

© . All rights reserved.