65.9K
CodeProject 正在变化。 阅读更多。
Home

CQRS 在 Windows Azure 上 - 事件溯源

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.65/5 (29投票s)

2014年1月23日

CPOL

15分钟阅读

viewsIcon

106374

downloadIcon

442

如何使用 Windows Azure 存储表、Blob 或文件作为事件溯源的事件存储

引言

虽然事件溯源并非 CQRS 的强制要求,并且事件溯源也用于 CQRS 之外的场景,但这两者经常一起使用。在本文中,我将探讨一种使用 Windows Azure Storage(表、Blob 或文件)作为持久化机制的实现。

事件溯源尤其适合那些读写数据存储需要独立扩展,并且业务规则中很少包含跨实体限制的业务领域。它也可以作为一种部分架构,与更适合基于模型的(数据库或 NoSQL)方法使用该技术的系统部分结合使用。

然而,事件溯源的最大用例在于,它允许您查看数据在其整个生命周期内的版本控制式历史记录。这为调试提供了非常强大的体验,能够重现任何生产问题,因为数据状态可以像在基于模型的系统中通过代码进行调试一样进行逐步查看。

理解事件溯源

如果您有 45 分钟,我推荐观看此主题的视频

对于熟悉关系数据库模型的开发人员来说,事件溯源可能是一种非常令人困惑的做事方式。它颠倒了现有的数据存储方式,即不存储对象的当前状态并在事件发生时更新状态,而是存储对象发生的全部事件历史记录,并利用这些历史记录来推导出对象的当前状态。

希望以下提示能有所帮助

  1. 事件只会添加到事件列表的末尾(您可以将其概念化为一个没有“弹出”选项的堆栈)。
  2. 事件是根据它们发生的“事物”来存储的,而不是根据它们的事件类型。例如,在银行账户系统中,我们不会为“付款”和“定期存款”设置单独的表——这些只是在银行账户生命周期中发生的不同事件。
  3. 事件不能被删除,也不能修改其内容——如果您需要“撤销”已发生的事情,则需要向事件存储中添加一个反向事件或撤销事件。
  4. 事件用过去时态来描述。

但是,如果您能摆脱 CRUD 的思维模式,您就会看到一些好处

  1. 您会自动获得所有发生事件的完整审计跟踪(如果您的数据库表中包含“最后修改”、“最后修改者”等字段,并且在数据库中更新字段时会触发写入审计表的触发器,那么您已经在使用这种方式了,只是方向相反)。
  2. 您可以以系统首次创建时未曾预料到的方式查询事件历史记录。
  3. 您的事件负载模式可以非常灵活——如果某个属性不适用于某种类型的事件,您就不需要在那里存储“null”。
  4. 您可以将事件流分开存储,这使得架构具有高度的可扩展性,因为您的系统可以跨多台计算机甚至数据中心进行扩展。

图 1:车辆事件存储示例,使用车辆注册号作为聚合标识符

题外话 - 术语快速词汇

在处理事件源时,有几个词可能会引起混淆:聚合(Aggregation)序列(Sequence)。我(也许过于简化)的定义如下:

聚合是事件发生的“事物”。在银行示例中,这可能是一个银行账户。在车辆租赁公司中,这可能是一辆车,等等。

每个聚合都必须有唯一标识。通常,业务领域已经有可用的唯一标识符,但如果没有,您可以创建它们。

序列是事件发生的顺序——这几乎总是实现为递增数字(尽管在基于文件的流中,我使用文件指针作为事件的开始)。

发生到任何聚合实例的事件都存储在一个称为事件流(或事实日志 Fact Log)的顺序存储中,这非常类似于该实例数据的版本控制视图。就像版本控制允许您随时查看源代码文件的内容一样,事件流也可以用来在任何给定时间点重构聚合实例的状态。

架构

上面的白板概览显示了事件流在 CQRS 架构中的位置。在 Azure 存储中,有多种方法可以存储事件流的底层事件——在我的例子中,我为四种方法提供了实现:SQL、表、文件或 AppendBlob。根据您选择使用的底层技术,实现方式会有所不同。

1. 使用 Azure 存储“表”持久化事件

事件可以由一个 CLR 类表示。通常有一个空的 IEvent 接口来表明开发者的意图,即给定类是一个事件。

存储事件时,我们需要添加聚合标识符和序列号——因此,这两个在存储事件的接口中进行了指定。

''' <summary>
''' Additional properties that uniquely identify an event
''' </summary>
Public Interface IEventIdentity
 
    ''' <summary>
    ''' Get the identifier by which this events aggregate is uniquely known
    ''' </summary>
    ''' <remarks>
    ''' Most implementations use a GUID for this but if you have a known unique identifier 
    ''' then that can be used instead - e.g. ISBN, CUSIP, VIN etc.
    ''' </remarks>
    Function GetAggregateIdentifier() As String
 
    ''' <summary>
    ''' The event version 
    ''' </summary>
    ReadOnly Property Version As UInteger
 
    ''' <summary>
    ''' The event that is identified by this event identity
    ''' </summary>
    ReadOnly Property EventInstance As IEvent
 
