65.9K
CodeProject 正在变化。 阅读更多。
Home

Rabbit Mq Shovel 示例

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (43投票s)

2012年1月3日

CPOL

11分钟阅读

viewsIcon

169833

downloadIcon

1577

了解如何使用 RabbitMq 进行消息路由。

目录

引言

我有一段时间没有写文章了,这并非因为我不忙,恰恰相反,我一直在忙于开发一个我认为人们会喜欢的生产力工具。我想我肯定会用上它,这也是我写它的部分原因。我正在和 CodeProject 的 Pete O'Hanlon 一起开发这个工具,进展顺利。我们希望在新的一年里能有一个初版发布。总之,说了这么多我为什么没写文章的原因,现在回到正题。

圣诞节前,在公司里,我的团队领导 Richard King(《Baboon Converters》的合著者)和我正在研究提供一个基于 MSMQ 队列的系统,在该系统中,我们将部署多个队列,链中的每台机器都可以接收和发送。为了简单起见,我们先不考虑双向通信,只考虑消息在机器间的单向传输。下图大致说明了我们的想法。

本质上,我们想要的是某种从Machine AMachine B的路由。

这一切都可以通过相当标准的 MSMQ 代码来实现,我们可以通过编程转发队列消息,或者甚至可以使用 MSMQ over WCF 或新的 WCF 4.0 RoutingService。我们对所有这些方法都有保留意见。

  • MSMQ 代码:这会产生过多的模板代码,当然我们可以对其进行抽象,最终得到一个相当简洁的方案,但我们想看看在不走这条路的情况下能做什么。
  • MSMQ over WCF:好的,但需要大量的配置,而且我们需要某个地方来托管服务。还需要创建 MSMQ 队列并管理这些队列的访问权限,使其仅限于授权用户。
  • RoutingService:这个功能相当不错,但它仍然依赖于 WCF,因此存在与 MSMQ over WCF 相同的问题。

我们觉得所有这些方法要么涉及过多的配置/设置,要么就是我们不想要的,而且需要我们编写大量代码。我们也觉得这个桥梁肯定已经被别人跨越过,所以我们开始研究市面上可用的消息解决方案(而且有很多);现在我们将简要讨论其中的一些。

注意:这篇文章非常具体,它主要讨论如何解决我们需要的路由安排问题,所以如果您认为这不适合您,没关系,我们理解。但是,如果您有类似的需求,那么这篇文章可能会谈论到一些您可能会用的东西。选择权在您。

可用框架

当你真正开始研究时,市面上有成百上千种消息解决方案。我们详细研究了三种,下面将简要介绍。我们不会过多深入细节,而是列出每个框架供应商声称能使他们的框架脱颖而出的特性。

NServiceBus

网站:http://www.nservicebus.com/

供应商声称

  • 总线架构
  • 发布/订阅
  • 持久化消息
  • Sagas
  • 跨服务器可扩展
  • 事务
  • 序列化
  • IOC 支持

MassTransit

网站:http://masstransit-project.com/

供应商声称

  • 总线架构
  • Sagas
  • 异常管理
  • 事务
  • 序列化
  • Headers
  • 消费者生命周期
  • 基于 Rabbit Mq 构建
  • IOC 支持

RabbitMq

网站:https://rabbitmq.cn/

供应商声称

  • 消息传递,就是能用

选择框架

现在,当涉及到实际选择框架时,我们有以下标准

  1. 易于使用吗?
  2. 需要多长时间才能启动并运行?
  3. 它能完全符合我们的需求吗?
  4. API 是否直观,6个月后我们能理解它吗?
  5. 如果没用过,出了问题我们知道如何修复吗?

基于广泛的概念验证,我们最终选择了 Rabbit Mq(您可能会觉得奇怪,因为它唯一的声称是“消息传递,就是能用”),原因是它比 NServiceBusMass Transit 显得不那么神秘。我们并不是说这两个框架不好,它们都很好,只是对我们的目的而言,我们想要一个非常简单的东西来使用,在进行了测试之后,我们觉得 Rabbit Mq 就是它。它的配置要简单得多(一旦掌握了窍门),而且不涉及那些只有框架设计者才真正理解的晦涩代码。

