65.9K
CodeProject 正在变化。 阅读更多。
Home

CQRS 与解耦消息传递 - 第二部分

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2016年3月4日

CPOL

7分钟阅读

viewsIcon

27075

本系列文章的第二篇,展示了 CQRS 架构的实际应用,重点在于将消息传递作为基础设施组件进行解耦。

引言

本文将列出几种可用的消息传递传输机制,介绍 MassTransit 和 nServicebus 等框架如何通过抽象底层(传输层)细节来提供帮助,并展示这些框架如何用作基础组件来提供 IServicebus 接口的具体实现。

我们将使用 MassTransit 和 nServicebus 文档中提供的代码示例,并将这些示例转换为遵循依赖倒置原则。

请注意,本文是系列文章的一部分,系列文章的链接如下。

  1. 引言
  2. 企业服务总线框架的需求和解耦的消息传递及示例(本文)
  3. 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 命令
  4. 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 聚合和事件溯源
  5. 库存管理器 - CQRS 应用程序(带解耦的消息传递) - 读取端

传输机制

消息传输方面,市面上有多种组件。

  • Msmq
  • Rabbitmq
  • Azure Service Bus
  • 以及许多其他

.NET 中存在允许直接使用这些传输机制的库/类。但是,直接使用这些传输机制的代码会是以下这种形式。

以 Azure Service Bus 的此 示例 为例。

发布/发送消息

string connectionString =
    CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

QueueClient Client =
    QueueClient.CreateFromConnectionString(connectionString, "TestQueue");

Client.Send(new BrokeredMessage());

接收消息

// Callback to handle received messages.
Client.OnMessage((message) =>
{
    try
    {
        // Process message from queue.
        Console.WriteLine("Body: " + message.GetBody<string>());
        Console.WriteLine("MessageID: " + message.MessageId);
        Console.WriteLine("Test Property: " +
        message.Properties["TestProperty"]);

        // Remove message from queue.
        message.Complete();
    }
    catch (Exception)
    {
        // Indicates a problem, unlock message in queue.
        message.Abandon();
    }
}, options);

直接在代码中使用这些队列有几个缺点。如上面的示例所示,只能发送特定类型的消息(BrokeredMessage),这会将我们与传输机制耦合,并在将来需要时难以切换到另一种机制。

在实际应用程序中,我们需要为应用程序特定的消息指定处理程序,即从我们的域和应用程序层相应发布的事件和命令。因此,需要使用应用程序特定的消息类型,如下面的 CQRS Journey 代码片段所示。

namespace Registration.Handlers
{
    public class SeatAssignmentsHandler :
        IEventHandler<OrderConfirmed>,
        IEventHandler<OrderPaymentConfirmed>,
        ICommandHandler<UnassignSeat>,
        ICommandHandler<AssignSeat>
    {
    
        // .. Code
     
        public void Handle(OrderPaymentConfirmed @event)
        {
            // .. Code
        }

        public void Handle(OrderConfirmed @event)
        {
            // .. Code
        }

        public void Handle(AssignSeat command)
        {
            // .. Code
        }

        public void Handle(UnassignSeat command)
        {
            // .. Code
        }
    }
}

该类通过实现 IxxxHandler<MessageType> 接口来指定它处理的消息。这是期望的实现,遵循单一职责原则,即不关心消息来自何处,以及如何将消息从 BrokeredMessage 转换为应用程序特定的消息类型等。

但是,当您直接处理传输机制时,这种实现很难做到。复杂性会增加,因为您需要处理消息的发送和接收、处理程序的注册、将其从 BrokeredMessage 转换为应用程序特定的消息类型,以及其他许多事情。CQRS Journey 项目试图做到这一点,因此最终得到了如下所示的复杂类层次结构。

CommandDispatcher 在调用 register 方法时,会填充一个 commandType => Handlers 的字典。MessageProcessor.onMessageReceived 在 IMessageProcessor.Start 中注册为回调。为了清晰起见,上面的图表中省略了 EventProcessor 类层次结构。

