65.9K
CodeProject 正在变化。 阅读更多。
Home

事件流量缓和

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.75/5 (3投票s)

2013年1月25日

CPOL

6分钟阅读

viewsIcon

17840

downloadIcon

257

如何构建一个对象来平缓事件流。

引言

您是否遇到过这样一个对象,它生成的事件快到您的程序没有时间处理?想象一下我们有一个在线商店。每当一个售出订单(以下简称订单)进来时,都会触发一个事件。然后执行某个事件处理程序来更新最新订单列表(或全部订单)。如果在一个更新过程开始后又来了另一个订单怎么办?更新可能需要很长时间,假设是2秒,在这段时间内可能会有几个订单到达。我们不能为每一个订单都开始一个新的更新,总有一天我们需要一个同步系统,允许一次只进行一次更新。不断更新控件可能会导致应用程序冻结。我们将使用一个持久化系统,例如数据库,这样可以更好地进行大查询而不是频繁的小查询。如果我们能用一个对象来平缓这些事件流呢?

有什么用?

在讨论问题或解决方案细节之前,让我们先看看图1中的示例应用程序。左边是使用了事件流平缓器的平缓客户端。右边是实时处理订单的实时客户端。在平缓应用程序中,我们可以看到并浏览订单,至少在每5秒刷新一次之前是这样。而实时应用程序则是一个闪烁的应用程序,除非我们捕获屏幕,否则连读都读不了。如果事件到达的速度稍微快一点,它甚至可能冻结。

图1:示例应用程序。

我们的商店

让我们快速定义本文其余部分将使用的域。我们需要一个实体和一个存储库。

public class Order
{
    public string Title { get; set; }
    public DateTime CreatedAt { get; set; }
}
 
public interface IOrderRepository
{
    IEnumerable<Order> ListFrom(DateTime startDate);
    event EventHandler OrderArrived ;
} 
代码片段1:示例域。

在我们的示例代码中,我们将引用一个名为_orders的订单存储库实例。

用于处理订单的代码(来自我们的表单)是:

_orders.OrderArrived += HandleOrderArrived;
代码片段2:处理订单到达事件。

每次有订单到达时,我们的处理程序都会被执行。然后,处理程序会查询自上次执行以来到达的订单,并使用此列表来更新UI。我们的示例代码展示了这组订单。请参阅项目:RealTimeClient

构建一个对象来解决问题。

我们需要一个对象(平缓器),它有两个成员:一个用于挂接我们想要平缓的事件的处理程序,以及一个它将平缓触发的事件。

public class EventCalmer
{
    public void HandleEvent(object sender, EventArgs e) { ... }
    public event EventHandler EventOccurred ;
} 
代码片段3:一个平缓器。

我们需要将这个对象放在实际事件生成器和我们的代码之间。

_calmer = new EventCalmer(TimeSpan.FromSeconds(5));
...
_orders.OrderArrived += _calmer.HandleEvent;
_calmer.EventOccurred += HandleOrderArrived; 
代码片段4:处理平缓的订单到达事件。

有了这个设置,每次有订单到达时,它只会触发事件,直到一段时间过去(例如5秒)。如果在5秒内有超过3个订单到达,第二个订单不会触发任何事件,但第三个会。平缓器将确保在最后一个订单到达后触发一次事件,这样就不会有订单未被处理。平缓器将触发事件的次数接近(+/-1)所有在总执行时间内可以容纳的5秒执行次数。即总时间除以5秒。如果某个处理程序执行时间超过5秒,平缓器将在前一个执行完成后立即开始下一个执行。如果执行提前完成,它将休眠剩余的时间。

这很难解释。我甚至都不理解自己在写什么。让我们尝试用图来解释

图2:平缓器的工作原理。

这确实改变了一切。标记为Event Received的转换不言自明。Finished将用于从执行状态转换出去。Done将用于从等待状态转换出去。

由于它看起来非常像一个自动机,一个类似自动机的实现可能是一个解决方案。

状态

我们将创建一个枚举,其中包含图2中每个状态的一个值。

private enum CalmerState
{
    Idle,
    Executing,
    ExecutingOneInQueue,
    Waiting,
    WaitingOneInQueue,
}; 
代码片段5:平缓器状态。

转移函数

定义此转移函数的一种简单方法是使用两个静态字典,一个用于Event Received,另一个用于DoneFinished

private static readonly Dictionary<CalmerState, CalmerState> EventReceivedTransitions=
    new Dictionary<CalmerState, CalmerState>
    {
        { CalmerState.Idle, CalmerState.Executing },
        { CalmerState.Executing, CalmerState.ExecutingOneInQueue },
        { CalmerState.Waiting, CalmerState.WaitingOneInQueue },
        { CalmerState.WaitingOneInQueue, CalmerState.WaitingOneInQueue },
        { CalmerState.ExecutingOneInQueue, CalmerState.ExecutingOneInQueue },
    };
 
