65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 async/await 实现有限状态机

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (19投票s)

2013年5月1日

CPOL

5分钟阅读

viewsIcon

51819

downloadIcon

857

使用 async/await 实现有限状态机的一种新方法

引言    

本文旨在展示如何利用 .NET 4.5 提供的 async/await 特性,轻松实现有限状态机模式。 

背景

根据维基百科:  

有限状态机被设想为一个抽象机,它可以处于有限数量的状态之一。机器一次只能处于一个状态。 

如果您正在处理,例如,系统中不同部分之间的复杂交互,这种模式可能非常有用。

FSM 最明显的用途是客户端-服务器协作,因此我将在文章中坚持使用此示例。

问题   

假设我们需要实现一个主-从(master-worker)计算系统。用户向主节点调度一个作业,主节点将作业分解为任务,然后发送给一组工作节点执行。每个工作节点执行任务并将结果发送回主节点。  

 

假定主节点和工作节点都是有限状态机,我们可以定义它们的状态。  

主节点的状态:  

  • 等待作业   
  • 规划作业(将作业分解为多个任务)
  • 调度任务(将一部分任务发送给选定的工作节点)
  • 等待结果 

工作节点的状态: 

  • 等待任务 
  • 执行任务
  • 发送结果

那么主节点的工作流程将是: 

工作节点的状态和转换非常简单,因此我省略了它们。 

该图清楚地描述了主节点正在发生的事情。 

让我们看看如何在 C# 中实现它。 

典型实现   

有限状态机由状态和转换组成。当发生特定事件(例如,工作节点已连接或任务结果已接收)并且当前状态有效(例如,正在执行的任务数量等于 0)时,将激活每个转换。 

因此,目标是用 C# 编写所描述的工作流程(图)。 

为了简单起见,让我们假设我们有两个类:MasterWorker。 

Worker 类目前并不那么有趣,所以我省略了它的实现细节,而专注于 Master。 

由于事件驱动的设计(每个事件处理程序都可以从不同的线程调用),代码必须是线程安全的。我们可以使用锁,或者依赖于线程安全集合/原语,或者使用同步上下文。我更喜欢第三种选择,因为它能保持代码的整洁和简单。

class Master {
    public SynchronizationContext SynchronizationContext { get; set; }
} 

所有事件处理程序都必须在指定的同步上下文中调用,因此我们不再需要担心竞态条件。

在实际应用程序中,创建类似外观(facade)的事件处理程序以确保处理程序是在当前同步上下文中调用的,这是一个好习惯: 

public class OnEventOccured(...) {
    this.SynchronizationContext.Post(o => this.OnEventOccuredInternal(...), null);
}    

此外,我们希望能够测试主节点的某些关键行为(例如,它应该等待工作节点连接,或者它应该在所有已连接的工作节点之间调度任务)。 

Master 类有一个内部状态和公共事件处理程序。 

首先,我们 显式 定义我们的状态: 

public enum MasterState {
    WaitingForJob,
    PlanningJob,
    SchedulingTasks,
    WaitingForResults
} 

然后 我们定义事件处理程序。每个事件处理程序由以下部分组成: 

  1. 验证逻辑(事件处理程序用当前的 Master 状态调用是否可以?) 
  2. 转换验证(是否可以更改当前状态?)  
  3. 状态更改(从一个状态转换为另一个状态)  
