65.9K
CodeProject 正在变化。 阅读更多。
Home

C#中的简单事件溯源演示

starIconstarIconstarIconstarIconstarIcon

5.00/5 (22投票s)

2017年10月24日

CPOL

7分钟阅读

viewsIcon

60153

downloadIcon

1470

关于如何在C#中使用事件溯源的示例。

引言

在研究领域驱动设计 (DDD) 的概念时,我遇到了事件溯源原则。在深入研究了该主题的一些理论后(马丁·福勒 (Martin Fowler) 拥有的下一个网站对此主题有一些很好的见解:https://martinfowler.com.cn/eaaDev/EventSourcing.html),我产生了通过提供一个简单的 C# 示例来实践我的知识的想法,该示例简要地阐述了这种模式。

背景

事件溯源的总体思想是确保应用程序状态的每一次更改都被捕获在事件对象中,并且这些事件对象本身按应用顺序存储。简单来说,事件溯源包含一个对象更改日志。记录这些连续的更改在您想要重放对象所做的更改时会很有用(无论出于何种原因……)。

示例应用程序

示例应用程序保持相当简单。它基于马丁·福勒 (Martin Fowler) 在其网站上提供的Ship Tracking Service 示例(有关详细信息,请参阅“引言”)。本文的其余部分将解释示例代码的实现。请注意,由于我只想解释事件溯源背后的模式,因此我没有引入任何专业的通信基础设施(例如 NServiceBus)和相关的消息队列机制,尽管在生产系统中我们通常应该依赖它们。

使用案例

代码案例非常简单。我们有港口 (Ports)船舶 (Ships)。接下来,一艘船抵达 (Arrivals)离开 (Departures) 某个特定的港口 (Harbor)。当一艘船离开一个港口时,它就处于海上 (AT SEA),否则它就在一个特定的港口,或者在船只处于维护模式的情况下不属于以上任何一种(维护模式在此处不作讨论……)。最后,我们有一个Ship Tracking Service,用于跟踪船舶的抵达或离开。该项目定义了一些“虚拟”的船舶和“虚拟”的港口,并且港口抵达是随机的(因此,一艘船可能在同一港口多次离开和抵达,也许是因为船员把他们的三明治忘在港口了 ;- ))。(Boterhammekes 是荷兰语中“三明治”的意思)。

领域模型

因此,我们相关的领域模型非常简单。我们有一个用户界面,它由一个单独的窗体 FormShipTrackingService 组成,该窗体启动 ShipTrackingService 的实例,其中包含一个每 2 秒被调用的计时器。我们在每次计时器滴答间隔时模拟一艘抵达离开。抵达/离开被称为船舶跟踪事件。因此,在每次船舶跟踪事件发生时,我们会在 UI 中记录此事件:记录抵达时间、记录时间、船舶和港口。一艘船的多次抵达/离开会导致在处理日志中生成多个更改记录。最后,跟踪处理器会在每次跟踪事件(港口更新)时通知船舶更新自身。这就是这个简单用例中发生的一切。了解了这些之后,让我们看一下一些实现细节。

模型

船舶模型

using TrackingService.DomainEvents;
namespace TrackingService.Models
{
    public class Ship
    {
        #region Properties

        public int ShipId { get; set; }
        public string Name { get; set; }
        public Port Location { get; set; }

        #endregion Properties


        #region Public Interface

        public void HandleArrival(ArrivalEvent ev)
        {
            // Here we set the Port to the Port Set by the ArrivalEvent
            Location = ev.Port;
        }

        public void HandleDeparture(DepartureEvent ev)
        {
            // Here we set the Port to the Port Set by the DepartureEvent
            Location = ev.Port;
        }

        #endregion Public Interface
    }
}

船舶模型具有唯一的 ID、名称和位置(港口)。它还包含两个方法,在被跟踪处理器跟踪后用于更新船舶状态。

港口模型

namespace TrackingService.Models
{
    public class Port
    {
        #region Properties

        public int PortId { get; set; }
        public string Name { get; set; }

        #endregion Properties
        
        #region Public Interface

        public override string ToString()
        {
            return Name;
        }
        
        #endregion Public Interface
    }
}

每个港口都有名称,并由唯一的 ID 定义。

域事件类

接下来,我们的示例包含一些域事件类

