Windows Azure 上的 CQRS -身份组和分类器






4.89/5 (2投票s)
一种将集合论(
引言
使用 事件溯源(也称为事件流)作为业务系统的数据存储机制的一个有效批评是,由于在对事件流运行相关投影之前,您不知道任何给定实体(或聚合)的属性,因此运行业务查询更加困难。
身份组和分类器是解决此问题的一种方法。
背景
如果您以前没有使用过事件流/事件溯源,我强烈建议您阅读文章“了解事件溯源”部分:Windows Azure 上的 CQRS - 事件溯源。
身份组
身份组是相同类型的实体的集合,它们共享一些公共属性或特征,这些属性或特征将它们分组到具有业务意义的组中。
例如,如果您有一个银行业务应用程序,实体类型为“银行账户”,那么创建一个具有业务意义的组“透支账户”将是有意义的,该组是当前余额低于零的所有账户的子集。反过来,您可能有一个业务组“拖欠账户”,它是“透支账户”组中超过 30 天未贷记的成员。
因此,身份组可以通过其名称、其父组名称(如果它不是组“全部”的子集)在代码中标识。
Public Interface IIdentifierGroup
''' <summary>
''' The unique name of the identity group
''' </summary>
''' <remarks>
''' This name can be passed as a parameter for a query definition.
''' There are two predefined names:-
''' "Identity" being the group of one specified aggregate identifier and
''' "All" being the group of all
''' instances of an aggregate identifier type.
''' </remarks>
ReadOnly Property Name As String
''' <summary>
''' The name of the outer parent group of which all members
''' must be members of to be checked
''' for membership of this group
''' </summary>
''' <remarks>
''' This can be used to speed up evaluation of
''' group membership by starting from a smaller
''' initial group than "All"
''' If not set then "All" is assumed
''' </remarks>
ReadOnly Property ParentGroupName As String
End Interface
为了类型安全,身份组可以被类型化为仅允许一种实体类型,并且专门用于识别实体实例的唯一标识符。在我们的银行账户示例中,这将是一个“银行账户”类,由唯一的“账户号码”(字符串)唯一标识。
''' <remarks>
''' The group is uniquely named per aggregate identifier type,
''' and is populated by its own projection which decides
''' if any given aggregate identifier is in or out of the group
''' </remarks>
Public Interface IIdentifierGroup(Of TAggregateIdentifier _
As IAggregationIdentifier, TAggregateKey)
Inherits IIdentifierGroup
End Interface
分类器
classifier
是一个类,当对单个实体的事件流运行时,可以根据对事件流中的某些或所有事件执行的函数,将该实体分类为是否属于身份组。
银行帐户实体的 classifier
,如果必须对帐户是否属于“透支账户”类别进行分类,则需要处理任何影响余额的事件(存款、取款、收费、税费、利息等),并保持余额的运行总计。如果分类器到达流的末尾并且运行总计低于零,则分类器认为该帐户在身份组内,否则它在外部。
因此,classifier
是一种非常特殊的投影形式,可以使用许多相同的底层功能。
Public Interface IClassifier
''' <summary>
''' Does the projection handle the data for the given event type
''' </summary>
''' <param name="eventType">
''' The type of the event containing the data that may or may not be handled
''' </param>
''' <returns>
''' True if this event type should get processed
''' </returns>
Function HandlesEventType(ByVal eventType As Type) As Boolean
End Interface
Public Interface IClassifier(Of TAggregate As IAggregationIdentifier, TAggregateKey)
Inherits IClassifier
''' <summary>
''' Perform whatever evaluation is required to handle the specific event
''' </summary>
''' <param name="eventToHandle">
''' The specific event to handle and perform whatever processing is required in order to
''' evaluate the status of the aggregate instance in relation to the identity group
''' </param>
Function Evaluate(Of TEvent As IEvent(Of TAggregate))_
(ByVal eventToHandle As TEvent) As IClassifierEventHandler.EvaluationResult
End Interface
或者,您可以首先运行投影,然后对该投影的结果应用一些逻辑来执行分类。例如,“信贷帐户”的分类器将首先运行“当前余额”投影,然后根据余额是否大于零来运行分类:-
/// <summary>
/// Use the running balance of the account to decide if it is in credit
/// </summary>
public IClassifierDataSourceHandler.EvaluationResult EvaluateProjection
(IRunning_Balance projection)
{
if (projection.Balance > 0)
{
return IClassifierDataSourceHandler.EvaluationResult.Include;
}
return IClassifierDataSourceHandler.EvaluationResult.Exclude;
}
处理器
身份组和分类器是与业务相关的类,它们没有任何实际的实现逻辑来在实际的事件流上运行它们。为此,我们需要特殊的处理器类来执行分类和分组功能
''' <summary>
''' Class to run defined classifiers over an event stream to classify
''' an aggregate instance as being
''' inside or outside of the identity group the classifier pertains to
''' </summary>
''' <typeparam name="TAggregate">
''' The class of the aggregate of which this is an instance
''' </typeparam>
''' <typeparam name="TAggregateKey">
''' The data type of the key that uniquely identifies an instance of this aggregate
''' </typeparam>
Public NotInheritable Class ClassifierProcessor_
(Of TAggregate As IAggregationIdentifier, TAggregateKey, TClassifier As IClassifier)
Implements IClassifierProcessor(Of TAggregate, TAggregateKey, TClassifier)
' The stream reader instance that will be used to run the projections
Private ReadOnly m_streamReader As IEventStreamReader(Of TAggregate, TAggregateKey)
Private ReadOnly m_classifier As IClassifier(Of TAggregate, TAggregateKey)
Public Function Classify(Optional ByVal classifierToProcess As IClassifier_
(Of TAggregate, TAggregateKey) = Nothing) _
As IClassifierEventHandler.EvaluationResult Implements IClassifierProcessor_
(Of TAggregate, TAggregateKey, TClassifier).Classify
If (classifierToProcess Is Nothing) Then
If (m_classifier IsNot Nothing) Then
classifierToProcess = m_classifier
End If
End If
If m_streamReader IsNot Nothing Then
If (classifierToProcess IsNot Nothing) Then
Dim startingSequence As UInteger = 0
Dim retVal As IClassifierEventHandler.EvaluationResult = _
IClassifierEventHandler.EvaluationResult.Unchanged
For Each evt In m_streamReader.GetEvents(startingSequence)
If (classifierToProcess.HandlesEventType(evt.GetType())) Then
retVal = classifierToProcess.Evaluate(evt)
End If
Next
' Return the evaluation status as at the end of the event stream
Return retVal
End If
End If
'If no classification was performed, leave the result as unchanged..
Return IClassifierEventHandler.EvaluationResult.Unchanged
End Function
''' <summary>
''' Create a new classifier processor that will use the given event stream reader
''' to do its processing
''' </summary>
''' <param name="readerTouse">
''' The event stream processor to use
''' </param>
''' <param name="classifier">
''' (Optional) The classifier class that does the actual evaluation
''' </param>
Friend Sub New(ByVal readerTouse As IEventStreamReader(Of TAggregate, TAggregateKey),
Optional classifier As IClassifier(Of TAggregate, TAggregateKey) = Nothing)
m_streamReader = readerTouse
If (classifier IsNot Nothing) Then
m_classifier = classifier
End If
End Sub
快照
鉴于每个事件流都是不可变的,因此如果对它运行的 classifier
是确定性的,则可以获取事件流中给定点的 classifier
状态的快照,然后可以将其用作从该点开始的 classifier
的起点。
同样,任何基于这些确定性 classifier
的身份组也是可快照的。在实践中,我已将一个属性添加到 classifier
本身,开发人员可以使用该属性来指示是否可以将 classifier
写入快照,或者是否每次都必须读取整个事件流。
并行化
因为 classifier
在一个事件流上运行,并且彼此之间不能有任何交互,所以分类过程本质上是并行的。如果身份组上的负载超过了当前可以处理的负载,这允许简单地添加更多 classifier
处理器机器的横向扩展架构。
关注点
处理在事件流上运行业务查询问题的另一种(也是更常见的)方法是将投影持久化到只读数据库中,可以在该数据库上运行业务查询。
历史
- 2016年8月23日:初始设计
- 2016年12月28日:添加了投影评估作为替代分类方法