65.9K
CodeProject 正在变化。 阅读更多。
Home

事件流会计

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.67/5 (2投票s)

2014年5月2日

CPOL

4分钟阅读

viewsIcon

20901

在财务软件中使用事件源应用程序架构。

引言

事件溯源 是一种架构模型,它基于存储对象生命周期中发生的事件序列,而不是存储该对象的某个时间点的状态。

它非常适合金融应用,因为(通常)任何账户余额都是导致该余额的事件的结果,并且我们希望能够既查看数字的底层推导过程,又能够回溯到给定时间点的估值。

事件溯源允许我们通过过滤掉事件或通过在备用流中播放事件来执行“假设分析”。

快速示例 – 一个简单的银行账户

银行账户可能是最简单的金融流,仅由单一货币的存款和取款组成。 在此模型中,事件只是余额调整的金额——存款为正,取款为负。

[ - - +50 - - +50 - - -100  - ->]

在本示例中,估值仅在实际存款或取款发生时才会发生变化——因此,通过播放事件流直到结束,我们可以看到当前余额现在为 0。

第二个例子 – 股票账户

对于股票账户,有两个流。 一个是持仓流,其中记录了股票的买入或卖出,另一个是定价流,其中记录了标的股票的价格。

[ - - +50 - - +50 - - -100  - ->]
[178.1 - - - - 178.0 - - - - 178.9 - - >]

这些可以组合成一个估值流,该估值流是当前持仓乘以最新价格的组合。 持仓事件和/或定价事件都将触发估值。

[ 0 - - 8905.00 - - 17810.00 - - 17800.00 - - 0.00 - - 0.00 ]

第三个例子 – 以非基础货币计价的持仓

如果您的股票以非基础货币持有,那么持有货币与账户基础货币之间的汇率变化也会影响估值。

[ - - +50- - +50- -  -100- -> ]
[178.1 - - - - 178.0 - - - - 178.9 -> ]
[1.1- - -  1.2 - - -  1.3 - - -  1.4 -> ] 

事件

对于任何事件,您都需要知道——它何时发生,它位于哪个流中,它的价值是多少。 从上面的例子中,您还可以看到,该值可以是绝对金额(例如,价格或汇率),也可以是变化(或增量)金额。

对于作为增量金额流的事件流,写入当前值的定期快照可能会有所帮助。 这使我们能够通过转到该点之前的快照,然后播放该快照时间之后的任何增量事件,将流回放到任何给定点。

派生事件

流可以是源数据,也可以通过一个或多个源流的函数派生。 对于派生事件,使用函数从其组成流中计算值,并且对于任何这些流上的任何事件都会触发此函数。

复式记账

在复式记账中,持有事件记录在两个(或更多)流中——一个代表事件影响的每个总账账户(最常见的是一个用于资产,一个用于负债)。 传统上,这是通过对输入流应用过账规则来过账到相关的两个(或更多)账户流来完成的。

快照

为了最大限度地缩短读取访问时间,此架构的基础应支持读取和创建快照。 这些快照可以被视为数据传输对象,以便集成到任何基于它们的系统(例如 MVC 应用程序)中。

定义所需操作的接口看起来像

    /// <summary>
    /// Repository to write to objects that are held in snapshot form
    /// </summary>
    /// <typeparam name="TEntity">
    /// The type of entity in the repository
    /// </typeparam>
    /// <typeparam name="TKey">
    /// The type of the key to uniquely identify the entity
    /// </typeparam>
    /// <remarks>
    /// This does not inherit from IRepositoryWrite 
    /// because it is possible that some background task will be
    /// creating the snapshots independent of front-end write requests
    /// </remarks>
    public interface IRepositoryWriteSnapshot<TKey, TEntity> where TEntity : IKeyedEntity<TKey>
    {
        /// <summary>
        /// Request that a snapshot as-of-now be taken for the given entity aggregate
        /// </summary>
        /// <param name="key">
        /// The unique identifier of the entity that we want snapshotted
        /// </param>
        /// <remarks>
        /// This does not return any value as it is designed to 
        /// operate asynchronously by setting a "needs snapshot" flag to prevent
        /// the snapshot generator system being flooded
        /// </remarks>
        void RequestSnapshot(TKey key);

        /// <summary>
        /// Delete and regenerate any snapshots post the as-of synchronization value
        /// </summary>
        /// <param name="key">
        /// The unique identifier of the entity that we want snapshotted
        /// </param>
        /// <param name="synchronization">
        /// The synchronization point for which we want the data regenerated
        /// </param>
        void RegenerateFromAsOf(TKey key, long synchronisation);
    }

读取端可以作为常用存储库模式的扩展来实现

    /// <summary>
    /// Repository to read objects that are held in snapshot form
    /// </summary>
    /// <typeparam name="TEntity">
    /// The key-identified type of entity we are reading
    /// </typeparam>
    /// <typeparam name="TKey">
    /// The type of the key
    /// </typeparam>
    /// <remarks>
    /// Where an object is based on one or more event streams,
    /// this allows for retrieval of the state of that object as at a given
    /// snapshot time.  This extends repository-read
    /// </remarks>
    public interface IRepositoryReadSnaphot<TKey, TEntity>
        : IRepositoryRead<TKey, TEntity>  where TEntity : IKeyedEntity<TKey>
    {
        /// <summary>
        /// Did any record matching the key exist as at the given point in time
        /// </summary>
        /// <param name="key">
        /// The unique aggregate identifier of the record we are seeking
        /// </param>
        /// <returns>
        /// True if a matching record is found
        /// </returns>
        bool ExistedAsOf(TKey key, long synchronisation);

        /// <summary>
        /// Get the entity state as it was at the given point in time
        /// </summary>
        /// <param name="key">
        /// The aggregate identifier of the record for which we want a point-in-time view
        /// </param>
        /// <param name="synchronisation">
        /// The synchronisation point for which we want the data
        /// </param>
        /// <returns></returns>
        Nullable<TEntity> GetByKeyAsOf(TKey key, long synchronisation);
    }

实际上,如果审计或法律要求,您可能需要保留旧快照,如果它们已向外部发送。 需要一种版本控制机制来确保任何显示(UI 或打印)都使用最新版本的快照。

关注点

这种类型的架构需要考虑的最大问题之一是幂等性——使系统能够免疫于事件多次运行时的错误。 实际上,这往往意味着用执行绝对值的消息(例如“将帐户余额设置为 $745.20”)替换执行增量的消息(例如“将帐户减少 $50”)。

但是,与其人为地创建绝对事件,不如深入挖掘直到获得本质上是绝对的事件。 在上面的例子中,拥有绝对的存款取款事件比拥有“将余额设置为 x”的派生事件更好。

另一个挑战是排序——重要的是以正确的按时间顺序将事件添加到流中。 实际上,需要使用普遍接受的时间戳。

历史

  • 2014 年 5 月 2 日:初始概念
© . All rights reserved.