代码变得难以理解和维护。添加新的处理程序变得非常困难,因为有几个组件需要修改,并且需要在许多不同的地方进行注册。

请注意,CQRS Journey 是一个很棒的示例应用程序,用于学习 CQRS、进程管理器、事件溯源和其他相关概念。我发现自己难以理解它们的底层组件,以便在实际应用程序或甚至示例应用程序中应用这些概念。

ESB 框架

MassTransit 和 nServicebus 等 ESB 框架在解决上述问题方面非常有用。这些框架抽象了传输机制,并为我们提供了干净的 API,我们可以轻松地发布和订阅应用程序特定的消息。

已经创建了几个示例来记录它们的使用。

  1. MassTransit 示例 - Loosely Coupled Labs。该网站提供了许多分步代码演练,介绍如何使用 MassTransit 服务总线框架进行发布-订阅。点击此处 查看使用 MassTransit 和 Azure Service Bus 的示例,我们将用它来演示消息传递基础组件的解耦。我建议至少阅读该示例中的代码演练,直到实现通过控制台应用程序进行发布-订阅。为了理解本文,您可以忽略示例中的“云方式”部分。此外,MassTransit 的文档网站在此处。
  2. nServicebus 示例 - nServicebus 的文档网站。点击此处 查看使用 MSMQ 和 nServicebus 的示例。本文将发布将提到的 nServicebus 示例(使用 nServicebus 5 版)转换为解耦消息传递的代码。

MassTransit 与 Azure Service Bus 示例解耦

该示例的初始起点是 Loosely Coupled Labs 的 示例。本文发布的示例试图解耦 Loosely Coupled Labs 上发布的示例中的消息传递。代码演练将侧重于这种解耦。

解耦示例的源代码可在 github 仓库中找到 - 位于 此处

代码演练

public interface IServiceBus : IDisposable
{
    void Publish(IEvent eventMessage);
    void Send(ICommand commandMessage);
}
  • 发布者和订阅者 EXEs 不依赖于 MassTransit 框架的具体实现。它们依赖于 IServiceBus 接口。
  • Infrastructure.Common 项目仅包含接口,不包含任何具体实现。因此,它提供了 EXEs 依赖的抽象。
    namespace Infrasctructure.MasstransitServiceBus
    {
        public class MassTransitServiceBus : IServiceBus
        {
            private readonly MassTransit.IServiceBus _massTransitBus;
    
            public MassTransitServiceBus(Action<ServiceBusConfigurator> configurator)
            {
                Log4NetLogger.Use();
                _massTransitBus = ServiceBusFactory.New(sbc => configurator(sbc));
            }
    
            public void Publish(IEvent eventMessage)
            {
                _massTransitBus.Publish(eventMessage, eventMessage.GetType(), 
                x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
            }
    
            public void Send(ICommand commandMessage)
            {
                _massTransitBus.Publish(commandMessage, commandMessage.GetType(), 
                x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
            }
    
            public void Dispose()
            {
                _massTransitBus.Dispose();
            }
        }
    }
  • 该接口在基础架构项目 Infrastructure.MasstransitServiceBus 中实现。
  • MassTransitServicebus 类提供了 MassTransit 库的包装器,以便我们可以自定义 MassTransit 库以满足我们的需求。这些自定义将仅在此基础架构组件中进行,从而遵循 SRP(一起更改的东西在一起)。
    namespace TestPublisher.Main
    {
        public class Bootstrapper
        {
            // .. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(x => new MassTransitWithAzureServiceBusConfigurator
    		(ConfigurationManager.AppSettings.Get("azure-namespace"), "TestPublisher", 
    		ConfigurationManager.AppSettings.Get("azure-key"), x));
                IoC.RegisterInstance<IServiceBus>(bus);
            }
    
            // .. Code
        }
    }
  • IServiceBus 接口被依赖注入到 Main 项目的 Bootstrapper 类中。发布者和订阅者 EXEs 都有自己独立的 Main 和 Bootstrapper 类,它们都依赖于这些类。
  • 因此,遵循依赖注入原则,高层策略(发布者和订阅者)不再依赖于底层细节(MassTransit 框架),它们(高层策略和底层细节)都依赖于抽象 IServiceBus。