DomainEvent

namespace TrackingService.DomainEvents
{
    // Domain event is the base event class
    // it simply registers when the according 
    // event occured and is recorded
    public abstract class DomainEvent
    {
        #region Private Storage

        private DateTime _recorded, _occured;

        #endregion Private Storage

        #region Internal Interface

        internal DomainEvent(DateTime occured)
        {
            this._occured = occured;
            this._recorded = DateTime.Now;
        }

        abstract internal void Process();

        #endregion Internal Interface
    }
}

在类层次结构的顶部是 DomainEvent 类。此类定义了常见的属性,如发生时间和记录时间。它还定义了抽象的 Process 方法,该方法必须由派生类实现。

ShippingEvent

using TrackingService.Models;
using TrackingService.Services;
namespace TrackingService.DomainEvents
{
    // The ShippingEvent enherits from the base DomainEvent
    // And adds logic to keep track of the Ship, Port and Trackingtype (Arrival,Departure)
    public abstract class ShippingEvent : DomainEvent
    {

        #region Private Storage

        private Port _port;
        private Ship _ship;
        private TrackingType _trackingType;

        #endregion Private Storage

        #region Public Properties

        public Port Port
        {
            get { return _port; }
            set { _port = value; }
        }
        
        public Ship Ship
        {
            get { return _ship; }
            set { _ship = value; }
        }
        
        public TrackingType TrackingType
        {
            get { return _trackingType; }
            set { _trackingType = value; }
        }

        #endregion Public Properties

        #region Internal Interface

        internal ShippingEvent(DateTime occured, Port port, Ship ship,TrackingType trackingType) : base(occured)
        {
            this._port = port;
            this._ship = ship;
            this._trackingType = trackingType;
        }

        #endregion Internal Interface

        #region Public Interface

        public override string ToString()
        {
            return $"TrackingType: {this.TrackingType} Ship: {this.Ship.Name} Port: {this.Port.Name}";
        }

        #endregion Public Interface

    }
}

ShippingEvent 是基础的 Shipping 类。它将特定于 Shipping 的属性和行为添加到父域事件类中。Shipping 事件类跟踪船舶、港口和跟踪类型(抵达、离开、无),并使用其基类来设置继承的成员值。

Arrival and Departure Events

using TrackingService.Models;
using TrackingService.Services;
namespace TrackingService.DomainEvents
{
    // The arrival Event simply captures the data and has a process method that simply
    // forwards the event to an appropriate domain object (ship in this case)
    public class ArrivalEvent : ShippingEvent
    {

        #region Internal Interface
        
        internal ArrivalEvent(DateTime arrivalTime, Port port, Ship ship, TrackingType trackingType) : base(arrivalTime, port, ship,trackingType)
        {
        }

        internal override void Process()
        {
            Ship.HandleArrival(this);
        }

        #endregion Internal Interface
    }
}

using TrackingService.Models;
using TrackingService.Services;
namespace TrackingService.DomainEvents
{ 
    // The departure Event simply captures the data and has a process method that simply
    // forwards the event to an appropriate domain object (ship in this case)
    public class DepartureEvent : ShippingEvent
    {
        #region Internal Interface

        internal DepartureEvent(DateTime departureTime, Port port, Ship ship,TrackingType trackingType) : base(departureTime,port,ship,trackingType)
        {

        }

        internal override void Process()
        {
            Ship.HandleDeparture(this);
        }

        #endregion Internal Interface
    }
}

ArrivalEventDepartureEvent 类非常相似。第一个用于通知 Ship 类已抵达某个 Port,后者用于通知已从某个 Port 离开。

EventProcessor

namespace TrackingService.DomainEvents
{
    // The Event Processor Processes the Events 
    // as received from the TrackingService
    public class EventProcessor<T>  where T : ShippingEvent
    {
        #region Private Storage

        private IList<T> _eventLogger = new List<T>();

        #endregion Private Storage


        #region Public Interface

        public void ProcessEvent(T e)
        {
            e.Process();
            _eventLogger.Add(e);
        }

        public int CountEventLogEntries()
        {
            return _eventLogger.Count;
        }

        public List<T> GetEvents()
        {
            return _eventLogger as List<T>;
        }

        #endregion Public Interface
    }
}

