65.9K
CodeProject 正在变化。 阅读更多。
Home

ZeroMq #3:套接字选项/标识符和 SendMore

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2014年8月26日

CPOL

6分钟阅读

viewsIcon

25004

ZeroMq #3:套接字选项/标识符和 SendMore

一次我们回顾了 ZeroMq 中不同的套接字类型,我还告诉了您它们在 NetMq(我用于这些帖子的库)中的对应项。

这次,我们将重点介绍 ZeroMq 的三个小型但非常重要的方面,不应被忽视。

这些方面是套接字选项/标识符和 SendMore。

代码在哪里?

所有这些帖子的代码都托管在 GitHub 的一个大型解决方案中

套接字选项

根据您使用的套接字类型或要创建的拓扑,您可能会发现需要设置一些 ZeroMq 选项。在 NetMq 中,这是通过 xxxxSocket.Options 属性完成的。

以下是您可以为 xxxxSocket 设置的可用属性列表。很难确切说明您需要设置哪些值,因为这显然完全取决于您试图实现的目标。我所能做的就是列出选项,让您了解它们。所以,它们在这里:

  • Affinity
  • BackLog
  • CopyMessages
  • DelayAttachOnConnect
  • Endian
  • GetLastEndpoint
  • IPv4Only
  • 身份
  • Linger
  • MaxMsgSize
  • MulticastHops
  • MulticastRate
  • MulticastRecoveryInterval
  • ReceiveHighWaterMark
  • ReceiveMore
  • ReceiveTimeout
  • ReceiveBuffer
  • ReconnectInterval
  • ReconnectIntervalMax
  • SendHighWaterMark
  • SendTimeout
  • SendBuffer
  • TcpAcceptFilter
  • TcpKeepAlive
  • TcpKeepaliveCnt
  • TcpKeepaliveIdle
  • TcpKeepaliveInterval
  • XPubVerbose

要确切了解所有这些选项的含义,您很可能需要参考 ZeroMq 的实际文档,即指南。

身份

我认为使用 ZeroMq 的一个很棒的优点(至少在我看来)是我们仍然可以坚持标准的请求/响应模式(就像我们在第 1 篇帖子中的 hello world 示例中那样),然后我们可以选择切换到异步服务器。这可以通过为服务器使用 RouterSocket 来轻松实现。客户端保持为 RequestSocket

所以现在这是一个有趣的安排,我们有:

  • 同步客户端,得益于标准的 RequestSocket 类型
  • 异步服务器,得益于名为 RouterSocket 的新套接字

RouterSocket 是我个人非常喜欢的,因为它非常易于使用(正如 ZeroMq 的许多套接字一样,一旦您了解了它们的作用),但它能够创建一个可以与数千个客户端无缝通信的服务器,所有这些都是异步的,并且对我们在第 1 部分中看到的代码几乎没有改动。

稍微偏离主题

当您使用 RequestSocket 时,它们会为您做一些巧妙的事情,它们总是提供一个包含以下帧的消息:

  • Frame[0] 地址
  • Frame[1] 空帧
  • Frame[2] 消息负载

即使我们只发送了负载(请参阅第 1 部分中的“Hello World”示例)。

同样,当您使用 ResponseSocket 时,它们也会为我们承担一些繁重的工作,它们总是提供一个包含以下帧的消息:

  • Frame[0] 返回地址
  • Frame[1] 空帧
  • Frame[2] 消息负载

即使我们只发送了负载(请参阅第 1 部分中的“Hello World”示例)。

通过理解标准同步请求/响应套接字的工作原理,现在可以相对容易地使用 RouterSocket 创建一个完全异步的服务器,该服务器知道如何将消息分派回正确的客户端。我们只需要模仿标准 ResponseSocket 的工作方式,即我们自己构建消息帧。我们将使用 RouterSocket 来创建以下帧(从而模仿标准 ResponseSocket 的行为)。

<!--EndFragment-->
  • Frame[0] 返回地址
  • Frame[1] 空帧
  • Frame[2] 消息负载

我认为理解这一点的最佳方法是通过一个示例。该示例的工作原理如下:

  1. 有 4 个客户端,它们是标准的同步 RequestSocket
  2. 有一个异步服务器,它使用 RouterSocket
  3. 如果客户端发送一个带有前缀“_B”的消息,它将收到服务器的特殊消息,所有其他客户端将收到标准的响应消息。

话不多说,这是此示例的完整代码:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;

namespace ZeroMqIdentity
{
    public class Program : IDisposable
    {
        private List<RequestSocket> clients = new List<RequestSocket>();

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateRouterSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    CreateClient(ctx, "A_");
                    CreateClient(ctx, "B_");
                    CreateClient(ctx, "C_");
                    CreateClient(ctx, "D_");