End Interface

在本例中,聚合标识符被实现为 string,这样业务部门就可以决定实际使用的唯一标识符。为了清晰起见,我还添加了一个接口,可以用来设置这些聚合标识符,但这完全是可选的。

''' <summary>
''' Interface to be implemented by any class that provides an aggregate 
''' identity
''' </summary>
''' <remarks>
''' This allows for different objects to define their aggregate identity differently - 
''' for example books might aggregate by ISBN, Stocks by CUSIP, 
''' cars by vehicle registration number etc
''' </remarks>
Public Interface IAggregateIdentity
 
    ''' Get the identifier by which this events aggregate is uniquely known
    Function GetAggregateIdentifier() As String
 
End Interface

将版本转换为行标识符

由于版本是递增数字,而 Azure 表需要 string 作为其行键,因此您需要用零填充版本号,以便以正确排序的方式存储它。

    Private Const VERSION_FORMAT As String = "0000000000000000000"
    Public Shared Function VersionToRowkey(ByVal version As Long) As String

        If (version <= 0) Then
            Return Long.MaxValue.ToString(VERSION_FORMAT)
        Else
            Return (Long.MaxValue - version).ToString(VERSION_FORMAT)
        End If

    End Function

保存事件记录

事件记录本身(除了分区键和行键之外的所有内容)可以是任何继承 IEventIdentity 的 .NET 类。由于此记录可以拥有的字段是动态的(取决于事件类型——从上面可知,不同类型的事件存储在同一个事件存储中),因此我们必须使用 DynamicTableEntity 类,并用传入的事件类的属性填充它。

Public Shared Function MakeDynamicTableEntity(ByVal eventToSave As IEventContext) 
          As DynamicTableEntity

        Dim ret As New DynamicTableEntity

        ret.PartitionKey = eventToSave.GetAggregateIdentifier()
        ret.RowKey = VersionToRowkey(eventToSave.Version)

        'Add the event type - currently this is the event class name
        ret.Properties.Add("EventType", 
              New EntityProperty(eventToSave.EventInstance.GetType().Name))

        'Add the context
        If (eventToSave.SequenceNumber <= 0) Then
            'Default sequence number is the current UTC date
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(DateTime.UtcNow.Ticks))
        Else
            ret.Properties.Add("SequenceNumber", 
                New EntityProperty(eventToSave.SequenceNumber))
        End If

        If (Not String.IsNullOrWhiteSpace(eventToSave.Commentary)) Then
            ret.Properties.Add("Commentary", 
                 New EntityProperty(eventToSave.Commentary))
        End If

        If (Not String.IsNullOrWhiteSpace(eventToSave.Who)) Then
            ret.Properties.Add("Who", New EntityProperty(eventToSave.Who))
        End If


        If (Not String.IsNullOrWhiteSpace(eventToSave.Source)) Then
            ret.Properties.Add("Source", New EntityProperty(eventToSave.Source))
        End If

        'Now add in the different properties of the payload
        For Each pi As System.Reflection.PropertyInfo In _
                    eventToSave.EventInstance.GetType().GetProperties()
            If (pi.CanRead) Then
                ret.Properties.Add(pi.Name, MakeEntityProperty(pi, eventToSave.EventInstance))
            End If
        Next pi
    End Function 

然后,通过反射将 DynamicTableEntity 转换回适当的事件类,就是读取事件类型,创建该类型的实例,然后从 DynamicTableEntity.Properties 集合中填充其属性。

关注点

使用 Windows Azure Table Storage 时,使用分区键和行标识符的查询速度非常快。而不使用这些查询的查询则慢得多——因此,将这两个字段映射到聚合标识符和序列号是一个非常明智的起点。

2. 使用 Azure 存储“Append Blob”持久化事件

Append Blob 是 Windows Azure 存储中一种新的、特殊的二进制大对象存储类型,它经过优化,您只能在文件末尾添加数据。

Append Blob 的最大尺寸为 195MB(或 50,000 个事件),因此通常的设置是为每个唯一的事件流创建一个 Blob。这还允许高度的并行性。

每个事件流所需的额外元数据(如记录数)可以存储在 Azure Blob 的文件元数据中。

        Protected Const METATDATA_DOMAIN As String = "DOMAIN"
        Protected Const METADATA_AGGREGATE_CLASS As String = "AGGREGATECLASS"
        Protected Const METADATA_SEQUENCE As String = "SEQUENCE"
        Protected Const METADATA_RECORD_COUNT As String = "RECORDCOUNT"
        Protected Const METADATA_AGGREGATE_KEY As String = "AGGREGATEKEY"