EventProcessor 类接受一个泛型类型作为参数,在本例中,它被限制为 ShippingEvent 类型(因为 Shipping event 是日志记录的基类)。因此,此类从 ShipTrackingService 获取事件并进行处理。需要注意的是,这个处理类是添加事件溯源的类(通过将接收到的 Arrival 或 Departure 事件添加到事件溯源日志中)。

通用应用程序流程

应用程序入口点

private void FormShipTrackingService_Load(object sender, EventArgs e)
{
    try
    {
        // create the tracking service
        _trackingService = new Services.ShipTrackingService();

        // create the eventprocessor
        _eventProcessor = new EventProcessor<ShippingEvent>();

        // subscribe to the ship tracked event of the tracking service
        _trackingService.ShipTracked += _trackingService_ShipTracked;

        this.SetDataSource();
        SetTimer();

    }
    catch(Exception ex)
    {
        MessageBox.Show(ex.Message);
    }
}

应用程序流程从主窗体的 Load 事件开始。首先,我们创建一个 TrackingServiceEventProcessor 实例。接下来,我们设置 DataSources 并启动一个跟踪 Timer。通过跟踪计时器,我们模拟船舶的抵达/离开。用户界面还订阅了 TrackingService 的 ShipTracked 事件。上述项目将在下面更详细地介绍。

设置数据源

private void SetDataSource()
{
    _shipsBindingSource.DataSource = null;
    _shipsBindingSource.DataSource = _trackingService.Ships;
}

由于我们跟踪的是 Ships 列表,因此我使用 BindingSource 作为船舶网格的数据源。BindingSource 简单地绑定到跟踪服务定义的静态船舶列表。

初始化跟踪模拟计时器

// we simulate ship arrival/departure every 2 seconds
private void SetTimer()
{
    _timer = new System.Windows.Forms.Timer();
    _timer.Interval = 2000;
    _timer.Tick += _timer_Tick;
    _timer.Enabled = true;
}

我们使用一个简单的计时器对象来模拟船舶的抵达/离开。计时器的间隔为 2000 毫秒(2 秒)。

船舶抵达/离开跟踪

private void _timer_Tick(object sender, EventArgs e)
{
    if(_trackingService != null)
    {
        // we simulate tracking events by selecting a RANDOM ship
        // next tracking type is set to Arrival or Departure depending on the 
        // current location of the selected ship
        // if port of selected ship == "AT SEA" then we set the tracking event type
        // as an ARRIVAL and will set ship port to the next port (!= 0) in the Port list
        // if port of selected ship != "AT SEA" then we set the tracking event type
        // as a DEPARTURE and set port to "AT SEA"

        int maxShip = _trackingService.Ships.Count;

        // select a random ship in the list
        _selectedShipId = _randomShip.Next(1, maxShip); 

        // set tracked ship to the current selected id
        _trackingService.TrackedShip = _trackingService.Ships[_selectedShipId];

        // set the tracking event type (Arrival or Departure) depening on the current location (PortId 0 is AT-SEA)
        _trackingService.TrackingType = _trackingService.TrackedShip.Location.PortId == 0 ? TrackingType.Arrival : TrackingType.Departure;

        // set the time of tracking recording
        _trackingService.Recorded = DateTime.Now;

        // create a unique id for the tracking
        _trackingService.TrackingServiceId = Guid.NewGuid();

        // set the new port of the tracking, this is a random port in
        // the list in case of arrival 
        // or port 0 = AT SEA in case of departure
        if(_trackingService.TrackingType == TrackingType.Arrival)
        {
            int maxPort = _trackingService.Ports.Count;
            _selectedPortId = _randomPort.Next(1, maxPort);
        }
        else
        {
            _selectedPortId = 0;
        }
        _trackingService.SetPort = _trackingService.Ports[_selectedPortId];

        // handle the tracking by the tracking service
        // the tracking service will now send an Arrival event or Departure event to the Ship
        _trackingService.RecordTracking(_eventProcessor);

        // augment number of events
        _numberOfEventsCount.Text = _eventProcessor.CountEventLogEntries().ToString();

        // refresh the UI
        this.SetDataSource();
    }
}

如前所述,我们使用 timer_tick 事件来模拟船舶跟踪。代码注释应该足够清楚地说明此事件的行为。

