65.9K
CodeProject 正在变化。 阅读更多。
Home

CQRS 与解耦消息传递 - 第四部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.33/5 (4投票s)

2016年3月4日

CPOL

4分钟阅读

viewsIcon

18826

本系列文章的第四篇,展示了 CQRS 架构的实际应用,重点在于将消息传递作为基础设施组件进行解耦。

引言

在本篇文章中,我们将研究库存管理器应用程序,重点关注聚合根(Aggregates)和事件溯源(Event Sourcing)的实现。库存管理器用例基于 Greg Young 的“超级简单的 CQRS”示例。在库存管理器中,仅实现了创建库存项目的第一个用例。

库存管理器应用程序的代码位于 GitHub 存储库中 - 在此处可以找到。

请注意,这篇博文是系列文章的一部分,系列文章的链接如下:

  1. 引言
  2. 企业服务总线框架的需求以及带有示例的解耦消息传递
  3. 库存管理器 - CQRS 应用程序(带有解耦消息传递)- 命令
  4. 库存管理器 - CQRS 应用程序(带解耦消息传递)- 聚合根和事件溯源(本文)
  5. 库存管理器 - CQRS 应用程序(带解耦的消息传递) - 读取端

库存管理器 – 聚合根和事件溯源

  • 正如 Greg Young 在视频中所述,聚合根和读取模型 DTOs 具有结构表示,并且它们仅捕获系统的最终状态。然而,真相的来源是事件。
  • 聚合根公开行为供用例消费。此行为会触发事件,这些事件是真相的来源。因此,我们持久化这些事件(行为模型),并通过为这些事件提供处理程序来从事件中派生状态(结构模型)。
  • 因此,聚合根具有其事件的处理程序。
    public class InventoryItem : EventSourced
    {
        private string _name = string.Empty;//Rename functionality will use this field
    
        protected InventoryItem(Guid id)
            : base(id)
        {
            Handles<InventoryItemCreated>(OnInventoryItemCreated); // A. Register handler for event
        }
    
        private void OnInventoryItemCreated(InventoryItemCreated e)
        {
            this._name = e.Name; // C. State is updated in the handler for the event.
            // State is not persisted for the aggregate, 
            // it is the events (source of truth) that get persisted.
        }
    
        public InventoryItem(Guid id, string name):this(id)
        {
            Update(new InventoryItemCreated(id, name)); // B. Some behavior fires off the event
        }
    }

Azure 事件溯源存储库

确保在保存时发布事件

  • EventStore 将事件存储在 Azure 表存储中。
  • AzureEventSourcedRepository 中,我们需要确保在保存事件时,也能发布这些事件。
    public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> 
                     where T : class, IEventSourced
    {
        // .. Code
        
        public void Save(T eventSourced, string correlationId)
            {
                // TODO: guarantee that only incremental versions of the event are stored
                var events = eventSourced.Events.ToArray();
                var serialized = events.Select
    		(e => _versionedEventSerializer.Serialize(e, typeof (T), correlationId));
    
                _eventStore.Save(eventSourced.Id.ToString(), serialized);
    
                _publisher.Send(eventSourced.Id.ToString(), events.Length);
            }
    }
  • 然而,不可能在 Azure 表和 Azure 服务总线之间进行分布式事务。Azure 表存储仅支持针对唯一分区键的事务。我们将聚合根的标识存储在该分区键列中。因此,为了确保事件被发布,我们在事件存储中保存事件的 2 份副本。
  • 一份副本是该事件的永久记录,另一份副本成为必须在 Windows Azure 服务总线上发布的事件的虚拟队列的一部分。下面的代码示例显示了 EventStore 类中的 Save 方法。“Unpublished”前缀标识了作为未发布事件虚拟队列一部分的事件副本。
    public class EventStore<T> : IEventStore<T>, IPendingEventsQueue<T>
    {
        // .. Code
        public void Save(string sourceId, IEnumerable<EventData> events)
        {
            var table = _tableClient.GetTableReference(_tableName);
            var tableBatchOperation = new TableBatchOperation();
            foreach (var eventData in events)
            {
                if (eventData.SourceId != sourceId)
                    throw new Exception("Events from different aggregate instances found during EventStore 
    				save. Events from only single Aggregate instance can be saved.");
    
                var creationDate = DateTime.UtcNow;
    
                tableBatchOperation.Insert(eventData.ToAzureTableEntry(creationDate));
    
                // Add a duplicate of this event to the Unpublished "queue"
                tableBatchOperation.Insert(eventData.ToUnpublishedAzureTableEntry(creationDate));
            }
    
            try
            {
                table.ExecuteBatch(tableBatchOperation);
            }
            catch (DataServiceRequestException ex)
            {
                var inner = ex.InnerException as DataServiceClientException;
                if (inner != null && inner.StatusCode == (int)HttpStatusCode.Conflict)
                {
                    throw new ConcurrencyException();
                }
    
                throw;
            }
        }
    }
  • 这两条记录是针对同一聚合根的,因此具有相同的分区键。因此,它们将在单个事务中保存。
  • 因此,EventStore 类同时实现了 IEventStoreIPendingEventsQueue 接口。
  • AzureEventSourced Repository 调用 EventStore 的保存方法,然后调用 publisher 来发布聚合根实例的事件。
  • 发布者读取指定聚合根的未发布事件记录,并通过 IServicebus 发布它们。发布后,发布者删除未发布事件的记录。
    public class EventStoreBusPublisher<T> : IEventStoreBusPublisher<T>, IDisposable
    {
        // .. Code
        private void SendAndDeletePending(EventData eventData)
        {
            var ev = new VersionedEventSerializer().Deserialize(eventData);
            _sender.Publish(ev);
            _queue.DeletePending(eventData);
        }
    }

EventPublishProcessor

  • AzureEventSourcedRepository 的保存操作中,有可能在保存事件的 2 份副本后但在发布事件之前发生崩溃。由于这两个语句没有包含在任何(分布式)事务中,如果发生此类崩溃,系统将进入不一致状态。
    public class AzureEventSourcedRepository<T> : IEventSourcedRepository<T> where T : class, IEventSourced
    {
        public void Save(T eventSourced, string correlationId)
        {
            // TODO: guarantee that only incremental versions of the event are stored
            var events = eventSourced.Events.ToArray();
            var serialized = events.Select(e => _versionedEventSerializer.Serialize
    						(e, typeof (T), correlationId));
    
            _eventStore.Save(eventSourced.Id.ToString(), serialized);
            // ...IF CRASH HAPPENS HERE, 
            // SYSTEM WILL END UP WITH EVENTS THAT ARE SAVED BUT NEVER PUBLISHED
            _publisher.Send(eventSourced.Id.ToString(), events.Length);
        }
    }
  • 因此,在发生故障的情况下,系统必须包含一种机制,用于扫描表存储中的所有分区,查找具有未发布事件的聚合根,然后发布这些事件。此过程需要一些时间来运行,但仅在应用程序重新启动时需要运行。
  • EventPublishProcessor 组件被注册,以便在工作角色启动时为此目的运行。它依赖于 IEventStoreBusPublisher
  • EventPublishProcessorBootStrapper 中注册为 IProcessor
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterRepository<T>(CloudStorageAccount storageAccount, 
    			string eventStoreTableName) where T: class, IEventSourced  
            {
                // .. Code
                IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>(name: typeof(T).Name + 
    				"_EventPublisherProcessor");
            }
        }
    }