此外,Rabbit Mq 的文档也比其他框架好很多,至少我们是这么觉得的。

在本文的其余部分,我们将讨论如何配置 Rabbit Mq 以将消息从一台机器发送到另一台机器,这正是我们的需求。如果这听起来对您不那么有趣,或者您看不到它的好处,那么现在就可以结束阅读了。但是,如果您觉得我们试图解决的这种安排可能会对您有所帮助,请继续阅读。

Rabbit Mq

Rabbit Mq 自称为消息代理,典型的设置是有一个 Rabbit Mq 代理(他们称之为 Agents)驻留在某个服务器上,负责处理传入的消息并确保它们被正确分发。一个比较奇怪的方面(至少对于 .NET 开发者来说)是 Rabbit Mq 实际上是用 Erlang 编写的。所以您需要安装它(我们稍后会讲到),但不要因此而却步,市面上有大量的 Rabbit Mq 客户端,.NET 就是其中之一。

现在您可能会想,如果 Rabbit Mq 是一个代理类型的系统,通常是这样的:

它怎么可能实现我们最初要求的那种消息路由呢?

这看起来不像一个有中央代理的代理类型架构。幸运的是,Rabbit Mq 带有一个方便的插件,名为“Shovel”,Rabbit Mq 文档将其描述如下:

rabbitmq_shovel:“RabbitMQ 的一个插件,它将消息从一个代理上的队列“铲”到另一个代理上的交换器。”

突然之间,我们有了两台机器,每台机器都运行一个 Rabbit Mq 代理。嗯,这听起来突然更像我们最初的要求了。太棒了。

有了这些知识,我们就继续进行文章的其余部分,讨论我们需要安装/配置什么才能成功满足我们的需求。

安装

要使 Rabbit Mq 正常运行,您首先需要安装所需的组件。对于我们的需求,因为我们想在消息链中的每台机器上都运行一个 Rabbit Mq 代理,所以所有这些以及后续的安装说明都适用于消息链中的所有机器(因此,对于我们的需求,这将是两台机器)。

现在您应该已经创建了几个文件夹。

Erlang

应该看起来像这样

Rabbit Mq

应该看起来像这样

插件安装

现在我们已经安装了 Erlang 和 Rabbit Mq 服务器,我们需要安装两个插件,如下所述

安装 WebServer 插件

从 Rabbit 安装目录(通常是 C:\Program Files\RabbitMQ Server\rabbitmq_server-2.7.0\sbin),运行以下命令行:

rabbitmq-plugins.bat enable rabbitmq_management

运行此命令行后,我们需要让 Rabbit Mq 服务器识别这些额外的插件,因此我们需要启动和停止 Rabbit 服务器(您需要运行以下命令行,其中命令窗口以管理员权限打开):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

您可能想将这些打包到一个批处理(.BAT)文件中,因为我们将需要再次使用这个组合。

安装 Shovel 插件

从 Rabbit 安装目录(通常是 C:\Program Files\RabbitMQ Server\rabbitmq_server-2.7.0\sbin),运行以下命令行:

  • rabbitmq-plugins.bat enable rabbitmq_shovel
  • rabbitmq-plugins.bat enable rabbitmq_shovel_management

和以前一样,运行这些命令行后,我们需要让 Rabbit Mq 服务器识别这些额外的插件,因此我们需要启动和停止 Rabbit 服务器(您需要运行以下命令行,其中命令窗口以管理员权限打开):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

检查 WebServer

接下来检查 Web服务器是否可用,可以通过以下 URL 查看:https://:55672/#/

其中用户名和密码默认为:

  1. 用户名 =“guest
  2. 密码 “guest

点击图片查看放大版本。

可以看到我们有一个正在运行的 Web 服务器,通过它可以监控 Rabbit Mq 服务器的所有组件,例如:

  • 连接
  • 队列
  • Exchanges
  • Shovels(现在还不能工作,因为我们还没有配置它)