显示事件源日志

private void toolStripButtonShowEvents_Click(object sender, EventArgs e)
{
    this._eventsTextBox.Text = string.Empty;

    // show the events for the selected ship

    try
    {
        // first get the selected ship

        Ship currentShip = (Ship)this._shipsBindingSource.Current;

        if(currentShip != null)
        {
            // get the event logs
            List<ShippingEvent> events = _eventProcessor.GetEvents() as List<ShippingEvent>;

            // filter events for the current ship
            var filterByShip = events.Where(ev => ev.Ship.ShipId == currentShip.ShipId);

            foreach (ShippingEvent ev in filterByShip)
            {
                this._eventsTextBox.Text += ev.ToString() + "\r\n";
            }
                   
        }
    }
    catch(Exception ex)
    {
        MessageBox.Show(ex.Message);
    }
}

每一次船舶事件(抵达/离开)都由 EventProcessor 记录。这是事件溯源机制的核心。用户可以显示跟踪事件列表(即所选船舶状态的变化),如下所示。

跟踪服务

我们还没有详细介绍代码的一个部分,那就是 ShipTrackingService 本身的行为。跟踪服务及其依赖项存储在Services 文件夹中。我将在接下来的子节中简要介绍每个部分。

Tracking Type Enum

namespace TrackingService.Services
{
    public enum TrackingType { Arrival, Departure, None};
}

枚举,包含不同的可能跟踪类型

ShipTracked EventArgs

using TrackingService.Models;
namespace TrackingService.Services
{
    // Holds the properties of the Ship TrackingService
    // that are exposed through event-handeling
    public class ShipTrackedEventArgs : EventArgs
    {
        #region Public Properties

        public Guid TrackingServiceId { get; set; }
        public DateTime Recorded { get; set; }
        public TrackingType TrackingType { get; set; }
        public Ship TrackedShip { get; set; }
        public Port OldLocation { get; set; }
        public Port NewLocation { get; set; }

        #endregion Public Properties
    }
}

ShipTrackedEventArgs 类将由 ShipTrackingService 使用,用于通知其订阅者(在本例中是 ShipTracking-UI...),已发生船舶跟踪 Record。然后 UI 将使用此数据刷新其内容。

Ship Tracking Service

由于 ShipTrackingService 的代码行数太多,我将代码库分成不同的部分。

ShipTrackingService Slice-1: Event Declaration

#region Event Declaration

public delegate void ShipTrackedEventHandler(object sender, ShipTrackedEventArgs e);
public event ShipTrackedEventHandler ShipTracked;

#endregion Event Declaration

我们定义了一个委托,它将提供必要的接口,订阅者(UI)可以使用该接口根据船舶跟踪服务中发生的事件来更新其内容。

ShipTrackingService Slice-2: Instance Variables Declaration

#region Private Storage

private TrackingType _trackingType = TrackingType.None;
private Guid _trackingServiceId;
private DateTime _recorded;
private List<Port> _ports;
private List<Ship> _ships;
private Ship _trackedShip;
private Port _currentPort;
private Port _setPort;

#endregion Private Storage

ShipTrackingService 包含一些状态。

变量 Info(信息)
TrackingType 跟踪类型(抵达、离开、无)。
TrackingServiceId TrackingService 的唯一标识符。
Recorded 跟踪记录的日期/时间。
端口 Dummy Ports.
Ships Dummy Ships.
TrackedShip 当前跟踪船舶的引用。
CurrentPort 当前跟踪港口的引用。
SetPort 在抵达情况下,新目的地港口的引用。

当然,每个后备变量都有其公共属性。

ShipTrackingService Slice-3: Initialization

#region C'tor
public ShipTrackingService()
{
    // initialize ports
    _ports = new List<Port>()
    {

        new Port()
        {
            PortId = 0, Name = "AT Sea"
        },
        new Port()
        {
            PortId = 1, Name = "Port of Shangai"
        },
        new Port()
        {
            PortId = 2, Name = "Port of Antwerp"
        },
        new Port()
        {
            PortId = 3, Name = "Port of Singapore"
        },
        new Port()
        {
            PortId = 4, Name = "Port of Dover"
        }

    };

    // initialize ships
    _ships = new List<Ship>()
    {
        new Ship()
        {
            ShipId = 1, Name = "Ship_1", Location = _ports[0]
        },
        new Ship()
        {
            ShipId = 2, Name = "Ship_2", Location = _ports[0]
        }
        ,new Ship()
        {
            ShipId = 3, Name = "Ship_3", Location = _ports[0]
        },
        new Ship()
        {
            ShipId = 4, Name = "Ship_4", Location = _ports[0]
        }
    };
}
#endregion C'tor

