CQRS 入门






4.96/5 (267投票s)
在本文中,我将通过一个非常简单的示例来描述 CQRS 模式。
什么是 CQRS
CQRS 代表命令查询职责分离(Command Query Responsibility Segregation)。许多人认为 CQRS 是一种完整的架构,但他们错了。CQRS 只是一个小的模式。这个模式最初由 Greg Young 和 Udi Dahan 提出。他们从 Bertrand Meyer 在其著作《面向对象软件构建》中定义的命令查询分离模式(Command Query Separation)中获得灵感。CQS 的主要思想是:“一个方法要么改变对象的状态,要么返回一个结果,但不能两者兼而有之。换句话说,提问不应该改变答案。更正式地说,方法只有在具有参照透明性且没有副作用时才应该返回值。”(维基百科)因此,我们可以将方法分为两组:
- 命令 - 改变对象或整个系统的状态(有时称为修改器或变异器)。
- 查询 - 返回结果,不改变对象的状态。
在实际情况中,区分它们非常简单。查询将声明返回类型,而命令将返回 void。这种模式应用广泛,使对象推理更容易。另一方面,CQRS 仅适用于特定问题。
许多使用主流方法的应用程序都包含读写两侧通用的模型。为读写两侧使用相同的模型会导致更复杂的模型,这可能非常难以维护和优化。
这两种模式的真正优势在于,您可以将改变状态的方法与不改变状态的方法分离。这种分离在处理性能和调优时非常方便。您可以将系统的读取侧与写入侧分开优化。写入侧被称为领域(domain)。领域包含所有行为。读取侧专门用于报表需求。
这种模式的另一个好处是在大型应用程序中。您可以将开发人员分成较小的团队,分别在系统的不同侧(读取或写入)工作,而无需了解另一侧。例如,从事读取侧工作的开发人员不需要理解领域模型。
查询侧
查询将只包含获取数据的方法。从架构的角度来看,这些将是所有返回 DTO 的方法,客户端消费这些 DTO 以在屏幕上显示。DTO 通常是领域对象的投影。在某些情况下,这可能是一个非常痛苦的过程,特别是当需要复杂的 DTO 时。
使用 CQRS 可以避免这些投影。相反,可以引入一种新的投影 DTO 的方式。您可以绕过领域模型,直接从数据存储中通过读取层获取 DTO。当应用程序请求数据时,可以通过对读取层进行一次调用来完成,该调用返回包含所有所需数据的单个 DTO。
读取层可以直接连接到数据库(数据模型),使用存储过程读取数据不是一个坏主意。直接连接到数据源使得查询非常容易维护和优化。对数据进行反范式化是有意义的。原因在于,数据查询的次数通常远多于领域行为的执行次数。这种反范式化可以提高应用程序的性能。
命令侧
由于读取侧已分离,领域仅专注于命令的处理。现在,领域对象不再需要暴露内部状态。存储库除了 `GetById` 之外,只有少量查询方法。
命令由客户端应用程序创建,然后发送到领域层。命令是指令特定实体执行特定操作的消息。命令的命名方式类似于 DoSomething(例如,ChangeName、DeleteOrder...)。它们指示目标实体执行可能导致不同结果或失败的操作。命令由命令处理程序处理。
public interface ICommand
{
Guid Id { get; }
}
public class Command : ICommand
{
public Guid Id { get; private set; }
public int Version { get; private set; }
public Command(Guid id,int version)
{
Id = id;
Version = version;
}
}
public class CreateItemCommand:Command
{
public string Title { get; internal set; }
public string Description { get;internal set; }
public DateTime From { get; internal set; }
public DateTime To { get; internal set; }
public CreateItemCommand(Guid aggregateId, string title,
string description,int version,DateTime from, DateTime to)
: base(aggregateId,version)
{
Title = title;
Description = description;
From = from;
To = to;
}
}
所有命令都将发送到命令总线,命令总线将每个命令委托给命令处理程序。这表明领域只有一个入口点。命令处理程序的职责是在领域上执行适当的领域行为。命令处理程序应与存储库连接,以提供加载所需实体(在此上下文中称为聚合根 Aggregate Root)的能力,在该实体上将执行行为。
public interface ICommandHandler<TCommand> where TCommand : Command
{
void Execute(TCommand command);
}
public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
private IRepository<DiaryItem> _repository;
public CreateItemCommandHandler(IRepository<DiaryItem> repository)
{
_repository = repository;
}
public void Execute(CreateItemCommand command)
{
if (command == null)
{
throw new ArgumentNullException("command");
}
if (_repository == null)
{
throw new InvalidOperationException("Repository is not initialized.");
}
var aggregate = new DiaryItem(command.Id, command.Title, command.Description,
command.From, command.To);
aggregate.Version = -1;
_repository.Save(aggregate, aggregate.Version);
}
}
命令处理程序执行以下任务:
- 它从消息基础设施(命令总线)接收命令实例。
- 它验证命令是有效命令。
- 它找到作为命令目标的聚合实例。
- 它调用聚合实例上的适当方法,并传入命令中的任何参数。
- 它将聚合的新状态持久化到存储中。
内部事件
我们首先应该问的问题是什么是领域事件。领域事件是系统中过去发生的事情。事件通常是命令的结果。例如,客户端请求了一个 DTO 并进行了一些更改,这导致发布了一个命令。然后,适当的命令处理程序加载了正确的聚合根并执行了适当的行为。此行为引发一个事件。此事件由特定的订阅者处理。聚合将事件发布到事件总线,事件总线将事件传递给适当的事件处理程序。在聚合根内部处理的事件称为内部事件。事件处理程序除了设置状态之外不应执行任何逻辑。
领域行为
public void ChangeTitle(string title)
{
ApplyChange(new ItemRenamedEvent(Id, title));
}
领域事件
public class ItemCreatedEvent:Event
{
public string Title { get; internal set; }
public DateTime From { get; internal set; }
public DateTime To { get; internal set; }
public string Description { get;internal set; }
public ItemCreatedEvent(Guid aggregateId, string title ,
string description, DateTime from, DateTime to)
{
AggregateId = aggregateId;
Title = title;
From = from;
To = to;
Description = description;
}
}
public class Event:IEvent
{
public int Version;
public Guid AggregateId { get; set; }
public Guid Id { get; private set; }
}
内部领域事件处理程序
public void Handle(ItemRenamedEvent e)
{
Title = e.Title;
}
事件通常与另一个名为事件源(Event Sourcing,ES)的模式相关联。ES 是一种通过保存事件流来持久化聚合状态的方法,以记录聚合状态的变化。
正如我之前提到的,聚合根的每个状态更改都是由事件触发的,并且聚合根的内部事件处理程序除了设置正确状态之外没有其他作用。要获取聚合根的状态,我们必须在内部重播所有事件。这里我必须提到,事件是只写的。您不能更改或删除现有事件。如果您发现系统中的某些逻辑生成了错误的事件,您必须生成一个新的补偿事件,纠正之前错误事件的结果。
外部事件
外部事件通常用于使报告数据库与领域的当前状态同步。这是通过将内部事件发布到领域外部来完成的。当事件发布时,相应的事件处理程序会处理该事件。外部事件可以发布到多个事件处理程序。事件处理程序执行以下任务:
- 它从消息基础设施(事件总线)接收事件实例。
- 它找到作为事件目标的进程管理器实例。
- 它调用进程管理器实例的适当方法,并传入事件中的任何参数。
- 它将进程管理器的新状态持久化到存储中。
但是谁可以发布事件呢?通常,领域存储库负责发布外部事件。
Using the Code
我创建了一个非常简单的示例,演示如何实现 CQRS 模式。这个简单的示例允许您创建和修改日记项。该解决方案由三个项目组成:
- Diary.CQRS
- Diary.CQRS.Configuration
- Diary.CQRS.Web
第一个是包含所有领域和消息对象的基项目。Configuration 项目由 Web 消耗,Web 是此示例的 UI。现在让我们仔细看看主项目。
Diary.CQRS
正如我之前提到的,这个项目包含了这个示例的所有领域和消息对象。CQRS 示例的唯一入口点是命令总线,命令被发送到那里。这个类只有一个泛型方法 Send
。这个方法负责使用 CommandHandlerFactory
创建适当的命令处理程序。如果没有命令处理程序与命令关联,则抛出异常。在其他情况下,会调用 Execute
方法,其中执行行为。行为创建内部事件,并将此事件存储到名为 _changes
的内部字段中。此字段在 AggregateRoot
基类中声明。接下来,此事件由内部事件处理程序处理,该处理程序更改聚合的状态。在处理此行为之后,聚合的所有更改都存储到存储库中。存储库通过比较聚合的预期版本和存储中存储的聚合版本来检查是否存在一些不一致之处。如果这些版本不同,则意味着对象已被其他人修改,并抛出 ConcurrencyException
。在其他情况下,更改存储在事件存储中。
存储库
public class Repository<T> : IRepository<T> where T : AggregateRoot, new()
{
private readonly IEventStorage _storage;
private static object _lockStorage = new object();
public Repository(IEventStorage storage)
{
_storage = storage;
}
public void Save(AggregateRoot aggregate, int expectedVersion)
{
if (aggregate.GetUncommittedChanges().Any())
{
lock (_lockStorage)
{
var item = new T();
if (expectedVersion != -1)
{
item = GetById(aggregate.Id);
if (item.Version != expectedVersion)
{
throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",
item.Id));
}
}
_storage.Save(aggregate);
}
}
}
public T GetById(Guid id)
{
IEnumerable<Event> events;
var memento = _storage.GetMemento<BaseMemento>(id);
if (memento != null)
{
events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version);
}
else
{
events = _storage.GetEvents(id);
}
var obj = new T();
if(memento!=null)
((IOriginator)obj).SetMemento(memento);
obj.LoadsFromHistory(events);
return obj;
}
}
内存事件存储(InMemoryEventStorage)
在这个简单的示例中,我创建了一个 InMemoryEventStorage
,它将所有事件存储在内存中。这个类实现了包含四个方法的 IEventStorage
接口。
public IEnumerable<Event> GetEvents(Guid aggregateId)
{
var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p);
if (events.Count() == 0)
{
throw new AggregateNotFoundException(string.Format(
"Aggregate with Id: {0} was not found", aggregateId));
}
return events;
}
此方法返回聚合的所有事件,并在聚合没有事件时抛出错误,这意味着聚合不存在。
public void Save(AggregateRoot aggregate)
{
var uncommittedChanges = aggregate.GetUncommittedChanges();
var version = aggregate.Version;
foreach (var @event in uncommittedChanges)
{
version++;
if (version > 2)
{
if (version % 3 == 0)
{
var originator = (IOriginator)aggregate;
var memento = originator.GetMemento();
memento.Version = version;
SaveMemento(memento);
}
}
@event.Version=version;
_events.Add(@event);
}
foreach (var @event in uncommittedChanges)
{
var desEvent = Converter.ChangeTo(@event, @event.GetType());
_eventBus.Publish(desEvent);
}
}
此方法将事件存储到内存中,并为聚合创建每三个事件的快照(memento)。此快照保存聚合的所有状态信息和版本。使用快照可以提高应用程序的性能,因为它不重要加载所有事件,而只加载最近的三个。
当所有事件存储完毕后,它们通过事件总线发布,并由外部事件处理程序消费。
public T GetMemento<T>(Guid aggregateId) where T : BaseMemento
{
var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault();
if (memento != null)
return (T) memento;
return null;
}
返回聚合的快照。
public void SaveMemento(BaseMemento memento)
{
_mementos.Add(memento);
}
存储聚合的快照。
聚合根(Aggregate Root)
AggregateRoot
类是所有聚合的基类。这个类实现了 IEventProvider
接口。它在 _changes
列表中保存所有未提交的更改信息。这个类还有一个 ApplyChange
方法,用于执行适当的内部事件处理程序。LoadFromHistory
方法加载并应用内部事件。
public abstract class AggregateRoot:IEventProvider
{
private readonly List<Event> _changes;
public Guid Id { get; internal set; }
public int Version { get; internal set; }
public int EventVersion { get; protected set; }
protected AggregateRoot()
{
_changes = new List<Event>();
}
public IEnumerable<Event> GetUncommittedChanges()
{
return _changes;
}
public void MarkChangesAsCommitted()
{
_changes.Clear();
}
public void LoadsFromHistory(IEnumerable<Event> history)
{
foreach (var e in history) ApplyChange(e, false);
Version = history.Last().Version;
EventVersion = Version;
}
protected void ApplyChange(Event @event)
{
ApplyChange(@event, true);
}
private void ApplyChange(Event @event, bool isNew)
{
dynamic d = this;
d.Handle(Converter.ChangeTo(@event,@event.GetType()));
if (isNew)
{
_changes.Add(@event);
}
}
}
事件总线(EventBus)
事件描述系统状态的变化。事件的主要目的是更新读取模型。为此,我创建了 EventBus
类。EventBus
类的唯一行为是将事件发布给订阅者。一个事件可以发布给多个订阅者。在这个示例中,不需要手动订阅。事件处理程序工厂返回所有可以处理当前事件的 EventHandler
列表。
public class EventBus:IEventBus
{
private IEventHandlerFactory _eventHandlerFactory;
public EventBus(IEventHandlerFactory eventHandlerFactory)
{
_eventHandlerFactory = eventHandlerFactory;
}
public void Publish<T>(T @event) where T : Event
{
var handlers = _eventHandlerFactory.GetHandlers<T>();
foreach (var eventHandler in handlers)
{
eventHandler.Handle(@event);
}
}
}
事件处理程序
事件处理程序的主要目的是接收事件并更新读取模型。在下面的示例中,您可以看到 `ItemCreatedEventHandler`。它处理 `ItemCreatedEvent`。利用事件中的信息,它创建一个新对象并将其存储在报告数据库中。
public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent>
{
private readonly IReportDatabase _reportDatabase;
public ItemCreatedEventHandler(IReportDatabase reportDatabase)
{
_reportDatabase = reportDatabase;
}
public void Handle(ItemCreatedEvent handle)
{
DiaryItemDto item = new DiaryItemDto()
{
Id = handle.AggregateId,
Description = handle.Description,
From = handle.From,
Title = handle.Title,
To=handle.To,
Version = handle.Version
};
_reportDatabase.Add(item);
}
}
Diary.CQRS.Web
这个项目作为 CQRS 示例的 UI。Web UI 项目是一个简单的 ASP.NET MVC 4 应用程序,只有一个 `HomeController` 包含六个 `ActionResult` 方法:
ActionResult Index()
- 此方法返回 Index 视图,这是此应用程序的主视图,您可以在其中看到所有日记项的列表。ActionResult Delete(Guid id)
- 此方法创建一个新的DeleteItemCommand
并将其发送到CommandBus
。当命令发送后,方法将返回 Index 视图。ActionResult Add()
- 返回 Add 视图,您可以在其中输入新日记项的数据。ActionResult Add(DiaryItemDto item)
- 此方法创建一个新的CreateItemCommand
并将其发送到CommandBus
。当创建新项时,将返回 Index 视图。ActionResult Edit(Guid id)
- 返回所选日记项的 Edit 视图。ActionResult Edit(DiaryItemDto item)
- 此方法创建一个新的ChangeItemCommand
并将其发送到CommandBus
。当项目成功更新后,返回 Index 屏幕。如果发生ConcurrencyError
,则返回编辑视图并在屏幕上显示异常。
在下图中,您可以看到主屏幕和日记项列表。
何时使用 CQRS
一般来说,CQRS 模式在以下情况下可能非常有价值:您拥有高度协作的数据和大型多用户系统,复杂且包含不断变化的业务规则,并为业务带来显著的竞争优势。当您需要跟踪和记录历史更改时,它会非常有帮助。
通过 CQRS,您可以实现出色的读写性能。系统本身支持横向扩展。通过分离读写操作,每个操作都可以得到优化。
当您遇到复杂的业务逻辑时,CQRS 会非常有用。CQRS 强制您不混淆领域逻辑和基础设施操作。
通过 CQRS,您可以将开发任务分配给具有明确接口的不同团队。
何时不使用 CQRS
如果您开发的不是一个高度协作的系统,并且没有多个写入者针对同一逻辑数据集,则不应使用 CQRS。
历史
- 3 月 4 日 - 发布原始版本。