CQRS:交叉检查其工作原理






4.97/5 (138投票s)
探讨 CQRS 应用程序的工作原理,并附赠一个演示应用程序
引言
大约 3 年前,我参加了一个由某位老兄主讲的关于 CQRS(命令查询职责分离)的讲座。当时我想,嗯,这是个不错的主意,但我不太清楚它如何应用到我的日常工作中。所以我就有点忘了它。大约 2 年过去了,我工作的地方雇佣了一个新人,结果发现他就是那个做 CQRS 讲座的人。那个人是 Ashic Mahtab,现在我有幸与他共事。与 Ashic 日复一日地共事再次激发了我对 CQRS 的兴趣。不止一次,当我的代码出现问题,Ashic 会听到我对着屏幕咒骂,然后他会不经意地走过来,和我一起分析问题,然后带着一种了然(又名沾沾自喜)的微笑,解释说使用 CQRS 和事件溯源永远不会发生这种事。Ashic 是一个很好的 CQRS 知识库,并曾被邀请审阅微软模式与实践书籍《CQRS 与事件溯源之旅》,并将 Greg Young 列为他的朋友/同事之一。可以想象,在我学习 CQRS 的过程中,Ashic 对于我提出想法/问题非常有帮助。因此,我通过他验证了很多问题,所以我希望我在这里呈现的材料是准确的。
Ashic,如果你正在读这篇文章,非常感谢你容忍我(有时)愚蠢的问题,并感谢你总是花时间用一种易于理解的方式向我解释事情,这几乎总能让事情最终变得更有意义。先生,我向您致敬。
不管怎样,我决定我需要更好地理解这个模式/架构。所以我开始阅读大量相关资料,观看一些视频,阅读一些相关书籍,并研究市面上的相关框架。这篇文章和附带的演示应用就是这些努力的成果。
在本文中,我将同时涵盖 CQRS 和事件溯源。本文旨在记录我尝试理解 CQRS 并向他人解释的过程。
那么 CQRS 究竟是什么?
如引言中所述,CQRS 代表“命令查询职责分离”,但这是什么意思呢?让我们来看看一些知名技术专家的引述。
Bertrand Meyer 在其著作《面向对象软件构造》中,引入了“命令查询分离”一词来描述一个原则,即对象的方法要么是命令,要么是查询。查询返回数据,不改变对象的状态;命令改变对象的状态,但不返回任何数据。这样做的好处是,你可以更好地理解系统中哪些操作会改变状态,哪些不会。
CQRS 将这一原则更进一步,定义了一个简单的模式。
“CQRS 简单来说就是将原来只有一个的对象创建成两个。这种分离是基于方法是命令还是查询(与 Meyer 在命令和查询分离中使用的定义相同:命令是任何改变状态的方法,而查询是任何返回值的方法)。”
——Greg Young,CQRS,基于任务的UI,事件溯源时代!
关于这个简单模式,重要且有趣的是在构建企业系统时如何、在何处以及为何使用它。使用这个简单的模式可以帮助你应对各种架构挑战,例如实现可伸缩性、管理复杂性以及管理系统中某些部分不断变化的业务规则。
“CQRS 是一个简单的模式,它严格地将处理命令输入到一个自治系统的职责与处理对同一系统的无副作用的查询/读取访问的职责分开。因此,解耦允许任意数量的同构或异构的查询/读取模块与命令处理器配对。这个原则为事件溯源、最终一致性状态复制/扇出,从而实现高可伸缩性的读取访问提供了一个非常合适的基础。简单来说,你不会通过处理命令的同一个服务模块来响应查询。在 REST 术语中,GET 请求连接到一个与 PUT、POST 和 DELETE 请求不同的东西上。”
——Clemens Vasters(CQRS 顾问邮件列表)
一些不错的 CQRS 资源
- Martin Fowler 的博客 : https://martinfowler.com.cn/bliki/CQRS.html
- 微软 CQRS 书籍 : https://msdn.microsoft.com/en-us/library/jj591573.aspx
- Greg Young 关于 CQRS 的视频 : https://www.youtube.com/watch?v=JHGkaShoyNs
- Mark Nijhof 的 CQRS 示例/书籍 : https://github.com/MarkNijhof/Fohjin
那么事件溯源究竟是什么?
我相当喜欢 Martin Fowler 对此的描述方式
我们可以查询应用程序的状态以了解世界的当前状态,这能回答许多问题。然而,有时我们不仅想知道我们现在在哪里,还想知道我们是如何到达那里的。
事件溯源确保对应用程序状态的所有更改都作为一系列事件被存储。我们不仅可以查询这些事件,还可以使用事件日志来重建过去的状态,并以此为基础自动调整状态以应对追溯性更改。
https://martinfowler.com.cn/eaaDev/EventSourcing.html
如果这听起来没道理,可以想一想典型的 CRUD 操作,我们会将对象的当前状态存储在数据库中。而使用事件溯源,我们会根据某个生成的 ID(通常是一个 Guid),即聚合的 ID,来存储与特定对象相关的事件。如果我们想知道对象的当前状态,我们会检索所有过去的事件,并按顺序重放它们,从而得到当前状态。简而言之,这就是事件溯源的工作原理。
注意: 如果你想使用事件溯源,你应该使用 CQRS,但如果你想创建一个 CQRS 系统,你不一定需要事件溯源。
那么我们为什么要使用 CQRS?
在我见过的大多数应用程序中,我认为可以公平地说,你可能会有类似这样的结构
- UI 代码
- 用于协调客户端远程调用的服务 (REST / SOAP)
- DTO 转换
- 验证层
- 业务逻辑
- 仓库 / 工作单元
- 数据访问层
- 数据访问对象
这可真是不少层。考虑到大多数应用程序的读取次数远多于写入次数,让读取操作也经过所有这些分层真的有意义吗?可能没有。也许一个更合理的模型是,当我们想要写入某些内容时使用这些层,而当我们想要读取某些内容时,我们可以假定数据是有效的,因此,我们是否可以绕过大部分这些层,直接从数据库中获取数据?这无疑是使用 CQRS 的一个令人信服的理由。
事件溯源也可以与 CQRS 一起使用,当这样做时,它增加了对给定聚合所执行更改的完整审计能力。这是通过将事件存储在某种形式的事件存储中,并在聚合上重放它们来实现的。
这是使用 CQRS 的两个令人信服的理由。
它是银弹吗?
我个人认为它并不适用于你的应用程序的每个部分。例如,想象一下这个非常常见的基于 Web 的场景:
“用户必须在某些输入字段中输入他们的用户名和密码,之后系统必须尝试通过某种查找机制来识别用户。如果获取到用户,系统应该检索一些个人资料信息,这些信息将保存在会话中 4 小时或直到用户注销”
我个人认为这不适合 CQRS(其他人可能不同意),因为它意味着一种更偏向请求/响应类型的操作,其中需要根据用户登录请求立即得到响应。这与典型的 CQRS 安排有些冲突。使用 CQRS,我们应该能够清楚地将执行某项操作的命令与检索某项内容的查询分开。在这种情况下,我们确实需要对一个命令几乎立即做出响应,并且我们不希望/不打算执行任何额外的查询,我们需要立即为当前请求获得响应。所以它并不是一个合适的 CQRS 候选场景(至少在我看来是这样)。
与之形成对比的是这种场景:
“用户可以修改一个尚未发货的开放订单,且今天与发货日期之间的差值大于 24 小时。用户可以修改订单的某些属性,例如
- 送货地址
- 数量
系统应允许用户请求的更改被存储到订单中,并应通知用户更改已完成,并将很快得到处理”
这(在我看来)更适合使用 CQRS 来建模,因为它并非真正的请求/响应类型的操作,因为更改详细信息的命令可以清晰地与获取修改后订单详细信息的查询分离开来。
尽管完全异步地执行 CQRS 操作可能会引发其他棘手的问题,我接下来会讨论这一点。
代码在哪里
本文的演示代码可以从我的 GitHub 账户获取
https://github.com/sachabarber/SachaBarber.CQRS.Demo
必备组件
我已经尽力将演示代码的先决条件降到最低。我为你考虑周到,对读模型使用了嵌入式 RavenDB,并创建了一个内存事件存储,但仍然需要安装以下软件
- 一些 DDD 知识(聚合/仓库等)
- Erlang (Rabbit MQ 需要这个) : https://erlang.org.cn/download.html
- Rabbit MQ : https://rabbitmq.cn/install-windows.html
我建议确保 RabbitMQ 服务器被设置为使用系统账户自动作为 Windows 服务启动。
CQRS 架构应用程序的构建模块
下图展示了 CQRS 应用程序的典型构建模块/流程。
点击查看大图
上述构建模块可能会有一些细微的改动,但这似乎是构建 CQRS 应用程序最普遍接受的架构。我将在下面对每个模块进行一些详细描述,但当我们讨论整个演示应用程序时,我们会更详细地回顾这些内容。
客户端
客户端实际上可以是任何形式的客户端,一个 WinForms 应用,WPF 应用,网站等等。只要它能发送命令,它就可以成为一个客户端。
我在这里使用的演示应用是一个 WPF 客户端,它利用 WCF 向领域模型发送命令。
命令总线
命令总线实际上只是用于为传入的命令获取一个处理器。有一些很酷的库可以做类似的事情,比如 Jimmy Bogard 的 MediatR。但问题是,那个库对于需求来说有点小题大做了。因此,我使用了一点简单的反射。
命令处理器
这些是处理执行命令逻辑的处理器。传入的命令和命令处理器之间应该是一对一的映射关系。
定义域
领域模型是系统的核心部分,它将来自事件存储的历史事件重放到领域模型聚合上。或者将任何新的事件(通过传入的命令)应用到聚合上,之后任何新的事件才会被持久化到事件存储中。
服务
这些是不适合放在聚合根或值对象中的任何领域服务,它们是领域的一部分。
存储库
仓库模式主要用于两个目的
- 将任何未提交的事件存储到事件存储中。
- 根据 ID 从事件存储中加载一个聚合(正如我已经说过的,这个 ID 通常是在聚合的第一个事件存入事件存储之前生成的一个 Guid)
注意:这是写入“写”模型,我们将事件存储在事件存储中。这是因为本文使用了带事件溯源的 CQRS。
事件总线
这用于发布已应用于领域模型的事件。
事件处理程序
这些是处理事件逻辑的处理器。一个事件可能关联多个处理器。例如,假设你有多个读模型,一个供 UI 读取,另一个用于驱动某些报表需求等等。所以可能需要不止一个事件处理器,但这取决于你的具体需求。
对于这个演示应用,事件和事件处理器之间是一对一的映射,因为只有一个读模型,没有其他事件监听器。
因此,对于演示应用来说,正是在这个环节更新了读模型。读模型可以是一个关系型数据库,甚至可以是 NoSQL,但它应该是客户端需求的非规范化表示。或者至少是某种能让客户端非常容易处理读模型中数据的表示形式。处理后的事件和读模型所需的表示形式之间可能存在也可能不存在很好的对等关系,甚至可能需要以某种方式转换数据。这也可能被称为“投影”,我们稍后会更多地看到这一点。
注意:这是在写入“读”模型(但仅在领域内部,客户端不能写入读模型,只能读取)
瘦数据层
如前所述,我们希望读模型尽可能易于处理,因此只需要一个简单的数据访问层。也许需要一个极其简单的仓库,但仅此而已,没有验证/没有业务逻辑,只是一个从读模型中获取数据的方法。
查询门面
我们应该允许查询几乎直接在读模型上执行。记住,从客户端的角度来看,它是一个只读模型。因此,客户端甚至可以直接从读模型中读取数据。
同步 vs 异步(最终一致性)
当我开始这个演示应用时,我立刻意识到的一件事是,如果你选择一切都采用异步方式而不是同步方式,复杂性会有巨大的差异。我的意思是,如果你走同步路线,你可能会有这样的东西(伪代码)
同步代码(伪代码)
如果我们假定系统的所有部分都是同步的。也就是说,我们使用的 CQRS 架构内部是完全同步的组件。那么我们可能会有类似这样的代码
FireCommand(new CommitOrderCommand(Guid.NewGuid(), [] { "bag of peas"});
Orders = readModelService.GetAllOrders();
这里我们没有问题,因为我们只是发出一个命令(它会与 CQRS 领域模型的写模型通信),让它运行完成,然后从读模型中读取数据,此时我们期望我们的新订单已经在那里了。一切都好
异步代码(伪代码)
这种方法使用了最终一致性的思想,我们向领域模型写入数据(通过命令发起),但我们知道系统的所有部分都是异步的。也就是说,我们使用的 CQRS 架构的内部是完全异步的组件。这可能意味着使用进程间总线,例如
- NetMQ
- RabbitMQ
- NServiceBus
- MassTransit
- MSMQ
- Azure ServiceBus
FireCommand(new CommitOrderCommand(Guid.NewGuid(), [] { "bag of peas"});
......
......
基本上,我们现在无法再预测从读模型更新的正确时机。我们应该在什么时候做呢?我们怎么知道什么时候该做呢?答案在于从 CQRS 系统的事件处理器传递的一条消息,这条消息告诉那些感兴趣的各方(主要是用户界面 UI)领域模型中发生了某些事情。这条消息可以包含实际数据,也可以仅仅作为触发器,表明领域模型中发生了某个事件,然后让这个新消息的消费者自行处理。
后一种方法是我在应用程序中采用的方法(其中 RabbitMQ 用于此目的)。在这种方法中,读模型首先响应事件存储发布的事件进行更新,然后 CQRS 框架在读模型更新后推送一条消息,订阅此消息的各方将收到通知,此时他们可能会选择做一些事情,比如通过再次从读模型读取数据来更新自己(如果订阅者是 UI,则更新 ViewModel/View)。
这确实提出了一个有趣的问题。假设你(从 UI)发出一个命令,以更改领域聚合根对象的某些内容。比如说你有一个订单,你向 CQRS 系统发出了一个“更改地址命令”。你的 UI 中仍然保留着带有当前地址的原始订单(正如我们上面假设的),并且在未来的某个时间点,UI 将被通知领域模型中发生的某个事件,此时 UI 会从读模型中读取数据。读模型只有在整个工作流完成后(至少是更新到读模型的那一步)才会包含新的地址。所以在此之前,UI 的状态将与读模型不同(有些人会称之为不一致)。
这被称为“最终一致性”,这是你在处理完全异步系统时必须习惯的事情。数据最终会变得一致,或者它可能会进入一种错误状态,甚至可能需要一些手动干预才能使其再次一致(不过这是最坏的错误情况)。
使用框架还是不使用?
现在,与我共事的 Ashic 总是说,他认为使用一个框架来做 CQRS 不是一个好方法,因为有时你可能需要比框架提供的更多的控制。这当然是真的,也是一个常见的问题。话虽如此,这个不使用框架的建议来自于一个在从零开始编写这些系统方面有丰富 CQRS 经验的人,而我没有,但我仍然想让一个 CQRS 应用程序工作起来。
所以我决定使用一个框架,我四处寻找并发现了不少,但我最喜欢的一个叫做 CQRSlite。这是 Greg Young 的一个旧框架的扩展版本。它提供了以下功能
- 命令发送和事件发布
- 通过会话实现的工作单元,带聚合跟踪
- 用于获取和保存聚合的仓库
- 乐观并发检查
- 带处理器自动注册的进程内总线
- 快照功能
- 带并发检查和更新到最新版本的缓存
我发现它用起来非常直观,并且没有后悔使用它的决定。
演示应用程序
这个演示应用是一个小型的 WPF 应用。我们不会在本节剖析任何代码,那将在稍后讨论。在本节中,我只想介绍我们将要涵盖的主要场景,以及 UI 是如何实现它们的。
警告
#1
演示应用程序使用 WCF 进行 UI 和服务器端(写模型)之间的通信。为了让一切有机会启动并准备就绪,在 UI 尝试执行任何操作之前,有一个 10 秒的硬编码延迟。你不应该更改此设置。在正常环境中,WCF 服务会一直运行,所以这个伪延迟是不需要的。
#2
由于我不想让你们为了安装太多额外的软件而烦恼,所以我尽可能多地使用了内存中的东西。因此,有一个内存中的事件存储。这意味着如果应用程序被终止/崩溃,事件不会被持久化。由于这个特性,我们也需要确保读模型在每次运行时都被清理。这个已经为你处理好了,不用担心。我只是不希望有人在停止演示应用并再次启动时,看到之前保存的东西都消失了而感到太惊讶。这是特意设计的,而且是为了你好
如何运行演示应用
一旦你获得了代码库并安装了先决条件,你应该(以管理员身份)打开 Visual Studio 并确保以下两个项目被设置为启动项目
- SachaBarber.CQRS.Demo.Domain
- SachaBarber.CQRS.Demo.WPFClient
演示应用中包含了什么?
本文提供的演示应用包含以下场景
创建一个新订单
演示应用启动时会显示一堆漫画。你可以选择你想要的漫画,然后点击添加按钮,这将允许输入一些最基本的信息。这最终会(通过写模型)创建一个新订单,从而导致一个 OrderCreatedEvent 通知返回到 UI(通过 RabbitMQ 消息)。此时,UI 将显示一个 toast 风格的消息,并通过从读模型中读取数据来更新自身。当前订单视图最初不会显示,但用户可以选择使用侧边栏按钮(屏幕右侧)来显示它。
以下截图向您展示了如何执行此场景
点击查看大图
您可以选择您订单中想要的商品,并点击顶部的按钮来创建一个新订单。
一旦你点击新订单按钮,你可以填写一些关于订单的最基本信息。
点击查看大图
当订单在领域模型中创建并且也已添加到读模型后,UI 将会收到一个 RabbitMQ 通知,之后 UI 会从读模型中读取数据,这将显示读模型中的所有订单列表。订单视图可以从右侧的按钮进入。
点击查看大图
这是订单视图,显示了来自读模型的订单。
删除一个订单
在当前订单视图中,用户可以选择删除一个订单。这最终会(通过写模型)删除请求的订单,这将导致一个 OrderDeletedEvent 通知(通过 RabbitMQ 消息)返回到 UI。此时,UI 将显示一个 toast 风格的消息,并通过从读模型中读取数据来更新自己。
更改订单地址
在当前订单视图中,用户可以选择进入所选订单的编辑模式(用户也可以取消编辑模式)。
此时用户可以编辑订单地址。用户编辑完地址后,可以通过点击一个按钮来更新订单地址。
这最终将更新订单地址(通过写模型),并导致一个 OrderAddressChangedEvent 通知(通过 RabbitMQ 消息)返回到 UI。届时,UI 将显示一个 toast 风格的消息,并通过从读模型中读取数据来更新自身。
好了,到目前为止我们已经讲了很多内容,希望现在我们已经了解了
- 什么是 CQRS
- 什么是事件溯源
- 我们正在使用 CQRSlite 框架
- 演示应用是做什么的
那么,是时候开始研究一些代码了。
写模型
本节讨论写模型。当领域模型的客户端发出一个命令时,写模型就会发挥作用。对于演示应用来说,这意味着 UI 将会发出命令。
演示 UI 如何发出命令
演示应用调用一个支持 async/await 的 WCF 服务。其中有标准的 WCF 内容,一个代理/ClientChannel 等等。我不会用这些来烦你,但以下是从 UI 发送命令时 WCF 服务的典型用法
await orderServiceInvoker.CallService(service =>
service.SendCommandAsync(new ChangeOrderAddressCommand()
{
ExpectedVersion = this.Order.Version,
Id = this.Order.OrderId,
NewAddress = this.Order.Address
}));
这是 UI 调用的 WCF 服务。它甚至使用了 async/await(很酷吧)。
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
//Useful when debugging, see App.Config too
//[ErrorHandlerBehavior]
public class OrderService : IOrderService
{
private readonly OrderCommandHandlers commandHandlers;
private readonly IReadModelRepository readModelRepository;
public OrderService(
OrderCommandHandlers commandHandlers,
IReadModelRepository readModelRepository)
{
this.commandHandlers = commandHandlers;
this.readModelRepository = readModelRepository;
}
public async Task<bool> SendCommandAsync(Command command)
{
await Task.Run(() =>
{
var meth = (from m in typeof(OrderCommandHandlers)
.GetMethods(BindingFlags.Public | BindingFlags.Instance)
let prms = m.GetParameters()
where prms.Count() == 1 &&
prms[0].ParameterType == command.GetType()
select m).FirstOrDefault();
if (meth == null)
throw new BusinessLogicException(
string.Format("Handler for {0} could not be found",
command.GetType().Name));
meth.Invoke(commandHandlers, new[] { command });
});
return true;
}
}
独立的命令
请注意,只有一个 SendCommandAsync(Command command)
方法。那么它是如何工作的呢?这实际上是标准的 WCF 功能。如果我们检查 Command
类,就会清楚了。我们只是使用了 [KnownType]
属性。
[DataContract]
[KnownType(typeof(CreateOrderCommand))]
[KnownType(typeof(ChangeOrderAddressCommand))]
[KnownType(typeof(DeleteOrderCommand))]
public abstract class Command : ICommand
{
[DataMember]
public Guid Id { get; set; }
[DataMember]
public int ExpectedVersion { get; set; }
}
命令处理器是如何被调用的?
在演示应用的 OrderService
中,构造函数有一个由 IOC 满足的对 OrderCommandHandlers
的依赖。这个对象包含了演示应用的所有命令处理器。这只是我在演示应用中的实现方式,你也可以用其他方式实现,完全取决于你。总之,有一个 OrderCommandHandlers
类,它在 OrderService
中通过反射被调用,其中会根据传入的 Command
类型定位并调用正确的 Handle(..)
方法。
这是 OrderCommandHandlers
类的代码
public class OrderCommandHandlers : ICommandHandler<CreateOrderCommand>,
ICommandHandler<ChangeOrderAddressCommand>,
ICommandHandler<DeleteOrderCommand>
{
private readonly ISession _session;
public OrderCommandHandlers(ISession session)
{
_session = session;
}
public void Handle(CreateOrderCommand command)
{
var item = new Order(
command.Id,
command.ExpectedVersion,
command.Description,
command.Address,
command.OrderItems.Select(x => new OrderItem()
{
OrderId = x.OrderId,
StoreItemDescription = x.StoreItemDescription,
StoreItemId = x.StoreItemId,
StoreItemUrl = x.StoreItemUrl
}).ToList());
_session.Add(item);
_session.Commit();
}
public void Handle(ChangeOrderAddressCommand command)
{
Order item = _session.Get<Order>(
command.Id, command.ExpectedVersion);
item.ChangeAddress(command.NewAddress);
_session.Commit();
}
public void Handle(DeleteOrderCommand command)
{
Order item = _session.Get<Order>(
command.Id, command.ExpectedVersion);
item.Delete();
_session.Commit();
}
}
命令实际上做了什么?
嗯,这要看情况,但在我们进入可能的场景之前,让我们谈谈在所有命令处理方法中都会发生的事情。那就是使用 ISession
对象的 Add()
/Get()
和 Commit()
方法。下面展示的是 ISession
的实现(这是 CQRSlite 框架的一部分)
ISession 实现
public class Session : ISession
{
private readonly IRepository _repository;
private readonly Dictionary<Guid, AggregateDescriptor> _trackedAggregates;
public Session(IRepository repository)
{
if(repository == null)
throw new ArgumentNullException("repository");
_repository = repository;
_trackedAggregates = new Dictionary<Guid, AggregateDescriptor>();
}
public void Add<T>(T aggregate) where T : AggregateRoot
{
if (!IsTracked(aggregate.Id))
_trackedAggregates.Add(aggregate.Id,
new AggregateDescriptor
{
Aggregate = aggregate,
Version = aggregate.Version
});
else if (_trackedAggregates[aggregate.Id].Aggregate != aggregate)
throw new ConcurrencyException(aggregate.Id);
}
public T Get<T>(Guid id, int? expectedVersion = null) where T : AggregateRoot
{
if(IsTracked(id))
{
var trackedAggregate = (T)_trackedAggregates[id].Aggregate;
if (expectedVersion != null && trackedAggregate.Version != expectedVersion)
throw new ConcurrencyException(trackedAggregate.Id);
return trackedAggregate;
}
var aggregate = _repository.Get<T>(id);
if (expectedVersion != null && aggregate.Version != expectedVersion)
throw new ConcurrencyException(id);
Add(aggregate);
return aggregate;
}
private bool IsTracked(Guid id)
{
return _trackedAggregates.ContainsKey(id);
}
public void Commit()
{
foreach (var descriptor in _trackedAggregates.Values)
{
_repository.Save(descriptor.Aggregate, descriptor.Version);
}
_trackedAggregates.Clear();
}
private class AggregateDescriptor
{
public AggregateRoot Aggregate { get; set; }
public int Version { get; set; }
}
}
这个类提供的主要功能是
- 添加新聚合的能力,这只是将聚合添加到一个被跟踪的聚合列表中
- 跟踪聚合的能力。这些是在特定会话中见过的聚合
- 获取聚合的能力。这将工作委托给一个
IRepository
实现,我们接下来会看 - 提交当前会话期间所做更改的能力
CacheRepository
在演示应用中,ISession
实现所使用的确切 IRepository
是一个 CacheRepository
,它装饰了一个常规的 IRepository
实现。总之,这里是 CacheRepository
的实现(这是 CQRSlite 框架的一部分)
public class CacheRepository : IRepository
{
private readonly IRepository _repository;
private readonly IEventStore _eventStore;
private readonly MemoryCache _cache;
private readonly Func<CacheItemPolicy> _policyFactory;
private static readonly ConcurrentDictionary<string, object> _locks =
new ConcurrentDictionary<string, object>();
public CacheRepository(IRepository repository, IEventStore eventStore)
{
if(repository == null)
throw new ArgumentNullException("repository");
if(eventStore == null)
throw new ArgumentNullException("eventStore");
_repository = repository;
_eventStore = eventStore;
_cache = MemoryCache.Default;
_policyFactory = () => new CacheItemPolicy
{
SlidingExpiration = new TimeSpan(0,0,15,0),
RemovedCallback = x =>
{
object o;
_locks.TryRemove(x.CacheItem.Key, out o);
}
};
}
public void Save<T>(T aggregate, int? expectedVersion = null)
where T : AggregateRoot
{
var idstring = aggregate.Id.ToString();
try
{
lock (_locks.GetOrAdd(idstring, _ => new object()))
{
if (aggregate.Id != Guid.Empty && !IsTracked(aggregate.Id))
_cache.Add(idstring, aggregate, _policyFactory.Invoke());
_repository.Save(aggregate, expectedVersion);
}
}
catch (Exception)
{
_cache.Remove(idstring);
throw;
}
}
public T Get<T>(Guid aggregateId) where T : AggregateRoot
{
var idstring = aggregateId.ToString();
try
{
lock (_locks.GetOrAdd(idstring, _ => new object()))
{
T aggregate;
if (IsTracked(aggregateId))
{
aggregate = (T)_cache.Get(idstring);
var events = _eventStore.Get(aggregateId, aggregate.Version);
if (events.Any() && events.First().Version != aggregate.Version + 1)
{
_cache.Remove(idstring);
}
else
{
aggregate.LoadFromHistory(events);
return aggregate;
}
}
aggregate = _repository.Get<T>(aggregateId);
_cache.Add(
aggregateId.ToString(),
aggregate,
_policyFactory.Invoke());
return aggregate;
}
}
catch (Exception)
{
_cache.Remove(idstring);
throw;
}
}
private bool IsTracked(Guid id)
{
return _cache.Contains(id.ToString());
}
}
在我们查看常规的 IRepository
实现之前,上述代码中有一个极其重要的一点。请看在 Get(..)
方法中,事件存储是如何被查询以获取所有历史事件的,这些事件随后被用来填充聚合的当前状态。这就是事件溯源部分的工作原理。我们基本上是根据过去的事件来加载聚合的状态。
上述代码做的另一件事是缓存它看到的任何聚合一段时间。毕竟它是一个缓存仓库。
Repository 类
常规的 Repository
实现(即非缓存的),负责将未提交的聚合事件存储到事件存储中,并通过 IOC 注入的 IEventPublisher
实现(内部应用程序总线)通知读模型(稍后会详细介绍此功能)。它还负责通过使用 IOC 注入的 IEventStore
实现,为特定聚合加载所有历史事件。在演示应用中,Repository
实例中的方法是由 CacheRepository
调用的。
这是 Repository
类的代码(这是 CQRSlite 框架的一部分)
public class Repository : IRepository
{
private readonly IEventStore _eventStore;
private readonly IEventPublisher _publisher;
public Repository(IEventStore eventStore, IEventPublisher publisher)
{
if(eventStore == null)
throw new ArgumentNullException("eventStore");
if(publisher == null)
throw new ArgumentNullException("publisher");
_eventStore = eventStore;
_publisher = publisher;
}
public void Save<T>(T aggregate, int? expectedVersion = null)
where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(
aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var i = 0;
foreach (var @event in aggregate.GetUncommittedChanges())
{
if (@event.Id == Guid.Empty)
@event.Id = aggregate.Id;
if (@event.Id == Guid.Empty)
throw new AggregateOrEventMissingIdException(
aggregate.GetType(), @event.GetType());
i++;
@event.Version = aggregate.Version + i;
@event.TimeStamp = DateTimeOffset.UtcNow;
_eventStore.Save(@event);
_publisher.Publish(@event);
}
aggregate.MarkChangesAsCommitted();
}
public T Get<T>(Guid aggregateId) where T : AggregateRoot
{
return LoadAggregate<T>(aggregateId);
}
private T LoadAggregate<T>(Guid id) where T : AggregateRoot
{
var aggregate = AggregateFactory.CreateAggregate<T>();
var events = _eventStore.Get(id, -1);
if (!events.Any())
throw new AggregateNotFoundException(id);
aggregate.LoadFromHistory(events);
return aggregate;
}
}
哇哦。这真是个大弯路,让我们回到正轨
好了,抱歉,正如我所说,那有点跑题了。在我们开始研究 ISession
工作流程之前,我们正在讨论命令是如何被处理的。让我们回顾一下命令处理器的代码是什么样子的。这里再展示一次
这是 OrderCommandHandlers
类的代码
public class OrderCommandHandlers : ICommandHandler<CreateOrderCommand>,
ICommandHandler<ChangeOrderAddressCommand>,
ICommandHandler<DeleteOrderCommand>
{
private readonly ISession _session;
public OrderCommandHandlers(ISession session)
{
_session = session;
}
public void Handle(CreateOrderCommand command)
{
var item = new Order(
command.Id,
command.ExpectedVersion,
command.Description,
command.Address,
command.OrderItems.Select(x => new OrderItem()
{
OrderId = x.OrderId,
StoreItemDescription = x.StoreItemDescription,
StoreItemId = x.StoreItemId,
StoreItemUrl = x.StoreItemUrl
}).ToList());
_session.Add(item);
_session.Commit();
}
public void Handle(ChangeOrderAddressCommand command)
{
Order item = _session.Get<Order>(
command.Id, command.ExpectedVersion);
item.ChangeAddress(command.NewAddress);
_session.Commit();
}
public void Handle(DeleteOrderCommand command)
{
Order item = _session.Get<Order>(
command.Id, command.ExpectedVersion);
item.Delete();
_session.Commit();
}
}
好了,精神焕发,准备摇滚......所以回到命令被处理时它们做了什么。
- 如果聚合是新的,将调用 IOC 注入的
ISession.Add()
方法,然后调用ISession.Commit()
- 如果聚合不是新的,将调用 IOC 注入的
ISession.Get(..)
方法。
你应该仔细研究一下上面的代码,看看它是如何详细工作的。我现在想向你展示的是,当我们处理一个现有聚合的命令时会发生什么。让我们以 DeleteOrderCommand
为例。
聚合应用 (Aggregate Apply)
那么让我们跟随 DeleteOrderCommand
的流程,到目前为止它给了我们一个聚合,我们在该聚合上调用了 Order
聚合的 Delete()
方法。让我们看看那个方法吧。
这是 Order 聚合
public class Order : AggregateRoot
{
private string description;
private string address;
private bool isDeleted;
private List<OrderItem> orderItems;
private void Apply(OrderCreatedEvent e)
{
Version = e.Version;
description = e.Description;
address = e.Address;
isDeleted = false;
orderItems = e.OrderItems;
}
private void Apply(OrderDeletedEvent e)
{
Version = e.Version;
isDeleted = true;
}
public void Delete()
{
ApplyChange(new OrderDeletedEvent(Id, Version));
}
private void Apply(OrderAddressChangedEvent e)
{
Version = e.Version;
address = e.NewOrderAddress;
}
public void ChangeAddress(string newAddress)
{
if (string.IsNullOrEmpty(newAddress))
throw new ArgumentException("newAddress");
ApplyChange(new OrderAddressChangedEvent(Id,
newAddress,Version));
}
private Order() { }
public Order(
Guid id,
int version,
string description,
string address,
List<OrderItem> orderItems
)
{
Id = id;
ApplyChange(new OrderCreatedEvent(id, description,
address, orderItems, version));
}
}
可以看到,Apply(..)
方法有多个重载。基本上每个命令动作都有一个。我们将继续跟随 .Delete()
方法。它只是简单地调用 ApplyChange(new OrderDeletedEvent(Id, Version));
要理解这个 ApplyChange(..)
方法在做什么,我们需要进入 CQRSlite 框架的 AggregateRoot
代码。那么让我们看看它。
如下所示
public abstract class AggregateRoot
{
private readonly List<IEvent> _changes = new List<IEvent>();
public Guid Id { get; protected set; }
public int Version { get; protected set; }
public IEnumerable<IEvent> GetUncommittedChanges()
{
lock (_changes)
{
return _changes.ToArray();
}
}
public void MarkChangesAsCommitted()
{
lock(_changes)
{
Version = Version + _changes.Count;
_changes.Clear();
}
}
public void LoadFromHistory(IEnumerable<IEvent> history)
{
foreach (var e in history)
{
if (e.Version != Version + 1)
throw new EventsOutOfOrderException(e.Id);
ApplyChange(e, false);
}
}
protected void ApplyChange(IEvent @event)
{
ApplyChange(@event, true);
}
private void ApplyChange(IEvent @event, bool isNew)
{
lock (_changes)
{
this.AsDynamic().Apply(@event);
if (isNew)
{
_changes.Add(@event);
}
else
{
Id = @event.Id;
Version++;
}
}
}
}
可以看到,这个基类负责管理特定聚合的事件。这包括从历史记录中加载它们,以及计算出哪些已应用的事件是新的(对于当前会话而言)并需要应用到事件存储中。
当我们调用 ApplyChange(..)
方法时,这将导致实际聚合的针对正确事件的 Apply(..)
重载方法被调用。一旦事件在聚合上被重放,聚合就可以根据已应用的事件更改其唯一的内部状态。
例如,这是 Order
聚合的 Apply(OrderDeletedEvent e)
方法,从中可以看到 Order
聚合的内部状态被改变了。
private void Apply(OrderDeletedEvent e)
{
Version = e.Version;
isDeleted = true;
}
这个工作流程相当复杂,所以你可能需要再读一遍(甚至多遍)上面的内容,才能有一个正确的理解
事件存储
CQRSlite 框架包含了以下用于事件存储的接口。
public interface IEventStore
{
void Save(IEvent @event);
IEnumerable<IEvent> Get(Guid aggregateId, int fromVersion);
}
因此,我们需要创建一个该接口的实现,来处理领域模型产生/需要的事件的保存和获取(以应用到聚合上)。
因为我想让这个演示应用的依赖尽可能少,所以我选择了一个内存事件存储,但要换成另一个应该不会太难。有一些不错的选择,比如
- NEventStore,它为相当多的数据库(NoSQL 和关系型)提供了适配器
- GetEventStore (Greg Young 是这个项目的合伙人之一)
不管怎样,就像我说的,这个演示应用包含一个内存事件存储,如下所示
public class InMemoryEventStore : IEventStore
{
private readonly Dictionary<Guid, List<IEvent>> _inMemoryDB =
new Dictionary<Guid, List<IEvent>>();
public IEnumerable<IEvent> Get(Guid aggregateId, int fromVersion)
{
List<IEvent> events;
_inMemoryDB.TryGetValue(aggregateId, out events);
return events != null
? events.Where(x => x.Version > fromVersion)
: new List<IEvent>();
}
public void Save(IEvent @event)
{
List<IEvent> list;
_inMemoryDB.TryGetValue(@event.Id, out list);
if (list == null)
{
list = new List<IEvent>();
_inMemoryDB.Add(@event.Id, list);
}
list.Add(@event);
}
}
希望那里没有什么太吓人的东西,它本质上是对一个 Dictionary<TKey,TValue>
的包装,所以我认为它没问题。
读模型
本节讨论读模型。当新事件被存储在事件存储中,并通过 CQRSlite 框架的 Repository
类在领域模型内部发布后,读模型就会发挥作用。
事件处理程序
任何在当前会话中存储并最终持久化到事件存储的事件,也会使用一个简单的内部进程发布器在领域模型内部发布。然后会有事件处理器对这些已发布的事件进行操作。事件处理器的任务是更新读模型。
如果你使用的是同步调用(即调用一个命令然后能够直接从读模型中读取),到这里我们就完成了。但由于我选择创建一个完全异步的系统(只是为了看看兔子洞有多深),事情到这里还没有结束。
那么继续……一旦读模型更新了,我们还需要告诉读模型的任何消费者,某些东西已经改变了。对于演示应用来说,这是事件处理器的工作。
让我们看看实现这一点的各个部分。
已发布事件的来源
在 CQRSlite 框架中,常规的 IRepository
实现代码中有以下代码,可以看到有一行代码会发布任何保存到事件存储的事件(见这行 _publisher.Publish(@event)
)。我们早些时候在讨论写模型/命令端时看到过这个。这是传入写入和基于持久化事件更新读模型之间的交叉点(记住,演示应用使用的是 CQRS + 事件溯源)。
public void Save<T>(T aggregate, int? expectedVersion = null)
where T : AggregateRoot
{
if (expectedVersion != null && _eventStore.Get(
aggregate.Id, expectedVersion.Value).Any())
throw new ConcurrencyException(aggregate.Id);
var i = 0;
foreach (var @event in aggregate.GetUncommittedChanges())
{
if (@event.Id == Guid.Empty)
@event.Id = aggregate.Id;
if (@event.Id == Guid.Empty)
throw new AggregateOrEventMissingIdException(
aggregate.GetType(), @event.GetType());
i++;
@event.Version = aggregate.Version + i;
@event.TimeStamp = DateTimeOffset.UtcNow;
_eventStore.Save(@event);
_publisher.Publish(@event);
}
aggregate.MarkChangesAsCommitted();
}
IEventPublisher
在 CQRSlite 框架的常规 IRepository
中对 IEventPublisher.Publish(..)
的调用,应该会导致一个处理该已发布事件的事件处理器被调用。 CQRSlite 框架有一个 IEventPublisher
的实现,但我选择自己实现一个(因为我想使用 async/await
),如下所示。 CQRSlite 框架一个非常好的地方是它的可插拔性非常强。也就是说,如果你不喜欢默认实现,直接换掉就行。
下面的代码是一个非常简单的实现,它试图为传入的事件找到正确的 IBusEventHandler
,并调用事件处理器的 Handle
方法。IBusEventHandler
数组由一个 IOC 容器(Castle)提供。
public class BusEventPublisher : IEventPublisher
{
private readonly IBusEventHandler[] _handlers;
private Dictionary<Type,MethodInfo> methodLookups =
new Dictionary<Type, MethodInfo>();
public BusEventPublisher(IBusEventHandler[] handlers)
{
_handlers = handlers;
foreach (var handler in _handlers)
{
var meth = (from m in handler.GetType()
.GetMethods(BindingFlags.Public | BindingFlags.Instance)
let prms = m.GetParameters()
where prms.Count() == 1 && m.Name.Contains("Handle")
select new
{
EventType = prms.First().ParameterType,
Method = m
}).FirstOrDefault();
if (meth != null)
{
methodLookups.Add(meth.EventType, meth.Method);
}
}
}
public void Publish<T>(T @event) where T : IEvent
{
var theHandler = _handlers.SingleOrDefault(
x => x.HandlerType == @event.GetType());
if (theHandler == null)
throw new BusinessLogicException(
string.Format("Handler for {0} could not be found",
@event.GetType().Name));
Task.Run(() =>
{
methodLookups[@event.GetType()].Invoke(
theHandler, new[] {(object) @event});
}).Wait();
}
}
典型的事件处理器实现
让我们在这里稍作停顿……我们进行到哪里了?
我们已经看到了需要被处理的新领域事件的来源,这是由 CQRSlite 的常规 IRepository
实现完成的。
我们也看到了这些事件是如何在领域模型内部发布的,这是由 CQRSlite 的 IEventPublisher
实现完成的。
那么事件处理器是什么样子的,它对传入的事件做了什么?
一个典型的事件处理器看起来像这样,从下面的代码中可以看到,一个典型的事件处理器会接受以下依赖项
IReadModelRepository
:允许事件处理器对读模型进行更改IInterProcessBus
:允许领域模型向任何读模型消费者进程广播读模型更改通知(RabbitMQ 用于此目的,显然消费者需要一些 RabbitMQ 订阅者代码来实现这一点)
public class OrderCreatedEventHandler : IBusEventHandler<OrderCreatedEvent>
{
private readonly IReadModelRepository readModelRepository;
private readonly IInterProcessBus interProcessBus;
public OrderCreatedEventHandler(
IReadModelRepository readModelRepository,
IInterProcessBus interProcessBus)
{
this.readModelRepository = readModelRepository;
this.interProcessBus = interProcessBus;
}
public Type HandlerType
{
get { return typeof (OrderCreatedEvent); }
}
public async void Handle(OrderCreatedEvent orderCreatedEvent)
{
await readModelRepository.AddOrder(new ReadModel.Models.Order()
{
OrderId = orderCreatedEvent.Id,
Address = orderCreatedEvent.Address,
Description = orderCreatedEvent.Description,
Version = orderCreatedEvent.Version,
OrderItems = orderCreatedEvent.OrderItems.Select(x =>
new ReadModel.Models.OrderItem()
{
OrderId = x.OrderId,
StoreItemId = x.StoreItemId,
StoreItemDescription = x.StoreItemDescription,
StoreItemUrl = x.StoreItemUrl
}).ToList()
});
interProcessBus.SendMessage("OrderCreatedEvent");
}
}
万一还不够清楚,事件处理器将被用来根据传入的领域事件更新读模型中需要发生的任何变化。正如我已经说过的,我选择了一个完全异步的例子,所以演示应用的代码需要通知读模型的消费者关于读模型的变更。下面将对此进行更详细的讨论。
数据库
对于读模型,我选择使用 NoSQL 数据库。然而,这也可以很容易地是一个关系型数据库。不过,如果你确实选择走传统关系型数据库的路线,你应该考虑如何对事件中包含的数据进行非规范化,这样 UI(或读模型的其他消费者)就能通过查询尽可能少的表来获得他们需要的一切。理想情况下,真实模型应该将 UI 中特定视图(或者如果某种报表软件是读模型消费者的话,就是一个报表)所需的一切都放在一个单独的表中。这个表中可能存在重复的数据,但这没关系,我们在这里关心的是速度,以只读格式尽快获得我们需要的一切。
如果你确实选择使用传统的关系型数据库,有一件事你可能会发现很有帮助,那就是某种投影库。GitHub 上有一个很好的库叫做 Projac。它允许你接收某种类型 T
并将其投影成一些 SQL 代码。
不过就像我说的,我正在为读模型使用一个 NoSQL 数据库,即 RavenDB 嵌入式版,我选择它的原因有两个
- 我需要的读模型非常适合文档数据库
- 通过使用 RavenDB 嵌入式版本,我可以直接通过 NuGet 安装它,这样就不会给你们增加额外的负担
演示应用的读模型是通过一个非常简单的仓库实现的。我本可以对此大做文章,创建一个工作单元的实现(因为 RavenDB 基于 NHibernate 的 ISession
概念,事实上我有那部分代码,如果有人感兴趣可以告诉我),这并不难,但为了简洁起见,我决定让事情尽可能简单。
总之,这是读模型仓库
public interface IReadModelRepository
{
Task<bool> CreateFreshDb();
Task<bool> SeedProducts();
Task<List<T>> GetAll<T>();
Task<bool> AddOrder(Order order);
Task<bool> DeleteOrder(Guid orderId);
Task<bool> UpdateOrderAddress(Guid orderId, string newAddress, int version);
Task<Order> GetOrder(Guid orderId);
}
public class ReadModelRepository : IReadModelRepository
{
private IDocumentStore documentStore = null;
private string dataDir = @"C:\temp\RavenDb";
public ReadModelRepository()
{
}
public async Task<bool> CreateFreshDb()
{
documentStore = new EmbeddableDocumentStore
{
DataDirectory = dataDir,
UseEmbeddedHttpServer = true
};
documentStore.Initialize();
//Add order Index
if (documentStore.DatabaseCommands.GetIndex("Order/ById") == null)
{
documentStore.DatabaseCommands.PutIndex(
"Order/ById",
new IndexDefinitionBuilder<Order>
{
Map = ords => from order in ords
select new { Id = order.Id }
});
}
var storeItems = await this.GetAll<StoreItem>();
if (!storeItems.Any())
{
await SeedProducts();
}
await DeleteAllOrders();
return true;
}
public Task<bool> SeedProducts()
{
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
CreateStoreItem(session,"RatGood.jpg","Rat God");
CreateStoreItem(session, "NeverBoy.jpg", "Never Boy");
CreateStoreItem(session, "Witcher.jpg", "Witcher");
CreateStoreItem(session, "Eight.jpg", "Eight");
CreateStoreItem(session, "MisterX.jpg", "Mister X");
CreateStoreItem(session, "CaptainMidnight.jpg", "Captain Midnight");
session.SaveChanges();
}
return true;
});
}
public Task<List<T>> GetAll<T>()
{
List<T> items = new List<T>();
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
int start = 0;
while (true)
{
var current = session.Query<T>()
.Customize(x => x.WaitForNonStaleResults())
.Take(1024).Skip(start).ToList();
if (current.Count == 0) break;
start += current.Count;
items.AddRange(current);
}
}
return items;
});
}
public Task<bool> AddOrder(Order order)
{
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
session.Store(order);
session.SaveChanges();
}
return true;
});
}
public Task<bool> DeleteOrder(Guid orderId)
{
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
var order = session.Query<Order>()
.SingleOrDefault(x => x.OrderId == orderId);
session.Delete(order);
session.SaveChanges();
}
return true;
});
}
public Task<bool> UpdateOrderAddress(Guid orderId, string newAddress, int version)
{
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
var order = session.Query<Order>()
.SingleOrDefault(x => x.OrderId == orderId);
order.Address = newAddress;
order.Version = version;
session.SaveChanges();
}
return true;
});
}
public Task<Order> GetOrder(Guid orderId)
{
return Task.Run(() =>
{
using (IDocumentSession session = documentStore.OpenSession())
{
return session.Query<Order>()
.SingleOrDefault(x => x.OrderId == orderId);
}
});
}
private void CreateStoreItem(IDocumentSession session, string imageUrl,
string description)
{
StoreItem newStoreItem = new StoreItem
{
StoreItemId = Guid.NewGuid(),
ImageUrl = imageUrl,
Description = description
};
session.Store(newStoreItem);
}
private async Task<bool> DeleteAllOrders()
{
await Task.Run(() =>
{
var staleIndexesWaitAction = new Action(() =>
{
while (documentStore.DatabaseCommands.GetStatistics()
.StaleIndexes.Length != 0)
{
Thread.Sleep(10);
}
});
staleIndexesWaitAction.Invoke();
documentStore.DatabaseCommands
.DeleteByIndex("Order/ById", new IndexQuery());
staleIndexesWaitAction.Invoke();
});
return true;
}
}
通知 UI 变更
正如我在这篇文章中多次提到的,我在这篇文章中做出的决定是采用一个完全异步的系统,只是为了看看在现实生活中会遇到什么样的困难(我知道我有点自虐)。因此,我们需要一种方法来通知读模型的消费者(在演示应用中是 UI),读模型发生了变化,并且读模型的消费者应该根据这个传入的通知刷新其数据(或者它也可以选择不刷新,这由消费者决定)。
我决定使用 RabbitMQ 来完成这项工作。你也可以使用其他技术,但我选择了 RabbitMQ。
我选择发送一个简单的通知消息(事件名称作为 string
),而不是某个 DTO。这样,消费者可以决定他们需要从读模型中读取什么。
显然,从读模型到读模型消费者的事件通信有两个部分,即进程间通知的生产者/消费者。我们将在这里分别看看这两个部分。
进程间总线(生产者)
这通常会在领域模型中事件处理器的末尾被调用,就在读模型更新之后。正如我所说,对我而言,这是一个简单的字符串,说明了刚刚改变了读模型的事件类型。这里没有太多可说的,我们只是使用 RabbitMQ 在一个队列上发布一个字符串(刚刚改变了读模型的事件的名称)。
public class InterProcessBus : IInterProcessBus
{
private readonly string busName;
private readonly string connectionString;
public InterProcessBus()
{
this.busName = "InterProcessBus";
this.connectionString = ConfigurationManager.AppSettings["RabbitMqHost"];
}
public void SendMessage(string message)
{
var factory = new ConnectionFactory() { HostName = connectionString };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var bytes = Encoding.ASCII.GetBytes(message);
channel.ExchangeDeclare(busName, "fanout");
channel.BasicPublish(busName, string.Empty, null, bytes);
}
}
}
}
那么这就是读模型(领域模型)方面的情况,但是读模型的消费者呢?正如我所说,在演示应用中,这是一个 UI。那么让我们接下来看看那是什么样子的。
UI 如何读取读模型(消费者)
第一部分是使用一个 RabbitMQ 订阅者来监听来自 RabbitMQ 发布者的消息。这在下面的 InterProcessBusSubscriber
代码中有所展示
public class InterProcessBusSubscriber : IInterProcessBusSubscriber, IDisposable
{
private readonly string busName;
private readonly string connectionString;
private CancellationTokenSource cancellationToken;
private Task workerTask;
private Subject<string> eventsSubject = new Subject<string>();
public InterProcessBusSubscriber()
{
this.busName = "InterProcessBus";
this.connectionString =
ConfigurationManager.AppSettings["RabbitMqHost"];
StartMessageListener();
}
private void StartMessageListener()
{
cancellationToken = new CancellationTokenSource();
workerTask = Task.Factory.StartNew(
() => ListenForMessage(), cancellationToken.Token);
}
public void Dispose()
{
CancelWorkerTask();
}
private void CancelWorkerTask()
{
if (workerTask == null) return;
cancellationToken.Cancel();
workerTask.Wait();
workerTask.Dispose();
}
private void ListenForMessage()
{
var factory = new ConnectionFactory() { HostName = connectionString };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(busName, "fanout");
bool durable = true;
bool exclusive = false;
bool autoDelete = false;
var queue = channel.QueueDeclare(
Assembly.GetEntryAssembly().GetName().Name,
durable, exclusive, autoDelete, null);
channel.QueueBind(queue.QueueName, busName, string.Empty);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue.QueueName, false, string.Empty, consumer);
while (true)
{
if (cancellationToken.IsCancellationRequested)
break;
BasicDeliverEventArgs ea;
consumer.Queue.Dequeue(10, out ea);
if (ea == null)
continue;
var message = Encoding.ASCII.GetString(ea.Body);
Task.Run(async () =>
{
await Task.Run(() =>
{
eventsSubject.OnNext(message);
});
});
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
public IObservable<string> GetEventStream()
{
return eventsSubject.AsObservable();
}
}
我决定从这个 RabbitMQ 订阅器中暴露一个 RX 的 IObservable<string>
。这样做很好,因为这意味着任何使用这个 RabbitMQ 订阅器的 ViewModel 也可以(使用 RX)订阅这个 IObservable<string>
,在其中可以选择使用任何标准的 RX/LINQ 操作符,这非常有用。下面是这个过程如何工作的一个例子。
public OrdersViewModel(
IInterProcessBusSubscriber interProcessBusSubscriber,
OrderServiceInvoker orderServiceInvoker,
IMessageBoxService messageBoxService)
{
orderEvents = new List<string>()
{
"OrderCreatedEvent","OrderAddressChangedEvent","OrderDeletedEvent"
};
growlNotifications.Top = SystemParameters.WorkArea.Top + topOffset;
growlNotifications.Left = SystemParameters.WorkArea.Left +
SystemParameters.WorkArea.Width - leftOffset;
var stream = interProcessBusSubscriber.GetEventStream();
disposables.Add(stream.Where(x => orderEvents.Contains(x))
.Subscribe(async x =>
{
var newOrders = await orderServiceInvoker.CallService(service =>
service.GetAllOrdersAsync());
this.Orders = new List<OrderViewModel>(
newOrders.Select(ord => new OrderViewModel(
ord, messageBoxService, orderServiceInvoker)));
this.HasOrders = Orders.Any();
if (this.HasOrders)
{
growlNotifications.AddNotification(new Notification
{
Title = "Orders changed",
ImageUrl = "pack://application:,,,/Images/metroInfo.png",
Message =
"New/modified orders have been obtained from the ReadModel. " +
"Click on the right hand side panel to see them"
});
}
})
);
}
}
在这段 ViewModel 代码中有几个值得注意的地方。
- 通过使用 RX,我们能够只对我们当前所在 ViewModel 感兴趣的事件做出反应
- 我们可以使用事件名称来决定在 ViewModel 中执行什么操作。在这种情况下,所有的事件都只是简单地导致 ViewModel 通过再次从读模型中读取来刷新订单列表(通过一个简单的 WCF 调用
orderServiceInvoker.CallService(service => service.GetAllOrdersAsync())
),这个调用只是使用了我们之前看到的ReadModelRepository
。由于我希望在应用运行时清理 RavenDB,我需要一个地方来执行这个操作,所以不得不将对读模型的访问放在一个服务(WCF)层调用之后。如果我们使用的是关系型数据库,我们可以简单地让客户端直接调用一些 ADO / 微型 ORM 代码。
这基本上就是演示应用在读模型发生变化后如何更新的。
进一步的考量
以下两点是我知道但没有在演示中实现的事情
- 事件版本控制
- 快照
话虽如此,我并不是一个完全的混蛋,肯定会向您解释这些项目,并且还会向您指出一些很好的资源,这些资源会更详细地向您介绍它们。
事件版本控制
当使用事件溯源时,您将事件存储在事件存储中。这个事件存储只能插入新事件和读取历史事件,仅此而已。因此,当您更改领域逻辑以及属于此行为的事件时,您不能回到事件存储中对所有属于相同行为的历史事件进行一次性转换。事件存储需要保持完整,这是它的强大之处之一。
所以你创建了一个原始事件的新版本,这个新版本携带的信息比原始版本更多或更少。
http://cre8ivethought.com/blog/2010/02/09/cqrs-event-versioning
Mark 向你展示了一种在需要多个事件版本时可行的方法。这是一个很好的起点。
快照
事件溯源中可能变得有问题的一个部分是那些生命周期长而复杂的对象。在几乎所有情况下,一个对象的生命周期相对较短——也许只有十几个事件左右。但有些情况下,一个对象可能会存在非常非常长的时间并且被频繁使用。Greg 在他的一次演讲中举了一个例子,一个对象每天会产生数千个新事件。加载这个对象可能会很昂贵,因为你必须加载自该对象创建以来的所有状态转换。
解决这个问题的一个捷径是快照的概念。你向聚合发送某种快照命令消息,它会产生一个包含其所有状态的快照消息——有点像将领域对象转换为 DTO,只不过这是从领域对象内部而不是外部产生的。
一旦我们有了这个快照消息,我们就将它持久化。然后,当从存储中加载对象时,我们加载直到并包括最后一个快照的所有事件。这使我们能够将对象恢复到某个状态,然后“重放”自最后一个快照以来的所有事件。
http://blog.jonathanoliver.com/event-sourcing-and-snapshots/
正如我已经说过的,这并不是我在演示代码库中实现的功能,但这是 CQRSlite 框架中所支持的,并且不需要太多努力就可以添加。你只需要做以下事情
- 切换到使用
SnapshotAggregateRoot
而不是AggregateRoot
类型,并实现一些额外的方法 - 切换为使用 SnapshotRepository 而不是常规的 Repository
在其中一个测试用例中有一些快照工作的示例,如果你也想包含快照功能,那会是一个很好的起点。
就这些
这次我想说的就这么多了。我真心希望这对你们中的一些人有用。通过想要写一篇关于这个主题的文章,我确实学到了很多。我坚信,如果你能学到足以向别人解释某件事,那么你已经对它有了很好的洞察力。当然,我可能说的是一派胡言,但我当然希望不是,并且希望任何可能偶然发现这篇文章并阅读它的 DDD/CQRS 实践专家能够纠正我任何完全错误的地方。
话虽如此,我在这上面花了不少时间,所以我很乐观(也许是盲目乐观)地认为我做得还算不错。
无论如何,如果你喜欢你所看到的,欢迎留言,或者更好的是投上一票,那将是太棒了。
致谢
特别感谢那支才华横溢的开发团队,他们告诉我没有所谓的金弹。实际上应该是银弹。接下来他们会告诉我没有澳洲神兽、天空豹或者飞翔的水獭。天哪。