65.9K
CodeProject 正在变化。 阅读更多。
Home

Windows Azure 上的 CQRS - 命令端

starIconstarIcon
emptyStarIcon
starIcon
emptyStarIconemptyStarIcon

2.25/5 (4投票s)

2014年1月22日

CPOL

6分钟阅读

viewsIcon

18717

概述如何将配对的 Azure 队列和 Azure 表用于 CQRS 应用程序的命令端

引言

本文展示了一种使用 Windows Azure 存储(特别是队列和表存储)来实现命令查询职责分离(CQRS)架构的命令端的方法。它是用 VB.NET 实现的,但如果 C# 是你选择的语言,它可以非常快速地转换为 C#。
它不涉及架构的查询端以及事件溯源的相关概念,我希望随着此代码所来自的应用程序的构建,能够逐步实现这些内容。

背景

简单来说,CQRS 是一种架构模式,它将命令(执行动作)与查询(获取信息)分离。实际上,这意味着命令端和查询端不需要共享一个公共模型,并且这个垂直分离层通常与命令或查询的定义和实现(在命令处理程序或查询处理程序中)之间的垂直分离层相匹配。

根据我的经验,当您在短时间内与分布式团队开发应用程序时,CQRS 非常有用,因为它减少了“模型争用”的风险,这种风险可能在每个人同时更新模型时发生。它也适用于需要记录应用程序中发生的所有更改的审计跟踪的财务场景。

(如果您不熟悉 CQRS,我建议您从这篇优秀的文章开始,以了解其工作原理和原因。)

定义命令

您可以执行的每种操作都被视为自己的命令,并且每个命令都有一个定义——涵盖命令是什么以及需要哪些附加参数。您需要为系统使用的每种命令类型创建一个独立的类,该类定义命令是什么以及执行命令所需的有效载荷。

例如,如果您有一个基于银行账户的系统,您可能有一个 `OpenAccount` 命令,它包含开设银行账户所需的数据的参数,例如账户所有者、开户分行、货币面额等;而 `DepositFunds` 命令则需要一个账户号码和金额。您还可以有一个完全不需要任何参数的命令。

为了使给定的类在语义上被识别为命令,它派生自标准接口 `ICommandDefinition`。这还添加了两个强制属性——一个用于标识每个命令实例的唯一标识符,以及一个用于识别命令的人类可读名称。
许多实现不使用人类可读的命令名称——您可以选择使用或不使用。

''' <summary>
''' Interface defining what is required of a class that defines a command to be executed
''' </summary>
''' <remarks>
''' Commands can have named parameters that have meaning to the command handler
''' </remarks>
Public Interface ICommandDefinition

    ''' <summary>
    ''' Unique identifier of this command instance
    ''' </summary>
    ''' <remarks>
    ''' This allows commands to be queued and the handler to know 
    ''' if a command has been executed or not (for example as an Event Source)
    ''' </remarks>
    ReadOnly Property InstanceIdentifier As Guid

    ''' <summary>
    ''' The unique name of the command being executed
    ''' </summary>
    ReadOnly Property CommandName As String

End Interface

此时,我们可以继续为每个命令定义具体类——例如,我们的 `OpenAccount` 命令定义可能如下所示

Public Class OpenAccountCommandDefinition
    Implements ICommandDefinition

    Private m_instanceid As Guid = Guid.NewGuid

    Public ReadOnly Property InstanceIdentifier As _
           Guid Implements ICommandDefinition.InstanceIdentifier
        Get
            Return m_instanceid
        End Get
    End Property

    Public Overrides ReadOnly Property CommandName As String
       Get
          Return "Open a new account"
       End Get
    End Property

    'Payload for this command
    Public Property BranchIdentifier As String

    Public Property AccountOwner As String

    Public Property DenominationCurrency As String

End Class

然而,为了将命令定义传递给命令处理程序,我正在使用 Windows Azure 存储——具体来说是一个队列和一个匹配的表。队列附加了命令类型和唯一标识符,匹配的表用于保存命令所需的任何附加有效载荷。
Azure 队列上消息的最大大小是 64KB,但命令可能需要比这更多的有效载荷,因此采用了这种双重方法。

分派命令

将有效载荷放入 Azure 表的第一步是识别命令的有效载荷属性,以便将其保存。这可以通过使用反射或通过拥有一个表示命令参数的类并使用它们的字典来支持有效载荷属性来完成。