所以到目前为止一切顺利,现在让我们转向配置 Shovel 插件,好吗?

配置 Shovel

Shovel Rabbit Mq 插件的作用是:

rabbitmq_shovel:“RabbitMQ 的一个插件,它将消息从一个代理上的队列“铲”到另一个代理上的交换器。”

在使用 Shovel 之前,我们需要对其进行配置。

创建环境变量

为了让 Rabbit Mq 能够读取配置文件,我们需要创建一个环境变量来告诉 Rabbit Mq 从哪里获取其配置。应该这样做,其中Variable 的值应该是 Rabbit Mq 配置文件的路径和名称,不包括文件扩展名。

创建 Shovel 配置文件

下一步是创建一个新的 Rabbit Mq 配置文件,该文件将配置 Shovel 插件,该配置文件的示例如下。值得注意的是,这是一个 Erlang 风格的配置文件,Rabbit Mq 使用的就是这种格式。

所以,回到我们想达到的目标

基于此图,我们可以有一个名为 Rabbit.configRabbit Mq 配置文件,存储在 c\:\RabbitConfig 中,其内容如下(末尾的“.”很重要)。

在演示代码中,Machine AMachine B 是我工作中两台名为“C1801”和“C1799”的机器,我们通信的队列名为“Killer”。

您需要根据自己的需求进行更改。

[{rabbitmq_shovel,
  [{shovels,
    [{killer_push,
      [{sources,      [{broker,"amqp://C1801"}]},
       {destinations, [{broker, "amqp://C1799"}]},
       {queue, <<"Killer">>},
       {ack_mode, on_confirm},
       {publish_properties, [{delivery_mode, 2}]},
       {publish_fields, [{exchange, <<"">>},
                         {routing_key, <<"Killer">>}]},
       {reconnect_delay, 5}
      ]}
     ]
   }]
}].

和以前一样,我们需要让 Rabbit Mq 服务器识别这些插件更改,因此我们需要启动和停止 Rabbit 服务器(您需要运行以下命令行,其中命令窗口以管理员权限打开):

  • rabbitmq-service stop
  • rabbitmq-service remove
  • rabbitmq-service install
  • rabbitmq-service start

这就是配置的全部内容了,至少对我们预期的场景是如此。确实需要一点时间来适应 Erlang 风格的配置文件,但这只是它的工作方式。您会习惯的。

这是从我们实际工作 PC 上截取的运行版本,我们在其中使用 Rabbit Mq 完全测试了此场景。

正如您所见,这假设了一个名为“killer_push”的 Rabbit Mq 队列,该名称在上面显示的 Rabbit Mq 配置文件中已配置。

演示代码

我们包含了一个简单的 VS2010 演示解决方案,其中包含两个简单的项目:Sender 和 Receiver,如下所示。它们有意设计得很简单,以便您能看到接收到的消息。您需要根据自己的用途进行更改。

发送者代码

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

class Send {

    private ConnectionFactory factory = new ConnectionFactory();
    private IConnection connection = null;
    private IModel channel = null;
    private int counter =0;


    public Send()
    {
        factory.HostName = "C1801";
    }

    private void Setup()
    {
        counter = 0;
        connection = connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ModelShutdown += Channel_ModelShutdown;
        connection.CallbackException += Connection_CallbackException;
        connection.ConnectionShutdown += Connection_ConnectionShutdown;

        bool durable = true;
        channel.QueueDeclare("Killer", durable, false, false, null);
    }

    private void Publish()
    {
        IBasicProperties properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2;
        properties.CorrelationId = "sachas message";

        try
        {
            while (true)
            {
                string message = string.Format("This is the message {0}, {1}", 
                                        ++counter, DateTime.Now.ToShortTimeString());
                byte[] body = System.Text.Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "Killer", properties, body);
                Console.ReadLine();
                Console.WriteLine(" [x] Sent1 {0}", message);
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("SOMETHING IS WRONG!!!!   " + ex.Message);
        }
        finally
        {
            ReStart();
        }
    }

