65.9K
CodeProject 正在变化。 阅读更多。
Home

ZeroMq #1: 你好,世界

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.86/5 (5投票s)

2014 年 8 月 21 日

CPOL

4分钟阅读

viewsIcon

43010

什么是 ZeroMQ ZeroMq 是一个 C 语言库,包含一系列强大的套接字,它对您通常编写的套接字代码提供了一个非常非常酷的抽象。它通过一套标准的、针对特定场景构建的套接字来提供构建块。

什么是 ZeroMQ

ZeroMq 是一个 C 语言库,包含一系列强大的套接字,它对您通常编写的套接字代码提供了一个非常非常酷的抽象。它通过一套标准的、针对特定场景构建的套接字来提供构建块。

它的开发者在高级消息队列协议 (AMQP) 的编写中发挥了重要作用,并且在消息传递领域非常重要。

有一本非常棒的书,每个人都应该读一读,它是 Pieter Hintjens 写的。

http://www.amazon.co.uk/ZeroMQ-Messaging-Applications-Pieter-Hintjens/dp/1449334067

还有一个包含完整代码示例的 PDF 在线版本,被称为指南。

https://zguide.zeromq.cn/page:all

为什么要使用 ZeroMq/消息传递

如果您曾经编写过任何异步代码,并且不得不处理共享状态,您就会知道这会带来锁/信号量等等。

现在想象一个您不必关心锁、信号量等等的世界,您只需传递一条消息,而无需担心共享状态。欢迎来到消息传递。这就是您如何在没有共享状态管理开销的情况下编写每秒消息吞吐量高的系统。

Zero 的核心是一个消息传递框架。它可以以无代理方式设置,也可以使用代理,甚至点对点。正是套接字使其强大。它们是基本的构建块,您可以使用它们来创建大型分布式架构,或者非常小的架构。

我强烈建议大家阅读这本书或查看指南,它们确实改变了我对某些任务的看法。

在哪里可以获得 ZeroMq 库?

首先,我提到 ZeroMq 是用 C 语言编写的,但有很多语言绑定。实际上,您可以使用的 C# 绑定,即 zmqcli 绑定。但是,这样做的问题是,您得到的错误有时会非常令人困惑,因为它倾向于显示实际的 C 错误代码。

我希望(如果可能)使用 ZeroMq 的完全原生端口,幸运的是,通过 NetMq 项目,确实存在这样的东西。这将是我在所有文章中使用的库。

您可以使用以下 Nuget 包管理器命令行安装 NetMq

Install-Package NetMQ

因此,事不宜迟,让我们来看看一个非常简单的例子。

这个例子做什么

这个例子非常简单明了,我们从客户端向服务器发送一条消息,然后服务器将一条消息发回。这是请求/响应模式的一个(非常简单的)示例,我们将看到更多这样的示例。Zero 还支持发布/订阅,我们也会对其进行研究(尽管不像请求/响应那样详细)。

让我们看看一些代码吧

using System;
using NetMQ;

namespace HelloWorldDemo
{
    class Program
    {
        private static void Main(string[] args)
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");

                        client.Send("Hello");

                        string fromClientMessage = server.ReceiveString();

                        Console.WriteLine("From Client: {0}", fromClientMessage);

                        server.Send("Hi Back");

                        string fromServerMessage = client.ReceiveString();

                        Console.WriteLine("From Server: {0}", fromServerMessage);

                        Console.ReadLine();
                    }
                }
            }
        }
    }
}

信不信由你,这些代码足以实现一个完全正常运行的请求(客户端)/响应(服务器)模式。如果不相信,这里有一些输出可以证明。

image

好的,它确实有效,这太疯狂了。那么它是如何工作的呢?

这里有几个需要注意的地方。

  1. 我们可以通过使用专门针对在请求/响应场景中工作的套接字来创建请求/响应模式。
  2. 我们可以使用 tcp 作为协议 (ZeroMq 还支持其他协议,例如 inproc)
  3. 我们不必在服务器上启动任何额外的线程来处理新连接的客户端套接字,然后继续接受其他客户端套接字。事实上,这段代码几乎可以与数千个客户端通信,而无需进行太多更改(事实上,我将向您展示一个使用单独进程的示例)。
  4. 有这个神奇的 NetMqContext。这是强制性的,无论何时使用 ZeroMq 套接字,都必须使用它。