public void OnPlanningJob() {
    if (this.State != MasterState.WaitingForJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.PlanningJob;
 
    this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
 
    this.OnSchedulingTasks();
}
 
private void OnSchedulingTasks() {
    if (this.State != MasterState.PlanningJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.SchedulingTasks;
 
    this.ScheduleTasks();
}
 
private void OnWaitingForResults() {
    if (this.State != MasterState.SchedulingTasks)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.WaitingForResults;
}
 
public void OnTaskResultReceived(TaskResult result) {
    if (this.State != MasterState.WaitingForResults)
        throw new InvalidOperationException("Invalid state");
 
    this.Results.Add(result);
    this.Tasks.RemoveAll(task => task.Id == result.TaskId);
 
    if (this.Tasks.Count > 0)
        return;
 
    this.CurrentJob.Result = this.Results.Sum(x => x.Data);
    this.JobFinished(this.CurrentJob);
    this.OnWaitingForJob();
}
 
private void OnWaitingForJob() {
    this.Results.Clear();
 
    this.State = MasterState.WaitingForJob;
 
    if (this.JobQueue.Any()) {
        this.CurrentJob = this.JobQueue.Dequeue();
        this.OnPlanningJob();
    }
}
 
public void OnWorkerConnected(Guid workerId) {
    this.Workers.Add(workerId);
 
    if (this.State == MasterState.SchedulingTasks)
        this.ScheduleTasks();
}
 
public void OnWorkerDisconnected(Guid workerId) {
    this.Workers.Remove(workerId);
}  

代码按预期工作,但有一个问题:很难跟踪这些事件处理程序之间的工作流程。

工作流程分布在整个类中,因此很难在代码中进行更改。为了证明这一点,让我们为“等待工作节点”操作添加超时功能。

首先,我们添加一个名为 WaitingForWorker 的新状态: 

internal enum MasterState {
    WaitingForJob,
    PlanningJob,
    WaitingForWorker,
    SchedulingTasks,
    WaitingForResults
} 

然后我们添加一个计时器: 

Timer workerTimeoutTimer = 
   new Timer(this.CheckWorkerTimeout, null, Timeout.Infinite, Timeout.Infinite); 

为该状态添加的新转换是: 

public void OnWaitingForWorker() {
    if (this.State != MasterState.PlanningJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.WaitingForWorker;
 
    if (this.Workers.Count == 0)
        this.workerTimeoutTimer.Change(this.WorkerTimeout, Timeout.InfiniteTimeSpan);
    else
        this.OnSchedulingTasks();
}

计时器的回调是: 

private void CheckWorkerTimeout(object parameter) {
    this.SynchronizationContext.Post(o => {
        if (this.State != MasterState.WaitingForWorker)
            return;
 
        this.WorkerTimedout();
        this.OnWaitingForJob();
    }, null);
} 

然后我们修改其他事件处理程序来改变工作流程: 

public void OnPlanningJob() {
    if (this.State != MasterState.WaitingForJob)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.PlanningJob;
 
    this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
 
    this.OnWaitingForWorker();
}
 
public void OnSchedulingTasks() {
    if (this.State != MasterState.WaitingForWorker)
        throw new InvalidOperationException("Invalid transition");
 
    this.State = MasterState.SchedulingTasks;
 
    this.ScheduleTasks();
} 

我们不得不修改两个方法并添加一个额外的方法来改变工作流程。每个新需求都会导致代码复杂性增加。最终,代码将变得难以 维护 和 修改 。

让我们考虑一种实现有限状态机的替代方法。 

Async/await 实现   

在 .NET 4.5(和 C# 5.0)中,Microsoft 引入了一种使用 async 和 await 关键字处理异步方法链的新方法。

基本上,当您创建一个异步方法时,编译器会将其转换为类似 yield 关键字创建的状态机。有关更多详细信息,请参阅本文

让我们利用这个闪亮的新功能重写我们之前的代码: 

private async void ExecuteJobs() {
    SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
 
    var cancellationToken = this.cancellationTokenSource.Token;
 
    while (cancellationToken.IsCancellationRequested == false) {
        var job = await this.GetJobAsync(cancellationToken);
        var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
        var results = new List<TaskResult>();
        var workers = await this.GetWorkersAsync(cancellationToken);
 
        var i = 0;
 
        foreach (var jobTask in tasks)
            this.ScheduleTask(jobTask, workers[i++%workers.Count]);
 
        while (cancellationToken.IsCancellationRequested == false) {
            var result = await this.ReceiveTaskResultAsync(cancellationToken);
 
            tasks.RemoveAll(task => task.Id == result.TaskId);
            results.Add(result);
 
            if (tasks.Count == 0)
                break;
        }
 
        job.Result = results.Sum(x => x.Data);
 
        this.JobFinished(job);
    }
} 

就是这样。整个工作流程都包含在一个简单的方法中!所有其他方法都是完全功利性的: 

private static Task<TResult> WaitFor<TResult>(
    Action<Action<TResult>> subscribe,
    Action<Action<TResult>> unsubscribe,
    CancellationToken cancellationToken) {
    var source = new TaskCompletionSource<TResult>(cancellationToken);
 
    cancellationToken.Register(OnCancelled<TResult>, source);
 
    Action<TResult> handler = null;
 
    handler = result => {
        unsubscribe(handler);
        source.TrySetResult(result);
    };
 
    subscribe(handler);
    return source.Task;
}
 
private Task<Job> ReceiveJobAsync(CancellationToken cancellationToken) {
    return WaitFor<Job>(
        handler => this.JobReceived += handler,
        handler => this.JobReceived -= handler,
        cancellationToken);
}
 
private Task<TaskResult> ReceiveTaskResultAsync(CancellationToken cancellationToken) {
    return WaitFor<TaskResult>(
        hanlder => this.TaskResultReceived += hanlder,
        handler => this.TaskResultReceived -= handler,
        cancellationToken);
}
 
private async Task<IReadOnlyList<Guid>> GetWorkersAsync(CancellationToken cancellationToken) {
    if (this.connectedWorkers.Count == 0)
        return new[] {await this.WaitForWorkerToConnect(cancellationToken)};
 
    return this.connectedWorkers;
}
 
private Task<Guid> WaitForWorkerToConnect(CancellationToken cancellationToken) {
    return WaitFor<Guid>(
        hanlder => this.WorkerConnected += hanlder,
        handler => this.WorkerConnected -= handler,
        cancellationToken);
}
 
private static void OnCancelled<TResult>(object parameter) {
    var source = (TaskCompletionSource<TResult>) parameter;
 
    source.TrySetCanceled();
}
 
private async Task<Job> GetJobAsync(CancellationToken cancellationToken) {
    while (this.jobQueue.Count == 0)
        await this.ReceiveJobAsync(cancellationToken);
 
    return this.jobQueue.Dequeue();
}
 
public void OnJobReceived(Job job) {
    this.jobQueue.Enqueue(job);
    this.JobReceived(job);
}
 
public void OnTaskResultReceived(TaskResult result) {
    this.TaskResultReceived(result);
}
 
public void OnWorkerConnected(Guid worker) {
    this.connectedWorkers.Add(worker);
    this.WorkerConnected(worker);
}
 
private void OnWorkerDisconnected(Guid worker) {
    this.connectedWorkers.Remove(worker);
    this.WorkerDisconnected(worker);
}
 
private event Action<Job> JobReceived = _ => { };
private event Action<Guid> WorkerConnected = _ => { };
private event Action<Guid> WorkerDisconnected = _ => { };
private event Action<TaskResult> TaskResultReceived = _ => { }; 

所有状态都是隐式的,这意味着您可以轻松修改工作流程。   

让我们看看它如何处理第一个示例中添加超时功能的流程更改: 

private async void ExecuteJobs() {
    SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
 
    var cancellationToken = this.cancellationTokenSource.Token;
 
    while (cancellationToken.IsCancellationRequested == false) {
        var job = await this.GetJobAsync(cancellationToken);
        var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
        var results = new List<TaskResult>();
        var workers = default (IReadOnlyList<Guid>);
 
        var localTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 
        localTokenSource.CancelAfter(this.WorkerTimeout);
 
        try {
            workers = await this.GetWorkersAsync(localTokenSource.Token);
        } catch (OperationCanceledException) {
            this.WorkerTimedout();
            continue;
        }
 
        var i = 0;
        foreach (var jobTask in tasks)
            this.ScheduleTask(jobTask, workers[i++%workers.Count]);
 
        while (cancellationToken.IsCancellationRequested == false) {
            var result = await this.ReceiveTaskResultAsync(cancellationToken);
 
            tasks.RemoveAll(task => task.Id == result.TaskId);
            results.Add(result);
 
            if (tasks.Count == 0)
                break;
        }
 
        job.Result = results.Sum(x => x.Data);
 
        this.JobFinished(job);
    }
}  

所有修改都在一个方法中完成,我们仍然可以用肉眼跟踪工作流程。

现在让我们看看我们如何测试这段代码。 

测试    

方法 ExecuteJobs 由多个“部分”组成,这些“部分”在同步上下文中的不同时间执行。

典型的测试将由三个部分组成: 

  1. 执行(使用事件处理程序与 Master 进行交互) 
  2. 响应(让齿轮运转起来) 
  3. 验证(断言结果) 

我们需要做的第一件事是控制这些“部分”的执行流程。为此,我们必须实现另一个同步上下文的版本: 

public class TestSynchronizationContext : SynchronizationContext {
    private readonly Queue<KeyValuePair<SendOrPostCallback, object>> queue =
        new Queue<KeyValuePair<SendOrPostCallback, object>>();
 
    public override void Post(SendOrPostCallback d, object state) {
        if (d == null)
            throw new ArgumentNullException("d");
 
        this.queue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
    }
 
    public override void Send(SendOrPostCallback d, object state) {
        throw new NotSupportedException("Synchronously sending is not supported.");
    }
 
    public bool RunOperations() {
        if (this.queue.Count == 0)
            return false;
 
        while (this.queue.Count > 0) {
            var item = this.queue.Dequeue();
            item.Key(item.Value);
        }
 
        return true;
    }
}

方法 RunOperations 运行 ExecuteJobs 方法所有挂起的“部分”。因此,在 RunOperations 执行之后,我们可以肯定地说所有转换都已完成,Master 已准备好进行交互。  

这里有一些完整的测试:  

public class Tests {
    private readonly MasterAsyncAwait master;
    private readonly List<Tuple<JobTask, Guid>> scheduledTasks = 
                   new List<Tuple<JobTask, Guid>>();
    private readonly TestSynchronizationContext syncContext = new TestSynchronizationContext();
 
    public Tests() {
        SynchronizationContext.SetSynchronizationContext(this.syncContext);
 
        this.master = new MasterAsyncAwait {
            SynchronizationContext = this.syncContext,
            ScheduleTask = (task, workerId) => 
                this.scheduledTasks.Add(Tuple.Create(task, workerId))
        };
    }
 
    [Fact]
    public void SchedulesJob() {
        this.master.Start();
 
        var workerId = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
 
        this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
        this.master.OnWorkerConnected(workerId);
 
        this.syncContext.RunOperations();
 
        Assert.NotEmpty(scheduledTasks);
        Assert.True(scheduledTasks.All(task => task.Item2 == workerId));
    }
 
    [Fact]
    public void SplitsTasksBetweenWorkersFairly() {
        this.master.Start();
 
        var workerId1 = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
        var workerId2 = new Guid("ffe9c75e-9b41-46fe-b75d-a8bb2167d43c");
 
        this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
        this.master.OnWorkerConnected(workerId1);
        this.master.OnWorkerConnected(workerId2);
 
        this.syncContext.RunOperations();
 
        Assert.NotEmpty(scheduledTasks);
        Assert.True(scheduledTasks.Any(task => task.Item2 == workerId1));
        Assert.True(scheduledTasks.Any(task => task.Item2 == workerId2));
    }
 
    [Fact]
    public void ComputesJobResult() {
        var result = 0;
 
        this.master.JobFinished = job => result = job.Result;
 
        this.master.Start();
        this.master.OnJobReceived(new Job {Lines = new[] {"test"}});
        this.master.OnWorkerConnected(new Guid("1eb02478-cc31-4759-97b4-5d459d802e73"));
        this.syncContext.RunOperations();
 
        this.master.OnTaskResultReceived(new TaskResult {
            TaskId = this.scheduledTasks.Single().Item1.Id,
            Data = 4
        });
 
        this.syncContext.RunOperations();
 
        Assert.Equal(4, result);
    }
}  

如您所见,我们无法显式断言当前状态,因为它被隐式处理,并且查看发生了什么的唯一方法是检查 Master 的输出。因此,每个测试都是一个场景(规范),您进行模拟并最终断言输出。

您还可以公开一些关于内部发生情况的信息: 

public MasterStatus Status { get; private set; }
 
private async void ExecuteJobs() {
   ...
   this.Status = MasterStatus.WaitingForWorker;
   ...
}    
 
// Collection of connected workers
public IReadOnlyCollection<Guid> ConnectedWorkers { get; private set; } 

总结  

每种方法都有其优点和缺点。

典型的实现难以维护,但您可以显式验证每个状态,并且不必重放完整场景。相反,您可以设置当前状态,进行操作,并确保转换发生并且新状态有效。

Async/await 实现为您提供了简洁性和灵活性。但是,您无法在单元测试中将该类视为 FSM,您只有一堆交互方法和一个输出。请明智地使用它。

© . All rights reserved.