Blob 本身在实例化事件流读取器或写入器类时创建。

 ''' <summary>
 ''' Create a new base for a reader or writer class in the given domain
 ''' </summary>
 ''' <param name="AggregateDomainName">
 ''' The name of the domain to store/retrieve the event streams under
 ''' </param>
 Protected Sub New(ByVal AggregateDomainName As String, ByVal AggregateKey As TAggregationKey)
     MyBase.New(AggregateDomainName)
      'Get the aggregation instance key to use when creating a blob file name
     m_key = AggregateKey
      If (BlobContainer IsNot Nothing) Then
         m_blob = BlobContainer.GetAppendBlobReference(EventStreamBlobFilename)
         If Not m_blob.Exists() Then
             'Make the file to append to if it doesn't already exist
             m_blob.CreateOrReplace()
             'Set the initial metadata
             m_blob.Metadata(METATDATA_DOMAIN) = DomainName
             m_blob.Metadata(METADATA_AGGREGATE_CLASS) = GetType(TAggregate).Name
             m_blob.Metadata(METADATA_AGGREGATE_KEY) = m_key.ToString()
             m_blob.Metadata(METADATA_SEQUENCE) = "0" 'Sequence starts at zero
             m_blob.Metadata(METADATA_RECORD_COUNT) = "0" 'Record count starts at zero
             m_blob.SetMetadata()
         Else
             m_blob.FetchAttributes()
         End If
     End If
 End Sub

然后,它被用作一个两步过程来追加事件:首先将原始事件包装在一个指示事件类型、序列号和其他事件级别元数据的类中,然后将整个内容写入 Append Blob 的末尾。

Public Sub AppendEvent(EventInstance As IEvent) _
   Implements IEventStreamWriter(Of TAggregate, TAggregationKey).AppendEvent
    If (AppendBlob IsNot Nothing) Then
        Dim nextSequence As Long = IncrementSequence()
        Dim evtToWrite As New BlobBlockWrappedEvent(nextSequence, 0, EventInstance)
        'Turn the event into a binary stream and append it to the blob
        Dim recordWritten As Boolean = False
        Try
            Using es As System.IO.Stream = evtToWrite.ToBinaryStream()
                 Dim offset As Long = AppendBlob.AppendBlock(es)
            End Using
            recordWritten = True
        Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
            Throw New EventStreamWriteException_
                  (DomainName, AggregateClassName, Key.ToString(), _ 
                nextSequence, "Unable to save a record to the event stream - " _
                               & evtToWrite.EventName, exBlob)
        End Try
         If (recordWritten) Then
            IncrementRecordCount()
        End If
    End If
End Sub    

要读取事件,我们只需将 Append Blob 的快照打开为一个流,然后从该流中反序列化包装的事件。

''' <summary>
''' Get a snapshot of the append blob to use when reading this event stream
''' </summary>
''' <returns></returns>
Private Function GetAppendBlobSnapshot() As CloudAppendBlob
    If (AppendBlob IsNot Nothing) Then
        Return AppendBlob.CreateSnapshot()
    Else
        Return Nothing
    End If
End Function

Private Function GetUnderlyingStream() As System.IO.Stream
    If (AppendBlob IsNot Nothing) Then
       Dim targetStream As New System.IO.MemoryStream()
       Try
           GetAppendBlobSnapshot().DownloadToStream(targetStream)
       Catch exBlob As Microsoft.WindowsAzure.Storage.StorageException
           Throw New EventStreamReadException(DomainName, AggregateClassName, m_key.ToString(), 
             0, "Unable to access underlying event stream", exBlob)
       End Try
       targetStream.Seek(0, IO.SeekOrigin.Begin)
       Return targetStream
   Else
       Return Nothing
   End If
End Function

Public Function GetEvents() As IEnumerable(Of IEvent) 
       Implements IEventStreamReader(Of TAggregate, TAggregationKey).GetEvents
    If (AppendBlob IsNot Nothing) Then
       Dim ret As New List(Of IEvent)
       Dim bf As New BinaryFormatter()
       Using rawStream As System.IO.Stream = GetUnderlyingStream()
           While Not (rawStream.Position >= rawStream.Length)
               Dim record As BlobBlockWrappedEvent = _
                   CTypeDynamic(Of BlobBlockWrappedEvent)(bf.Deserialize(rawStream))
               If (record IsNot Nothing) Then
                   ret.Add(record.EventInstance)
               End If
           End While
       End Using
       Return ret
   Else
       Throw New EventStreamReadException_
         (DomainName, AggregateClassName, MyBase.m_key.ToString(),
              0, "Unable to read events - Azure blob not initialized")
   End If
End Function 

3. 使用 Azure 存储“文件”持久化事件

使用 Azure 文件实现的方案与使用 Append Blob 的方案非常相似,只不过它会在文件首次使用时预先分配文件的全部大小,然后在此之后填充。此外,它将事件记录开头的那个文件指针用作该事件的序列号。

