65.9K
CodeProject 正在变化。 阅读更多。
Home

CQRS - 在 Microsoft Orleans 上托管事件溯源系统

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (1投票)

2017年5月20日

CPOL

4分钟阅读

viewsIcon

27949

Microsoft Orleans 如何促进高度可扩展的 CQRS 和基于事件溯源的系统

引言

过去,我曾写过关于命令查询职责分离 (CQRS) 架构以及事件溯源的数据存储/处理理念如何很好地契合其中的内容。然而,任何选择此路线来实现其应用程序的开发者,也必须决定在哪里以及如何托管它。

在一个纯粹的单体应用程序中,我的建议是将其像 MVC 和 MVVM 架构一样嵌入到代码中,但一旦你的系统转向微服务甚至无服务器函数,你就需要考虑将 CQRS/ES 系统托管在它们之外。

Microsoft Orleans 是一个可行的托管方案,它致力于通过抽象化分布式系统中出现的持久化和线程同步的复杂性,极大地简化容错、异步分布式系统的创建。它通过使用 Actor 模型(尽管将 Actor 称为“Grains”)并限制对其的操作,从而使其线程安全。

背景

如果你是 CQRS 架构和事件溯源的新手,我建议阅读以上文章,或者如果你有 45 分钟的空闲时间,还可以观看这个YouTube 视频。

必备组件

设计决策

第一个问题是:每个投影都应该是独立可用的,还是聚合 grain 应该包含其所有投影?

例如,在上面的 CQRS 域中,“**运行余额**”投影是否应该成为一个独立的 grain,还是应该作为“**银行账户**”grain 的一个方面?

CQRS/ES 的一个批评是需要完全重新加载聚合才能使用它的任何方面。因此,在本篇文章中,我将让每个投影成为自己的 grain。然而,这个设计决策取决于你的业务模型,你应该考虑这两种方法。

创建 Orleans "grains"

第一步是创建一个库,将业务类(来自 CQRS 设计器)包装在 Orleans 接口中,以便它们可以由 Orleans 托管。使用 Orelans Tools 插件创建一个新的接口项目。

然后,在该项目中,添加对由 CQRS 建模域的代码生成创建的 `.EventSourcing` 项目的引用。

包装聚合

你的 CQRS 域中的每个聚合都必须用 grain 接口包装,该接口使用相同的数据类型作为唯一标识符。例如,银行示例中的“`Account`”聚合有一个`string`(账号)来唯一标识它,因此我们将其包装在一个也继承自 `IGrainWithStringKey` 的类中。

然后,在接口中,你需要定义一个任务来处理聚合的每种事件类型,包括序列号的参数(以防止事件被处理两次)。

    /// <summary>
    /// Grain interface IAccountGrain for the Account aggregate
    /// </summary>
    public interface IAccountGrain :
        IGrainWithStringKey ,
        Accounts.Account.IAccount 
    {
        /// <summary>
        /// The account was opened
        /// </summary>
        Task HandleOpenedEvent(int eventSequence, 
               Accounts.Account.eventDefinition.IOpened eventData);

        /// <summary>
        /// The account was closed
        /// </summary>
        Task HandleClosedEvent(int eventSequence, 
               Accounts.Account.eventDefinition.IClosed eventData);

        /// <summary>
        /// Money was paid into the account
        /// </summary>
        Task HandleMoneyDepositedEvent(int eventSequence, 
                Accounts.Account.eventDefinition.IMoney_Deposited eventData);

        /// <summary>
        /// Money was withdrawn from this account
        /// </summary>
        Task HandleMoneyWithdrawnEvent(int eventSequence, 
                Accounts.Account.eventDefinition.IMoney_Withdrawn eventData);
    }