在请求/响应中,有一些具体的事情需要讨论,因为这些类型的套接字被认为是 1:1 请求/响应。因此,如果您在客户端的请求套接字上调用 ReceiveString() 两次,而服务器的响应套接字没有发送任何东西,您将会收到一个异常,如下面的屏幕截图所示。

image

在单独的线程中运行

这个演示是为了向您展示如何使用内部进程消息传递系统。显然,您需要在此示例中使用新线程,因为我们需要不阻塞套接字的 receive() 方法。

image

程序

这只是启动一些客户端供服务器处理。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HelloWorldDemoSeparateThreads
{
    public class Program
    {
        public static void Main(string[] args)
        {
            Server server = new Server();
            server.Run();

            foreach (Client client in Enumerable.Range(0, 5).Select(
                x => new Client(string.Format("client {0}", x))))
            {
                client.Run();
            }

            Console.ReadLine();
        }
    }
}

客户端

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;


namespace HelloWorldDemoSeparateThreads
{
    sealed class Client
    {
        private readonly string clientName;

        public Client(string clientName)
        {
            this.clientName = clientName;
        }

        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var client = ctx.CreateRequestSocket())
                    {
                        client.Connect("tcp://127.0.0.1:5556");
                        while (true)
                        {
                            client.Send(string.Format("Hello from client {0}", clientName));
                            string fromServerMessage = client.ReceiveString();
                            Console.WriteLine("From Server: {0} running on ThreadId : {1}", 
                                fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(5000);
                        }
                    }
                }
            });

        }
    }
}

服务器

using System;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;

namespace HelloWorldDemoSeparateThreads
{
    sealed class Server
    {
        public void Run()
        {
            Task.Run(() =>
            {
                using (NetMQContext ctx = NetMQContext.Create())
                {
                    using (var server = ctx.CreateResponseSocket())
                    {
                        server.Bind("tcp://127.0.0.1:5556");

                        while (true)
                        {
                            string fromClientMessage = server.ReceiveString();
                            Console.WriteLine("From Client: {0} running on ThreadId : {1}", 
                                fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                            server.Send("Hi Back");
                        }

                    }
                }
            });

        }
    }
}

在单独的进程中运行

客户端

using System;
using System.Threading;
using NetMQ;

namespace HelloWorldSeparateClient
{
    sealed class Client
    {
        private string clientName;

        public static void Main(string[] args)
        {
            Client c = new Client();
            c.clientName = args[0];
            c.Run();
        }

        public void Run()
        {
            
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var client = ctx.CreateRequestSocket())
                {
                    client.Connect("tcp://127.0.0.1:5556");
                    while (true)
                    {
                        client.Send(string.Format("Hello from client {0}", clientName));
                        string fromServerMessage = client.ReceiveString();
                        Console.WriteLine("From Server: {0} running on ThreadId : {1}",
                            fromServerMessage, Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(5000);
                    }
                }
            }
            

        }
    }

}

服务器

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;


namespace HelloWorldSeparateServer
{
    sealed class Server
    {

        public static void Main(string[] args)
        {
            Server s = new Server();
            s.Run();
        }

        public void Run()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                using (var server = ctx.CreateResponseSocket())
                {
                    server.Bind("tcp://127.0.0.1:5556");

                    while (true)
                    {
                        string fromClientMessage = server.ReceiveString();
                        Console.WriteLine("From Client: {0} running on ThreadId : {1}",
                            fromClientMessage, Thread.CurrentThread.ManagedThreadId);
                        server.Send("Hi Back");
                    }

                }
             }
        }
    }
}

这是在单独的进程中运行的代码,可以看到我们根本不必做任何事情来更改服务器代码,我们只是将其移动到新进程中运行,而不是新线程。

image

本文的代码在哪里?

所有这些帖子的代码都托管在 github 中的一个大型解决方案中。

https://github.com/sachabarber/ZeroMqDemos

© . All rights reserved.