记录数、聚合类型、聚合键和当前序列号也作为属性存储在每个事件流文件中。

    ''' <summary>
    ''' An event instance wrapped up in a way that allows it to be stored in an Azure file
    ''' </summary>
    ''' <remarks>
    ''' The size of the event is stored in the outer wrapper to allow a file reader to  
    ''' skip over any events it doesn't need to process
    ''' </remarks>
    <Serializable()>
    <DataContract()>
    Public Class FileBlockWrappedEvent

        <DataMember(Name:="EventName", Order:=0)>
        Private ReadOnly m_eventName As String
        ''' <summary>
        ''' The class name of the event
        ''' </summary>
        Public ReadOnly Property EventName As String
            Get
                Return m_eventName
            End Get
        End Property

        <DataMember(Name:="Sequence", Order:=1)>
        Private ReadOnly m_sequence As Long
        ''' <summary>
        ''' The sequence number of this record
        ''' </summary>
        Public ReadOnly Property Sequence As Long
            Get
                Return m_sequence
            End Get
        End Property

        <DataMember(Name:="Version", Order:=2)>
        Private ReadOnly m_version As UInteger
        Public ReadOnly Property Version As UInteger
            Get
                Return m_version
            End Get
        End Property

        <DataMember(Name:="Timestamp", Order:=3)>
        Private ReadOnly m_timestamp As DateTime
        Public ReadOnly Property Timestamp As DateTime
            Get
                Return m_timestamp
            End Get
        End Property

        <DataMember(Name:="Size", Order:=4)>
        Private ReadOnly m_eventSize As UInteger
        Public ReadOnly Property EventSize As UInteger
            Get
                Return m_eventSize
            End Get
        End Property

        ''' <summary>
        ''' The .NET class used to serialize/deserialise the underlying event blob data
        ''' </summary>
        ''' <remarks>
        ''' It is possible to derive this by a lookup table 
        ''' from the event name and version if you 
        ''' prefer not to save the class name in the event record.  
        ''' Usually any storage space critical systems would do this 
        ''' so as to reduce redundant data 
        ''' stored.
        ''' </remarks>
        <DataMember(Name:="Class", Order:=4)>
        Private ReadOnly m_eventClassName As String
        Public ReadOnly Property ClassName As String
            Get
                Return m_eventClassName
            End Get
        End Property

        <DataMember(Name:="Data", Order:=5)>
        Private ReadOnly m_eventData As Byte()

        Public ReadOnly Property EventInstance As IEvent
            Get
                If (String.IsNullOrWhiteSpace(m_eventClassName)) Then
                    Throw New SerializationException
                       ("Unable to determine the event type that wrote this event instance")
                End If

                If (m_eventSize = 0) Then
                    Throw New SerializationException_
                    ("Unable to return the event data for this event instance - size is zero")
                End If

                Dim evtType As Type = Type.GetType(m_eventClassName, True, True)
                If (evtType IsNot Nothing) Then
                    Dim bf As New BinaryFormatter()
                    Using memStream As New System.IO.MemoryStream(m_eventData)
                        Return CTypeDynamic(bf.Deserialize(memStream), evtType)
                    End Using
                End If

                Return Nothing
            End Get
        End Property

        Public Sub New(ByVal sequenceInit As String,
           ByVal versionInit As UInteger,
           ByVal timestampInit As DateTime,
           ByVal eventInstanceInit As IEvent)

            m_eventName = EventNameAttribute.GetEventName(eventInstanceInit)
            m_sequence = sequenceInit
            m_version = versionInit
            m_timestamp = timestampInit

            Dim bf As New BinaryFormatter()
            Using memStream As New System.IO.MemoryStream()
                bf.Serialize(memStream, eventInstanceInit)
                m_eventSize = memStream.Length
                m_eventData = memStream.ToArray()
            End Using
            m_eventClassName = eventInstanceInit.GetType().AssemblyQualifiedName

        End Sub

        Public Sub WriteToBinaryStream(ByVal stream As System.IO.Stream)

            Dim bf As New BinaryFormatter()
            bf.Serialize(stream, Me)

        End Sub

    End Class

4. 使用本地文件存储事件

如果您想对事件溯源系统进行单元测试,或者想在将该技术用于全面项目之前设置一个试用项目来理解其中的概念,您可以使用本地机器上的文件来存储事件流。

为此,它使用聚合标识符创建一个唯一的 文件名,并相应地将事件追加到每个文件中。文件开头有一个“信息记录”,指示了事件流所属的域和完整类名。

        ''' <summary>
        ''' Append an event to the file, without saving the record count
        ''' </summary>
        ''' <param name="EventInstance"></param>
        Private Sub AppendEventInternal(EventInstance As IEvent(Of TAggregate))

            If (MyBase.m_file IsNot Nothing) Then
                Dim evtToWrite As New LocalFileWrappedEvent_
                                 (m_eventStreamDetailBlock.SequenceNumber,
                                  EventInstance.Version,
                                  DateTime.UtcNow,
                                  EventInstance, m_setings.UnderlyingSerialiser)
                If (evtToWrite IsNot Nothing) Then
                    Using fs = m_file.OpenWrite()
                        fs.Seek(0, IO.SeekOrigin.End)
                        'write the event to the stream here..
                        evtToWrite.WriteToBinaryStream(fs)
                        m_eventStreamDetailBlock.SequenceNumber = fs.Position
                    End Using
                    m_eventStreamDetailBlock.RecordCount += 1
                End If
            End If

        End Sub

