ZeroMq #3:套接字选项/标识符和 SendMore





5.00/5 (1投票)
ZeroMq #3:套接字选项/标识符和 SendMore
上一次我们回顾了 ZeroMq 中不同的套接字类型,我还告诉了您它们在 NetMq(我用于这些帖子的库)中的对应项。
这次,我们将重点介绍 ZeroMq 的三个小型但非常重要的方面,不应被忽视。
这些方面是套接字选项/标识符和 SendMore。
代码在哪里?
所有这些帖子的代码都托管在 GitHub 的一个大型解决方案中
套接字选项
根据您使用的套接字类型或要创建的拓扑,您可能会发现需要设置一些 ZeroMq 选项。在 NetMq 中,这是通过 xxxxSocket.Options
属性完成的。
以下是您可以为 xxxxSocket
设置的可用属性列表。很难确切说明您需要设置哪些值,因为这显然完全取决于您试图实现的目标。我所能做的就是列出选项,让您了解它们。所以,它们在这里:
Affinity
BackLog
CopyMessages
DelayAttachOnConnect
Endian
GetLastEndpoint
IPv4Only
身份
Linger
MaxMsgSize
MulticastHops
MulticastRate
MulticastRecoveryInterval
ReceiveHighWaterMark
ReceiveMore
ReceiveTimeout
ReceiveBuffer
ReconnectInterval
ReconnectIntervalMax
SendHighWaterMark
SendTimeout
SendBuffer
TcpAcceptFilter
TcpKeepAlive
TcpKeepaliveCnt
TcpKeepaliveIdle
TcpKeepaliveInterval
XPubVerbose
要确切了解所有这些选项的含义,您很可能需要参考 ZeroMq 的实际文档,即指南。
身份
我认为使用 ZeroMq 的一个很棒的优点(至少在我看来)是我们仍然可以坚持标准的请求/响应模式(就像我们在第 1 篇帖子中的 hello world 示例中那样),然后我们可以选择切换到异步服务器。这可以通过为服务器使用 RouterSocket
来轻松实现。客户端保持为 RequestSocket
。
所以现在这是一个有趣的安排,我们有:
- 同步客户端,得益于标准的
RequestSocket
类型 - 异步服务器,得益于名为
RouterSocket
的新套接字
RouterSocket
是我个人非常喜欢的,因为它非常易于使用(正如 ZeroMq 的许多套接字一样,一旦您了解了它们的作用),但它能够创建一个可以与数千个客户端无缝通信的服务器,所有这些都是异步的,并且对我们在第 1 部分中看到的代码几乎没有改动。
稍微偏离主题
当您使用 RequestSocket
时,它们会为您做一些巧妙的事情,它们总是提供一个包含以下帧的消息:
- Frame[0] 地址
- Frame[1] 空帧
- Frame[2] 消息负载
即使我们只发送了负载(请参阅第 1 部分中的“Hello World
”示例)。
同样,当您使用 ResponseSocket
时,它们也会为我们承担一些繁重的工作,它们总是提供一个包含以下帧的消息:
- Frame[0] 返回地址
- Frame[1] 空帧
- Frame[2] 消息负载
即使我们只发送了负载(请参阅第 1 部分中的“Hello World
”示例)。
通过理解标准同步请求/响应套接字的工作原理,现在可以相对容易地使用 RouterSocket
创建一个完全异步的服务器,该服务器知道如何将消息分派回正确的客户端。我们只需要模仿标准 ResponseSocket
的工作方式,即我们自己构建消息帧。我们将使用 RouterSocket
来创建以下帧(从而模仿标准 ResponseSocket
的行为)。
<!--EndFragment-->
- Frame[0] 返回地址
- Frame[1] 空帧
- Frame[2] 消息负载
我认为理解这一点的最佳方法是通过一个示例。该示例的工作原理如下:
- 有 4 个客户端,它们是标准的同步
RequestSocket
。 - 有一个异步服务器,它使用
RouterSocket
。 - 如果客户端发送一个带有前缀“
_B
”的消息,它将收到服务器的特殊消息,所有其他客户端将收到标准的响应消息。
话不多说,这是此示例的完整代码:
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.SqlServer.Server;
using NetMQ;
using NetMQ.Sockets;
namespace ZeroMqIdentity
{
public class Program : IDisposable
{
private List<RequestSocket> clients = new List<RequestSocket>();
public void Run()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateRouterSocket())
{
server.Bind("tcp://127.0.0.1:5556");
CreateClient(ctx, "A_");
CreateClient(ctx, "B_");
CreateClient(ctx, "C_");
CreateClient(ctx, "D_");
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("========================");
Console.WriteLine(" INCOMING CLIENT MESSAGE ");
Console.WriteLine("========================");
for (int i = 0; i < clientMessage.FrameCount; i++)
{
Console.WriteLine("Frame[{0}] = {1}", i,
clientMessage[i].ConvertToString());
}
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server",
clientOriginalMessage);
// "B_" client is special
if (clientOriginalMessage.StartsWith("B_"))
{
response = string.Format(
"special Message for 'B' back from server");
}
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMessage(messageToClient);
}
}
}
Console.ReadLine();
}
private static void Main(string[] args)
{
Program p = new Program();
p.Run();
}
private void CreateClient(NetMQContext ctx, string prefix)
{
Task.Run(() =>
{
var client = ctx.CreateRequestSocket();
clients.Add(client);
client.Connect("tcp://127.0.0.1:5556");
client.Send(string.Format("{0}Hello", prefix));
//read client message
var echoedServerMessage = client.ReceiveString();
Console.WriteLine(
"\r\nClient Prefix is : '{0}', Server Message : '{1}'",
prefix, echoedServerMessage);
});
}
public void Dispose()
{
foreach (var client in clients)
{
client.Dispose();
}
}
}
}
我认为要完全欣赏这个例子,需要检查输出,其输出应该类似于这样(可能不完全相同,因为 RouterSocket
是完全 async
的,所以它可能会以不同的顺序处理 RequestSocket
,这取决于您)。
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = ???”
Frame[1] =
Frame[2] = A_Hello
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = @??”
Frame[1] =
Frame[2] = D_Hello
Client Prefix is : ‘A_’, Server Message : ‘A_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = A??”
Frame[1] =
Frame[2] = B_Hello
Client Prefix is : ‘D_’, Server Message : ‘D_Hello back from server’
========================
INCOMING CLIENT MESSAGE
========================
Frame[0] = B??”
Frame[1] =
Frame[2] = C_Hello
Client Prefix is : ‘B_’, Server Message : ‘special Message for ‘B’ back from ser
ver’
Client Prefix is : ‘C_’, Server Message : ‘C_Hello back from server’
SendMore
ZeroMq 使用消息帧工作。使用 ZeroMq,您可以创建多部分消息,出于多种原因可以使用它们,例如:
- 包括地址信息(我们上面已经有一个例子了)
- 为您的最终目的设计协议
- 发送序列化数据(例如,第一个消息帧可以是项目的类型,下一个消息帧可以是实际的序列化数据)
当您处理多部分消息时,您必须发送/接收您想要处理的所有消息部分。
我认为尝试理解多部分消息的最佳方法可能是通过一个小测试。我坚持使用一个集成的演示,它建立在原始的“Hello World
”请求/响应演示的基础上。我们使用 NUnit 来对客户端/服务器之间的数据进行断言。
这是一个小的测试用例,应观察以下几点:
- 我们构造第一个消息部分,并使用
xxxxSocket.SendMore()
方法发送第一个消息。 - 我们使用
xxxxSocket.Send()
方法构造第二个(也是最后一个)消息部分。 - 服务器能够接收第一个消息部分,并分配一个值来确定是否还有更多部分。这是通过使用
xxxxSocket.Receive(..)
的一个重载来完成的,该重载允许我们获取一个“more”的out
值。 - 我们也可以使用实际的
NetMqMessage
并向其追加,然后使用xxxxSocket.SendMessage
发送,接收套接字将使用xxxxSocket.ReceieveMessage(..)
并可以检查实际的NetMqMessage
帧。
无论如何,这是代码
using System;
using System.Threading;
using NetMQ;
using NUnit.Framework;
namespace SendMore
{
[TestFixture]
public class SendMoreTests
{
[Test]
public void SendMoreTest()
{
using (NetMQContext ctx = NetMQContext.Create())
{
using (var server = ctx.CreateResponseSocket())
{
server.Bind("tcp://127.0.0.1:5556");
using (var client = ctx.CreateRequestSocket())
{
client.Connect("tcp://127.0.0.1:5556");
//client send message
client.SendMore("A");
client.Send("Hello");
//server receive 1st part
bool more;
string m = server.ReceiveString(out more);
Assert.AreEqual("A", m);
Assert.IsTrue(more);
//server receive 2nd part
string m2 = server.ReceiveString(out more);
Assert.AreEqual("Hello", m2);
Assert.False(more);
//server send message, this time use NetMqMessage
//which will be sent as frames if the client calls
//ReceieveMessage()
var m3 = new NetMQMessage();
m3.Append("From");
m3.Append("Server");
server.SendMessage(m3);
//client receive
var m4 = client.ReceiveMessage();
Assert.AreEqual(2, m4.FrameCount);
Assert.AreEqual("From", m4[0].ConvertToString());
Assert.AreEqual("Server", m4[1].ConvertToString());
}
}
}
}
}
}
以下是关于使用 SendMore 和多部分消息时,Zero Guide 中的几个非常重要的点。这谈论的是 ZeroMq C++ 核心实现,而不是 NetMq 版本,但使用 NetMq 时,这些要点同样有效。
关于多部分消息的一些知识
- 当您发送多部分消息时,第一部分(以及所有后续部分)只有在您发送最后一部分时才会实际发送到网络上。
- 如果您正在使用zmq_poll(),当您接收到消息的第一部分时,其余部分也已到达。
- 您将接收到消息的所有部分,或者根本不接收任何部分。
- 消息的每个部分都是一个单独的zmq_msg项。
- 无论您是否检查 more 属性,您都将接收到消息的所有部分。
- 发送时,ØMQ 在内存中排队消息帧,直到接收到最后一帧,然后一次性全部发送。
- 除了关闭套接字外,没有办法取消部分发送的消息。
这就是我在这篇文章中想说的全部内容,下次再见。