创建命令定义后,需要将其分派。命令分发器负责将命令传输到命令处理程序。在 Azure 分发器的情况下,这是一个两步过程——首先将有效载荷保存到 `commands` 表中,然后向队列添加一条消息。执行此操作的(部分)代码片段是

        Dim applicationStorageAccount As CloudStorageAccount
        Dim applicationQueueClient As CloudQueueClient
        Dim applicationTableClient As CloudTableClient

        Public Sub Send(command As ICommandDefinition) Implements ICommandDispatcher.Send

            ' 1) Save the command data in a commands table for the handler to use
            If (applicationTableClient IsNot Nothing) Then
                Dim commandTable As CloudTable = _
                    applicationTableClient.GetTableReference("commandsparameterstable")
                'Table may not exists yet, so create it if it doesn't
                commandTable.CreateIfNotExists()
                Dim cmdRecord As New CommandTableEntity(command)
                '\\ Insert or update the record
                Dim insertOperation As TableOperation = _
                                    TableOperation.InsertOrReplace(cmdRecord)
                Dim insertResult = commandTable.Execute(insertOperation)
            End If

            ' 2) Queue the command to execute
            If (applicationQueueClient IsNot Nothing) Then
                Dim queue As CloudQueue = applicationQueueClient.GetQueueReference_
                                            ("commandsqueue")
                ' Queue may not yet exist so create it is it doesn't
                queue.CreateIfNotExists()
                Dim msg As CloudQueueMessage = New CloudQueueMessage_
                   (command.GetType.Name & "::" & command.InstanceIdentifier.ToString())
                If (msg IsNot Nothing) Then
                    queue.AddMessage(msg)
                End If
            End If

        End Sub

遵循队列和表的命名标准非常重要——所有字母小写且没有标点符号。