5. 使用本地内存存储事件

用于单元测试,或用于可以将整个事件流存储在机器本地内存中的场景(例如,当使用该技术实现撤销-重做缓冲区时)。

选择要使用的存储类型

根据我的经验,存储技术的选择取决于您正在构建的系统的具体情况,但我建议使用 Azure 表(甚至 Azure 上的 SQL)如果您希望能够查看事件流,而当您需要最大性能和水平扩展时,则使用 Append Blobs 或 Files。

特别地,如果您的事件流可能具有较高的写入速率(例如,在任何 IoT 仪器仪表场景、多人游戏或交易平台中),那么 AppendBlob 可以很好地扩展并且速度非常快。

为了在不进行大量重写的情况下切换这些选项,我允许通过特定的配置设置进行配置(我认为在这方面还有一些工作要做),以便将聚合类映射到用于存储其事件流和投影的后端技术。

  <CQRSAzureEventSourcingConfiguration>
    <ImplementationMaps>
      <Map AggregateDomainQualifiedName="HospitalWard.Nurse" 
       ImplementationName="InMemoryImplementationExample" 
       SnapshotSettingsName="InMemorySnapshotExample" />
    </ImplementationMaps>
   
    <Implementations>
      <Implementation Name="InMemoryImplementationExample" ImplementationType="InMemory">
       <InMemorySettings />
      </Implementation>
      <Implementation Name="AzureBlobImplementationExample" ImplementationType="AzureBlob">
        <BlobSettings ConnectionStringName="UnitTestStorageConnectionString" />
      </Implementation>
      <Implementation Name="AzureBlobImplementationDomainExample" 
       ImplementationType="AzureBlob">
        <BlobSettings ConnectionStringName=
            "UnitTestStorageConnectionString" DomainName="Test" />
      </Implementation>
      <Implementation Name="AzureFileImplementationExample" ImplementationType="AzureFile">
        <FileSettings ConnectionStringName="UnitTestStorageConnectionString" 
         InitialSize="20000" />
      </Implementation>
      <Implementation Name="AzureSQLImplementationExample" ImplementationType="AzureSQL">
        <SQLSettings ConnectionStringName="UnitTestStorageConnectionString" 
         AggregateIdentifierField="AggregateKey" />
      </Implementation>
      <Implementation Name="AzureTableImplementationExample" ImplementationType="AzureTable">
        <TableSettings ConnectionStringName="UnitTestStorageConnectionString" 
         SequenceNumberFormat="00000000" />
      </Implementation>
      <Implementation Name="LocalFileSettingsExample" ImplementationType="LocalFileSettings">
        <LocalFileSettings EventStreamRootFolder="C:\CQRS\Data\EventStreams" 
         UnderlyingSerialiser="JSON"/>
      </Implementation>
    </Implementations>

    <SnapshotSettings>
      <SnapshotSetting Name="InMemorySnapshotExample" ImplementationType="InMemory">
        <InMemorySettings />
      </SnapshotSetting>
    </SnapshotSettings>
   
  </CQRSAzureEventSourcingConfiguration>

消费事件和投影

为了将您的事件流转化为有意义的东西(至少对希望查询数据的用户而言),您需要创建一个投影。投影是对一组事件的影响的视图。例如,上述汽车事件示例中的财务投影将关注任何影响给定汽车成本或利润的事件。

要消费事件,您需要创建一个“知道”自己处理何种事件以及如何处理它们的类。这些特定的投影可以针对单个聚合的事件流运行,以基于该事件流的底层数据执行某些计算或操作。

任何投影的底层接口是