InventoryManagerProcessor

  • 在工作角色中,InventoryManagerProcessor 检索通过 IoC 注册的所有 IProcessor,然后启动它们。
    class InventoryManagerProcessor : IDisposable
    {
        private readonly CancellationTokenSource _cancellationTokenSource;
        private readonly List<IProcessor> _processors;
    
        public InventoryManagerProcessor()
        {
            _cancellationTokenSource = new CancellationTokenSource();
            _processors = IoC.ResolveAll<IProcessor>().ToList();
        }
    
        public void Start()
        {
            _processors.ForEach(p => p.Start());
        }
    
        public void Stop()
        {
            _cancellationTokenSource.Cancel();
            _processors.ForEach(p => p.Stop());
        }
    
        // .. Code
    }
  • workerRole 启动时调用 InventoryManagerProcessor
    public class WorkerRole : RoleEntryPoint
    {
        private async Task RunAsync(CancellationToken cancellationToken)
        {
            // .. Code
            using (var processor = new InventoryManagerProcessor())
            {
                processor.Start();
    
                while (!cancellationToken.IsCancellationRequested)
                {   
                    Thread.Sleep(10000);
                }
    
                processor.Stop();
    
                // cause the process to recycle
                return;
            }
        }
    }

注册 EventSourcedRepository

  • EventSourced 存储库有几个依赖项。
  • 因此,注册存储库的方法已按如下方式模板化:
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterRepository<T>(CloudStorageAccount storageAccount, 
    			string eventStoreTableName) where T: class, IEventSourced  
            {
                // TODO: See if we can work with registerType (non-singletons) for all of the following
                var eventStore = new EventStore<T>(storageAccount, eventStoreTableName);
                IoC.RegisterInstance<IEventStore<T>>(eventStore);
                IoC.RegisterInstance<IPendingEventsQueue<T>>(eventStore);
                IoC.RegisterAsSingleton<IEventStoreBusPublisher<T>, EventStoreBusPublisher<T>>();
                IoC.RegisterAsSingleton<IEventSourcedRepository<T>, AzureEventSourcedRepository<T>>();
                IoC.RegisterAsSingleton<IProcessor, EventPublishProcessor<T>>
    				(name: typeof(T).Name + "_EventPublisherProcessor");
            }
        }
    }
  • 这简化了为将来的聚合根创建更多存储库。

注册命令和事件的处理程序

  • 这与示例应用程序类似,在工作角色的引导过程中完成。
    namespace InventoryManager.Worker.Main
    {
        public class Bootstrapper
        {
            //.. Code
    
            private static void RegisterServiceBus()
            {
                var bus = new MassTransitServiceBus(
                    x => new MassTransitWithAzureServiceBusConfigurator
    				(ConfigurationManager.AppSettings.Get("azure-namespace"),
                                     "InventoryManager.WriteSide",
                                     ConfigurationManager.AppSettings.Get("azure-key"), x)
                                     .WithHandler<CreateInventoryItem, InventoryItemAppService>()
                                     .WithHandler<InventoryItemCreated, InventoryViewModelGenerator>());
                ;
                IoC.RegisterInstance<IServiceBus>(bus);
            }
        }
    }
  • 注意 ViewModelGenerator 如何订阅“InventoryItemCreated”事件。ViewModelGenerator 是工作角色的组成部分,负责根据生成的事件更新 ReadModel

系列中的下一篇文章

本系列的下一篇文章将重点介绍库存管理器应用程序中的读取端。

有关本系列文章的完整列表,请转到本文的“介绍”部分。
感谢阅读本文,希望它们能提供深刻的见解。

参考文献

  1. Greg Young 的“超级简单 cqrs”示例
  2. Greg Young 关于 CQRS 的视频(约 6 小时)
© . All rights reserved.