ZeroMQ #5:从多个套接字发送





5.00/5 (2投票s)
上次我们探讨了如何使用 Poller 来处理多个套接字并检测它们的就绪状态。这次我们将继续使用我们迄今为止熟悉的请求/响应模型。
上次我们探讨了如何使用 Poller 来处理多个套接字并检测它们的就绪状态。这次我们将继续使用我们迄今为止熟悉的请求/响应模型。不过,我们将对其进行一些改进,并探讨几种可以让多个线程向服务器推送消息并接收响应的方法,这通常是一个常见的需求(至少在我看来是这样)。
代码在哪里?
一如既往,在我们开始之前,礼貌的做法是告诉您代码在哪里,代码和以前一样,都在 GitHub 上
https://github.com/sachabarber/ZeroMqDemos
开始之前的一点说明
您可能已经意识到,ZeroMQ 是一个消息传递框架,因此它提倡无锁消息传递的理念。我也恰好认为这是一个非常好的主意。如果您尝试避免共享数据结构,就可以实现出色的消息吞吐量,并节省大量同步的麻烦。通过这样做,您还可以避免同步对它们的访问所带来的麻烦。因此,总的来说,请尝试以 ZeroMQ 希望的方式来使用它,即通过消息传递,避免锁和共享数据结构。
为本文设定场景
好的,我们马上就可以开始看代码了,但在那之前,让我们再多谈谈本文试图讨论的内容。
在我通常编写的代码中,很多客户端线程同时运行是很常见的,每个线程都可以与服务器通信。如果这听起来像是您曾经处理过的需求,那么您可能会发现这篇文章很有用,因为它正是本文旨在解决的场景。
由于本文的目的是实现异步客户端,因此我们也需要一个异步服务器,所以我们为客户端使用 DealerSocket,为服务器使用 RouterSocket。
就像大多数事情一样,解决办法不止一种,所以我们将探讨几种选择,每种选择都有其优缺点。
选项 1:每个线程都有自己的 DealerSocket
第一个选项确实需要一些 .NET 线程知识,但如果您具备这些知识,那么这个想法很简单。对于每个客户端线程,我们还创建一个专用的 DealerSocket,该套接字*应该*由该线程独占使用。
这是通过 ThreadLocal<T> .NET 类实现的,该类允许我们为每个线程拥有一个 DealerSocket。我们将每个客户端创建的 DealerSocket 添加到一个 Poller 实例中,并监听每个套接字的 ReceieveReady 事件,这允许我们从服务器接收回消息。
这种方法的明显缺点是客户端会创建更多的套接字。优点是实现起来非常简单,而且确实有效。
这是一张展示我们在此试图实现目标的图
这是该场景的代码
using System; using System.Threading; using System.Threading.Tasks; using NetMQ; using NetMQ.Sockets; namespace ManualThreadingDemo { public class Program { public void Run() { //NOTES //1. Use ThreadLocal<DealerSocket> where each thread has // its own client DealerSocket to talk to server //2. Each thread can send using it own socket //3. Each thread socket is added to poller ThreadLocal<DealerSocket> clientSocketPerThread = new ThreadLocal<DealerSocket>(); int delay = 3000; Poller poller = new Poller(); using (NetMQContext ctx = NetMQContext.Create()) { using (var server = ctx.CreateRouterSocket()) { server.Bind("tcp://127.0.0.1:5556"); //start some threads, each with its own DealerSocket //to talk to the server socket. Creates lots of sockets, //but no nasty race conditions no shared state, each //thread has its own socket, happy days for (int i = 0; i < 3; i++) { Task.Factory.StartNew((state) => { DealerSocket client = null; if (!clientSocketPerThread.IsValueCreated) { client = ctx.CreateDealerSocket(); client.Connect("tcp://127.0.0.1:5556"); client.ReceiveReady += Client_ReceiveReady; clientSocketPerThread.Value = client; poller.AddSocket(client); } else { client = clientSocketPerThread.Value; } while (true) { var messageToServer = new NetMQMessage(); messageToServer.AppendEmptyFrame(); messageToServer.Append(state.ToString()); client.SendMessage(messageToServer); Thread.Sleep(delay); } },string.Format("client {0}", i), TaskCreationOptions.LongRunning); } //start the poller Task task = Task.Factory.StartNew(poller.Start); //server loop while (true) { var clientMessage = server.ReceiveMessage(); Console.WriteLine("========================"); Console.WriteLine(" INCOMING CLIENT MESSAGE "); Console.WriteLine("========================"); for (int i = 0; i < clientMessage.FrameCount; i++) { Console.WriteLine("Frame[{0}] = {1}", i, clientMessage[i].ConvertToString()); } if (clientMessage.FrameCount == 3) { var clientAddress = clientMessage[0]; var clientOriginalMessage = clientMessage[2].ConvertToString(); string response = string.Format("{0} back from server {1}", clientOriginalMessage, DateTime.Now.ToLongTimeString()); var messageToClient = new NetMQMessage(); messageToClient.Append(clientAddress); messageToClient.AppendEmptyFrame(); messageToClient.Append(response); server.SendMessage(messageToClient); } } } } } void Client_ReceiveReady(object sender, NetMQSocketEventArgs e) { bool hasmore = false; e.Socket.Receive(out hasmore); if (hasmore) { string result = e.Socket.ReceiveString(out hasmore); Console.WriteLine("REPLY " + result); } } [STAThread] public static void Main(string[] args) { Program p = new Program(); p.Run(); } } }
如果您运行此代码,您会看到类似这样的内容
选项 2:每个线程委托给本地代理
下一个示例保留了发送消息给服务器的独立线程的想法。然而,这次我们将在客户端使用一个代理。其想法是客户端线程会将消息推送到一个共享数据队列。我知道我曾告诉您要避免共享数据结构。问题是,这不是一个共享数据结构,它只是一个线程安全的队列,许多线程都可以向其写入。而共享数据结构可能意味着多个线程都试图更新外汇期权报价价格的当前出价。两者是有区别的。好吧,共享队列将在某个地方进行一些同步以使其线程安全,值得庆幸的是,我们可以依赖 Microsoft PFX 团队的辛勤工作。那些家伙很聪明,我相信 Concurrent collections 命名空间设计得相当好,并且可以信任它具有相当高的效率。
同样,我们需要调用一些 .NET 知识,所以对于集中式队列,我们使用 ConcurrentQueue<T>。所有客户端线程都会在这里将它们的传入消息排队等待发送给服务器。
还将启动另一个线程。这个额外的线程将处理已加入集中队列的消息。当从集中队列中取出一条消息时,它将被发送到服务器。关键是只有读取集中队列的线程才会向服务器发送消息。
由于我们仍然希望消息异步发送,因此我们继续使用 DealerSocket,但由于现在只有一个地方向服务器发送消息,所以我们只需要一个 DealerSocket。
我们将单个 DealerSocket 添加到一个 Poller 实例中,并监听每个套接字的 ReceieveReady 事件,这允许我们从服务器接收回消息。
这比第一个示例更复杂,因为它涉及更多的活动部件,但我们不再创建大量的套接字。只有一个。
和以前一样,这是一张展示我们在此试图实现目标的图
这是该场景的代码
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using NetMQ; namespace ConcurrentQueueDemo { public class Program { public void Run() { //NOTES //1. Use many threads each writing to ConcurrentQueue //2. Extra thread to read from ConcurrentQueue, and this is the one that // will deal with writing to the server ConcurrentQueue<string> messages = new ConcurrentQueue<string>(); int delay = 3000; Poller poller = new Poller(); using (NetMQContext ctx = NetMQContext.Create()) { using (var server = ctx.CreateRouterSocket()) { server.Bind("tcp://127.0.0.1:5556"); //start some threads, where each thread, will use a client side //broker (simple thread that monitors a CooncurrentQueue), where //ONLY the client side broker talks to the server for (int i = 0; i < 3; i++) { Task.Factory.StartNew((state) => { while (true) { messages.Enqueue(state.ToString()); Thread.Sleep(delay); } }, string.Format("client {0}", i), TaskCreationOptions.LongRunning); } //single sender loop Task.Factory.StartNew((state) => { var client = ctx.CreateDealerSocket(); client.Connect("tcp://127.0.0.1:5556"); client.ReceiveReady += Client_ReceiveReady; poller.AddSocket(client); while (true) { string clientMessage = null; if (messages.TryDequeue(out clientMessage)) { var messageToServer = new NetMQMessage(); messageToServer.AppendEmptyFrame(); messageToServer.Append(clientMessage); client.SendMessage(messageToServer); } } }, TaskCreationOptions.LongRunning); //start the poller Task task = Task.Factory.StartNew(poller.Start); //server loop while (true) { var clientMessage = server.ReceiveMessage(); Console.WriteLine("========================"); Console.WriteLine(" INCOMING CLIENT MESSAGE "); Console.WriteLine("========================"); for (int i = 0; i < clientMessage.FrameCount; i++) { Console.WriteLine("Frame[{0}] = {1}", i, clientMessage[i].ConvertToString()); } if (clientMessage.FrameCount == 3) { var clientAddress = clientMessage[0]; var clientOriginalMessage = clientMessage[2].ConvertToString(); string response = string.Format("{0} back from server {1}", clientOriginalMessage, DateTime.Now.ToLongTimeString()); var messageToClient = new NetMQMessage(); messageToClient.Append(clientAddress); messageToClient.AppendEmptyFrame(); messageToClient.Append(response); server.SendMessage(messageToClient); } } } } } void Client_ReceiveReady(object sender, NetMQSocketEventArgs e) { bool hasmore = false; e.Socket.Receive(out hasmore); if (hasmore) { string result = e.Socket.ReceiveString(out hasmore); Console.WriteLine("REPLY " + result); } } [STAThread] public static void Main(string[] args) { Program p = new Program(); p.Run(); } } }
如果您运行此代码,您会看到类似这样的内容
选项 3:使用 NetMQScheduler
最后一个选项是使用 NetMQ 库类:NetMQScheduler。我认为最好从阅读我刚刚提供的链接开始。然后回到这里。
……。
……。
时间流逝
……。
……。
哦,您回来了。好的,现在您知道 NetMQScheduler 为我们提供了一种使用 TPL 来调度工作的方式,并且有一个我们传递给 NetMQScheduler 的 Poller。很酷。
NetMQScheduler 是一个自定义的 TPL 调度程序,它允许我们创建需要完成的任务,并负责它们的线程管理。由于我们已经告诉 NetMQScheduler 我们要使用的 Poller,因此我们可以挂接到 ReceiveReady 事件并使用它从服务器接收回消息。
这里的区别在于,由于我们使用的是 TPL 和 NetMQ,因此每当我们想发送/接收时,都需要使用 TPL 任务和 NetMQScheduler 实例。
说实话,我认为我最不喜欢这种设计,因为它混合了太多的概念,而且 TPL 的东西与 ZeroMQ 的优点混合得有点太多,不符合我的口味。不过,我还是想展示这个例子以求完整。
所以这个例子的代码分为两部分。一个简单的客户端,然后是启动客户端实例并让多个线程使用该客户端实例向服务器发送消息的代码。还有一个基本的服务器循环(我将在“其余部分”标题下显示)。
客户端代码
这是客户端代码,可以看到我们创建了一个 NetMQScheduler,它会接收一个内部使用的新的 Poller 实例。其想法是,任何人都可以通过调用客户端的 SendMessage(..)
方法来发送消息。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using NetMQ; namespace NetMQSchedulerDemo { public class Client : IDisposable { private readonly NetMQContext context; private readonly string address; private Poller poller; private NetMQScheduler scheduler; private NetMQSocket clientSocket; public Client(NetMQContext context, string address) { this.context = context; this.address = address; } public void Start() { poller = new Poller(); clientSocket = context.CreateDealerSocket(); clientSocket.ReceiveReady += clientSocket_ReceiveReady; clientSocket.Connect(address); scheduler = new NetMQScheduler(context, poller); Task.Factory.StartNew(poller.Start, TaskCreationOptions.LongRunning); } void clientSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) { string result = e.Socket.ReceiveString(); Console.WriteLine("REPLY " + result); } public async Task SendMessage(NetMQMessage message) { // instead of creating inproc socket which listen to messages and then send //to the server we just creating task and run a code on // the poller thread which the the thread of the clientSocket Task task = new Task(() => clientSocket.SendMessage(message)); task.Start(scheduler); await task; await ReceiveMessage(); } public async Task ReceiveMessage() { Task task = new Task(() => { var result = clientSocket.ReceiveString(); Console.WriteLine("REPLY " + result); }); task.Start(scheduler); await task; } public void Dispose() { scheduler.Dispose(); clientSocket.Dispose(); poller.Stop(); } } }
其余部分
这是负责启动客户端和额外线程通过客户端(使用上面的 SendMessage(..)
方法)推送消息的其余代码
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.SqlServer.Server; using NetMQ; using NetMQ.Sockets; using NetMQSchedulerDemo; using NUnit.Framework; namespace NetMQSchedulerDemo { public class Program { public void Run() { //NOTES //1. Use NetMQs NetMQScheduler to communicate with the // server. All Send/Receive MUST be done via the // NetMQScheduler and TPL Tasks. See the Client class // for more information on this int delay = 3000; using (NetMQContext ctx = NetMQContext.Create()) { using (var server = ctx.CreateRouterSocket()) { server.Bind("tcp://127.0.0.1:5556"); using (var client = new Client(ctx, "tcp://127.0.0.1:5556")) { client.Start(); //start some theads, each thread will use the //Clients NetMQScheduler to send/receieve messages //to/from the server for (int i = 0; i < 2; i++) { Task.Factory.StartNew(async (state) => { while (true) { var messageToServer = new NetMQMessage(); messageToServer.AppendEmptyFrame(); messageToServer.Append(state.ToString()); await client.SendMessage(messageToServer); Thread.Sleep(delay); } }, string.Format("client {0}", i), TaskCreationOptions.LongRunning); } //server loop while (true) { var clientMessage = server.ReceiveMessage(); Console.WriteLine("========================"); Console.WriteLine(" INCOMING CLIENT MESSAGE "); Console.WriteLine("========================"); for (int i = 0; i < clientMessage.FrameCount; i++) { Console.WriteLine("Frame[{0}] = {1}", i, clientMessage[i].ConvertToString()); } if (clientMessage.FrameCount == 3) { var clientAddress = clientMessage[0]; var clientOriginalMessage = clientMessage[2].ConvertToString(); string response = string.Format("{0} back from server {1}", clientOriginalMessage, DateTime.Now.ToLongTimeString()); var messageToClient = new NetMQMessage(); messageToClient.Append(clientAddress); messageToClient.AppendEmptyFrame(); messageToClient.Append(response); server.SendMessage(messageToClient); } } } } } } [STAThread] public static void Main(string[] args) { Program p = new Program(); p.Run(); } } }
如果您运行此代码,您可能会看到类似这样的内容