private static readonly Dictionary<CalmerState, CalmerState> DoneOrFinishedTransitions=
    new Dictionary<CalmerState, CalmerState>
    {
        { CalmerState.Executing, CalmerState.Waiting },
        { CalmerState.Waiting, CalmerState.Idle },
        { CalmerState.ExecutingOneInQueue, CalmerState.WaitingOneInQueue },
        { CalmerState.WaitingOneInQueue, CalmerState.Executing },
    };
代码片段6:转移函数字典。

平缓器的处理程序和触发器。

标准的事件处理和触发代码将确保事件到达平缓器,并从平缓器传递到存储系统。在触发平缓器事件之前,我们需要将触发时间存储在一个实例变量中。

public void HandleEvent(object sender, EventArgs e)
{
    EventReceived();
}
 
public event EventHandler EventOccurred;
 
private void OnEventOccurred(EventArgs e)
{
    _lastExecutionBeganAt = DateTime.Now;
    if (EventOccurred != null) EventOccurred(this, e);
} 
代码片段7:平缓器的处理程序和触发器。

启动平缓器

当第一个事件收到并执行EventReceived()时,所有魔力就开始了。

private void EventReceived()
{
    lock (_criticalSectionDoor)
    {
        var newState = GetStateToMoveToWhenEventArrives();
        SetCurrentStateTo(newState);
 
        if (_currentState == CalmerState.Executing)
            BeginExecutionThread();
    }
}
 
private CalmerState GetStateToMoveToWhenEventArrives()
{
    return EventArrivedTransitions[_currentState] ;
}
 
private void SetCurrentStateTo(CalmerState newState)
{
    _currentState = newState;
}
代码片段8:启动平缓器。

在一个锁定的块内,通过第一个字典将状态转移到新状态。然后,如果新状态是Executing,这意味着平缓器应该脱离空闲状态,执行就开始了。BeginExecutionThread()方法并不太有趣,它只是启动一个异步调用到另一个方法ExecuteWhileNotIddle()。后者才有趣。

执行线程

private void ExecuteWhileNotIddle()
{
    while (_currentState != CalmerState.Idle)
    {
        if (currentState == CalmerState.Executing || 
            currentState == CalmerState.ExecutingOneInQueue)
            DoExecuteStep();
        else
            DoWaitStep();
 
        UpdateStateAfterDoneOrFinished();
    }
}
代码片段9:执行线程。

只要状态不是空闲,执行线程就会执行步骤。

步骤

有两种类型的步骤:执行(Execute)或等待(Wait)步骤。执行步骤触发事件。等待步骤休眠。

private void DoExecuteStep()
{
    OnEventOccurred(EventArgs.Empty);
}
 
private void DoWaitStep()
{
    var stillNeedToWait = _sleepTime - TimeItTookLastExecution();
 
    if (stillNeedToWait > TimeSpan.Zero)
        Thread.Sleep(stillNeedToWait);
}
 
private TimeSpan TimeItTookLastExecution()
{
    return DateTime.Now - _lastExecutionBeganAt;
}
代码片段10:执行和等待步骤。

Execute触发事件。在生产环境中使用时,一些错误处理将非常有用。现在,如果处理程序中发生错误,平缓器将中断并且无法使用。请注意,事件触发是同步的,因此它将花费与实际处理程序相同的时间。

Wait休眠剩余时间。假设我们希望平缓器每5秒释放一次事件。如果实际处理程序花费2秒,平缓器将休眠3秒。如果实际处理程序花费6秒,平缓器将不会休眠。

等待或执行完步骤后更新状态

在完成以上任何步骤后,我们需要更新自动机状态。这次我们将使用第二个字典。

private void UpdateStateAfterDoneOrFinished()
{
    lock (_criticalSectionDoor)
    {
        var newState = GetStateToMoveToWhenDoneOrFinished();
        SetCurrentStateTo(newState);
    }
}
 
private CalmerState GetStateToMoveToWhenDoneOrFinished()
{
    return DoneOrFinishedTransitions[_currentState]
} 
代码片段11:在执行完步骤后更新状态。

平缓器已准备就绪。

测试

我试图使用TDD来开发这一切,但是很难想到如何在不添加不必要的公共属性或访问私有成员的情况下测试异步代码,所以我没有这样做。我只是创建了几个测试,然后从那里开发了整个东西。项目TrafficCalming.Tests包含了那些测试。

有一些相当有用的集成测试。每个测试大约需要10秒,并且会重复10次。尽管如此,它们从未失败过:

  1. 平缓器最后一次触发事件是在平缓器接收到最后一个事件之后。
  2. 此测试确保处理的事件数量接近(+/-1) (总时间) / (处理时间)。

局限性

我目前能想到唯一一个:事件本身不能包含有意义的信息。即使是生成事件的对象引用也不会传递到平缓的应用程序。

© . All rights reserved.