通过 C#: ZeroMQ 简介






4.96/5 (79投票s)
ZeroMQ 简介,一个非常轻量级的消息队列开源软件。然后,我们将使用 C# 以非常简单的方式探索和测试其主要的通信模式。
我关于 ZeroMQ 的文章
引言
ZeroMQ(也拼写为 ØMQ、0MQ 或 ZMQ)是一个非常轻量级的消息队列开源软件。它没有独立的服务器;消息直接从应用程序发送到应用程序。它非常容易学习和实现。它由一个名为 libzmq.dll 的 C++ 编写的单一库组成,可以链接到任何应用程序。要在 .NET 环境中使用它,我们需要一个该库的包装器,称为 clrzmq.dll,它用 C# 编写。
ZeroMQ 可以在 Windows、OS X 和 Linux 上运行。可以使用多种语言来实现使用 ZeroMQ 的应用程序,包括 C、C++、C#、Java、Python……这使得能够与不同平台上的不同应用程序进行通信。
ZeroMq 的核心
ZeroMQ 的主要部分是套接字。它不是传统的套接字,而是一个在传统套接字 API 之上提供抽象层的套接字,这使我们摆脱了应用程序中复杂且重复的任务。ZeroMQ 支持多种类型的套接字(套接字类型在套接字本身中定义为属性值)。发送端和接收端套接字类型的不同组合为我们提供了不同的通信模式,其中一些将在本文中进行探讨。
异步通信
ZeroMQ 的通信以异步方式进行。这意味着我们的应用程序在设置或关闭套接字连接、重新连接和消息传递期间不会被阻塞。这些操作由 ZeroMQ 本身在后台线程中管理,并与我们应用程序执行的常规处理并行进行。它会在需要时自动将消息排队(在发送方或接收方)。它智能地执行此操作,在排队之前尽可能将消息推近接收方。
传输协议
ZeroMQ 支持 4 种传输协议。每种传输协议都由一个地址字符串定义,该字符串由两部分组成:transport://endpoint。传输部分指定要使用的底层传输协议,端点部分根据所使用的协议定义如下
- TCP (tcp://hostname:port):通过网络进行通信
- INROC (inproc://name):同一进程内通信(线程之间)
- IPC (ipc:///tmp/filename):同一主机内的进程间通信
- PGM (pgm://interface;address:port 和 epgm://interface;address:port):通过网络进行组播通信
消息格式
我们可以发送或接收的默认消息类型是 string
和字节数组。ZeroMQ 不强制对套接字之间发送的消息设置任何格式。我们可以自由地编码我们的消息;可以使用 XML、JSON、MessagePack
……在本文中,为简单起见,我们将只使用 string
。
二进制文件
ZeroMq 项目
我们需要的库是两个 DLL。
libzmq.dll
这是一个包含所有 ZeroMQ 内容的 C++ 库。您可以从 http://www.zeromq.org/area:download 获取源代码。要构建它,请遵循以下步骤
- 您需要 Microsoft Visual C++ 2008 或更高版本。
- 解压 .zip 源文件存档。
- 在 Visual C++ 中,打开解决方案 builds\msvc\msvc10.sln。
- 在工具栏的“解决方案配置”中选择“Release”。
- 构建解决方案。
ZeroMQ
库 (libzmq.dll) 将在 bin\win32 子目录中生成。
clrzmq.dll
这是 libzmq.dll 库的 .NET 包装器,用 C# 编写。您可以从 https://github.com/zeromq/clrzmq 获取源代码(获取版本 3.0.0.0 beta)。要构建它,请遵循以下步骤
- Microsoft Visual C# 2010 Express 或更高版本
- 解压 .zip 源文件存档。
- 在 Visual C# 中,打开解决方案 src\clrzmq.sln。
- 在工具栏的“解决方案配置”中选择“Release”。
- 选中
ZeroMQ
项目的“生成”选项卡中的“XML 文档文件”复选框,以生成 XML 文档文件。 - 构建解决方案。
- clrzmq.dll 库将在 src\ZeroMQ\bin\Release 子目录中生成
- libzmq.dll(版本 3.2.1-beta 2)库将在 \lib(\x86)|(\x64) 中。该库已通过 nuGet 包由项目下载。
我已经构建了这个解决方案,并获得了两个 DLL(x86:Clrzmq.dll 和 libzmq.dll),并在本文的解决方案中使用了它们。
注意:您可以从 NuGet 获取 Clrzmq.dll beta 版本的二进制文件(您必须在 NuGet 包管理器窗口中选择“包含预发行版本”),或者从 GitHub 获取。在此包中,libzmq.dll(x86 和 x64)直接捆绑在程序集中,并在应用程序启动时选择性地提取/加载(感谢 clrzmq
的维护者 John 的评论)。
ZeroMQ 捆绑项目
这是我创建的文章解决方案,包含几个小的控制台应用程序,允许我们非常轻松地在不同情况下测试不同的通信模式。它包含我通过构建 clrzmq 解决方案获得的两个库 libzmq.dll 和 clrzmq.dll,如上所述。每个应用程序都有一组命令行参数(感谢 Giacomo Stelluti Scala 的 CommandLine Parser 开源库)。要查看这些参数,您可以键入应用程序名称后跟开关 /?
。我还为每种通信模式包含了一些批处理文件,其中包含运行该模式的必要命令。这些批处理文件可以随时快速运行相同的模式。
基本代码
为了在 Visual C# 项目中使用 ZeroMQ
,我们必须
- 添加对 clrzmq.dll 的引用。
- 将 libzmq.dll 文件添加到项目中(添加现有项...)(因为 clrzmq.dll 依赖于它)。
- 更改 libzmq.dll 文件的属性如下(以便在构建时将其复制到输出目录)
- 构建操作:无
- 复制到输出目录:如果更新则复制
- 在代码中添加 clrzmq.dll 命名空间的一个
using
指令:using ZeroMQ
现在我们可以编写一些代码来发送或接收消息。让我们看看以下代码
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REQ))
{
socket.Connect("tcp://127.0.0.1:5000");
socket.Send("My Reply", Encoding.UTF8);
var replyMsg = socket.Receive(Encoding.UTF8);
}
}
首先,我们创建一个上下文;从这个上下文中,我们可以创建著名的 ZeroMQ
套接字。套接字类型是在创建时定义的。使用此套接字,我们可以执行以下两项操作之一
- 绑定到一个端点并等待其他套接字建立连接。
- 连接到一个端点。
选择 bind
或 connect
取决于我们使用的通信模式(稍后将在本文中解释)。
最后,我们可以发送或接收消息。正如我们所见,我们只用了很少的几行就建立了通信。这些步骤在通信模式的所有地方都使用。
通信模式
通信模式是一组连接的套接字,用于指定消息流。我们将使用一些形状和符号来演示套接字之间的连接。下图显示了套接字之间的基本连接
矩形是一个包含一个或多个套接字的应用程序。每个套接字都可以绑定或连接到某个端点。绑定到端点的套接字是等待其他套接字连接的套接字。
您会注意到,在发送或接收消息之前,我添加了一个延迟(可以设置为任何毫秒值)。延迟的目的是
- 减慢消息发送速度
- 模拟繁忙状态
- 为套接字连接完成留出一些时间,然后再发送消息,以免丢失它们
请求/回复模式 (REQ/REP)
此模式具有以下特征
- 服务器使用 REP 类型的套接字,客户端使用 REQ 类型的套接字。
- 客户端发送请求并接收回复,而服务器接收请求并发送回复。
- 它允许一个客户端连接到一个或多个服务器。在这种情况下,请求会在所有服务器(Reps)之间进行循环分配(一次发送一个请求给一个服务器,下一个请求发送给下一个服务器,依此类推)。
- 基于状态的模式:客户端必须在发送另一个请求之前接收其请求的回复,服务器必须在接收另一个请求之前发送回复。
让我们通过两种客户端-服务器连接场景来探讨这种模式。
1. 一个客户端 - 一个服务器
在这种情况下,我们有一个客户端(Req
)连接到一个服务器(Rep
)。下图说明了这一点
服务器的 C# 代码是
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REP))
{
foreach (var bindEndPoint in options.bindEndPoints)
socket.Bind(bindEndPoint);
while (true)
{
Thread.Sleep(options.delay);
var rcvdMsg = socket.Receive(Encoding.UTF8);
Console.WriteLine("Received: " + rcvdMsg);
var replyMsg = options.replyMessage.Replace("#msg#", rcvdMsg);
Console.WriteLine("Sending : " + replyMsg + Environment.NewLine);
socket.Send(replyMsg, Encoding.UTF8);
}
}
}
客户端的 C# 代码是
using (var context = ZmqContext.Create())
{
using (var socket = context.CreateSocket(SocketType.REQ))
{
foreach (var connectEndpoint in options.connectEndPoints)
socket.Connect(connectEndpoint);
long msgCptr = 0;
int msgIndex = 0;
while (true)
{
if (msgCptr == long.MaxValue)
msgCptr = 0;
msgCptr++;
if (options.maxMessage >= 0)
if (msgCptr > options.maxMessage)
break;
if (msgIndex == options.alterMessages.Count())
msgIndex = 0;
var reqMsg = options.alterMessages[msgIndex++]
.Replace("#nb#", msgCptr.ToString("d2"));
Thread.Sleep(options.delay);
Console.WriteLine("Sending : " + reqMsg);
socket.Send(reqMsg, Encoding.UTF8);
var replyMsg = socket.Receive(Encoding.UTF8);
Console.WriteLine("Received: " + replyMsg + Environment.NewLine);
}
}
}
双击 bin 目录下的 ReqRep_Patttern_1.bat 文件。此批处理文件包含以下命令
start "Server (Rep)" cmd /T:8E /k Rep.exe -b tcp://127.0.0.1:5000 -r "#msg# - Reply" -d 0
start "Client (Req)" cmd /T:8F /k Req.exe -c tcp://127.0.0.1:5000 -m "Request #nb#" -x 5 -d 1000
第一个命令将在一个新的彩色(/T:fg
命令)DOS 命令窗口中启动并运行应用程序 Rep.exe。Rep
应用程序将绑定到端点 tcp://127.0.0.1:5000 并等待传入的请求。当请求到达时,它将发送一个由传入请求(#msg# 宏
)和单词“Reply
”组成的回复。发送回复之前的延迟为零毫秒(-d 开关
)。
第二个命令将在一个新的彩色(/T:fg 命令)DOS 命令窗口中启动并运行应用程序 Req.exe。Req 应用程序将连接到端点 tcp://127.0.0.1:5000。然后它将发送 5 条消息(一个单词“Request
”和消息编号(#nb# 宏
)的组合)。它将在发送每个请求之前等待 1000 毫秒(-d 开关
)。
运行上述命令后,我们得到以下结果
2. 一个客户端 - 两个服务器
在这里,我们有一个客户端(Req
)连接到两个服务器(Rep
)。下图代表了这种情况
双击 bin 目录下的 ReqRep_Patttern_2.bat 文件。此批处理文件包含以下命令
start "Server 1 (Rep)" cmd /T:8E /k Rep.exe -b tcp://127.0.0.1:5000 -r "#msg# Reply 1" -d 0
start "Server 2 (Rep)" cmd /T:8E /k Rep.exe -b tcp://127.0.0.1:5001 -r "#msg# Reply 2" -d 0
start "Client (Req)" cmd /T:8F /k Req.exe
-c tcp://127.0.0.1:5000;tcp://127.0.0.1:5001 -m "Request #nb#" -x 5 -d 1000
前两个命令将运行两个 Rep
应用程序实例;每个实例将在不同的端口号(5000 和 5001)上等待连接。最后一个命令将运行 Req
应用程序,该应用程序将连接到两个正在运行的 Rep
。
运行上述命令后,我们得到以下结果
我们注意到,请求在两个服务器(Rep
)之间进行循环分配,一个请求发送给一个服务器,下一个请求发送给另一个服务器。这是 REQ 套接字的出站路由策略。
发布/订阅模式 (PUB/SUB)
此模式具有以下特征
- 发布者使用 PUB 类型的套接字,订阅者使用 SUB 类型的套接字。
- 一个发布者可以有一个或多个订阅者。
- 一个订阅者可以连接到一个或多个发布者。
- 发布者发送消息,订阅者接收它们。
- 订阅者必须通过使用
SubscribeAll
方法订阅所有发布者消息,或者通过使用Subscribe
方法并指定(作为参数)订阅者感兴趣的消息前缀来订阅特定消息。 - 订阅者可以通过使用
UnsubscribeAll
方法取消订阅所有发布者消息,或者通过使用Unsubscribe
方法并指定消息前缀作为方法参数来取消订阅特定消息。 - 消息过滤发生在
- 发布者端(ZeroMQ 3.x,用于 tcp:// 和 ipc:// 协议)。发布者在将消息发送给订阅者之前过滤消息。
- 订阅者端(ZeroMQ 3.x,用于 epgm:// 协议和 ZeroMQ 2.x)。订阅者丢弃从发布者接收到的不需要的消息。
- 如果没有连接的订阅者,则消息将被丢弃。
- 连接到多个发布者的订阅者将均匀地接收消息(公平排队)。
让我们通过三种发布者-订阅者连接场景来探讨这种模式。
1. 一个发布者 - 两个订阅者(订阅所有消息)
在这种情况下,我们有一个发布者(PUB
),有两个已连接的订阅者(SUB
)。下图代表了这种情况
发布者的 C# 代码
using (var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PUB))
{
foreach (var endPoint in options.bindEndPoints)
socket.Bind(endPoint);
long msgCptr = 0;
int msgIndex = 0;
while (true)
{
if (msgCptr == long.MaxValue)
msgCptr = 0;
msgCptr++;
if (options.maxMessage >= 0)
if (msgCptr > options.maxMessage)
break;
if (msgIndex == options.altMessages.Count())
msgIndex = 0;
var msg = options.altMessages[msgIndex++].Replace("#nb#", msgCptr.ToString("d2"));
Thread.Sleep(options.delay);
Console.WriteLine("Publishing: " + msg);
socket.Send(msg, Encoding.UTF8);
}
}
}
订阅者的 C# 代码
using(var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.SUB))
{
if (options.subscriptionPrefixes.Count() == 0)
socket.SubscribeAll();
else
foreach (var subscriptionPrefix in options.subscriptionPrefixes)
socket.Subscribe(Encoding.UTF8.GetBytes(subscriptionPrefix));
foreach (var endPoint in options.connectEndPoints)
socket.Connect(endPoint);
while (true)
{
Thread.Sleep(options.delay);
var msg = socket.Receive(Encoding.UTF8);
Console.WriteLine("Received: " + msg);
}
}
}
双击 bin 目录下的 PubSub_Pattern_1.bat 文件。此批处理文件包含以下命令
start "Subscriber 1" cmd /T:8E /k Sub.exe -c tcp://127.0.0.1:5000 -d 0
start "Subscriber 2" cmd /T:8E /k Sub.exe -c tcp://127.0.0.1:5000 -d 0
start "Publisher" cmd /T:8F /k Pub.exe -b tcp://127.0.0.1:5000
-m "Orange #nb#";"Apple #nb#" -x 5 -d 1000
前两个命令将运行两个订阅者应用程序实例(Sub.exe)。每个订阅者将连接到端点 tcp://127.0.0.1:5000 并订阅所有发布者消息(我们没有定义订阅前缀,这是默认值)。第三个命令将运行发布者(Pub.exe),它将绑定到端点 tcp://127.0.0.1:5000 并等待订阅者连接。然后,它发送 5 条消息(在“Orange
”和“Apple
”之间交替),其中每个单词都与消息编号(#nb# 宏
)连接。消息之间的延迟为 1000 毫秒(-d 开关
)。
运行上述批处理文件后,我们得到以下结果
2. 一个发布者 - 两个订阅者(特定消息订阅)
在这种情况下,我们有两个订阅者(Sub
)连接到一个发布者(Pub
)。第一个订阅者将订阅以“Orange
”或“Apple
”开头的消息,第二个订阅者将订阅以“Kiwi
”开头的消息。下图代表了这种情况
双击 bin 目录下的 PubSub_Pattern_2.bat 文件。此批处理文件包含以下命令
start "Subscriber 1" cmd /T:8E /k Sub.exe -c tcp://127.0.0.1:5000 -s "Orange";"Apple" -d 0
start "Subscriber 2" cmd /T:8E /k Sub.exe -c tcp://127.0.0.1:5000 -s "Kiwi" -d 0
start "Publisher" cmd /T:8F /k Pub.exe -b tcp://127.0.0.1:5000
-m "Orange #nb#";"Apple #nb#";"Kiwi #nb#" -x 7 -d 1000
注意订阅者命令行中的 –s
开关指定了订阅前缀。
运行上述批处理文件后,我们得到以下结果
3. 两个发布者 - 一个订阅者
在这种情况下,我们有一个订阅者(Sub
)连接到两个发布者(Pub
)。下图代表了这种情况
双击 bin 目录下的 PubSub_Pattern_3.bat 文件。此批处理文件包含以下命令
start "Subscriber" cmd /T:8E /k Sub.exe -c tcp://127.0.0.1:5000;tcp://127.0.0.1:5001 -d 0
start "Publisher 1" cmd /T:8F /k Pub.exe -b tcp://127.0.0.1:5000
-m "Orange #nb# (Pub 1)";"Apple #nb# (Pub 1)" -x 5 -d 1000
start "Publisher 2" cmd /T:8F /k Pub.exe -b tcp://127.0.0.1:5001
-m "Orange #nb# (Pub 2)";"Apple #nb# (Pub 2)" -x 5 -d 1000
请注意,订阅者连接到代表两个发布者的两个不同端点。订阅者订阅了所有消息。
运行上述批处理文件后,我们得到以下结果
在此示例中,订阅者从每个连接(发布者)之间均匀地接收消息,一条消息来自一个连接,下一条消息来自另一个连接,依此类推,这是 SUB
套接字的主叫路由策略。
管道模式 (PUSH/PULL)
当需要进行并行数据处理时,通常使用此模式。管道模式的场景如下
- 通常,我们有一个任务分发器,它以循环方式将消息(任务)推送到工作节点(为每个工作节点分发不同的任务)。
- 当工作节点接收到消息时,它将处理该消息,然后将其推送到一种任务收集器,该收集器接收消息(任务)。
- 收集器接收到的消息会在所有已连接的工作节点之间进行公平排队。
此模式具有以下特征
- 任务分发器使用
PUSH
类型的套接字。它绑定到其端点并等待接收来自工作节点的连接。 - 工作节点有两个套接字,一个套接字是
PULL
类型的,连接到任务分发器套接字;另一个套接字是PUSH
类型的,连接到收集器套接字。 - 任务收集器有一个
PULL
类型的套接字。它绑定到其端点并等待接收来自工作节点的连接。
下图代表了这种模式
任务分发器的 C# 代码
using(var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PUSH))
{
foreach (var endPoint in options.bindEndPoints)
socket.Bind(endPoint);
long msgCptr = 0;
int msgIndex = 0;
while (true)
{
if (msgCptr == long.MaxValue)
msgCptr = 0;
msgCptr++;
if (options.maxMessage >= 0)
if (msgCptr > options.maxMessage)
break;
if (msgIndex == options.altMessages.Count())
msgIndex = 0;
var msg = options.altMessages[msgIndex++].Replace("#nb#", msgCptr.ToString("d2"));
Thread.Sleep(options.delay);
Console.WriteLine("Pushing: " + msg);
socket.Send(msg, Encoding.UTF8);
}
}
}
工作节点的 C# 代码
using(var ctx = ZmqContext.Create())
{
using (ZmqSocket receiver = ctx.CreateSocket(SocketType.PULL),
sender = ctx.CreateSocket(SocketType.PUSH))
{
receiver.Connect(options.pullEndPoint);
sender.Connect(options.pushEndPoint);
while (true)
{
var rcvdMsg = receiver.Receive(Encoding.UTF8);
Console.WriteLine("Pulled : " + rcvdMsg);
var sndMsg = options.rcvdMessageTag.Replace("#msg#", rcvdMsg);
Thread.Sleep(options.delay);
Console.WriteLine("Pushing: " + sndMsg);
sender.Send(sndMsg, Encoding.UTF8);
}
}
}
任务收集器的 C# 代码
using(var ctx = ZmqContext.Create())
{
using (var socket = ctx.CreateSocket(SocketType.PULL))
{
foreach (var endPoint in options.bindEndPoints)
socket.Bind(endPoint);
while (true)
{
Thread.Sleep(options.delay);
var msg = socket.Receive(Encoding.UTF8);
Console.WriteLine("Received: " + msg);
}
}
}
双击 bin 目录下的 Pipeline_Pattern.bat 文件。此批处理文件包含以下命令
start "Task Distributor (Push)" cmd /T:8F /k Push.exe -b tcp://127.0.0.1:5000
-m "Orange #nb#";"Apple #nb#" -x 5 -d 1000
start "Task Collector (Pull)" cmd /T:8E /k Pull.exe -b tcp://127.0.0.1:5001 -d 0
start "Worker 1" cmd /T:1F /k PullPushWorker.exe -l tcp://127.0.0.1:5000
-s tcp://127.0.0.1:5001 -t "#msg# (Worker 1)" -d 0
start "Worker 2" cmd /T:1F /k PullPushWorker.exe -l tcp://127.0.0.1:5000
-s tcp://127.0.0.1:5001 -t "#msg# (Worker 2)" -d 0
第一个命令将运行任务分发器,它将绑定到端点 tcp://127.0.0.1:5000 并等待连接。第二个命令将运行任务收集器,它将绑定到端点 tcp://127.0.0.1:5001 并等待连接。第三和第四个命令将运行两个工作节点实例。每个工作节点都将连接到任务分发器和收集器。
运行上述批处理文件后,我们得到以下结果
注意到
- 任务分发器已将任务分发给已连接的工作节点,为每个工作节点分发不同的任务(循环),这是
PUSH
套接字的出站路由策略。 - 任务收集器已从工作节点接收到已处理的任务(公平排队),这是
PULL
套接字的主叫路由策略。
使用此模式,我们可以轻松地添加其他工作节点,而无需更改任务分发器和收集器中的任何配置,因为工作节点是连接(而非绑定)到其端点的。我们可以说分发器和收集器是模式的稳定部分,而工作节点是动态部分。
结论
ZeroMQ 是一个非常轻量级的开源库。只需几行简单的代码,我们就可以构建一个通信模式,该模式可以在同一台或多台机器上使用,这些机器具有相同或不同的平台。模式的不同部分可以由相同或不同的语言实现。