    private void ReStart()
    {
        CleanUp();
        Setup();
        Publish();
    }

    private void CleanUp()
    {
        if (connection != null)
        {
            connection.Dispose();
            connection.Close();
        }
        if (channel != null)
        {
            channel.Dispose();
            channel.Close();
        }
    }

    public static void Main() 
    {
        Send r = new Send();
        r.Setup();
        r.Publish();
        
    }

    private void Connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
    {
        Console.WriteLine("connection_ConnectionShutdown " + reason.ToString());
        ReStart();
    }

    private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
    {
        Console.WriteLine("connection_CallbackException " + e.Exception.StackTrace);
        ReStart();
    }

    private void Channel_ModelShutdown(IModel model, ShutdownEventArgs reason)
    {
        Console.WriteLine("CHANNEL__MODEL_SHUTDOWN " + reason.ToString());
        ReStart();
    }
}

接收者代码

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Threading;
using RabbitMQ.Client.Exceptions;
using System;

class Receive {

    private ConnectionFactory factory = new ConnectionFactory();
    
    private IConnection connection = null;
    private IModel channel = null;
    private QueueingBasicConsumer consumer = null;

    public Receive ()
    {
        factory.HostName = "C1799";
    }

    private void Setup()
    {
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ModelShutdown += Channel_ModelShutdown;
        connection.CallbackException += Connection_CallbackException;
        connection.ConnectionShutdown += Connection_ConnectionShutdown;

        bool isDurable = true;
        bool exclusive = false;
        bool autoDelete = false;
        bool noAck = false;

        channel.QueueDeclare("Killer", isDurable, exclusive, autoDelete, null);

        consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume("Killer", noAck, consumer);

        System.Console.WriteLine(" [*] Waiting for messages." +
                                 "To exit press CTRL+C");
    }

    private void CleanUp()
    {
        if (connection != null)
        {
            connection.Dispose();
            connection.Close();
        }
        if (channel != null)
        {
            channel.Dispose();
            channel.Close();
        }
    }

    private void Listen()
    {
        try
        {
            while (true)
            {

                if (!channel.IsOpen)
                    throw new Exception("Channel is closed");

                BasicDeliverEventArgs ea =
                    (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                byte[] body = ea.Body;
                string s = ea.BasicProperties.CorrelationId;
                string message = System.Text.Encoding.UTF8.GetString(body);
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine(" [x] Received {0}", message);
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine("SOMETHING IS WRONG!!!!   " + ex.Message);
        }
        finally
        {
            CleanUp();
        }
    }

    public static void Main() 
    {
        Receive r = new Receive();
        r.Setup();
        r.Listen();
    }

    private void ReStart()
    {
        connection.CallbackException -= Connection_CallbackException;
        connection.ConnectionShutdown -= Connection_ConnectionShutdown;

        CleanUp();
        Setup();
        Listen();
    }

    private void Connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
    {
        Console.WriteLine("connection_ConnectionShutdown " + reason.ToString());
        ReStart();
    }

    private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
    {
        Console.WriteLine("connection_CallbackException " + e.Exception.StackTrace);
        ReStart();
    }

    private void Channel_ModelShutdown(IModel model, ShutdownEventArgs reason)
    {
        Console.WriteLine("CHANNEL__MODEL_SHUTDOWN " + reason.ToString());
        ReStart();
    }
}

我认为代码本身很清晰,所以不会深入探讨。其中很多内容与 Rabbit Mq 示例中的内容大致相同,只是稍作重构以适应上述结构。

就这样

总之,这便是我现在想说的全部内容了。我意识到这不是我通常风格的文章,而更像是一篇循序渐进的说明型文章(我很少这样做),但我和 Richard 都花了很长时间才正确设置好 Rabbit,所以我们觉得值得与他人分享。如果您喜欢这篇文章或觉得它有用,请花点时间写下评论或投票,都欢迎。谢谢。

© . All rights reserved.