使用 async/await 实现有限状态机






4.97/5 (19投票s)
使用 async/await 实现有限状态机的一种新方法
引言
本文旨在展示如何利用 .NET 4.5 提供的 async/await 特性,轻松实现有限状态机模式。
背景
根据维基百科:
有限状态机被设想为一个抽象机,它可以处于有限数量的状态之一。机器一次只能处于一个状态。
如果您正在处理,例如,系统中不同部分之间的复杂交互,这种模式可能非常有用。
FSM 最明显的用途是客户端-服务器协作,因此我将在文章中坚持使用此示例。
问题
假设我们需要实现一个主-从(master-worker)计算系统。用户向主节点调度一个作业,主节点将作业分解为任务,然后发送给一组工作节点执行。每个工作节点执行任务并将结果发送回主节点。
假定主节点和工作节点都是有限状态机,我们可以定义它们的状态。
主节点的状态:
- 等待作业
- 规划作业(将作业分解为多个任务)
- 调度任务(将一部分任务发送给选定的工作节点)
- 等待结果
工作节点的状态:
- 等待任务
- 执行任务
- 发送结果
那么主节点的工作流程将是:
工作节点的状态和转换非常简单,因此我省略了它们。
该图清楚地描述了主节点正在发生的事情。
让我们看看如何在 C# 中实现它。
典型实现
有限状态机由状态和转换组成。当发生特定事件(例如,工作节点已连接或任务结果已接收)并且当前状态有效(例如,正在执行的任务数量等于 0)时,将激活每个转换。
因此,目标是用 C# 编写所描述的工作流程(图)。
为了简单起见,让我们假设我们有两个类:Master
和 Worker
。
Worker
类目前并不那么有趣,所以我省略了它的实现细节,而专注于 Master
。
由于事件驱动的设计(每个事件处理程序都可以从不同的线程调用),代码必须是线程安全的。我们可以使用锁,或者依赖于线程安全集合/原语,或者使用同步上下文。我更喜欢第三种选择,因为它能保持代码的整洁和简单。
class Master {
public SynchronizationContext SynchronizationContext { get; set; }
}
所有事件处理程序都必须在指定的同步上下文中调用,因此我们不再需要担心竞态条件。
在实际应用程序中,创建类似外观(facade)的事件处理程序以确保处理程序是在当前同步上下文中调用的,这是一个好习惯:
public class OnEventOccured(...) {
this.SynchronizationContext.Post(o => this.OnEventOccuredInternal(...), null);
}
此外,我们希望能够测试主节点的某些关键行为(例如,它应该等待工作节点连接,或者它应该在所有已连接的工作节点之间调度任务)。
Master
类有一个内部状态和公共事件处理程序。
首先,我们 显式 定义我们的状态:
public enum MasterState {
WaitingForJob,
PlanningJob,
SchedulingTasks,
WaitingForResults
}
然后 我们定义事件处理程序。每个事件处理程序由以下部分组成:
- 验证逻辑(事件处理程序用当前的
Master
状态调用是否可以?) - 转换验证(是否可以更改当前状态?)
- 状态更改(从一个状态转换为另一个状态)
public void OnPlanningJob() {
if (this.State != MasterState.WaitingForJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.PlanningJob;
this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
this.OnSchedulingTasks();
}
private void OnSchedulingTasks() {
if (this.State != MasterState.PlanningJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.SchedulingTasks;
this.ScheduleTasks();
}
private void OnWaitingForResults() {
if (this.State != MasterState.SchedulingTasks)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.WaitingForResults;
}
public void OnTaskResultReceived(TaskResult result) {
if (this.State != MasterState.WaitingForResults)
throw new InvalidOperationException("Invalid state");
this.Results.Add(result);
this.Tasks.RemoveAll(task => task.Id == result.TaskId);
if (this.Tasks.Count > 0)
return;
this.CurrentJob.Result = this.Results.Sum(x => x.Data);
this.JobFinished(this.CurrentJob);
this.OnWaitingForJob();
}
private void OnWaitingForJob() {
this.Results.Clear();
this.State = MasterState.WaitingForJob;
if (this.JobQueue.Any()) {
this.CurrentJob = this.JobQueue.Dequeue();
this.OnPlanningJob();
}
}
public void OnWorkerConnected(Guid workerId) {
this.Workers.Add(workerId);
if (this.State == MasterState.SchedulingTasks)
this.ScheduleTasks();
}
public void OnWorkerDisconnected(Guid workerId) {
this.Workers.Remove(workerId);
}
代码按预期工作,但有一个问题:很难跟踪这些事件处理程序之间的工作流程。
工作流程分布在整个类中,因此很难在代码中进行更改。为了证明这一点,让我们为“等待工作节点”操作添加超时功能。
首先,我们添加一个名为 WaitingForWorker
的新状态:
internal enum MasterState {
WaitingForJob,
PlanningJob,
WaitingForWorker,
SchedulingTasks,
WaitingForResults
}
然后我们添加一个计时器:
Timer workerTimeoutTimer =
new Timer(this.CheckWorkerTimeout, null, Timeout.Infinite, Timeout.Infinite);
为该状态添加的新转换是:
public void OnWaitingForWorker() {
if (this.State != MasterState.PlanningJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.WaitingForWorker;
if (this.Workers.Count == 0)
this.workerTimeoutTimer.Change(this.WorkerTimeout, Timeout.InfiniteTimeSpan);
else
this.OnSchedulingTasks();
}
计时器的回调是:
private void CheckWorkerTimeout(object parameter) {
this.SynchronizationContext.Post(o => {
if (this.State != MasterState.WaitingForWorker)
return;
this.WorkerTimedout();
this.OnWaitingForJob();
}, null);
}
然后我们修改其他事件处理程序来改变工作流程:
public void OnPlanningJob() {
if (this.State != MasterState.WaitingForJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.PlanningJob;
this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
this.OnWaitingForWorker();
}
public void OnSchedulingTasks() {
if (this.State != MasterState.WaitingForWorker)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.SchedulingTasks;
this.ScheduleTasks();
}
我们不得不修改两个方法并添加一个额外的方法来改变工作流程。每个新需求都会导致代码复杂性增加。最终,代码将变得难以 维护 和 修改 。
让我们考虑一种实现有限状态机的替代方法。
Async/await 实现
在 .NET 4.5(和 C# 5.0)中,Microsoft 引入了一种使用 async
和 await 关键字处理异步方法链的新方法。
基本上,当您创建一个异步方法时,编译器会将其转换为类似 yield
关键字创建的状态机。有关更多详细信息,请参阅本文。
让我们利用这个闪亮的新功能重写我们之前的代码:
private async void ExecuteJobs() {
SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
var cancellationToken = this.cancellationTokenSource.Token;
while (cancellationToken.IsCancellationRequested == false) {
var job = await this.GetJobAsync(cancellationToken);
var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
var results = new List<TaskResult>();
var workers = await this.GetWorkersAsync(cancellationToken);
var i = 0;
foreach (var jobTask in tasks)
this.ScheduleTask(jobTask, workers[i++%workers.Count]);
while (cancellationToken.IsCancellationRequested == false) {
var result = await this.ReceiveTaskResultAsync(cancellationToken);
tasks.RemoveAll(task => task.Id == result.TaskId);
results.Add(result);
if (tasks.Count == 0)
break;
}
job.Result = results.Sum(x => x.Data);
this.JobFinished(job);
}
}
就是这样。整个工作流程都包含在一个简单的方法中!所有其他方法都是完全功利性的:
private static Task<TResult> WaitFor<TResult>(
Action<Action<TResult>> subscribe,
Action<Action<TResult>> unsubscribe,
CancellationToken cancellationToken) {
var source = new TaskCompletionSource<TResult>(cancellationToken);
cancellationToken.Register(OnCancelled<TResult>, source);
Action<TResult> handler = null;
handler = result => {
unsubscribe(handler);
source.TrySetResult(result);
};
subscribe(handler);
return source.Task;
}
private Task<Job> ReceiveJobAsync(CancellationToken cancellationToken) {
return WaitFor<Job>(
handler => this.JobReceived += handler,
handler => this.JobReceived -= handler,
cancellationToken);
}
private Task<TaskResult> ReceiveTaskResultAsync(CancellationToken cancellationToken) {
return WaitFor<TaskResult>(
hanlder => this.TaskResultReceived += hanlder,
handler => this.TaskResultReceived -= handler,
cancellationToken);
}
private async Task<IReadOnlyList<Guid>> GetWorkersAsync(CancellationToken cancellationToken) {
if (this.connectedWorkers.Count == 0)
return new[] {await this.WaitForWorkerToConnect(cancellationToken)};
return this.connectedWorkers;
}
private Task<Guid> WaitForWorkerToConnect(CancellationToken cancellationToken) {
return WaitFor<Guid>(
hanlder => this.WorkerConnected += hanlder,
handler => this.WorkerConnected -= handler,
cancellationToken);
}
private static void OnCancelled<TResult>(object parameter) {
var source = (TaskCompletionSource<TResult>) parameter;
source.TrySetCanceled();
}
private async Task<Job> GetJobAsync(CancellationToken cancellationToken) {
while (this.jobQueue.Count == 0)
await this.ReceiveJobAsync(cancellationToken);
return this.jobQueue.Dequeue();
}
public void OnJobReceived(Job job) {
this.jobQueue.Enqueue(job);
this.JobReceived(job);
}
public void OnTaskResultReceived(TaskResult result) {
this.TaskResultReceived(result);
}
public void OnWorkerConnected(Guid worker) {
this.connectedWorkers.Add(worker);
this.WorkerConnected(worker);
}
private void OnWorkerDisconnected(Guid worker) {
this.connectedWorkers.Remove(worker);
this.WorkerDisconnected(worker);
}
private event Action<Job> JobReceived = _ => { };
private event Action<Guid> WorkerConnected = _ => { };
private event Action<Guid> WorkerDisconnected = _ => { };
private event Action<TaskResult> TaskResultReceived = _ => { };
所有状态都是隐式的,这意味着您可以轻松修改工作流程。
让我们看看它如何处理第一个示例中添加超时功能的流程更改:
private async void ExecuteJobs() {
SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
var cancellationToken = this.cancellationTokenSource.Token;
while (cancellationToken.IsCancellationRequested == false) {
var job = await this.GetJobAsync(cancellationToken);
var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
var results = new List<TaskResult>();
var workers = default (IReadOnlyList<Guid>);
var localTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
localTokenSource.CancelAfter(this.WorkerTimeout);
try {
workers = await this.GetWorkersAsync(localTokenSource.Token);
} catch (OperationCanceledException) {
this.WorkerTimedout();
continue;
}
var i = 0;
foreach (var jobTask in tasks)
this.ScheduleTask(jobTask, workers[i++%workers.Count]);
while (cancellationToken.IsCancellationRequested == false) {
var result = await this.ReceiveTaskResultAsync(cancellationToken);
tasks.RemoveAll(task => task.Id == result.TaskId);
results.Add(result);
if (tasks.Count == 0)
break;
}
job.Result = results.Sum(x => x.Data);
this.JobFinished(job);
}
}
所有修改都在一个方法中完成,我们仍然可以用肉眼跟踪工作流程。
现在让我们看看我们如何测试这段代码。
测试
方法 ExecuteJobs
由多个“部分”组成,这些“部分”在同步上下文中的不同时间执行。
典型的测试将由三个部分组成:
- 执行(使用事件处理程序与
Master
进行交互) - 响应(让齿轮运转起来)
- 验证(断言结果)
我们需要做的第一件事是控制这些“部分”的执行流程。为此,我们必须实现另一个同步上下文的版本:
public class TestSynchronizationContext : SynchronizationContext {
private readonly Queue<KeyValuePair<SendOrPostCallback, object>> queue =
new Queue<KeyValuePair<SendOrPostCallback, object>>();
public override void Post(SendOrPostCallback d, object state) {
if (d == null)
throw new ArgumentNullException("d");
this.queue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
}
public override void Send(SendOrPostCallback d, object state) {
throw new NotSupportedException("Synchronously sending is not supported.");
}
public bool RunOperations() {
if (this.queue.Count == 0)
return false;
while (this.queue.Count > 0) {
var item = this.queue.Dequeue();
item.Key(item.Value);
}
return true;
}
}
方法 RunOperations
运行 ExecuteJobs
方法所有挂起的“部分”。因此,在 RunOperations
执行之后,我们可以肯定地说所有转换都已完成,Master
已准备好进行交互。
这里有一些完整的测试:
public class Tests {
private readonly MasterAsyncAwait master;
private readonly List<Tuple<JobTask, Guid>> scheduledTasks =
new List<Tuple<JobTask, Guid>>();
private readonly TestSynchronizationContext syncContext = new TestSynchronizationContext();
public Tests() {
SynchronizationContext.SetSynchronizationContext(this.syncContext);
this.master = new MasterAsyncAwait {
SynchronizationContext = this.syncContext,
ScheduleTask = (task, workerId) =>
this.scheduledTasks.Add(Tuple.Create(task, workerId))
};
}
[Fact]
public void SchedulesJob() {
this.master.Start();
var workerId = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
this.master.OnWorkerConnected(workerId);
this.syncContext.RunOperations();
Assert.NotEmpty(scheduledTasks);
Assert.True(scheduledTasks.All(task => task.Item2 == workerId));
}
[Fact]
public void SplitsTasksBetweenWorkersFairly() {
this.master.Start();
var workerId1 = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
var workerId2 = new Guid("ffe9c75e-9b41-46fe-b75d-a8bb2167d43c");
this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
this.master.OnWorkerConnected(workerId1);
this.master.OnWorkerConnected(workerId2);
this.syncContext.RunOperations();
Assert.NotEmpty(scheduledTasks);
Assert.True(scheduledTasks.Any(task => task.Item2 == workerId1));
Assert.True(scheduledTasks.Any(task => task.Item2 == workerId2));
}
[Fact]
public void ComputesJobResult() {
var result = 0;
this.master.JobFinished = job => result = job.Result;
this.master.Start();
this.master.OnJobReceived(new Job {Lines = new[] {"test"}});
this.master.OnWorkerConnected(new Guid("1eb02478-cc31-4759-97b4-5d459d802e73"));
this.syncContext.RunOperations();
this.master.OnTaskResultReceived(new TaskResult {
TaskId = this.scheduledTasks.Single().Item1.Id,
Data = 4
});
this.syncContext.RunOperations();
Assert.Equal(4, result);
}
}
如您所见,我们无法显式断言当前状态,因为它被隐式处理,并且查看发生了什么的唯一方法是检查 Master
的输出。因此,每个测试都是一个场景(规范),您进行模拟并最终断言输出。
您还可以公开一些关于内部发生情况的信息:
public MasterStatus Status { get; private set; }
private async void ExecuteJobs() {
...
this.Status = MasterStatus.WaitingForWorker;
...
}
// Collection of connected workers
public IReadOnlyCollection<Guid> ConnectedWorkers { get; private set; }
总结
每种方法都有其优点和缺点。
典型的实现难以维护,但您可以显式验证每个状态,并且不必重放完整场景。相反,您可以设置当前状态,进行操作,并确保转换发生并且新状态有效。
Async/await 实现为您提供了简洁性和灵活性。但是,您无法在单元测试中将该类视为 FSM,您只有一堆交互方法和一个输出。请明智地使用它。