出版社

  • TestPublisher EXE 不直接依赖于基础架构组件。
  • 正如我们在下一篇文章中将进一步看到的,如果发布者遵循分层架构,除了基础架构层之外,没有任何层会依赖于 MassTransitServiceBus。
  • 消息的发布通过 IoC 解析接口 IServiceBus 来实现。
    namespace TestPublisher
    {
        internal class Program
        {
            private static void Main(string[] args)
            {   
                Bootstrapper.Init();
                var serviceBus = IoC.Resolve<IServiceBus>();
    
                // .. Code
    
                serviceBus.Publish(message);
    
                // .. Code        
                serviceBus.Dispose();
            }
        }
    }

订阅者

  • 在 Bootstrapper 类中,TestSubscriber 在构建 MassTransitServiceBusConfigurator 时注册消息的处理程序,然后将 IServiceBus 注册到 IoC。
    namespace TestSubscriber.Main
    {
        public class Bootstrapper
        {
           // .. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(
                    x =>
                        new MassTransitWithAzureServiceBusConfigurator(
                            ConfigurationManager.AppSettings.Get("azure-namespace"), "TestSubscriber",
                            ConfigurationManager.AppSettings.Get("azure-key"), x)
                            .WithHandler<SomethingHappened, SomethingHappenedHandler>());
                                                              
                ;
                IoC.RegisterInstance<IServiceBus>(bus);
            }
    
            // .. Code
        }
    }
  • SomethingHappenedHandler 类实现 IHandle<SomethingHappened>。
    namespace TestSubscriber.AppService
    {
      public class SomethingHappenedHandler : IHandle<SomethingHappened>
      {
          public void Handle(SomethingHappened message)
          {
              // .. Code
          }
      }
    }

对 MassTransit 库的依赖

  • 如上所示,只有以下项目依赖于 MassTransit:
    • Infrastructure.MassTransit => 因为它实现了 IServiceBus
    • TestPublisher.Main, TestSubscriber.Main => 因为它将 IServiceBus 与具体实现注册/引导

运行应用程序

  • 请在 TestPublisher 和 TestSubscriber 的 app.config 文件中提供 Azure Service Bus 命名空间和 Service Bus 密钥的值。
      <appSettings>
        <add key="azure-namespace" value="" />
        <add key="azure-key" value="" />
      </appSettings>

NserviceBus 解耦示例

遵循依赖倒置原则的一个好处是我们能够用一个库替换另一个库。下一个代码示例演示了这一点,其中 IServiceBus 接口由一个包装 nServiceBus 库的包装器类实现。

此代码示例的初始起点是 nServicebus 文档中的 示例(使用 nServicebus 5 版)。解耦代码遵循与 MassTransitBus 示例类似的概念,因此不需要代码演练。

此示例的源代码可在 github 仓库中找到 - 位于 此处

关于解耦的说明

目标是让基础架构与核心域逻辑解耦,以便在实现业务逻辑时不会干扰。

它还提供了额外的优势,即能够升级和替换第三方库组件,将更改的区域限制在单个项目中,如上面从 MassTransit 切换到 nServiceBus 所示。

系列中的下一篇文章

系列中的下一篇文章将采用消息传递基础架构组件(本文为 MassTransit 和 Azure Service Bus 通信开发的组件),并构建一个简单的 CQRS 应用程序。

有关本系列文章的完整列表,请转到本文的“介绍”部分。
感谢阅读本文,希望它们能提供深刻的见解。

参考文献

  1. Uncle Bob 的单一职责原则
  2. Uncle Bob 的依赖倒置原则
  3. Azure Service Bus 示例
  4. CQRS Journey
  5. MassTransit
  6. Loosely Coupled Labs - MassTransit 示例
  7. nServicebus
  8. nServicebus 示例
© . All rights reserved.