使用Azure Event Grid和Durable Functions实现“无服务器”CQRS





5.00/5 (11投票s)
使用Azure Event Grid和Durable Functions作为无服务器CQRS架构的骨干
引言
Serverless(无服务器)是当今云计算领域的热门话题,因为它提供了纯粹基于需求的计费模式,并且没有任何固定成本。这将使您的企业能够根据需求进行扩展或缩减,从而带来更精简、更专注于业务的 IT 预算。
当然,没有任何系统是真正无服务器的。在 Azure Event Grid 以及大多数无服务器云解决方案中,与基于虚拟机架构相比,其区别在于云提供商负责您的应用程序的托管、启动和扩展。这种共享基础设施模型的成本节省也传递给用户,使其成为构建大规模系统的最经济高效的方式。
背景
Event Grid
Azure Event Grid 是微软在其 Azure 云中提供的一组互联技术,旨在促进事件驱动的架构。
它包含事件触发器,这些触发器可以设置为在状态更改事件(例如新文件被加载到 Azure Blob 或新行被添加到 Azure 表中)发生时触发,或者在时间触发时触发,或者实际上是从外部系统(例如 IoT 传感器等)接收。
这些触发器通过 Event Grid 连接,将事件从触发器路由到目的地,目的地可以是 Azure Function、Logic App、Azure Automation 甚至是 Webhook 事件。
CQRS 和事件溯源
CQRS 是一种应用程序设计模式(或架构),它与单一模型架构(如 MVC 或 MVVM)不同之处在于,负责更改系统状态的部分(命令端)与负责获取系统状态的部分(查询端)完全分开。
事件溯源是一种数据架构,它颠覆了我们存储数据的方式。我们不再存储任何给定对象的当前状态并更新该状态,而是存储该对象发生的每一次更改的完整历史记录,并允许我们通过对该事件历史记录运行称为投影的代码来推导出对象的当前状态。
示例系统:农场管理应用程序
CQRS 更传统的示例系统通常基于高度交易性的业务,例如金融服务、零售银行或保险。然而,为了表明该架构可以用于这些狭窄范围之外的领域,我将使用一个虚构的混合用途农场管理系统。
分析阶段
在进行基于事件的系统的设计的早期阶段,最佳方法是使用一种称为 事件风暴 的分析工具。这是一种协作式领域发现练习,其中一小组领域专家和一个或多个引导者使用便利贴和非常长的设计空间来映射领域内所有感兴趣的事件、事件的含义以及触发事件的原因。通常需要进行多次事件风暴会议,以发现领域的完整范围,然后再进行任何详细设计。
同样,在事件风暴会议(以及其中咨询的专家)中定义系统的领域边界也是一个好主意。
这允许您在您能够控制的系统部分与您无法控制的外部因素(如政府、监管机构、数据提供商等)之间建立一个共享层(本质上是一个接口)。通过这样做,您可以允许您的系统在受控环境中进行演进,从而节省大量精力。
聚合体和事件设计
使用 CQRS 设计器,下一步是设置领域中的聚合体以及已识别的可能发生在这些聚合体上的事件。对于每个事件,我们添加尽可能多的与之相关的属性(即使我们还不确定这些属性是否会有用,但捕获不需要的数据总比需要却没捕获的数据要好)。
在我们的农场管理系统中,为我们设计此事件宇宙选择的聚合体是 cow
、field
、barn
和 tractor
。对于每种聚合体类型,我们需要决定一个唯一标识符,我们将使用它来标识该聚合体类型的单个实例。
在某些情况下,我们可以重用现有的唯一标识符 - 例如,牛将具有畜群编号和耳标号的唯一组合,而每辆拖拉机将具有唯一的底盘号。在其他情况下,我们可以决定自己的唯一标识符,例如增量编号或 GUID。只要标识符保证唯一且“查找”步骤不是太繁重,它实际上并没有太大区别。
下一步是为进入系统的每个状态更改原因定义一个命令。这些命令可能与单个聚合体有关,但也可能与已知组中的多个实例有关,或者单个外部命令可能会导致多个聚合体类型的事件发生。例如,AI 检查将适用于一头牛,TB 检查将针对已知一群牛,而金融贷款抵押可能会同时涵盖拖拉机、田地甚至牛。
然后,您可以将系统中的每个事件分解为“我影响了哪些聚合体”以及“我对这些聚合体做了什么”的组合。然后,我们选择受影响聚合体的事件流,并将创建的事件追加到其事件流的末尾。
要从系统中获取数据,我们创建投影。这些是简单的代码片段,它们遍历事件流,并依次决定“我是否关心这个事件”,如果关心,则“我从这个事件中执行什么内部属性更新?”。重要的是要记住,投影是 CQRS 查询端的一部分,因此它本身不应导致任何状态更改。
在设计了投影之后,您可以将它们组合成查询,这些查询定义了要为其运行投影的聚合体,然后定义要应用于返回数据的函数(以类似 map-reduce 的方式)。查询可以通过组合来连接,以生成报告样式的输出。
您还可以拥有内部触发的投影,这些投影会更新读取缓存中的数据,从而决定聚合体实例是否属于设计的业务分组。我称之为分类器以区分它们在系统中的作用,但它们在功能上与其他投影相同,只是结果是一个单一的“是/否”值。
在 Azure Event Grid 上的实现
1) 命令
第一步是为您的域创建一个新的 Azure 资源组。您可以使用 Azure 门户或 Azure 命令行界面来完成此操作。
或
az group create --name eventFarmyardDemo --location westus2
对于每个命令,我们需要决定命令如何进入我们的系统。即使是一个小型农场,数据输入范围也很广泛 - 每笔交易提交的表格和收据、手动数据录入、IoT 传感器或自动数据捕获系统等等。对于每一个,我们需要从 Blob 存储(本质上是基于云的文件系统)、Event Hubs 或通过 自定义主题(监听其他事件)列表中选择最佳事件发布者来触发我们域中的命令。
对于传感器数据和自动数据捕获,通常使用 Event Hubs,但在大多数其他情况下,我会使用 Blob 存储,将新命令实现为写入特定“收件箱”命令位置的文件,其中包含执行该命令所需的数据,并订阅 Microsoft.Storage.BlobCreated
事件。
无论哪种情况,我们都希望触发命令的操作将命令事件推送到事件网格。为了使用我们的领域知识而不是隐式命令(添加到 [x] 的新文件),我们希望为命令定义自定义订阅。这还在触发器和我们域中的命令本身之间创建了一个共享层,允许触发器在以后更改(例如,如果手动数据输入过程自动化)。
这里的想法是,如果您更改数据进入系统的方式,您只需将新触发器重定向到现有的业务命令,而无需做任何其他事情。
2) 命令处理程序
命令处理程序必须在特定命令被触发时做出响应,并执行该命令所需的任何操作,直到将事件写入受影响聚合体的事件流。
实现命令处理程序的一个好方法是使用 Azure Logic App。
您可以使用 Event Grid 自定义主题作为命令处理程序的触发器。
命令处理程序必须验证命令是否允许继续执行,如果允许,则必须发出命令所需的所有领域事件。然后,这些事件(使用另一个自定义主题)将传递给事件处理程序。
请注意,主题名称必须是全局唯一的,因为它由 DNS 条目表示,因此对于实际应用程序,我强烈建议在每个自定义主题中包含域名和可能的应用程序名称。
或者,如果您希望从已编译的 Azure Function App 中响应 Event Grid 主题,您将需要使用 EventGridTrigger 属性和一个 EventGrid
参数来接收来自事件网格的数据。
[FunctionName("OnCreateLeagueCommand")]
public static async void OnCreateLeagueCommand(
[EventGridTrigger] EventGridEvent eventGridEvent,
TraceWriter log)
{
// The payload of the event is in eventGridEvent.Data
}
然后,您将使用 Azure 门户 将函数连接到事件网格主题(作为订阅)。
每个命令处理程序都分解为一系列步骤,每个步骤都由一个独立执行的 Azure Function 支持,该函数将其状态写入为该命令创建的事件流。这允许使用一些并行处理来处理命令,或者在阻止命令完成的问题得到解决后恢复已停滞的命令。
有一个“命令状态”投影,它返回命令的当前状态,并且每个步骤处理函数都可以执行此投影,以测试命令是否处于可以运行的状态。
/// <summary>
/// The different states a command can be in - to prevent them being processed
/// in an invalid state
/// </summary>>>
public enum CommandState
{
/// <summary>>
/// A new command that has just been created
/// </summary>
Created = 0,
/// <summary>
/// A command that has been validated and can proceed
/// </summary>
Validated = 1,
/// <summary>
/// A command marked as invalid
/// </summary>
Invalid = 2,
/// <summary>
/// A multi-step command can be valid but not yet complete
/// so a status of "In progress" would indicate that
/// </summary>
InProgress = 3,
/// <summary>
/// A command marked as complete
/// </summary>
Completed =4
}
在顶层,一个 Azure Function 响应事件网格主题并启动一个 Azure Durable Functions 编排器函数。
// Get the command request details out of the event grid data request
var jsondata = JsonConvert.SerializeObject(eventGridEvent.Data);
CommandRequest<Create_New_League_Definition> cmdRequest = null;
if (!string.IsNullOrWhiteSpace(jsondata))
{
cmdRequest = JsonConvert.DeserializeObject
<CommandRequest<Create_New_League_Definition>>(jsondata);
}
if (null != cmdRequest)
{
// Create a new command
// Make sure the command has a new identifier
if (cmdRequest.CommandUniqueIdentifier == Guid.Empty)
{
cmdRequest.CommandUniqueIdentifier = Guid.NewGuid();
}
// Using Azure Durable functions to do the command chaining
string instanceId =
await createLeagueCommandHandlerOrchestrationClient.StartNewAsync
("OnCreateLeagueCommandHandlerOrchestrator", cmdRequest);
log.LogInformation
($"Run OnCreateLeagueCommandHandlerOrchestrator orchestration
with ID = '{instanceId}'.");
}
这会将每个步骤函数作为 Durable Function Activity 调用。
[ApplicationName("The Long Run")]
[DomainName("Leagues")]
[AggregateRoot("League")]
[CommandName("Create League")]
[FunctionName("OnCreateLeagueCommandHandlerOrchestrator")]
public static async Task OnCreateLeagueCommandHandlerOrchestrator
([OrchestrationTrigger] DurableOrchestrationContext context,
Microsoft.Extensions.Logging.ILogger log)
{
CommandRequest<Create_New_League_Definition&g; cmdRequest =
context.GetInput<CommandRequest<Create_New_League_Definition>>();
if (null != cmdRequest)
{
ActivityResponse resp =
await context.CallActivityAsync<ActivityResponse>
("CreateLeagueCommandLogParametersActivity", cmdRequest);
#region Logging
if (null != log)
{
if (null != resp)
{
log.LogInformation($"{resp.FunctionName}
complete: {resp.Message } ");
}
}
#endregion
if (null != resp)
{
context.SetCustomStatus(resp);
}
// - - - 8<- - - - - - - - - -
每个 Activity 只完成其链中的一步,然后返回。有一个标准的响应类 ActivityResponse
,可用于传递有关所调用 Activity 健康状况的信息。我更喜欢这样做,而不是依赖异常来处理业务逻辑,这样异常日志就只包含实际的基础设施问题。
[ApplicationName("The Long Run")]
[DomainName("Leagues")]
[AggregateRoot("League")]
[CommandName("Create League")]
[FunctionName("CreateLeagueCommandValidationAction")]
public static async Task<bool> CreateLeagueCommandValidationAction
([ActivityTrigger] CommandRequest<Create_New_League_Definition> cmdRequest,
ILogger log)
{
if (null != log)
{
log.LogInformation($"CreateLeagueCommandValidationAction
called for command : {cmdRequest.CommandUniqueIdentifier}");
}
return await ValidateCreateLeagueCommand
(cmdRequest.CommandUniqueIdentifier.ToString(), log);
}
此外,每个步骤还会将其状态写入该唯一命令的基础事件流,并且可以独立查询该事件流。这允许应用程序实时显示命令的处理过程,并且在出现业务问题时可以提供更好的调试体验,因为您可以复制命令执行的基础事件流,以查看执行了哪些步骤以及结果是什么。
public static async Task LogCommandValidationError(Guid commandGuid,
string CommandName,
bool fatal,
string errorMessage)
{
EventStream commandEvents = new EventStream(@"Command",
CommandName,
commandGuid.ToString());
if (null != commandEvents)
{
await commandEvents.AppendEvent
(new TheLongRun.Common.Events.Command.ValidationErrorOccured
(errorMessage,fatal ));
}
}
虽然 Durable Functions 框架本身创建了一个表来记录执行情况,但我更倾向于将应用程序分开,以便系统事件进入该内置表中,而有业务意义的事件进入它们自己的单独事件流。
3) 持久化事件流
每当发生事件时,我们需要将其持久化到发生该事件的聚合体实例的事件流中。这意味着任何针对该聚合体运行的投影都将在运行时考虑此事件,以推导出聚合体实例的状态。
在此实例中,我们所做的只是找到合适的 AppendBlob
以供聚合体实例使用,并将事件追加到其中 - 在我的例子中,使用了 CQRS on Azure framework code 中的 AppendBlob
类。
在事件持久化后,会引发一个通知事件,以通知任何感兴趣的各方(例如,任何依赖于该实例的投影或分类器),要求它们读取最新事件并相应地更新其状态。
这(与命令处理程序一样)是通过在每个事件持久化后触发一个自定义事件来完成的。*请注意,目前无法(也不建议)在从 Azure 无服务器函数调用的任何代码中使用二进制序列化,因此使用的是 JSON。*
查找正确事件流并将其追加到事件的过程被封装在一个名为 EventStream
的类中。
EventStream tractorEvents = new EventStream(@"Cloud Farm",
"Tractor",
tractorName );
tractorEvents.AppendEvent(new ServicedEvent(DateTime.UTCNow, "Serviced" ));
4) 投影
可以将投影连接到每种可能导致该投影状态更改的事件类型的不同事件主题。这些都实现为每种事件类型的特定投影事件处理程序。当投影处理并处理一个事件时,如果这会导致其状态更改,它将将其当前状态持久化到缓存存储,然后引发一个“投影已更改”通知,其他业务流程和标识符组分类代码可以由此触发。
这同样将使用自定义事件主题来完成,就像事件通知传播事件一样。
如果事件已被处理但投影状态未改变,则无需发布通知。
投影通过一个名为 Projection
的类查找其底层事件流。
Projection getCommandState = new Projection(@"Command", COMMAND_NAME, commandGuid.ToString(), nameof(Command_Summary_Projection));
处理这些事件的底层逻辑保存在投影定义中,并通过调用 Process
方法将其输入到投影中。
Command_Summary_Projection cmdProjection =
new Command_Summary_Projection(log );
getCommandState.Process(cmdProjection);
5) 分类器
分类器是一种特殊的投影类型,可用于确定某个实例是否属于某个命名集合。例如,在上述业务案例中,您可能有一个分类器来根据买入、卖出和转移等事件来定义哪些动物在您的农场。
与投影一样,当分类器被通知运行时,并且该运行导致分类器状态发生更改,它将作为一个自定义事件网格主题发布通知消息。
6) 查询
对于每个定义的可以针对系统运行的查询,都会使用一个特定的自定义主题来请求执行查询。查询附带的有效负载指定了传递给系统的参数,以及用于通知调用者查询完成的通知方法,并且可能传递回结果。
此外,我们应该考虑计算查询名称和参数的哈希值,以便实现查询结果缓存 - 因为理想情况下,如果缓存结果可用,我们永远不想完全处理相同的查询两次。
7) 查询处理程序
查询处理程序必须在特定查询被触发时做出响应,并执行获取给定查询数据所需的任何投影和聚合功能。
支撑查询的 Logic App 的编写方式与命令的 Logic App 相同,但增加了一个步骤,即提供一个通知,告知查询结果可用(以及这些结果的 URI),作为回调到命令实例传入的地址。
这可以通过将查询结果作为 Blob 持久化到 Azure Blob 存储的某个区域来完成,该区域可以被监视,并且写入结果又可以触发任何所需的进一步下游处理。
每个查询都有一个跟踪查询处理进度的事件流。这允许一定程度的并行性(一种项目然后聚合的模式,类似于 Map -> Reduce 模式),其中多个投影运行程序或分类器运行程序实例可以同时运行,并在完成时将结果返回到查询事件流。
有一个投影运行在这个查询事件流上,以决定还有哪些工作需要完成——如果查询已完成——一个 Azure Function 会将结果发布到 ReturnPath
属性中指定的任何返回地址。
这背后的方法与命令处理程序完全相同:启动一个 Durable Function 编排,然后该编排调用 Durable Function Activities 来执行查询的各个步骤。唯一的额外步骤是,运行查询的结果本身被写入该查询执行的底层事件流。这意味着,如果我们以后需要再次获取这些结果,我们可以运行一个投影来获取“查询时”的结果。
安全
Azure 主题可以通过角色或个人进行安全保护——对于业务系统,创建对应于应用程序不同用例的角色,并将个人用户分配给适用于他们的特定角色是很有意义的。这为您提供了对系统数据如何进入系统的不同方式进行细粒度控制,按命令类型进行区分,从而防止滥用。
如果您需要额外的访问控制(例如,为给定用户提供特定命令的特定配额),则需要在命令处理程序中进行编码,并将任何“非法”命令记录到“毒消息”表中进行分析。
可扩展性
在 Azure Event Grid 上构建 CQRS 系统的主要优势之一是它非常容易扩展——添加新的输入源或从现有的已捕获事件流创建新的查询和操作。
如果我们设计并添加一个新的命令(作为一个新的订阅主题),现有的应用程序逻辑不受影响,并且我们只需通过更改事件网格中的路由即可轻松地交换事件处理程序。
添加 DevOps
当命令被 CQRS 系统拒绝或失败时,我们将一条日志记录写入 Blob 存储中一个名为 command-errors 的特定容器。
然后可以设置一个 Logic App 来监视该文件夹,并且每当有新 Blob 写入该位置时,它会使用 Visual Studio 连接器创建一个 bug 记录。(还有 Git 和许多其他缺陷跟踪或应用程序生命周期管理系统的连接器)。
经验教训
在构建这个架构的实际业务示例时(不幸的是,尚未公开),我学到了以下经验。
- 避免在触发器中放置任何业务逻辑(而是使用命令-处理程序架构)。
- 连接 Application Insights 以了解您的业务应用程序的内部情况。
- 警惕领域混杂——您可以在 Event Grid 中创建一个防腐层。
- 小心区分哪些可以并行处理,哪些不能。(如果事情必须按顺序发生,那么让它们通过 Durable Functions 编排 作为函数管道相互触发。)
- 只编写您自己能写出的代码(业务逻辑)——不要自己实现安全、跟踪等功能。
历史
- 2017 年 8 月 22 日:第一个版本 - 想法输出
- 2018 年 3 月 10 日:添加了来自真实世界示例的经验教训
- 2018 年 8 月 19 日:向命令和查询处理机制添加了详细信息
- 2018 年 12 月 9 日:关于 Durable Functions 库如何适应此框架的额外详细信息