                    while (true)
                    {

                        var clientMessage = server.ReceiveMessage();
                        Console.WriteLine("========================");
                        Console.WriteLine(" INCOMING CLIENT MESSAGE ");
                        Console.WriteLine("========================");
                        for (int i = 0; i < clientMessage.FrameCount; i++)
                        {
                            Console.WriteLine("Frame[{0}] = {1}", i, 
                                clientMessage[i].ConvertToString());
                        }

                        var clientAddress = clientMessage[0];
                        var clientOriginalMessage = clientMessage[2].ConvertToString();
                        string response = string.Format("{0} back from server", 
                            clientOriginalMessage);

                        // "B_" client is special
                        if (clientOriginalMessage.StartsWith("B_"))
                        {
                            response = string.Format(
                                "special Message for 'B' back from server");
                        }

                        var messageToClient = new NetMQMessage();
                        messageToClient.Append(clientAddress);
                        messageToClient.AppendEmptyFrame();
                        messageToClient.Append(response);
                        server.SendMessage(messageToClient);
                    }
                }
            }

            Console.ReadLine();
        }

        private static void Main(string[] args)
        {
            Program p = new Program();
            p.Run();
        }

        private void CreateClient(NetMQContext ctx, string prefix)
        {
            Task.Run(() =>
            {
                var client = ctx.CreateRequestSocket();
                clients.Add(client);
                client.Connect("tcp://127.0.0.1:5556");
                client.Send(string.Format("{0}Hello", prefix));

                //read client message
                var echoedServerMessage = client.ReceiveString();
                Console.WriteLine(
                    "\r\nClient Prefix is : '{0}', Server Message : '{1}'",
                    prefix, echoedServerMessage);

            });
        }

        public void Dispose()
        {
            foreach (var client in clients)
            {
                client.Dispose();
            }
        }
    }
}

我认为要完全欣赏这个例子,需要检查输出,其输出应该类似于这样(可能不完全相同,因为 RouterSocket 是完全 async 的,所以它可能会以不同的顺序处理 RequestSocket,这取决于您)。

========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  ???”
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  @??”
Frame[1] =
Frame[2] = D_Hello

Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  A??”
Frame[1] =
Frame[2] = B_Hello

Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] =  B??”
Frame[1] =
Frame[2] = C_Hello

Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’

Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’

SendMore

ZeroMq 使用消息帧工作。使用 ZeroMq,您可以创建多部分消息,出于多种原因可以使用它们,例如:

  • 包括地址信息(我们上面已经有一个例子了)
  • 为您的最终目的设计协议
  • 发送序列化数据(例如,第一个消息帧可以是项目的类型,下一个消息帧可以是实际的序列化数据)

当您处理多部分消息时,您必须发送/接收您想要处理的所有消息部分。

我认为尝试理解多部分消息的最佳方法可能是通过一个小测试。我坚持使用一个集成的演示,它建立在原始的“Hello World”请求/响应演示的基础上。我们使用 NUnit 来对客户端/服务器之间的数据进行断言。

这是一个小的测试用例,应观察以下几点:

  1. 我们构造第一个消息部分,并使用 xxxxSocket.SendMore() 方法发送第一个消息。
  2. 我们使用 xxxxSocket.Send() 方法构造第二个(也是最后一个)消息部分。
  3. 服务器能够接收第一个消息部分,并分配一个值来确定是否还有更多部分。这是通过使用 xxxxSocket.Receive(..) 的一个重载来完成的,该重载允许我们获取一个“more”的 out 值。
  4. 我们也可以使用实际的 NetMqMessage 并向其追加,然后使用 xxxxSocket.SendMessage 发送,接收套接字将使用 xxxxSocket.ReceieveMessage(..) 并可以检查实际的 NetMqMessage 帧。

无论如何,这是代码

using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;

namespace SendMore
{
    [TestFixture]
    public class SendMoreTests
    {
        [Test]
        public void SendMoreTest()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        //client send message
                        client.SendMore("A");
                        client.Send("Hello");

                        //server receive 1st part
                        bool more;
                        string m = server.ReceiveString(out more);
                        Assert.AreEqual("A", m);
                        Assert.IsTrue(more);

                        //server receive 2nd part
                        string m2 = server.ReceiveString(out more);
                        Assert.AreEqual("Hello", m2);
                        Assert.False(more);

                        //server send message, this time use NetMqMessage
                        //which will be sent as frames if the client calls
                        //ReceieveMessage()
                        var m3 = new NetMQMessage();
                        m3.Append("From");
                        m3.Append("Server");
                        server.SendMessage(m3);

                        //client receive
                        var m4 = client.ReceiveMessage();
                        Assert.AreEqual(2, m4.FrameCount);
                        Assert.AreEqual("From", m4[0].ConvertToString());
                        Assert.AreEqual("Server", m4[1].ConvertToString());
                    }
                }
            }
        }
    }
}

以下是关于使用 SendMore 和多部分消息时,Zero Guide 中的几个非常重要的点。这谈论的是 ZeroMq C++ 核心实现,而不是 NetMq 版本,但使用 NetMq 时,这些要点同样有效。

关于多部分消息的一些知识

  • 当您发送多部分消息时,第一部分(以及所有后续部分)只有在您发送最后一部分时才会实际发送到网络上。
  • 如果您正在使用zmq_poll(),当您接收到消息的第一部分时,其余部分也已到达。
  • 您将接收到消息的所有部分,或者根本不接收任何部分。
  • 消息的每个部分都是一个单独的zmq_msg项。
  • 无论您是否检查 more 属性,您都将接收到消息的所有部分。
  • 发送时,ØMQ 在内存中排队消息帧,直到接收到最后一帧,然后一次性全部发送。
  • 除了关闭套接字外,没有办法取消部分发送的消息。

这就是我在这篇文章中想说的全部内容,下次再见。

© . All rights reserved.