CQRS 与解耦消息传递 - 第二部分





5.00/5 (5投票s)
本系列文章的第二篇,展示了 CQRS 架构的实际应用,重点在于将消息传递作为基础设施组件进行解耦。
引言
本文将列出几种可用的消息传递传输机制,介绍 MassTransit 和 nServicebus 等框架如何通过抽象底层(传输层)细节来提供帮助,并展示这些框架如何用作基础组件来提供 IServicebus 接口的具体实现。
我们将使用 MassTransit 和 nServicebus 文档中提供的代码示例,并将这些示例转换为遵循依赖倒置原则。
请注意,本文是系列文章的一部分,系列文章的链接如下。
- 引言
- 企业服务总线框架的需求和解耦的消息传递及示例(本文)
- 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 命令
- 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 聚合和事件溯源
- 库存管理器 - CQRS 应用程序(带解耦的消息传递) - 读取端
传输机制
消息传输方面,市面上有多种组件。
- Msmq
- Rabbitmq
- Azure Service Bus
- 以及许多其他
.NET 中存在允许直接使用这些传输机制的库/类。但是,直接使用这些传输机制的代码会是以下这种形式。
以 Azure Service Bus 的此 示例 为例。
发布/发送消息
string connectionString =
CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
QueueClient Client =
QueueClient.CreateFromConnectionString(connectionString, "TestQueue");
Client.Send(new BrokeredMessage());
接收消息
// Callback to handle received messages.
Client.OnMessage((message) =>
{
try
{
// Process message from queue.
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Test Property: " +
message.Properties["TestProperty"]);
// Remove message from queue.
message.Complete();
}
catch (Exception)
{
// Indicates a problem, unlock message in queue.
message.Abandon();
}
}, options);
直接在代码中使用这些队列有几个缺点。如上面的示例所示,只能发送特定类型的消息(BrokeredMessage),这会将我们与传输机制耦合,并在将来需要时难以切换到另一种机制。
在实际应用程序中,我们需要为应用程序特定的消息指定处理程序,即从我们的域和应用程序层相应发布的事件和命令。因此,需要使用应用程序特定的消息类型,如下面的 CQRS Journey 代码片段所示。
namespace Registration.Handlers
{
public class SeatAssignmentsHandler :
IEventHandler<OrderConfirmed>,
IEventHandler<OrderPaymentConfirmed>,
ICommandHandler<UnassignSeat>,
ICommandHandler<AssignSeat>
{
// .. Code
public void Handle(OrderPaymentConfirmed @event)
{
// .. Code
}
public void Handle(OrderConfirmed @event)
{
// .. Code
}
public void Handle(AssignSeat command)
{
// .. Code
}
public void Handle(UnassignSeat command)
{
// .. Code
}
}
}
该类通过实现 IxxxHandler<MessageType> 接口来指定它处理的消息。这是期望的实现,遵循单一职责原则,即不关心消息来自何处,以及如何将消息从 BrokeredMessage 转换为应用程序特定的消息类型等。
但是,当您直接处理传输机制时,这种实现很难做到。复杂性会增加,因为您需要处理消息的发送和接收、处理程序的注册、将其从 BrokeredMessage 转换为应用程序特定的消息类型,以及其他许多事情。CQRS Journey 项目试图做到这一点,因此最终得到了如下所示的复杂类层次结构。
CommandDispatcher 在调用 register 方法时,会填充一个 commandType => Handlers 的字典。MessageProcessor.onMessageReceived 在 IMessageProcessor.Start 中注册为回调。为了清晰起见,上面的图表中省略了 EventProcessor 类层次结构。
代码变得难以理解和维护。添加新的处理程序变得非常困难,因为有几个组件需要修改,并且需要在许多不同的地方进行注册。
请注意,CQRS Journey 是一个很棒的示例应用程序,用于学习 CQRS、进程管理器、事件溯源和其他相关概念。我发现自己难以理解它们的底层组件,以便在实际应用程序或甚至示例应用程序中应用这些概念。
ESB 框架
MassTransit 和 nServicebus 等 ESB 框架在解决上述问题方面非常有用。这些框架抽象了传输机制,并为我们提供了干净的 API,我们可以轻松地发布和订阅应用程序特定的消息。
已经创建了几个示例来记录它们的使用。
- MassTransit 示例 - Loosely Coupled Labs。该网站提供了许多分步代码演练,介绍如何使用 MassTransit 服务总线框架进行发布-订阅。点击此处 查看使用 MassTransit 和 Azure Service Bus 的示例,我们将用它来演示消息传递基础组件的解耦。我建议至少阅读该示例中的代码演练,直到实现通过控制台应用程序进行发布-订阅。为了理解本文,您可以忽略示例中的“云方式”部分。此外,MassTransit 的文档网站在此处。
- nServicebus 示例 - nServicebus 的文档网站。点击此处 查看使用 MSMQ 和 nServicebus 的示例。本文将发布将提到的 nServicebus 示例(使用 nServicebus 5 版)转换为解耦消息传递的代码。
MassTransit 与 Azure Service Bus 示例解耦
该示例的初始起点是 Loosely Coupled Labs 的 示例。本文发布的示例试图解耦 Loosely Coupled Labs 上发布的示例中的消息传递。代码演练将侧重于这种解耦。
解耦示例的源代码可在 github 仓库中找到 - 位于 此处
代码演练
public interface IServiceBus : IDisposable
{
void Publish(IEvent eventMessage);
void Send(ICommand commandMessage);
}
- 发布者和订阅者 EXEs 不依赖于 MassTransit 框架的具体实现。它们依赖于 IServiceBus 接口。
- Infrastructure.Common 项目仅包含接口,不包含任何具体实现。因此,它提供了 EXEs 依赖的抽象。
namespace Infrasctructure.MasstransitServiceBus { public class MassTransitServiceBus : IServiceBus { private readonly MassTransit.IServiceBus _massTransitBus; public MassTransitServiceBus(Action<ServiceBusConfigurator> configurator) { Log4NetLogger.Use(); _massTransitBus = ServiceBusFactory.New(sbc => configurator(sbc)); } public void Publish(IEvent eventMessage) { _massTransitBus.Publish(eventMessage, eventMessage.GetType(), x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); }); } public void Send(ICommand commandMessage) { _massTransitBus.Publish(commandMessage, commandMessage.GetType(), x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); }); } public void Dispose() { _massTransitBus.Dispose(); } } }
- 该接口在基础架构项目 Infrastructure.MasstransitServiceBus 中实现。
- MassTransitServicebus 类提供了 MassTransit 库的包装器,以便我们可以自定义 MassTransit 库以满足我们的需求。这些自定义将仅在此基础架构组件中进行,从而遵循 SRP(一起更改的东西在一起)。
namespace TestPublisher.Main { public class Bootstrapper { // .. Code private static void RegisterServiceBus() { var bus = new MassTransitServiceBus(x => new MassTransitWithAzureServiceBusConfigurator (ConfigurationManager.AppSettings.Get("azure-namespace"), "TestPublisher", ConfigurationManager.AppSettings.Get("azure-key"), x)); IoC.RegisterInstance<IServiceBus>(bus); } // .. Code } }
- IServiceBus 接口被依赖注入到 Main 项目的 Bootstrapper 类中。发布者和订阅者 EXEs 都有自己独立的 Main 和 Bootstrapper 类,它们都依赖于这些类。
- 因此,遵循依赖注入原则,高层策略(发布者和订阅者)不再依赖于底层细节(MassTransit 框架),它们(高层策略和底层细节)都依赖于抽象 IServiceBus。
出版社
- TestPublisher EXE 不直接依赖于基础架构组件。
- 正如我们在下一篇文章中将进一步看到的,如果发布者遵循分层架构,除了基础架构层之外,没有任何层会依赖于 MassTransitServiceBus。
- 消息的发布通过 IoC 解析接口 IServiceBus 来实现。
namespace TestPublisher { internal class Program { private static void Main(string[] args) { Bootstrapper.Init(); var serviceBus = IoC.Resolve<IServiceBus>(); // .. Code serviceBus.Publish(message); // .. Code serviceBus.Dispose(); } } }
订阅者
- 在 Bootstrapper 类中,TestSubscriber 在构建 MassTransitServiceBusConfigurator 时注册消息的处理程序,然后将 IServiceBus 注册到 IoC。
namespace TestSubscriber.Main { public class Bootstrapper { // .. Code private static void RegisterServiceBus() { var bus = new MassTransitServiceBus( x => new MassTransitWithAzureServiceBusConfigurator( ConfigurationManager.AppSettings.Get("azure-namespace"), "TestSubscriber", ConfigurationManager.AppSettings.Get("azure-key"), x) .WithHandler<SomethingHappened, SomethingHappenedHandler>()); ; IoC.RegisterInstance<IServiceBus>(bus); } // .. Code } }
- SomethingHappenedHandler 类实现 IHandle<SomethingHappened>。
namespace TestSubscriber.AppService { public class SomethingHappenedHandler : IHandle<SomethingHappened> { public void Handle(SomethingHappened message) { // .. Code } } }
对 MassTransit 库的依赖
- 如上所示,只有以下项目依赖于 MassTransit:
- Infrastructure.MassTransit => 因为它实现了 IServiceBus
- TestPublisher.Main, TestSubscriber.Main => 因为它将 IServiceBus 与具体实现注册/引导
运行应用程序
- 请在 TestPublisher 和 TestSubscriber 的 app.config 文件中提供 Azure Service Bus 命名空间和 Service Bus 密钥的值。
<appSettings> <add key="azure-namespace" value="" /> <add key="azure-key" value="" /> </appSettings>
NserviceBus 解耦示例
遵循依赖倒置原则的一个好处是我们能够用一个库替换另一个库。下一个代码示例演示了这一点,其中 IServiceBus 接口由一个包装 nServiceBus 库的包装器类实现。
此代码示例的初始起点是 nServicebus 文档中的 示例(使用 nServicebus 5 版)。解耦代码遵循与 MassTransitBus 示例类似的概念,因此不需要代码演练。
此示例的源代码可在 github 仓库中找到 - 位于 此处
关于解耦的说明
目标是让基础架构与核心域逻辑解耦,以便在实现业务逻辑时不会干扰。
它还提供了额外的优势,即能够升级和替换第三方库组件,将更改的区域限制在单个项目中,如上面从 MassTransit 切换到 nServiceBus 所示。
系列中的下一篇文章
系列中的下一篇文章将采用消息传递基础架构组件(本文为 MassTransit 和 Azure Service Bus 通信开发的组件),并构建一个简单的 CQRS 应用程序。
有关本系列文章的完整列表,请转到本文的“介绍”部分。
感谢阅读本文,希望它们能提供深刻的见解。