具有基于约定的注册和延迟执行支持的领域事件





5.00/5 (12投票s)
领域事件通过约定与容器注册,并且仅在事务提交时触发。
在 DDD(领域驱动设计)中,我们建立有界上下文并定义它们如何交换信息,这通常通过触发事件来实现。根据 Eric Evans 的说法,你应该
“描述模型之间的接触点,为任何通信勾勒出明确的翻译,并突出任何共享。”
这些上下文通常会在同一个数据库事务中进行通信。这实际上是期望的,因为它将确保整个过程的完整性。然而,在现实世界中,事情可能会变得棘手。有些操作可能依赖于已提交的事务才能触发。例如,对于在当前数据库事务之外运行的服务来说,就会发生这种情况。您不希望发送一封电子邮件通知用户某些内容已更改,然后在几秒钟后整个操作被回滚。当我们的应用程序使用不支持分布式事务的分布式服务时,也会发生同样的情况。
我们想要实现的目标
让我们看看我们的主要目标。有很多好的方法可以实现领域事件处理器。本实现将使用依赖注入容器来控制事件触发器的生命周期,它将以非常透明的方式在容器中注册事件并支持延迟执行。让我们回顾一下
轻松注册
为了创建一个新的事件处理器,我们**不希望**去查找另一个项目中一个包含数百个事件到处理器绑定的远程类,然后添加新行。为此,我们可能需要考虑使用依赖注入容器。
延迟执行支持
调用与事务无关的代码的处理器应该只在事务提交后触发。
实际示例:电子商务应用程序中支付订单
让我们以电子商务应用程序中支付订单为例。一旦支付完成,就会触发一个事件。然后,处理客户订单的有界上下文会处理该事件,并将订单标记为待发货。此外,管理库存的有界上下文会处理同一个事件,从库存中减去已订购的商品。
使用 Balsamiq Mockups 创建。
触发事件
PaymentService
由依赖注入容器实例化。事件在 PayOrder
方法的最后一行触发。
public class PaymentService : IPaymentService
{
private readonly IDomainEventsRaiser _events;
private readonly IRepository<Order> _orderRepository;
private readonly IRepository<Payment> _paymentRepository;
public PaymentService(IRepository<Payment> paymentRepository,
IRepository<Order> orderRepository, IDomainEventsRaiser events)
{
_paymentRepository = paymentRepository;
_orderRepository = orderRepository;
_events = events;
}
public void PayOrder(int orderId, decimal amount)
{
Order order = _orderRepository.GetById(orderId);
Payment payment = new Payment()
{
OrderId = orderId,
Amount = amount
};
_paymentRepository.Insert(payment);
_events.Raise(new OrderPaidEvent(payment, order.Items));
}
}
IDomainEventsRaiser
负责触发事件,它只需要一个方法来实现此功能
public interface IDomainEventsRaiser
{
/// <summary>
/// Raises the given domain event
/// </summary>
/// <typeparam name="T">Domain event type</typeparam>
/// <param name="domainEvent">Domain event</param>
void Raise<T>(T domainEvent) where T : IDomainEvent;
}
OrderPaidEvent
之所以成为一个事件,是因为它实现了 IDomainEvent
接口。这个 `interface` 实际上并没有暴露任何服务。它唯一的目的是作为 `IDomainEventsRaiser` 的 `Raise` 方法的一个泛型约束,以分组表示事件的类。
public class OrderPaidEvent : IDomainEvent
{
public OrderPaidEvent(Payment payment, IEnumerable<OrderItem> orderItems)
{
Payment = payment;
OrderItems = new List<OrderItem>(orderItems);
}
public Payment Payment { get; private set; }
public List<OrderItem> OrderItems { get; set; }
}
处理事件
正如我们之前所说,我们想要轻松注册。为此,我们将使用一些约定。每个处理器都必须实现 IHandles
接口,如下所示
/// <summary>
/// Handles an event. If there is a database transaction, the
/// execution is delayed until the transaction is complete.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IHandles<T> where T : IDomainEvent
{
void Handle(T domainEvent);
}
一旦订单支付完成,其相关商品将从库存中减去。换句话说,一旦 OrderPaidEvent
被触发,SubtractInventaryWhenOrderPaidEventHandler
将处理它。IHandles
接口在领域事件和它的处理器之间建立了联系。
public class SubtractInventaryWhenOrderPaidEventHandler : IHandles<OrderPaidEvent>
{
private readonly IInventoryService _inventory;
public SubtractInventaryWhenOrderPaidEventHandler(IInventoryService inventory)
{
_inventory = inventory;
}
public void Handle(OrderPaidEvent domainEvent)
{
foreach (var item in domainEvent.OrderItems)
_inventory.SubtractAvailability(item.InventoryItemId, item.Quantity);
}
}
处理器也是由依赖注入容器创建的。这使得依赖项更容易传递。
同一个事件可以有多个处理器。也就是说,一旦订单支付完成,它就可以被安排发货。这意味着 PlaceOrderWhenPaidEventHandler
将处理同一个事件(我们也可以有一个单一的处理器来处理多个事件)。
public class PlaceOrderWhenPaidEventHandler : IHandles<OrderPaidEvent>
{
private readonly IOrderPlacementService _orderPlacement;
public PlaceOrderWhenPaidEventHandler(IOrderPlacementService orderPlacement)
{
_orderPlacement = orderPlacement;
}
public void Handle(OrderPaidEvent domainEvent)
{
_orderPlacement.PlaceOrder(domainEvent.Payment.OrderId);
}
}
领域事件触发器
现在我们了解了事件是如何被触发和处理的,让我们看看事件触发器是如何实现的
/// <summary>
/// Simple domain events raiser that is functional, but doesn't support deferred execution
/// </summary>
class DomainEventsRaiser : IDomainEventsRaiser
{
/// <summary>
/// Locator of event handlers
/// </summary>
private readonly IServiceProvider _locator;
internal DomainEventsRaiser(IServiceProvider locator)
{
_locator = locator;
}
/// <summary>
/// Raises the given domain event
/// </summary>
/// <typeparam name="T">Domain event type</typeparam>
/// <param name="domainEvent">Domain event</param>
public void Raise<T>(T domainEvent) where T : IDomainEvent
{
//Get all the handlers that handle events of type T
IHandles<T>[] allHandlers = (IHandles<T>[])_locator.GetService(typeof(IHandles<T>[]));
if (allHandlers != null && allHandlers.Length > 0)
foreach (var handler in allHandlers)
handler.Handle(domainEvent);
}
}
正如我们所见,一个定位器会找到处理特定事件的所有处理器,然后依次触发每个处理器。领域事件触发器也是由依赖注入容器创建的。它允许服务在其构造函数中拥有其引用。
同样的原理也可以用于其他需求,例如 CQRS 来处理命令。
基于约定的注册
您可能希望为属于同一概念、具有相似结构并遵循相同注册和解析模式的服务使用基于约定的注册。更“手动”的注册对于不符合第一条规则且需要更具体需求的服务可能更有意义。
在本节中,我们将使用 **Unity**,但任何不错的容器都应该提供类似的功能。
Container.RegisterTypes(new EventHandlersConvention());
这将使容器注册所有实现 IHandles<T>
的类。这使得 DomainEventsRaiser
可以使用一个定位器来查找特定事件的处理器。
/// <summary>
/// Register the conventions that allows domain events to be raised and handled
/// </summary>
class EventHandlersConvention : RegistrationConvention
{
public override Func<Type, IEnumerable<Type>> GetFromTypes()
{
return WithMappings.FromAllInterfaces;
}
public override Func<Type, IEnumerable<InjectionMember>> GetInjectionMembers()
{
return (t) => new InjectionMember[0];
}
public override Func<Type, LifetimeManager> GetLifetimeManager()
{
return WithLifetime.Transient;
}
public override Func<Type, string> GetName()
{
return WithName.TypeName;
}
public override IEnumerable<Type> GetTypes()
{
Type handlerType = typeof(IHandles<>);
return AllClasses.FromLoadedAssemblies(skipOnError: false).
Where(t => !t.IsAbstract &&
t.GetInterfaces().Any(i => i.IsGenericType &&
i.GetGenericTypeDefinition().Equals(handlerType)));
}
}
**重要提示**:
DomainEventsRaiser
的生命周期不是瞬时的。它可以是单例的,但我建议作用域为整个事务的持续时间,因为该对象的**状态(我们在下一节中将看到的事件队列)在不同事务之间是不可重用的**。
添加延迟执行支持
我们已经有了一个功能齐全的领域事件示例,这些事件是使用约定注册的。现在让我们加入一点现实世界的元素。
假设,一旦订单支付完成,就会向客户发送一封电子邮件,告知他们已处理付款。这就是我们现在的流程
使用 Balsamiq Mockups 创建。
我们将使用我们为领域事件提出的领域事件触发器来处理电子邮件发送。然而,电子邮件应该只在事务提交后发送。
因此,在我们的 IHandles 接口
中,我们添加了一个 Deferred
属性
/// <summary>
/// Handles an event. If there is a database transaction, the
/// execution is delayed until the transaction is complete.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IHandles<T> where T : IDomainEvent
{
void Handle(T domainEvent);
bool Deferred { get; }
}
一个类可以处理多个事件。NotificationsHandler
展示了如何实现这种行为
public class NotificationsHandler : IHandles<OrderPaidEvent>,
IHandles<OrderShippedEvent> //<- multiple events handled by a single handler
{
private readonly IUserNotifier _notifier;
public NotificationsHandler
(IUserNotifier notifier) // <-- instantiated by the dependencies container
{
_notifier = notifier;
}
public bool Deferred { get { return true; } } //<- 'true' here indicates that
//all these events should be invoked only after the transaction is committed
public void Handle(OrderPaidEvent domainEvent)
{
_notifier.Notify("Yay! Your payment has been processed.");
}
public void Handle(OrderShippedEvent domainEvent)
{
_notifier.Notify(string.Format("Your order has finally been shipped.
Address : \"{0}\"", domainEvent.Order.ShipmentAddress));
}
}
DeferredDomainEventsRaiser
DeferredDomainEventsRaiser
取代了我之前提到的简单 DomainEventsRaiser
。它将延迟的处理器添加到队列中,并在事务提交后分派它们。请注意:
- 并非所有事件都会被延迟。只有那些
IHandles.Deferred = true
的事件。 - 此外,如果没有正在进行的事务,事件永远不会被延迟。
/// <summary>
/// Domain events handler that supports deferred execution
/// </summary>
class DeferredDomainEventsRaiser : IDomainEventsRaiser
{
/// <summary>
/// Locator of event handlers
/// </summary>
private readonly IServiceProvider _resolver;
/// <summary>
/// Collection of events queued for later execution
/// </summary>
private readonly ConcurrentQueue<Action> _pendingHandlers = new ConcurrentQueue<Action>();
/// <summary>
/// Data access state manager
/// </summary>
private readonly IDbStateTracker _dbState;
public DeferredDomainEventsRaiser(IServiceProvider resolver, IDbStateTracker dbState)
{
_resolver = resolver;
_dbState = dbState;
_dbState.TransactionComplete += this.Flush;
_dbState.Disposing += this.FlushOrClear;
}
/// <summary>
/// Raises the given domain event
/// </summary>
/// <typeparam name="T">Domain event type</typeparam>
/// <param name="domainEvent">Domain event</param>
public void Raise<T>(T domainEvent) where T : IDomainEvent
{
//Get all the handlers that handle events of type T
IHandles<T>[] allHandlers =
(IHandles<T>[])_resolver.GetService(typeof(IHandles<T>[]));
if (allHandlers != null && allHandlers.Length > 0)
{
IHandles<T>[] handlersToEnqueue = null;
IHandles<T>[] handlersToFire = allHandlers;
if (_dbState.HasPendingChanges())
{
//if there is a transaction in progress, events are enqueued to be executed later
handlersToEnqueue = allHandlers.Where(h => h.Deferred).ToArray();
if (handlersToEnqueue.Length > 0)
{
lock (_pendingHandlers)
foreach (var handler in handlersToEnqueue)
_pendingHandlers.Enqueue(() => handler.Handle(domainEvent));
handlersToFire = allHandlers.Except(handlersToEnqueue).ToArray();
}
}
foreach (var handler in handlersToFire)
handler.Handle(domainEvent);
}
}
/// <summary>
/// Fire all the events in the queue
/// </summary>
private void Flush()
{
Action dispatch;
lock (_pendingHandlers)
while (_pendingHandlers.TryDequeue(out dispatch))
dispatch();
}
/// <summary>
/// Execute all pending events if there is no open transaction.
/// Otherwise, clears the queue without executing them
/// </summary>
private void FlushOrClear()
{
if (!_dbState.HasPendingChanges())
Flush();
else
{
//If the state manager was disposed with a transaction in progress, we clear
//the queue without firing the events because this flow is pretty inconsistent
//(it could be caused, for instance, by an unhandled exception)
Clear();
}
}
/// <summary>
/// Clear the pending events without firing them
/// </summary>
private void Clear()
{
Action dispatch;
lock (_pendingHandlers)
while (_pendingHandlers.TryDequeue(out dispatch)) ;
}
}
这个新的 interface IDbStateTracker
仅暴露了关于数据库状态的一些事件,并且可以由 UnitOfWork
实现(请参阅本文随附的源代码,也可在 GitHub 上找到)。
/// <summary>
/// Tracks the database's state
/// </summary>
public interface IDbStateTracker : IDisposable
{
/// <summary>
/// Triggered when disposing
/// </summary>
event Action Disposing;
/// <summary>
/// Triggered when the transaction is completed
/// </summary>
event Action TransactionComplete;
/// <summary>
/// Returns true if there are uncommitted pending changes.
/// Otherwise, returns false.
/// </summary>
bool HasPendingChanges();
}
整合
这种方法允许通过使用约定自动注册事件处理器。此外,相同的设计允许事件处理器在线触发或将其执行延迟到事务结束。本文随附的源代码(也可在 GitHub 上找到)包含了此处讨论的原则的完整工作示例。