CQRS 与解耦消息传递 - 第四部分






4.33/5 (4投票s)
本系列文章的第四篇,展示了 CQRS 架构的实际应用,重点在于将消息传递作为基础设施组件进行解耦。
引言
在本篇文章中,我们将研究库存管理器应用程序,重点关注聚合根(Aggregates)和事件溯源(Event Sourcing)的实现。库存管理器用例基于 Greg Young 的“超级简单的 CQRS”示例。在库存管理器中,仅实现了创建库存项目的第一个用例。
库存管理器应用程序的代码位于 GitHub 存储库中 - 在此处可以找到。
请注意,这篇博文是系列文章的一部分,系列文章的链接如下:
- 引言
- 企业服务总线框架的需求以及带有示例的解耦消息传递
- 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 命令
- 库存管理器 - CQRS 应用程序(带解耦消息传递)- 聚合根和事件溯源(本文)
- 库存管理器 - CQRS 应用程序(带解耦的消息传递) - 读取端
库存管理器 – 聚合根和事件溯源
- 正如 Greg Young 在视频中所述,聚合根和读取模型 DTOs 具有结构表示,并且它们仅捕获系统的最终状态。然而,真相的来源是事件。
- 聚合根公开行为供用例消费。此行为会触发事件,这些事件是真相的来源。因此,我们持久化这些事件(行为模型),并通过为这些事件提供处理程序来从事件中派生状态(结构模型)。
- 因此,聚合根具有其事件的处理程序。
public class InventoryItem : EventSourced { private string _name = string.Empty;//Rename functionality will use this field protected InventoryItem(Guid id) : base(id) { Handles<InventoryItemCreated>(OnInventoryItemCreated); // A. Register handler for event } private void OnInventoryItemCreated(InventoryItemCreated e) { this._name = e.Name; // C. State is updated in the handler for the event. // State is not persisted for the aggregate, // it is the events (source of truth) that get persisted. } public InventoryItem(Guid id, string name):this(id) { Update(new InventoryItemCreated(id, name)); // B. Some behavior fires off the event } }
Azure 事件溯源存储库
确保在保存时发布事件
EventStore
将事件存储在 Azure 表存储中。- 在
AzureEventSourcedRepository
中,我们需要确保在保存事件时,也能发布这些事件。public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced { // .. Code public void Save(T eventSourced, string correlationId) { // TODO: guarantee that only incremental versions of the event are stored var events = eventSourced.Events.ToArray(); var serialized = events.Select (e => _versionedEventSerializer.Serialize(e, typeof (T), correlationId)); _eventStore.Save(eventSourced.Id.ToString(), serialized); _publisher.Send(eventSourced.Id.ToString(), events.Length); } }
- 然而,不可能在 Azure 表和 Azure 服务总线之间进行分布式事务。Azure 表存储仅支持针对唯一分区键的事务。我们将聚合根的标识存储在该分区键列中。因此,为了确保事件被发布,我们在事件存储中保存事件的 2 份副本。
- 一份副本是该事件的永久记录,另一份副本成为必须在 Windows Azure 服务总线上发布的事件的虚拟队列的一部分。下面的代码示例显示了
EventStore
类中的Save
方法。“Unpublished”前缀标识了作为未发布事件虚拟队列一部分的事件副本。public class EventStore<T> : IEventStore<T>, IPendingEventsQueue<T> { // .. Code public void Save(string sourceId, IEnumerable<EventData> events) { var table = _tableClient.GetTableReference(_tableName); var tableBatchOperation = new TableBatchOperation(); foreach (var eventData in events) { if (eventData.SourceId != sourceId) throw new Exception("Events from different aggregate instances found during EventStore save. Events from only single Aggregate instance can be saved."); var creationDate = DateTime.UtcNow; tableBatchOperation.Insert(eventData.ToAzureTableEntry(creationDate)); // Add a duplicate of this event to the Unpublished "queue" tableBatchOperation.Insert(eventData.ToUnpublishedAzureTableEntry(creationDate)); } try { table.ExecuteBatch(tableBatchOperation); } catch (DataServiceRequestException ex) { var inner = ex.InnerException as DataServiceClientException; if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict) { throw new ConcurrencyException(); } throw; } } }
- 这两条记录是针对同一聚合根的,因此具有相同的分区键。因此,它们将在单个事务中保存。
- 因此,
EventStore
类同时实现了IEventStore
和IPendingEventsQueue
接口。 AzureEventSourced
Repository 调用EventStore
的保存方法,然后调用publisher
来发布聚合根实例的事件。- 发布者读取指定聚合根的未发布事件记录,并通过
IServicebus
发布它们。发布后,发布者删除未发布事件的记录。public class EventStoreBusPublisher<T> : IEventStoreBusPublisher<T>, IDisposable { // .. Code private void SendAndDeletePending(EventData eventData) { var ev = new VersionedEventSerializer().Deserialize(eventData); _sender.Publish(ev); _queue.DeletePending(eventData); } }
EventPublishProcessor
- 在
AzureEventSourcedRepository
的保存操作中,有可能在保存事件的 2 份副本后但在发布事件之前发生崩溃。由于这两个语句没有包含在任何(分布式)事务中,如果发生此类崩溃,系统将进入不一致状态。public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced { public void Save(T eventSourced, string correlationId) { // TODO: guarantee that only incremental versions of the event are stored var events = eventSourced.Events.ToArray(); var serialized = events.Select(e => _versionedEventSerializer.Serialize (e, typeof (T), correlationId)); _eventStore.Save(eventSourced.Id.ToString(), serialized); // ...IF CRASH HAPPENS HERE, // SYSTEM WILL END UP WITH EVENTS THAT ARE SAVED BUT NEVER PUBLISHED _publisher.Send(eventSourced.Id.ToString(), events.Length); } }
- 因此,在发生故障的情况下,系统必须包含一种机制,用于扫描表存储中的所有分区,查找具有未发布事件的聚合根,然后发布这些事件。此过程需要一些时间来运行,但仅在应用程序重新启动时需要运行。
EventPublishProcessor
组件被注册,以便在工作角色启动时为此目的运行。它依赖于IEventStoreBusPublisher
。EventPublishProcessor
在BootStrapper
中注册为IProcessor
。namespace InventoryManager.Worker.Main { public class Bootstrapper { //.. Code private static void RegisterRepository<T>(CloudStorageAccount storageAccount, string eventStoreTableName) where T: class, IEventSourced { // .. Code IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>(name: typeof(T).Name + "_EventPublisherProcessor"); } } }
InventoryManagerProcessor
- 在工作角色中,
InventoryManagerProcessor
检索通过IoC
注册的所有IProcessor
,然后启动它们。class InventoryManagerProcessor : IDisposable { private readonly CancellationTokenSource _cancellationTokenSource; private readonly List<IProcessor> _processors; public InventoryManagerProcessor() { _cancellationTokenSource = new CancellationTokenSource(); _processors = IoC.ResolveAll<IProcessor>().ToList(); } public void Start() { _processors.ForEach(p => p.Start()); } public void Stop() { _cancellationTokenSource.Cancel(); _processors.ForEach(p => p.Stop()); } // .. Code }
- 在
workerRole
启动时调用InventoryManagerProcessor
。public class WorkerRole : RoleEntryPoint { private async Task RunAsync(CancellationToken cancellationToken) { // .. Code using (var processor = new InventoryManagerProcessor()) { processor.Start(); while (!cancellationToken.IsCancellationRequested) { Thread.Sleep(10000); } processor.Stop(); // cause the process to recycle return; } } }
注册 EventSourcedRepository
EventSourced
存储库有几个依赖项。- 因此,注册存储库的方法已按如下方式模板化:
namespace InventoryManager.Worker.Main { public class Bootstrapper { //.. Code private static void RegisterRepository<T>(CloudStorageAccount storageAccount, string eventStoreTableName) where T: class, IEventSourced { // TODO: See if we can work with registerType (non-singletons) for all of the following var eventStore = new EventStore<T>(storageAccount, eventStoreTableName); IoC.RegisterInstance<IEventStore<T>>(eventStore); IoC.RegisterInstance<IPendingEventsQueue<T>>(eventStore); IoC.RegisterAsSingleton<IEventStoreBusPublisher<T>, EventStoreBusPublisher<T>>(); IoC.RegisterAsSingleton<IEventSourcedRepository<T>, AzureEventSourcedRepository<T>>(); IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>> (name: typeof(T).Name + "_EventPublisherProcessor"); } } }
- 这简化了为将来的聚合根创建更多存储库。
注册命令和事件的处理程序
- 这与示例应用程序类似,在工作角色的引导过程中完成。
namespace InventoryManager.Worker.Main { public class Bootstrapper { //.. Code private static void RegisterServiceBus() { var bus = new MassTransitServiceBus( x => new MassTransitWithAzureServiceBusConfigurator (ConfigurationManager.AppSettings.Get("azure-namespace"), "InventoryManager.WriteSide", ConfigurationManager.AppSettings.Get("azure-key"), x) .WithHandler<CreateInventoryItem, InventoryItemAppService>() .WithHandler<InventoryItemCreated, InventoryViewModelGenerator>()); ; IoC.RegisterInstance<IServiceBus>(bus); } } }
- 注意
ViewModelGenerator
如何订阅“InventoryItemCreated
”事件。ViewModelGenerator
是工作角色的组成部分,负责根据生成的事件更新ReadModel
。
系列中的下一篇文章
本系列的下一篇文章将重点介绍库存管理器应用程序中的读取端。
有关本系列文章的完整列表,请转到本文的“介绍”部分。
感谢阅读本文,希望它们能提供深刻的见解。
参考文献
- Greg Young 的“超级简单 cqrs”示例
- Greg Young 关于 CQRS 的视频(约 6 小时)