在构造函数代码中,我们初始化了我们的 ShipsPorts。请注意,Location(Port)属性指向“AT-SEA”港口,这意味着在启动我们的 ShipTrackingService 时,每艘船都应该在海上(而不是在港口……)。

ShipTrackingService Slice-4: Ship Tracking

#region Public Interface

public void RecordTracking(EventProcessor<ShippingEvent> eProc)
{
    // Create event depending on TrackingType
    Port OldLocation = TrackedShip.Location;
    ShippingEvent ev;
    if (TrackingType == TrackingType.Arrival)
    {
        ev = new ArrivalEvent(DateTime.Now, SetPort, TrackedShip,TrackingType);
    }
    else
    {
        ev = new DepartureEvent(DateTime.Now, SetPort, TrackedShip, TrackingType);
    }

    // send the event to the event handler (ship) which will update it's status on the provided event data
    eProc.ProcessEvent(ev);

    // notify the UI Tracking List so it can update itself
    ShipTrackedEventArgs args = new ShipTrackedEventArgs()
    {
        TrackingServiceId = TrackingServiceId,
        Recorded = Recorded,
        TrackingType = TrackingType,
        TrackedShip = TrackedShip,
        OldLocation = OldLocation,
        NewLocation = SetPort,

    };
    // notify subscribers ...
    OnShipTracked(args);
}

#endregion Public Interface

#region Protected Interface

// Notify the (UI) Subscribders that a Ship has been tracked
protected virtual void OnShipTracked(ShipTrackedEventArgs args)
{

    if (ShipTracked != null)
        ShipTracked(this, args);

}

#endregion Protected Interface

RecordTracking 方法是我们跟踪服务的核心。此方法从 UI 的 _timer_Tick 方法调用,它执行两件事:首先,它创建一个新的抵达离开事件,并将其委托给事件处理引擎。事件处理引擎将指示相关船舶根据提供的事件数据更新其状态。接下来,它通知已连接的订阅者(在本例中为跟踪服务 UI),已发生跟踪事件。然后,UI 可以采取适当的行动来更新其状态。下面显示了 UI 中的事件处理代码。

private void FormShipTrackingService_Load(object sender, EventArgs e)
{
    try
    {
        ...

        // subscribe to the ship tracked event of the tracking service
        _trackingService.ShipTracked += _trackingService_ShipTracked;

        ...
    }

    ...
}

// Update the UI after RecordTracked has been tracked in TrackingService
private void _trackingService_ShipTracked(object sender, ShipTrackedEventArgs e)
{
    this.TrackingOccuredTextBox.Text +=
    $"TrackingId: {e.TrackingServiceId}\r\n" +
    $"RecordedAt: {e.Recorded.ToLongTimeString()}\r\n" +
    $"TrackingType: {e.TrackingType}\r\n" +
    $"Ship: {e.TrackedShip.Name} Id: {e.TrackedShip.ShipId}\r\n" +
    $"Current Location : {e.OldLocation.Name}\r\n" +
    $"New Location : {e.NewLocation.Name}" +
    "\r\n\r\n";
}

整合所有内容

现在我们已经很好地理解了构成我们应用程序的各个部分,让我们看一下用户界面。顶部是我们船舶列表。屏幕底部是跟踪表,显示所有抵达/离开事件按发生顺序排列。接下来,为了演示事件溯源,屏幕中间是我们单艘船舶的日志详情。以 Ship_3 为例。

一些优化点

我试图让示例尽可能简单,因此我在实现中使用了具体类。在实际环境中,您应该避免直接使用具体类,因为在许多情况下,我们可能有不同形式的 TrackingServices,它们共享一些通用逻辑并添加特定行为。为此,ShipTrackingService 应通过使用接口来实现,并使用依赖注入机制将我们的服务具体实例注入到客户端类中,如下所示。

© . All rights reserved.