''' <summary>
''' Marker interface to denote anything as being a projection 
''' over the given aggregate identifier
''' </summary>
''' <remarks>
''' The type-safety is to ensure the projection only operates on events of one kind
''' </remarks>
Public Interface IProjection(Of TAggregate As IAggregationIdentifier, TAggregateKey)
    Inherits IProjection

    ''' <summary>
    ''' Perform whatever processing is required to handle the specific event
    ''' </summary>
    ''' <param name="eventToHandle">
    ''' The specific event to handle and perform whatever processing is required
    ''' </param>
    Sub HandleEvent(Of TEvent As IEvent(Of TAggregate))(ByVal eventToHandle As TEvent)

    ' --8<------------------------------------------

End Interface

实现该类的每个特定投影都决定了如何处理事件的数据负载。

        Public Overrides Sub HandleEvent(Of TEvent As IEvent(Of MockAggregate))_
           (eventToHandle As TEvent) _ 
               Implements IProjection(Of MockAggregate, String).HandleEvent

        Select Case eventToHandle.GetType()
            Case GetType(MockEventTypeOne)
                HandleMockEventOne(CTypeDynamic(Of MockEventTypeOne)(eventToHandle))
            Case GetType(MockEventTypeTwo)
                HandleMockEventTwo(CTypeDynamic(Of MockEventTypeTwo)(eventToHandle))
            Case Else
                'Nothing to do with this event type
                Throw New ArgumentException("Unexpected event type - " & _
                      eventToHandle.GetType().Name)
        End Select

    End Sub
    
    ' --8<-------------
    
        Private Sub HandleMockEventOne(ByVal eventToHandle As MockEventTypeOne)

        AddOrUpdateValue(Of Integer)(NameOf(Total), _ 
                ProjectionSnapshotProperty.NO_ROW_NUMBER, Total + _
                     eventToHandle.EventOneIntegerProperty)
        AddOrUpdateValue(Of String)(NameOf(LastString), _ 
                ProjectionSnapshotProperty.NO_ROW_NUMBER, eventToHandle.EventOneStringProperty)

    End Sub

快照

为了允许保存投影的当前状态——既供任何读取器使用,也允许我们在服务中断后重建投影时有一个起点——还定义了一个用于定义投影快照的接口。这是一种说法:“在某个已知的点,它是这样的。”

''' <summary>
''' A snapshot of a projection as at a point in time
''' </summary>
''' <typeparam name="IAggregateIdentity">
''' The type of thing that we are snapshotting that can be uniquely identified
''' </typeparam>
''' <remarks>
''' For entities that have a busy or long history it may be performant to store
''' point-in-time snapshots and only project forward from the most recent 
''' snapshot
''' </remarks>
Public Interface ISnaphot(Of In IAggregateIdentity)

    ''' <summary>
    ''' The version number of the highest event that contributed to 
    ''' this snapshot
    ''' </summary>
    ''' <remarks>
    ''' All events of higher version should be applied to this projection
    ''' so as to get the current as-of-now view
    ''' </remarks>
    ReadOnly Property AsOfVersion As Long

End Interface

实际上,我将这些快照以 JSON 格式保存为 Blob(文件)——这使得 CQRS 架构的查询部分(至少部分)变得像找到快照并读取它一样简单。

事件排序和同步

能够将不同聚合事件的发生顺序排列在一起通常很有用——例如,您可能有一个聚合代表股票价格,另一个聚合代表账户,并且需要将两者结合起来,以便在特定时间点给出账户估值。

为了实现这一点,需要一个主同步字段——这可以是一个递增数字,也可以使用事件发生日期/时间。

为了做到这一点,使用一个 abstract 类作为所有事件类型的基类,它处理同步键。

    ''' <summary>
    ''' Base class for all events - allows for a common synchronizing property
    ''' </summary>
    Public MustInherit Class EventBase

        ''' <summary>
        ''' The creation timestamp of this event 
        ''' </summary>
        ''' <remarks>
        ''' This allows event streams to be combined in a synchronized fashion for 
        ''' multi-aggregate snapshots
        ''' </remarks>
        Public Property SynchronisationStamp As Long

        Public Sub New()
            ' Default the synchronization stamp to now
            SynchronisationStamp = DateTime.UtcNow.Ticks
        End Sub
    End Class

运行投影

现在,我们想在事件流上运行一个投影,以便从发生的事件中读取聚合的当前状态。

因为投影是一个业务对象而不是一个框架对象,我希望将其与事件流本身的底层实现分开。(这也非常方便,可以允许您通过单元测试方法本身创建的内存事件流来对投影进行单元测试)。

为此,我们有一个 IEventStreamReader 接口,它向投影提供事件流。不同事件流后端存储的每个实现都必须支持此接口。

 

''' <summary>
''' Definition for any implementation that can read events from an event stream
''' </summary>
''' <typeparam name="TAggregate">
''' The data type of the aggregate that identifies the event stream to read
''' </typeparam>
''' <typeparam name="TAggregationKey">
''' The data type of the key that uniquely identifies the specific event  
''' stream instance to read
''' </typeparam>
Public Interface IEventStreamReader(Of TAggregate As IAggregationIdentifier, 
                                       TAggregationKey)

    ''' <summary>
    ''' Get the event stream for a given aggregate
    ''' </summary>
    Function GetEvents() As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream for a given aggregate from a given starting version
    ''' </summary>
    ''' <param name="StartingVersion">
    ''' The starting version number for our snapshot
    ''' </param>
    ''' <remarks>
    ''' This is used in scenario where we are starting from  
    ''' a given snapshot version
    ''' </remarks>
    Function GetEvents(ByVal StartingVersion As UInteger) _
               As IEnumerable(Of IEvent(Of TAggregate))

    ''' <summary>
    ''' Gets the event stream and the context information recorded for each event
    ''' </summary>
    ''' <remarks>
    ''' This is typically only used for audit trails as all business functionality 
    ''' should depend on the event data alone
    ''' </remarks>
    Function GetEventsWithContext() As IEnumerable(Of IEventContext)

End Interface