由于我们将投影的实现与聚合类分开,我们也需要一个用于投影的 grain 接口,其中只包含该投影关心的事件类型。

    public interface IRunningBalanceGrain
        : IGrainWithStringKey ,
        Accounts.Account.projection.IRunning_Balance
    {

        /// <summary>
        /// Money was paid into the account
        /// </summary>
        Task HandleMoneyDepositedEvent(int eventSequence, 
               Accounts.Account.eventDefinition.IMoney_Deposited eventData);

        /// <summary>
        /// Money was withdrawn from this account
        /// </summary>
        Task HandleMoneyWithdrawnEvent(int eventSequence, 
               Accounts.Account.eventDefinition.IMoney_Withdrawn eventData);

        /// <summary>
        /// The date/time of the last transaction against this account
        /// </summary>
        Task<DateTime> GetLastTransactionDate();

        /// <summary>
        /// The current running balance for the account
        /// </summary>
        Task<decimal> GetBalance();
    }

我们还添加了可用于检索投影当前状态的任务。CQRS 设计器创建的业务类将执行实际的投影逻辑,但我们需要这一组附加函数,以便能够通过 Orleans 基础结构查询它。

由于所有 Orleans grain 类(上述接口的具体实现)都必须继承自基类 Grain,并且不支持多重继承,因此我们使用类的私有实例将 CQRS 设计器派生的类连接到它们的 Orleans grain 包装器中。

    public class AccountGrain :
        Grain, IAccountGrain
    {
        /// <summary>
        /// Private link to the CQRS-DSL created account instance
        /// </summary>
        private Accounts.Account.Account _account;       

        public string GetAggregateIdentifier()
        {
            if (null != _account )
            {
                return _account.GetAggregateIdentifier();
            }
            else
            {
                throw new NullReferenceException("Account instance not initialised");
            }
        }

 // - - 8< - - - - - - -
    }

这也适用于投影包装器的具体实现。

    public class RunningBalanceGrain
        : Grain, IRunningBalanceGrain
    {
        /// <summary>
        /// Private link to the CQRS-DSL created running balance projection instance
        /// </summary>
        private Accounts.Account.projection.Running_Balance _runningBalance;

        /// <summary>
        /// The date/time of the last balance affecting transaction
        /// </summary>
        public DateTime Last_Transaction_Date
        {
            get
            {
                if (null != _runningBalance )
                {
                    return _runningBalance.Last_Transaction_Date;
                }
                else
                {
                    throw new NullReferenceException
                    ("Running balance projection instance not initialised");
                }
            }
        }
 // - - - 8< - - - - - - -
}

这确实需要一些额外的代码来将 CQRS 类连接到 Orleans 包装器,但它仍然保持了业务逻辑(来自 CQRS 设计器类)和实现逻辑(由 Orleans 提供)之间的分离。

为了将 CQRS-DSL 提供的业务类与它所托管的 grain 同步,我们需要在创建 grain 时实例化它。在 Orleans 中,你可以重写 `OnActivateAsync` 方法来实现这一点。

    public class AccountGrain :
        Grain, IAccountGrain
    {
        /// <summary>
        /// Private link to the CQRS-DSL created account instance
        /// </summary>
        private Account _account;
        private void InitialiseAccount()
        {
            if (null == _account )
            {
                _account = new Account(this.GetPrimaryKeyString ());
            }
        }

        public override Task OnActivateAsync()
        {
            InitialiseAccount();
            return base.OnActivateAsync();
        }

     // - - 8< - - - - - -
    }

创建 Silo

这些 grains(实体)的实例需要由一个 **silo** 托管,它本质上是该 grain 的一个虚拟机环境。这与 Orleans 允许的集群管理相结合,意味着你可以快速启动一个真正的分布式 CQRS/ES 应用程序。

关注点

  • 这绝不是在 Microsoft Orleans 上实现 CQRS 的权威方法——我还建议查看 OrleansAkka 项目,特别是如果你使用 F#。
  • 你可以在用户之间持久化 grains 时使用许多存储提供程序,包括各种 Azure 云存储选项。

历史

  • 2017 年 5 月 20 日 - 初始版本(我可能会添加标识符组)
© . All rights reserved.