快速轻量级的CQRS和事件溯源解决方案






4.98/5 (40投票s)
介绍使用C#实现的CQRS+ES模式的快速轻量级实现。
引言
如果您正在阅读本文,那么您可能已经对命令查询职责分离(CQRS)和事件溯源(ES)有所了解。网上有一些非常好的资料,如果您正在寻找背景信息。例如:
- CQRS实战
- Martin Fowler 的 CQRS
- 事件溯源模式 (Microservices.io) 或 事件溯源模式 (Microsoft.com)
- 事件溯源:优点、缺点和丑陋
- 关于事件溯源,他们没有告诉你的事
在我看来,CQRS+ES的关键优势在于:
- 数据永远不会被修改或删除。
- 系统的更改/审计日志具有完美的完整性。
- 它支持回滚和其他基于时间的特性,例如对先前状态进行分析。
- 它可以提高应用程序的性能和可伸缩性——至少,理论上如此!
- 它还可以(理论上)提高代码质量和可测试性。
在本文中,我不会比这更详细地解释CQRS+ES背后的原理,也不会深入探讨您可能想要使用它(或避免它)的原因。
本文的目的是介绍使用C#编程语言和.NET Framework实现快速轻量级CQRS+ES模式。
这个实现非常简单,但仍然相对功能齐全,包括对SQL Server命令和事件持久化、计划命令、快照、Saga(即流程管理器)的支持,以及用于多租户定制的即插即用覆盖。
我将描述代码的结构,并通过遵循干净架构原则的示例程序来说明其工作原理。
请注意:我不期望(也不建议)您直接将此源代码集成到您开发的任何软件系统中。这个“实现”只是一个原型,旨在说明理论模式的一种潜在应用;它并非旨在作为可重用库或成品——并且代码不适合集成到任何实时生产系统中。
何必费心?
这几乎肯定是您首先会提出的问题。
如果您正在研究选项并评估替代方案,以便为CQRS+ES解决方案做出购买还是自建的决策,那么有一些现有的商业和开源产品可供考虑。例如:
既然有这些选项,为什么还要自己实现呢?为什么从头开始开发自己的解决方案?
我已经研究CQRS和ES模式多年了。我使用过一些商业和开源解决方案,并用它们构建和/或改进了实际的生产系统。如果您的背景和经验与我相似,那么您可能已经知道这个问题的答案了,但您可能不愿相信这是真的(就像我很久以来都不愿相信一样)。
如果您认真考虑采用CQRS+ES架构,那么您可能别无选择,只能自己构建。
正如Chris Kiehl在他的文章“事件溯源很难”中所说:
...您很可能会从头开始构建核心组件。该领域的框架往往很笨重,过于刻板,并且在技术栈方面缺乏灵活性。如果您想让某些东西启动并运行……那么自己动手才是明智之举(以及建议的方法)。
如果是这样,那么我写这篇文章对您有什么帮助呢?
很简单:这是另一个带源码的示例,因此您可以看到我解决CQRS+ES实现中出现的一些问题的思路。如果您正在开始您的第一个CQRS+ES项目,那么您应该尽可能多地研究您能找到的所有示例。
我的目的是提供这样一个示例,从中您可以为自己的项目获取想法——也许(如果我做得足够好)还能获得一些灵感。
优先级
重要的是首先列出驱动此实现的优先级,因为我做出的许多设计决策都存在显著的权衡。
CQRS+ES的纯粹主义者会反对我的一些决定,并直接谴责其他决定。我能接受。我设计和开发软件已经很长时间了(比我在这里愿意承认的还要长)。我付出了不少血汗和泪水——也掉了一些白头发——所以我非常清楚在权衡面前,糟糕选择的代价。
以下优先级有助于指导和影响这些决策。它们按重要性大致排序,但所有都是要求,所以请给自己倒杯酒,坐好,因为这里的序言会很长……
1. 可读性
代码必须可读。
代码越可读,它的可用性、可维护性和可扩展性就越好。
在我使用过的一些实现中(以及我自己开发的一些实现中),底层CQRS+ES骨干的代码几乎不可能被除了原作者以外的任何人理解。我们在这里不能允许这种情况发生。一个小型开发团队必须能够——并且相对容易地——共享和使用这些代码,并完全理解它是如何工作的以及为何要这样编写。
保持注册命令处理程序和事件处理程序的代码尽可能简单和明确尤其重要。
许多CQRS+ES框架使用反射和依赖注入的组合来自动化处理命令和事件的订阅者注册。虽然这通常非常巧妙,并且通常能减少项目中的总代码行数,但它隐藏了命令(或事件)与其订阅者之间的关系,将这些关系转移到一个不透明的、神奇的黑盒中。许多控制反转(IoC)容器使这很容易实现,因此可以理解其诱惑,但我认为这是一个错误。
需要明确的是:在项目中不使用IoC容器并不是错误。依赖注入是一项优秀的最佳实践,也是一项需要完善的重要技术。然而,依赖注入模式本身并不是发布-订阅模式,将两者混淆可能导致许多痛苦和不幸。在IoC容器库中使用高度专业的特性来自动化那些超出该库预期用途范围的功能,然后将软件架构中最关键的组件与之紧密耦合,这是一个错误(我自己犯过的)。当您的应用程序出现意外行为时,这可能使故障排除和调试变得极其困难和耗时。
因此,作为此可读性目标的一部分,命令处理程序和事件处理程序的注册必须在代码中显式定义,而不是隐式通过约定或自动化。
2. 性能
代码必须快速。
处理命令和事件是任何基于CQRS+ES架构开发的系统的核心,因此吞吐量优化是关键的性能指标。
该实现必须能够处理最大可能的吞吐量,包括并发用户和发出命令并观察已发布事件效果的系统。
在我之前的某些实现中,由于并发违规(当命令发送到大型聚合时,例如具有大量事件流的长期聚合),导致了很多痛苦和挣扎。大多数情况下,根本原因是代码性能不佳。因此,算法优化至关重要。
快照对于满足此要求至关重要,因此必须是解决方案的组成部分。该实现必须内置支持对每个聚合根的自动快照。
内存缓存是运行时优化的另一个重要组成部分,因此也必须是解决方案的组成部分。
3. 可调试性
使用标准的调试器(如Visual Studio IDE调试器)能够轻松跟踪代码并遵循其执行流程。
许多CQRS+ES实现似乎依赖于复杂的算法来进行动态注册、查找和调用方法来处理命令和事件。
同样,其中许多算法都非常巧妙:它们具有强大的功能和灵活性,并且可以显著减少解决方案中的代码行数。
例如,我在过去的一些项目中使用了DynamicInvoker类。这是一个巧妙的代码片段——不到150行——而且效果很好。(我不是作者,所以说它好不是在自夸。)然而,如果您的代码中调用此类代码的方法时出现问题,并且您需要通过调试器进行单步调试,那么您将需要特别擅长进行必要的脑力体操来理解发生了什么。我不是,所以如果使用了任何动态调用,那么它必须极其容易理解代码并通过调试器跟踪其执行线程。
4. 最小依赖
外部依赖项必须保持在绝对最少的状态。
过多的依赖会导致代码比您希望的任何关键系统组件都要慢、笨重且脆弱。最小化依赖有助于确保您的代码更快、更轻、更健壮。
最重要的是,最小化依赖有助于确保解决方案不与任何外部程序集、服务或组件紧密耦合,除非该依赖项至关重要。
如果您的软件的基本架构依赖于某个外部第三方组件,那么您必须准备好应对该组件的更改将来可能对您的项目产生影响。有时这是可接受的风险,有时则不然。
在此特定实现中,对这种风险的容忍度非常非常低。
因此,您会注意到核心的Timeline程序集(实现了我解决方案中的CQRS+ES骨干)只有一个外部依赖项:即.NET Framework中的System
命名空间。
这里只是快速提一下,因为它是一篇说明我观点的有趣文章:在2018年这篇文章发表时,NPM JavaScript包“is-odd”在一周内就有超过280万个安装。这些开发人员没有编写一个简单的函数来返回一个数字是否为奇数,而是选择将is-odd包及其300多个依赖项链集成到他们的解决方案中!
5. 分离命令和事件
许多CQRS+ES框架实现了一个Command
类和一个Event
类,以便两者都继承自一个公共基类。
这样做的理由很明显:将命令和事件都视为通用消息的子类型是很自然的。两者都使用某种形式的“服务总线”进行“发送”,那么为什么不实现共享基类中的通用功能,并编写一个双用途类来路由消息——而不是编写大量重复代码呢?
这是我过去采用的方法,并且有充分的理由支持它。
然而,我现在认为这可能是一个错误。引用Robert C. Martin的话:
软件开发人员常常陷入一个陷阱——这个陷阱在于他们害怕重复。重复在软件中通常是一件坏事。但重复有好几种。有真正的重复,即一个实例的每次更改都必然导致其所有副本的相同更改。然后是虚假的或偶然的重复。如果两段看似重复的代码沿着不同的路径演变——如果它们以不同的速率和不同的原因变化——那么它们就不是真正的重复……当你将用例垂直分离时,你将遇到这个问题,并且你会有将用例耦合在一起的诱惑,因为它们有相似的用户界面、相似的算法或相似的数据库模式。要小心。抵制消除重复的诱惑,以免犯下“条件反射式消除重复”的罪过。确保重复是真实的。
命令和事件的差异足够大,足以使它们各自拥有独立演进和适应系统需求的路径。
我(至今)尚未遇到任何情况,可以通过消除 A)发送/处理命令和 B)发布/处理事件的“重复”代码来提高代码质量、性能或可读性。
因此,命令和事件不得具有任何共享基类,用于发送/发布命令/事件的机制不得是共享队列。
6. 多租户
多租户必须是解决方案的组成部分,而不是事后添加的功能或设施。
如今,我专门构建和维护企业级多租户系统。这意味着我有一个应用程序的单个实例,为多个并发租户和多个并发用户提供服务。
使多租户成为此实现优先事项有几个原因:
- 每个聚合都必须分配给一个租户。这使得数据的所有权清晰明确。
- 当需要扩展时,分片必须易于实现。分片是将聚合分布到多个写节点上,而“租户”是划分聚合最自然的边界。
- 租户特定的自定义必须易于实现。每个应用程序对每个命令和每个事件都有核心的默认行为,但在服务于许多不同组织和/或利益相关者的大型复杂应用程序中,不同的租户肯定会有各种特定的需求。有时差异很小;有时差异很大。这里的解决方案必须允许开发人员用特定租户的自定义功能来覆盖命令和/或事件的默认处理。覆盖必须是明确的,以便于识别和启用或禁用。
7. Saga / 流程管理器
实现流程管理器所需的步骤必须相对较少,并且流程管理器的代码必须相对容易编写。
流程管理器(有时称为Saga)是一个独立的组件,它以跨聚合、最终一致性的方式响应领域事件。流程管理器有时是纯粹响应式的,有时代表一个工作流。
从技术角度来看,流程管理器是由传入事件驱动的状态机,这些事件可能来自多个聚合。每个状态都可以有副作用(例如,发送命令、与外部Web服务通信、发送电子邮件)。
我曾使用过一些完全不支持流程管理器的CQRS+ES框架,也使用过一些支持该概念但不易于理解或配置的框架。
例如,在我过去的一个实现中,事件在事件附加到数据库日志后立即由事件存储发布。它不是由聚合或命令处理程序发布的。这使得即使是最基本的工作流也难以实现:我无法在事件处理程序中向聚合发送同步命令,因为事件存储的
Save
方法在同步锁内执行(以保持线程安全),而发布新事件而不创建死锁。
无论工作流的状态机多么简单或复杂,协调该过程中的事件都需要具有副作用的代码,例如向其他聚合发送命令、向外部Web服务发送请求或发送电子邮件。因此,这里的解决方案必须具有原生的、内置的支持来实现这一点。
8. 计划
命令的计划必须是解决方案的组成部分。
发送带有计时器的命令必须很简单,这样命令才能在计时器到期后执行。这使开发人员能够为任何命令的执行指定特定的日期和时间。
这对于必须按时间依赖性触发的命令很有用。
它也对必须“离线”执行的命令有用,即在正常执行流程之外的后台进程中。这种完全异步的操作非常适合一个预计需要很长时间才能完成的命令。
例如,假设您有一个命令需要调用某个外部第三方Web服务的某个方法,并且该服务通常需要超过800,000毫秒才能响应。这样的命令必须安排在非高峰时段执行,和/或在正常执行线程之外执行。
9. 聚合过期
该解决方案必须具有聚合过期和清理的原生、内置支持。
我需要一个CQRS+ES解决方案,可以轻松地将聚合事件流从在线结构化日志复制到离线存储,并将其从事件存储中清除。
事件溯源的纯粹主义者会立即标记这一点,并说聚合的事件流绝不能被更改或删除。他们会说,事件(以及因此的聚合)根据定义是不可变的。
然而,我遇到过一些情况,这是不可协商的业务需求。
-
第一:当客户不再续订多租户应用程序的订阅时,托管该应用程序的服务提供商通常有合同义务将其客户数据从其系统中删除。
-
第二:当项目团队频繁运行集成测试以确认系统功能正常运行时,输入和输出这些测试的数据定义上就是临时的。永久存储测试聚合的事件流是浪费磁盘空间,没有当前或未来的业务价值;我们需要一种清除机制。
因此,这里的解决方案必须提供一种简便的方法,将聚合从操作系统中移出,进入所谓的“冷存储”。
10. Async/Await 是邪恶的
当然,我是在开玩笑。
但也不是完全开玩笑。
C# 中的async/await模式会产生非常高性能的代码。这一点毫无疑问。在某些情况下,我曾见过它将性能提升了一个数量级或更多。
async/await模式可以在此解决方案的未来迭代中使用,但——尽管它是此列表中的第二个优先级——但在此解决方案中不允许使用,因为它破坏了第一个优先级。
一旦您将async
/await
引入到一个方法中,您就必须转换它的调用者,使它们使用async
/await
(或者您被迫开始将干净的代码包装在脏线程块中),然后您必须转换这些调用者的调用者,使它们使用async
/await
……async
/await
关键字就像一种传染性的僵尸病毒一样在您的整个代码库中传播。由此产生的异步混乱几乎肯定会更快,但同时更难阅读,并且更难调试。
可读性是这里的最高优先级,因此我将避免使用async
/await
,直到它是提高性能的唯一选择(而这种额外提升本身就是一项不可协商的业务要求)。
干净架构
Matthew Renze有一个关于干净架构主题的出色Pluralsight课程。此解决方案的源代码包含五个程序集,并且遵循他提倡的干净架构模式。这对于一个示例应用程序来说显然是过度的,但它有助于建立大型企业级实现需要遵循的模式。
Timeline项目
Timeline程序集实现了CQRS+ES骨干。该程序集没有上游依赖项,因此不针对任何特定应用程序。它可以从示例应用程序中拔出并集成到新解决方案中以开发完全不同的应用程序。
其他四个程序集(Sample.*)使用Timeline程序集实现了控制台应用程序中的各个层,以演示我对CQRS+ES软件系统中常见编程任务的处理方法。
项目依赖关系图在此处说明
示例项目
请注意,Timeline程序集没有任何对任何Sample程序集的引用。
另请注意以领域为中心的方法:领域层不依赖于表示层、应用程序层或持久层。
示例域的实体关系图在此处说明(图2)
在此基本数据模型中:
- 一个
Person
拥有0..N个银行Account
; - 一个
Transfer
从一个账户提取资金并存入另一个账户; - 一个
User
可能是一个没有个人数据的管理员,或者是一个拥有多个租户所有个人数据的用户。
请记住:每个Person
、Account
和Transfer
都是聚合根,因此这些实体中的每一个都具有Tenant
属性。
概述
此解决方案中CQRS+ES的整体方法在此处说明(图3)
请注意,写侧(命令)和读侧(查询)已得到很好的区分。
您还可以看到,事件溯源非常像写侧的一个插件。虽然在此解决方案中未演示,但您可以想象一个*没有*事件溯源的CQRS解决方案可能是什么样子,有时(单独的CQRS)是一种更好的模式,具体取决于您项目的需求。
以下是该架构的关键特征:
- 命令队列将命令(调度所需)保存在结构化日志中。
- 命令订阅者监听命令队列上的命令。
- 命令订阅者负责创建聚合并在执行命令时调用聚合上的方法。
- 命令订阅者将聚合(作为事件流)保存在结构化日志中。
- 命令订阅者将事件发布到事件队列。
- 发布的事件由事件订阅者和流程管理器处理。
- 流程管理器可以向命令队列发送命令,以响应事件。
- 事件订阅者在查询存储中创建和更新投影。
- 查询搜索是一个轻量级的数据访问层,用于读取投影。
入门
在编译和执行源码之前:
- 执行脚本“
Create Database.sql
”以创建本地SQL Server数据库。 - 更新Web.config中的连接字符串。
- 更新Web.config中
OfflineStoragePath
的appSetting
值。
用法
我不会从底层开始描述Timeline程序集的工作原理,而是从顶层开始,演示如何使用它,然后向下穿过应用程序堆栈,直至CQRS+ES骨干的细节。
如果我到目前为止还保持了您的注意力,那么我欠您一些回报,感谢您一直坚持到现在……
场景 A:如何创建和更新联系人
这是最简单的用法。
这里我们创建一个新的联系人,然后执行一次更名操作,模拟Alice
结婚的用例。
public static void Run(ICommandQueue commander)
{
var alice = Guid.NewGuid();
commander.Send(new RegisterPerson(alice, "Alice", "O'Wonderland"));
commander.Send(new RenamePerson(alice, "Alice", "Cooper"));
}
在此运行之后,读侧投影看起来很好,正如预期。
数据流
本场景中系统执行的步骤在下图说明:
场景 B:如何创建聚合快照
快照由Timeline程序集自动完成;默认情况下,它们为所有聚合启用,因此您无需做任何事情即可使其正常工作。
在下一次测试运行中,Timeline程序集配置为每10个事件后创建一个快照。我们注册一个新联系人,然后重命名他20次。这会在事件号20时产生一个快照,即倒数第二次重命名操作。
public static void Run(ICommandQueue commander)
{
var henry = Guid.NewGuid();
commander.Send(new RegisterPerson(henry, "King", "Henry I"));
for (int i = 1; i <= 20; i++)
commander.Send(new RenamePerson(henry, "King", "Henry " + (i+1).ToRoman()));
}
正如预期的那样,我们在版本20时有一个快照,并且在事件号21
之后的当前状态投影。
场景 C:如何将聚合下线
在我的解决方案中,术语“装箱”和“拆箱”用于将聚合下线和上线。
当您发送一个命令来装箱一个聚合时,Timeline程序集会:
- 创建一个快照;然后
- 将该快照和整个聚合事件流复制到一个存储在文件系统目录中的JSON文件中;然后
- 删除SQL Server结构化日志表中的快照和聚合。
这当然是一个具有破坏性的操作,除非是强制性的业务/法律要求,否则绝不应使用。
在下一次测试运行中,我们注册一个新联系人,重命名他7次,然后装箱该聚合。
public static void Run(ICommandQueue commander)
{
var hatter = Guid.NewGuid();
commander.Send(new RegisterPerson(hatter, "Mad", "Hatter One"));
for (int i = 2; i <= 8; i++)
commander.Send(new RenamePerson(hatter, "Mad", "Hatter " + i.ToWords().Titleize()));
commander.Send(new BoxPerson(hatter));
}
正如您所见,该聚合不再存在于事件存储中,并且最终快照(连同整个事件流)的离线副本已在文件系统上创建。
场景 D:如何创建一个具有唯一登录名的用户
开发人员在尝试理解CQRS+ES时在线上遇到的最常见问题是:
“我如何强制实施引用完整性以保证新用户具有唯一的登录名?”
在我早期研究CQRS+ES模式时,我(不止一次)问过自己这个问题。
许多经验丰富的实践者的答案是这样的:
“您的问题表明您不理解CQRS+ES。”
这是真的(我现在意识到),但完全没有帮助,特别是对于那些努力学习的人。
一些答案稍微好一些,提供了一个总结形式的高层建议,但充满了CQRS+ES术语,这并不总是很有帮助。我最喜欢的建议之一是(来自Edument的好人):
“创建一个响应式Saga来拦截并禁用那些仍然使用了重复用户名创建的账户,无论是因为极端的巧合、恶意还是由于客户端故障。”
我第一次读到这句话时,只有模糊的理解,完全不知道如何开始实现这样的建议。
下一次测试运行展示了一种(但不是唯一一种)方法来创建具有唯一名称的用户,并使用真实、可工作的代码作为示例。
在此场景中,诀窍在于认识到您确实需要一个Saga(或流程管理器,如我更倾向于称呼它)。创建新用户账户不是一个单步操作;它是一个过程,因此需要协调。流程图(或您喜欢的状态机)在您的应用程序中可能非常复杂,但即使在最简单的可能情况下,它看起来也像这样:
依赖流程管理器实现此功能的代码如图所示:
public void Run()
{
var login = "jack@example.com";
var password = "Let_Me_In!";
if (RegisterUser(Guid.NewGuid(), login, password)) // succeeds.
System.Console.WriteLine($"User registration for {login} succeeded");
if (!RegisterUser(Guid.NewGuid(), login, password)) // fails; duplicate login.
System.Console.WriteLine($"User registration for {login} failed");
}
private bool RegisterUser(Guid id, string login, string password)
{
bool isComplete(Guid user) { return _querySearch.IsUserRegistrationCompleted(user); }
const int waitTime = 200; // ms
const int maximumRetries = 15; // 15 retries (~3 seconds)
_commander.Send(new StartUserRegistration(id, login, password));
for (var retry = 0; retry < maximumRetries && !isComplete(id); retry++)
Thread.Sleep(waitTime);
if (isComplete(id))
{
var summary = _querySearch.SelectUserSummary(id);
return summary?.UserRegistrationStatus == "Succeeded";
}
else
{
var error = $"Registration for {login} has not completed after
{waitTime * maximumRetries} ms";
throw new IncompleteUserRegistrationException(error);
}
}
请注意,上面的示例中的调用者不假定同步处理StartUserRegistration
命令。相反,它轮询注册状态,等待其完成。
知道Timeline程序集中的代码是同步的,我们可以重构RegisterUser
方法使其更简单。
private bool RegisterUserNoWait(Guid id, string login, string password)
{
bool isComplete(Guid user) { return _querySearch.IsUserRegistrationCompleted(user); }
_commander.Send(new StartUserRegistration(id, login, password));
Debug.Assert(isComplete(id));
return _querySearch.SelectUserSummary(id).UserRegistrationStatus == "Succeeded";
}
流程管理器本身的代码比您想象的要简单:
public class UserRegistrationProcessManager
{
private readonly ICommandQueue _commander;
private readonly IQuerySearch _querySearch;
public UserRegistrationProcessManager
(ICommandQueue commander, IEventQueue publisher, IQuerySearch querySearch)
{
_commander = commander;
_querySearch = querySearch;
publisher.Subscribe<UserRegistrationStarted>(Handle);
publisher.Subscribe<UserRegistrationSucceeded>(Handle);
publisher.Subscribe<UserRegistrationFailed>(Handle);
}
public void Handle(UserRegistrationStarted e)
{
// Registration succeeds only if no other user has the same login name.
var status = _querySearch
.UserExists(u => u.LoginName == e.Name
&& u.UserIdentifier != e.AggregateIdentifier)
? "Failed" : "Succeeded";
_commander.Send(new CompleteUserRegistration(e.AggregateIdentifier, status));
}
public void Handle(UserRegistrationSucceeded e) { }
public void Handle(UserRegistrationFailed e) { }
}
您看到了一个基本的、响应式的Saga,它会禁用因重复用户名创建的账户。然后大家都很高兴。
正如预期的那样,第一次注册成功,第二次失败。
场景 E:如何调度命令
调度命令在未来的日期/时间运行很简单。
public static void Run(ICommandQueue commander)
{
var alice = Guid.NewGuid();
var tomorrow = DateTimeOffset.UtcNow.AddDays(1);
commander.Schedule(new RegisterPerson(alice, "Alice", "O'Wonderland"), tomorrow);
// After the above timer elapses, any call to Ping() executes the scheduled command.
// commander.Ping();
}
请注意,这不会在事件日志中创建任何聚合,并且命令日志现在包含一个计划条目。
场景 F:如何用一个命令更新多个聚合
这是CQRS+ES模式的开发人员在尝试理解如何实现时经常提出的另一个常见问题。这也是我在学习过程中(许多许多次)问过自己的一个问题。
实践者通常会回答:
“你不能。”
这并没有多少帮助。
有些人会提供更多指导,并给出类似这样的陈述:
“您的聚合和命令处理程序的划分将使得这种想法无法在代码中表达。”
第一次阅读这句话时,它看起来很神秘,最终您会发现它对于验证您的实现非常有帮助,但一开始它并没有太多启发性。
更有帮助的是一个包含真实、可工作的代码的示例,该示例实现了激发该问题的功能。
-
假设我有两个银行账户,每个账户都是一个聚合根,我想从一个账户转移资金到另一个账户。如何使用CQRS+ES实现这一点?
下一次测试运行展示了一种(但不是唯一一种)完成此任务的方法。
在此场景中,诀窍在于认识到您需要另一个聚合根——即,一个资金转移,它本身不是一个账户——您还需要一个流程管理器来协调工作流。
最简单的流程图在此图中说明。(一个会计系统显然需要比这更复杂的系统。)
一旦所有部件都就位,依赖流程管理器来实现上述工作流的代码就很简单了。
public void Run()
{
// Start one account with $100.
var bill = Guid.NewGuid();
CreatePerson(bill, "Bill", "Esquire");
var blue = Guid.NewGuid();
StartAccount(bill, blue, "Bill's Blue Account", 100);
// Start another account with $100.
var ted = Guid.NewGuid();
CreatePerson(ted, "Ted", "Logan");
var red = Guid.NewGuid();
StartAccount(ted, red, "Ted's Red Account", 100);
// Create a money transfer for Bill giving money to Ted.
var tx = Guid.NewGuid();
_commander.Send(new StartTransfer(tx, blue, red, 69));
}
private void StartAccount(Guid person, Guid account, string code, decimal deposit)
{
_commander.Send(new OpenAccount(account, person, code));
_commander.Send(new DepositMoney(account, deposit));
}
private void CreatePerson(Guid person, string first, string last)
{
_commander.Send(new RegisterPerson(person, first, last));
}
执行该测试后,Bill的蓝色账户余额为31美元,Ted的红色账户余额为169美元,正如预期。
资金转移流程管理器的代码也不是太难。
public class TransferProcessManager
{
private readonly ICommandQueue _commander;
private readonly IEventRepository _repository;
public TransferProcessManager
(ICommandQueue commander, IEventQueue publisher, IEventRepository repository)
{
_commander = commander;
_repository = repository;
publisher.Subscribe<TransferStarted>(Handle);
publisher.Subscribe<MoneyDeposited>(Handle);
publisher.Subscribe<MoneyWithdrawn>(Handle);
}
public void Handle(TransferStarted e)
{
var withdrawal = new WithdrawMoney(e.FromAccount, e.Amount, e.AggregateIdentifier);
_commander.Send(withdrawal);
}
public void Handle(MoneyWithdrawn e)
{
if (e.Transaction == Guid.Empty)
return;
var status = new UpdateTransfer(e.Transaction, "Debit Succeeded");
_commander.Send(status);
var transfer = (Transfer) _repository.Get<TransferAggregate>(e.Transaction).State;
var deposit = new DepositMoney(transfer.ToAccount, e.Amount, e.Transaction);
_commander.Send(deposit);
}
public void Handle(MoneyDeposited e)
{
if (e.Transaction == Guid.Empty)
return;
var status = new UpdateTransfer(e.Transaction, "Credit Succeeded");
_commander.Send(status);
var complete = new CompleteTransfer(e.Transaction);
_commander.Send(complete);
}
}
场景 G:如何实现自定义事件处理器
在下一个示例中,我演示了如何定义一个自定义事件处理器,该处理器仅供多租户系统中的一个租户使用。
在此场景中,Umbrella Corporation是我们的一个租户,该公司希望获得我们系统中所有现有的核心功能。但是,该公司还希望增加一项自定义功能:
- 当资金从Umbrella账户转出或转入Umbrella账户时,如果金额超过10,000美元,则必须直接向公司所有者发送电子邮件通知。
为了满足此要求,我们为该租户实现了一个流程管理器。依赖于此流程管理器的调用代码与上一场景没有区别。
public void Run()
{
// Start one account with $50,000.
var ada = Guid.NewGuid();
CreatePerson(ada, "Ada", "Wong");
var a = Guid.NewGuid();
StartAccount(ada, a, "Ada's Account", 50000);
// Start another account with $25,000.
var albert = Guid.NewGuid();
CreatePerson(albert, "Albert", "Wesker");
var b = Guid.NewGuid();
StartAccount(albert, b, "Albert's Account", 100);
// Create a money transfer for Ada giving money to Albert.
var tx = Guid.NewGuid();
_commander.Send(new StartTransfer(tx, a, b, 18000));
}
private void StartAccount(Guid person, Guid account, string code, decimal deposit)
{
_commander.Send(new OpenAccount(account, person, code));
_commander.Send(new DepositMoney(account, deposit));
}
private void CreatePerson(Guid person, string first, string last)
{
_commander.Send(new RegisterPerson(person, first, last));
}
这是Visual Studio调试器中的一个快照,查看流程管理器代码,并在发送电子邮件通知的那一行设置了断点。请注意弹窗中消息的正文符合我们的预期。
场景 H:如何覆盖具有自定义处理程序的命令
最后一个示例是前一个示例的一个变体。Umbrella Corporation希望完全禁用一项核心应用程序功能,并用完全自定义的行为取代它。新的业务要求如下:
- 不允许更改我们系统中联系人姓名。永远。
为了满足此要求,我们对流程管理器进行了几处简单的更改。我们在构造函数中添加了一行代码,指定覆盖,然后添加了替换功能。
public class UmbrellaProcessManager
{
private IQuerySearch _querySearch;
public UmbrellaProcessManager
(ICommandQueue commander, IEventQueue publisher, IQuerySearch querySearch)
{
_querySearch = querySearch;
publisher.Subscribe<TransferStarted>(Handle);
commander.Override<RenamePerson>(Handle, Tenants.Umbrella.Identifier);
}
public void Handle(TransferStarted e) { }
public void Handle(RenamePerson c)
{
// Do nothing. Umbrella does not permit renaming people.
// Throw an exception to make the consequences even more severe
// for any attempt to rename a person...
// throw new DisallowRenamePersonException();
}
}
这是一个基本测试运行,以演示这是否按预期工作。
public static class Test08
{
public static void Run(ICommandQueue commander)
{
ProgramSettings.CurrentTenant = Tenants.Umbrella;
var alice = Guid.NewGuid();
commander.Send(new RegisterPerson(alice, "Alice", "Abernathy"));
commander.Send(new RenamePerson(alice, "Alice", "Parks"));
}
}
请注意日志中只有一个事件,并且联系人姓名没有更改。
演示
示例应用程序中的表示层是一个控制台应用程序,仅用于编写和运行测试场景。
这里没有什么值得特别关注的。您会注意到我没有使用第三方组件进行依赖注入;相反,我编写了一个非常基础的内存服务定位器。
这样做只是为了使示例应用程序尽可能小巧和专注。在您自己的表示层中,您将以最适合您和您团队的方式实现依赖注入,使用您喜欢的任何IoC容器。
Application
应用程序层分为两个不同的部分:用于命令的写侧,以及用于查询的读侧。这种划分有助于确保我们不会意外地混合写侧和读侧的功能。
请注意,这里没有对任何外部第三方程序集的引用。
写侧
命令是纯C#对象(POCO)类,因此可以轻松地用作数据传输对象(DTO)以方便序列化。
public class RenamePerson : Command
{
public string FirstName { get; set; }
public string LastName { get; set; }
public RenamePerson(Guid id, string firstName, string lastName)
{
AggregateIdentifier = id;
FirstName = firstName;
LastName = lastName;
}
}
注意:我更喜欢“Packet”而不是“Data Transfer Object”这个术语,我知道许多读者会反对,所以请选择适合您和您的团队的术语。
命令处理程序方法的注册在命令订阅者类的构造函数中是显式的,并且事件在保存到事件存储之后发布。
public class PersonCommandSubscriber
{
private readonly IEventRepository _repository;
private readonly IEventQueue _publisher;
public PersonCommandSubscriber
(ICommandQueue commander, IEventQueue publisher, IEventRepository repository)
{
_repository = repository;
_publisher = publisher;
commander.Subscribe<RegisterPerson>(Handle);
commander.Subscribe<RenamePerson>(Handle);
}
private void Commit(PersonAggregate aggregate)
{
var changes = _repository.Save(aggregate);
foreach (var change in changes)
_publisher.Publish(change);
}
public void Handle(RegisterPerson c)
{
var aggregate = new PersonAggregate { AggregateIdentifier = c.AggregateIdentifier };
aggregate.RegisterPerson(c.FirstName, c.LastName, DateTimeOffset.UtcNow);
Commit(aggregate);
}
public void Handle(RenamePerson c)
{
var aggregate = _repository.Get<PersonAggregate>(c.AggregateIdentifier);
aggregate.RenamePerson(c.FirstName, c.LastName);
Commit(aggregate);
}
}
读侧
查询也是POCO类,这使它们轻量级且易于序列化。
public class PersonSummary
{
public Guid TenantIdentifier { get; set; }
public Guid PersonIdentifier { get; set; }
public string PersonName { get; set; }
public DateTimeOffset PersonRegistered { get; set; }
public int OpenAccountCount { get; set; }
public decimal TotalAccountBalance { get; set; }
}
事件处理程序方法的注册在事件订阅者类的构造函数中也是显式的。
public class PersonEventSubscriber
{
private readonly IQueryStore _store;
public PersonEventSubscriber(IEventQueue queue, IQueryStore store)
{
_store = store;
queue.Subscribe<PersonRegistered>(Handle);
queue.Subscribe<PersonRenamed>(Handle);
}
public void Handle(PersonRegistered c)
{
_store.InsertPerson(c.IdentityTenant, c.AggregateIdentifier,
c.FirstName + " " + c.LastName, c.Registered);
}
public void Handle(PersonRenamed c)
{
_store.UpdatePersonName(c.AggregateIdentifier, c.FirstName + " " + c.LastName);
}
}
定义域
域仅包含聚合和事件。同样,您会看到这里的引用列表尽可能精简。
每个聚合根类都包含一个函数,用于处理它接受的每个命令,以更改其状态。
public class PersonAggregate : AggregateRoot
{
public override AggregateState CreateState() => new Person();
public void RegisterPerson(string firstName, string lastName, DateTimeOffset registered)
{
// 1. Validate command
// Omitted for the sake of brevity.
// 2. Validate domain.
// Omitted for the sake of brevity.
// 3. Apply change to aggregate state.
var e = new PersonRegistered(firstName, lastName, registered);
Apply(e);
}
public void RenamePerson(string firstName, string lastName)
{
var e = new PersonRenamed(firstName, lastName);
Apply(e);
}
}
请注意,聚合状态是在一个与聚合根分开的类中实现的。
这使得序列化和快照更容易管理,并且有助于整体可读性,因为它强制区分了与命令相关的函数和与事件相关的函数。
public class Person : AggregateState
{
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTimeOffset Registered { get; set; }
public void When(PersonRegistered @event)
{
FirstName = @event.FirstName;
LastName = @event.LastName;
Registered = @event.Registered;
}
public void When(PersonRenamed @event)
{
FirstName = @event.FirstName;
LastName = @event.LastName;
}
}
事件,与命令和查询一样,是轻量级的POCO类。
public class PersonRenamed : Event
{
public string FirstName { get; set; }
public string LastName { get; set; }
public PersonRenamed(string first, string last) { FirstName = first; LastName = last; }
}
持久化
在持久化层,我们开始看到对外部第三方组件的大量依赖。例如,这里我们依赖于:
- Json.NET 用于JSON序列化和反序列化;
- System.Data 用于使用ADO.NET在SQL Server中记录命令、事件和快照;以及
- Entity Framework 用于查询投影。
此项目中包含的源代码实现了一个标准的、普通的DataAccess层,这个层不应该有任何新的、特别创新的或令人惊讶的东西——因此它不需要特别的讨论。
CQRS+ES骨干
最后(漫长的)大家,女士们先生们,迎来了您一直期待的晚会部分:真正实现CQRS+ES模式的Timeline程序集,它使以上所有成为可能。
有趣的是……现在我们已经到了细节部分,应该没什么神秘之处了。
您会注意到的第一件事是,Timeline程序集(除了.NET Framework本身之外)没有对任何外部第三方组件的依赖项。
Commands
这里只有几点需要注意。
Command基类包含聚合标识符和版本号属性,正如您所期望的那样。它还包含租户和发送命令的用户身份的属性。
/// <summary>
/// Defines the base class for all commands.
/// </summary>
/// <remarks>
/// A command is a request to change the domain. It is always are named with a verb in
/// the imperative mood, such as Confirm Order. Unlike an event, a command is not a
/// statement of fact; it is only a request, and thus may be refused. Commands are
/// immutable because their expected usage is to be sent directly to the domain model for
/// processing. They do not need to change during their projected lifetime.
/// </remarks>
public class Command : ICommand
{
public Guid AggregateIdentifier { get; set; }
public int? ExpectedVersion { get; set; }
public Guid IdentityTenant { get; set; }
public Guid IdentityUser { get; set; }
public Guid CommandIdentifier { get; set; }
public Command() { CommandIdentifier = Guid.NewGuid(); }
}
CommandQueue
实现了ICommandQueue
接口,该接口定义了一组少量方法用于注册订阅者和覆盖,以及发送和调度命令。您可以将其视为命令的服务总线。
事件
Event基类包含聚合标识符和版本号属性,以及事件被引发/发布的租户和用户身份的属性。这确保了每个事件日志条目都与特定的租户和用户相关联。
您可以将EventQueue
视为事件的服务总线。
聚合
AggregateState
类中有一个小巧的黑魔法。Apply
方法使用反射来确定在将事件应用于聚合状态时应调用哪个方法。我不太喜欢这样,但我找不到任何方法可以避免。幸运的是,代码非常易于阅读和理解。
/// <summary>
/// Represents the state (data) of an aggregate. A derived class should be a POCO
/// (DTO/Packet) that includes a When method for each event type that changes its
/// property values. Ideally, the property values for an instance of this class
/// should be modified only through its When methods.
/// </summary>
public abstract class AggregateState
{
public void Apply(IEvent @event)
{
var when = GetType().GetMethod("When", new[] { @event.GetType() });
if (when == null)
throw new MethodNotFoundException(GetType(), "When", @event.GetType());
when.Invoke(this, new object[] { @event });
}
}
快照
实现快照的源代码比我最初开始这个项目时想象的要干净和简单。逻辑有些复杂,但Snapshots命名空间只有约240行代码,所以我在这里不添加细节。我将其留给您,最耐心的读者,如果到目前为止还有的话。:-)
指标
我将用一些基本指标来结束本文。(稍后将提供更多。)
以下是NDepend对Timeline程序集产生的分析报告。
源代码并不完美,正如您所见,但确实获得了“A”级评级,技术债务估计仅为1.3%。项目也很紧凑,在我写这篇文章时只有439行代码。
注意:NDepend根据程序集.pdb符号文件中的方法序列点数计算代码行数(LOC)。Visual Studio的计算方式不同;对于Timeline项目,它报告1,916行源代码,其中277行是可执行代码。
时间允许的话,我将更新本文并提供运行时性能结果。
在此期间,非常欢迎您的评论和批评。
历史
- 2020年4月8日:初始版本
- 2020年5月11日:小幅编辑