要使用的特定事件流读取器被传递给“投影处理器”,后者可以接受一个投影类并在事件流上运行该投影。

    Public Class ProjectionProcessor(Of TAggregate As IAggregationIdentifier,
                                       TAggregateKey)

    ''' <summary>
    ''' The stream reader instance that will be used to run the projections
    ''' </summary>
    Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, 
                                        TAggregateKey)

    ''' <summary>
    ''' Process the given projection using the event stream reader 
    ''' we have set up
    ''' </summary>
    ''' <param name="projectionToProcess">
    ''' The class that defines the projection operation we are going to process
    ''' </param>
    Public Sub Process(ByVal projectionToProcess As IProjection(Of TAggregate, 
                                                 TAggregateKey))

        If (m_streamReader IsNot Nothing) Then
            If (projectionToProcess IsNot Nothing) Then
                'Does it support snapshots?
                Dim startingSequence As UInteger = 0
                If (projectionToProcess.SupportsSnapshots) Then
                    'load the most recent snapshot for it

                    'and update the starting sequence
                End If
                For Each evt In m_streamReader.GetEvents(startingSequence)
                    If (projectionToProcess.HandlesEventType(evt.GetType())) Then
                        projectionToProcess.HandleEvent(evt)
                    End If
                Next
            End If
        Else
            'Unable to use this projection as it has no stream reader associated
        End If

    End Sub

    ''' <summary>
    ''' Create a new projection processor that will use the given event 
    ''' stream reader to do its processing
    ''' </summary>
    ''' <param name="readerTouse">
    ''' The event stream processor to use
    ''' </param>
    Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey))
        m_streamReader = readerTouse
    End Sub

End Class

特定的事件流读取器实现类有一个工厂方法,可以为任何给定的聚合实例创建投影处理器,如下所示:

    #Region "Factory methods"

        ''' <summary>
        ''' Creates an azure blob storage based event stream reader for the
        '''  given aggregate  
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to read 
        ''' the event stream
        ''' </param>
        ''' <returns>
        ''' </returns>
        Public Shared Function Create_
          (ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
              As IEventStreamReader(Of TAggregate, TAggregationKey)

            Return New BlobEventStreamReader(Of TAggregate, TAggregationKey)
                (DomainNameAttribute.GetDomainName(instance), instance.GetKey())

        End Function

        ''' <summary>
        ''' Create a projection processor that works off an azure
        ''' blob backed event stream
        ''' </summary>
        ''' <param name="instance">
        ''' The instance of the aggregate for which we want to run projections
        ''' </param>
        ''' <returns>
        ''' A projection processor that can run projections over this event stream
        ''' </returns>
        Public Shared Function CreateProjectionProces(
           ByVal instance As IAggregationIdentifier(Of TAggregationKey)) 
                  As ProjectionProcessor(Of TAggregate, TAggregationKey)

            Return New ProjectionProcessor(Of TAggregate, TAggregationKey )
                     (Create(instance))

        End Function

#End Region

创建了这些框架代码后,我们现在可以完全独立于其事件流的底层实现来创建我们的聚合、事件和投影(业务逻辑)类,并使它们能够在这些不同技术之间移植。

运行分类器

分类器是一种特殊的投影,用于决定给定聚合实例是否属于某个已定义的、有业务意义的组。例如,如果您正在实现一个银行系统,您可能有一个分类器,它运行每个账户的事件流,以决定哪些账户属于“逾期账户”组。

设计您的业务代码

一旦有了可作为事件流系统基础的框架,您就必须创建代表该系统业务部分的类:聚合、事件和投影。您可以使用 图形化设计器 来完成此操作,或者如果您愿意,也可以直接用代码创建类。

从聚合标识符(事件可以发生并记录在其上的“事物”)开始,您需要创建一个类来定义该聚合如何被唯一标识。为此,我们需要确定其唯一键的数据类型,并且由于许多系统使用字符串进行数据存储,因此需要一种一致的方式将该唯一键转换为字符串。

如果我们以一个由唯一账号标识的银行账户为例,我们将得到一个类似这样的聚合类:

    ''' <summary>
    ''' An aggregate representing bank account in the cloud bank demo project
    ''' </summary>
    <DomainName("CloudBank")>
    Public Class Account
        Implements IAggregationIdentifier(Of String)

        Private m_bankAccount As String

        Public Sub SetKey(key As String) _ 
               Implements IAggregationIdentifier(Of String).SetKey
            m_bankAccount = key
        End Sub

        Public Function GetAggregateIdentifier() As String _ 
                  Implements IAggregationIdentifier.GetAggregateIdentifier
            Return m_bankAccount
        End Function

        Public Function GetBankAccountNumber() As String _ 
               Implements IAggregationIdentifier(Of String).GetKey
            Return m_bankAccount
        End Function


        Public Sub New(ByVal accountNumber As String)
            m_bankAccount = accountNumber
        End Sub

    End Class

然后,对于可以针对该银行账户发生的每个事件,您都需要创建一个事件类,该类包含有关该事件的所有已知属性。请注意,无需定义任何键事件或强制性——基本上,可以存储有关事件的任何数据都应包含在事件定义中。

因此,在银行账户中发生存款事件的事件可能看起来像这样:

    ''' <summary>
    ''' Money has been deposited in an account
    ''' </summary>
    <DomainName("CloudBank")>
    <AggregateIdentifier(GetType(Account))>
    <EventAsOfDate(NameOf(MoneyDeposited.DepositDate))>
    Public Class MoneyDeposited
        Implements IEvent(Of Account)

        ''' <summary>
        ''' The effective date of the deposit
        ''' </summary>
        Public Property DepositDate As Nullable(Of DateTime)

        ''' <summary>
        ''' The amount deposited
        ''' </summary>
        Public Property Amount As Decimal

        Public ReadOnly Property Version As UInteger Implements IEvent(Of Account).Version
            Get
                Return 1
            End Get
        End Property

    End Class

请注意,事件定义还包含一个版本号,如果您更改或添加属性,应递增此版本号。还有一个可选属性 EventAsOfDate,允许您指定事件中的哪个属性包含事件实际发生的实际世界日期和时间。

在定义了所有事件类型之后,您就可以继续定义投影,这些投影允许您从聚合的事件流中获取聚合的实时状态。投影类需要知道它处理哪些事件类型,以及对于每种事件类型,当遇到该事件类型时该怎么做。例如,一个用于计算账户当前余额的投影需要处理 MoneyDepositedMoneyWithdrawn 事件,并在遇到它们时更新 CurrentBalance 属性。

    ''' <summary>
    ''' Gets the current balance for any given bank account
    ''' </summary>
    <DomainName("CloudBank")>
    <AggregateIdentifier(GetType(Account))>
    Public Class CurrentBalanceProjection
        Inherits ProjectionBase(Of Account, String)
        Implements IHandleEvent(Of MoneyDeposited)
        Implements IHandleEvent(Of MoneyWithdrawn)

        Private m_currentBalance As Decimal

        Public ReadOnly Property CurrentBalance As Decimal
            Get
                Return m_currentBalance
            End Get
        End Property

        ''' <summary>
        ''' What events does this projection handle
        ''' </summary>
        ''' <param name="eventType">
        ''' The possible event type
        ''' </param>
        ''' <returns>
        ''' True if the event type is one of:
        ''' MoneyDeposited
        ''' MoneyWithdrawn
        ''' </returns>
        Public Overrides Function HandlesEventType(eventType As Type) As Boolean

            If eventType Is GetType(MoneyDeposited) Then
                Return True
            End If

            If eventType Is GetType(MoneyDeposited) Then
                Return True
            End If

            Return False
        End Function

        Public Overrides ReadOnly Property SupportsSnapshots As Boolean
            Get
                Return True
            End Get
        End Property

        Public Overrides Sub HandleEvent(Of TEvent As IEvent)(eventToHandle As TEvent)

            If GetType(TEvent) Is GetType(MoneyDeposited) Then
                HandleMoneyDepositedEvent(CTypeDynamic(Of MoneyDeposited)(eventToHandle))
            End If

            If GetType(TEvent) Is GetType(MoneyWithdrawn) Then
                HandleMoneyWithdrawnEvent(CTypeDynamic(Of MoneyWithdrawn)(eventToHandle))
            End If

        End Sub

        Public Sub HandleMoneyWithdrawnEvent(eventHandled As MoneyWithdrawn) _ 
                          Implements IHandleEvent(Of MoneyWithdrawn).HandleEvent

            m_currentBalance -= eventHandled.Amount

        End Sub

        Public Shadows Sub HandleMoneyDepositedEvent(eventHandled As MoneyDeposited) _ 
                            Implements IHandleEvent(Of MoneyDeposited).HandleEvent

            m_currentBalance += eventHandled.Amount

        End Sub

    End Class

正如您在此示例中看到的,业务逻辑代码与其支持的框架代码之间存在清晰而完整的隔离,从而允许对所有业务规则类进行无模拟测试。

源代码说明

我附加到此项目的源代码包含了我在“Azure 上的 CQRS”系列文章中上传的所有文章的框架代码(和单元测试项目)。

每个项目都有一个自述文件,我建议您阅读它,并且运行单元测试以更好地利用这些代码。

我已从单元测试中删除了 App.Config,因为它包含对我的 Azure 存储帐户的引用。您需要一个 Azure 帐户才能运行云端单元测试,但内存测试和本地文件测试可以在没有任何此类访问的情况下运行。

  <connectionStrings>
    <add name="UnitTestStorageConnectionString" 
     connectionString="TODO - Add your connection string here" providerName="" />
    <!-- UnclassifiedStorageConnectionString -->
    <add name="StorageConnectionString" 
     connectionString="TODO - Add your connection string here" providerName="" />
  </connectionStrings>

进一步阅读/资源

如果您计划实现一个基于事件溯源的系统,特别是如果它是微服务架构的一部分,我推荐“蓝皮书”——《领域驱动设计:软件核心复杂性应对之道》 by Eric Evans。

我还推荐 Greg Young 的博客文章,其中详细介绍了事件溯源,以及他在 YouTube 上发布的关于该主题的各种演讲。

© . All rights reserved.