为了将命令记录保存到 Azure 表中,它必须放在一个继承自 `TableEntity` 的类中,或者,如果您在后台进行操作,则需要一个继承自 `ITableEntity` 的类。
我没有找到很多继承自 `ITableEntity` 的例子,所以我的如下

   ''' <summary>
    ''' A class for storing a command and its parameters in an azure table store
    ''' </summary>
    ''' <remarks>
    ''' The command type and the instance identifier are the partition 
    ''' and row keys to allow any 
    ''' command handler easily to find its command parameter data
    ''' </remarks>
    Public Class CommandTableEntity
        Implements ITableEntity

        Private m_parameters As Dictionary(Of String, CommandParameter) = _
                                     New Dictionary(Of String, CommandParameter)

        ''' <remarks>
        ''' Set this value to '*' in order to blindly overwrite an entity 
        ''' as part of an update operation. 
        ''' </remarks>
        Public Property ETag As String Implements ITableEntity.ETag

        ''' <summary>
        ''' The property that defines what table partition this command will be stored in
        ''' </summary>
        Public Property PartitionKey As String Implements ITableEntity.PartitionKey

        ''' <summary>
        ''' The property that defines what row this command record will be stored in
        ''' </summary>
        Public Property RowKey As String Implements ITableEntity.RowKey

        Public Property Timestamp As DateTimeOffset Implements ITableEntity.Timestamp

        ''' <summary>
        ''' Has this command been marked as processed
        ''' </summary>
        Public Property Processed As Boolean

        ''' <summary>
        ''' Has this command been marked as in error
        ''' </summary>
        Public Property InError As Boolean

        ''' <summary>
        ''' What handler processed this command
        ''' </summary>
        Public Property ProcessedBy As String

        Public Sub ReadEntity(properties As IDictionary(Of String, EntityProperty), _
          operationContext As OperationContext) Implements ITableEntity.ReadEntity

            For Each propertyPair In properties
                If (propertyPair.Key.Contains("__")) Then
                    'This is a property in the form name[index]...
                    Dim fixedPropertyName As String = _
                              propertyPair.Key.Replace("__", "[").Trim() & "]"
                    Dim parameterName As String = _
                              CommandParameter.GetParameterName(fixedPropertyName)
                    Dim parameterIndex As Integer = _
                              CommandParameter.GetParameterIndex(fixedPropertyName)
                    'Get the command payload from the string
                    Dim parameterPayload As String = propertyPair.Value.StringValue
                    If (Not String.IsNullOrWhiteSpace(parameterPayload)) Then
                        m_parameters.Add(propertyPair.Key, _
                              CommandParameter.Create(parameterPayload))
                    Else
                        '\\ Add a parameter that has no payload/value
                        m_parameters.Add(propertyPair.Key, _
                          CommandParameter.Create(parameterName, parameterIndex))
                    End If
                Else
                    'Named property only...do not persist "ETag", _
                               "Timestamp", "Rowkey" or "PartitionKey" 

                    ' Additional properties for a command 
                    If propertyPair.Key.Equals_
                        ("Processed", StringComparison.OrdinalIgnoreCase) Then
                        If (propertyPair.Value.BooleanValue.HasValue) Then
                            Me.Processed = propertyPair.Value.BooleanValue.Value
                        End If
                    End If

                    If propertyPair.Key.Equals("InError", _
                        StringComparison.OrdinalIgnoreCase) Then
                        If (propertyPair.Value.BooleanValue.HasValue) Then
                            Me.InError = propertyPair.Value.BooleanValue.Value
                        End If
                    End If

                    If propertyPair.Key.Equals("ProcessedBy", _
                        StringComparison.OrdinalIgnoreCase) Then
                        Me.ProcessedBy = propertyPair.Value.StringValue
                    End If

                End If
            Next

        End Sub

        Public Function WriteEntity(operationContext As OperationContext) _
         As IDictionary(Of String, EntityProperty) Implements ITableEntity.WriteEntity

            Dim properties As New Dictionary(Of String, EntityProperty)
            'Skip the ITableEntity properties "ETag", _
            "Timestamp", "Rowkey" or "PartitionKey" 

            'Add all the command parameters...
            For Each param As CommandParameter In m_parameters.Values
                Dim fixedPropertyName As String = _
                CommandParameter.GetParameterKey(param).Replace("[", "__").Replace("]", "")
                properties.Add(fixedPropertyName, New EntityProperty(param.ToString))
            Next

            'Add the other properties
            properties.Add("Processed", New EntityProperty(Me.Processed))
            properties.Add("InError", New EntityProperty(Me.InError))
            properties.Add("ProcessedBy", New EntityProperty(Me.ProcessedBy))

            Return properties

        End Function
    End Class

下一步——从队列中获取命令并处理它——在下一篇文章中...

查询命令

由于我们已在 `InstanceIdentifier` 中为每个命令实例提供了自己的唯一标识符,因此我们可以创建一个用户界面组件来查询命令队列和表的状态,以便执行“我的命令进展如何”类型的查询。

这解决了 CQRS 中最大的困惑领域之一——如果用户的命令是完全异步执行的,他们如何获得反馈。解决方案是让他们通过明确的用户交互或某种轮询机制来询问。

这也意味着任何不需要状态反馈的系统(例如自动化传感器馈送或数据管道)都可以使用与交互式前端完全相同的命令处理功能。

为此,您需要为每个命令提供一个唯一标识符——我使用 GUID,因为这似乎最有意义——以及一系列可以发生在命令上的已定义事件。例如,**已创建**、**步骤已处理**、**已完成**、**暂时性故障**、**致命错误**等。

然后,当您的命令处理程序处理命令时,它只需要在进行过程中将相关事件附加到该事件流中。因此,任何给定命令的状态都可以通过对该事件流运行一个非常简单的投影来派生,并且此状态查询可以与命令的实际处理完全异步地完成。

同一个状态投影可以被命令处理程序端用来决定接下来需要处理哪些命令。同样,通过让命令处理程序向命令的事件流发布一条消息以指示处理已开始(并且不缓存投影),您可以确保可靠的命令出队过程,而不管有多少个命令处理程序实例被启动。

您可以选择是否使用持久性后端存储(例如 Azure Blob)或使用内存事件流实现,这取决于您在记录命令和从故障中恢复方面的业务需求。您还可以将命令事件流复制到开发或测试环境中,并在需要对其进行调试或分析时重播事件。

关注点

如果您正在使用 Windows Azure 存储,我建议从 CodePlex 下载 Azure Storage Explorer 项目。

我目前学到的技巧

  1. 在 Windows Azure 表存储中存储数据时,要特别注意分区键属性的决定——我一开始使用了太高级别的对象(客户端),这意味着我的表只有少量分区,这使得数据访问变慢。
  2. 即使 Windows 存储队列是空的,轮询它也会花费(非常小)的费用,因此根据需求提高或降低轮询速率的方法是一个非常好的主意。

历史

  • v1 - 2014 年 1 月 19 日 所涉及的类的概要
  • v2 - 2014 年 1 月 22 日 重写,强调原因而非方法——删除了代码转储部分
  • v3 - 2016 年 3 月 5 日 添加了关于查询命令的思考
Windows Azure 上的 CQRS - 命令端 - CodeProject - 代码之家
© . All rights reserved.