跨线程工作流






4.72/5 (7投票s)
轻松声明可以跨越线程的工作流。
引言
在我编写的 Web 服务器中,我需要一个工作流,该工作流能够执行工作流函数,即使这些函数可能跨越两个或更多线程。这里有一个例子
一个单线程监听器接收一个请求。工作流包括插桩(日志记录)和黑名单检查。如果请求通过黑名单检查,则将其排队等待处理。这使得请求监听器能够非常快速地返回监听连接。
另外,另一个线程等待其队列中的请求,并在收到请求时,选择一个工作线程将请求排队。我们在此过程中插桩工作线程的负载。
最后,工作线程本身进行身份验证检查,路由请求以进行任何自定义处理,然后将数据传递给视图引擎进行最终渲染。
因此,正如您所看到的,这是一个线性工作流,但它跨越了三个独立的线程。我下面介绍的实现是一种管理这些工作流而不会产生过多开销的解决方案。
为什么这很酷
通过使用“continuation”类,我们可以在执行工作流时安全地跨越线程。此外,而不是克隆工作流以适应特定线程,我们可以声明一次工作流,并将工作流状态保留在 continuation 实例中。这是将要执行的工作与执行工作的线程解耦的一种简单而有效的方法。换句话说,要执行的工作可以独立于执行工作的线程来声明。更有用的是,工作过程本身可以确定工作是否应该在同一线程上继续,还是推迟到另一个线程进行处理。
因此,我们以很低的成本实现了高水平的抽象。我们可以
- 声明式地定义工作流
- 将线程与工作实现分离
- 允许工作实现决定如何继续工作
- 在同一线程上
- 推迟到另一个线程
流程签名
为了更好地理解工作流的每个步骤,需要记住正在调用什么。这必须是一个具有以下签名的函数:
Func<WorkflowContinuation<T>, T, WorkflowState> doWork
该函数接收一个 WorkflowContinuation<T>
实例和一个 T
实例,并期望返回一个 WorkflowState
。T 是在工作流函数之间传递的数据类型。
WorkflowState
定义如下
public enum WorkflowState { /// <summary> /// Terminate execution of the workflow. /// </summary> Abort, /// <summary> /// Continue with the execution of the workflow. /// </summary> Continue, /// <summary> /// Execution is deferred until Continue is called, usually by another thread. /// </summary> Defer, }
实际的日志记录工作流函数的示例如下所示
/// <summary> /// A workflow item, implementing a simple instrumentation of the client IP address and port. /// </summary> static WorkflowState LogIPAddress( WorkflowContinuation<HttpListenerContext> workflowContinuation, HttpListenerContext context) { Console.WriteLine(context.Request.RemoteEndPoint.ToString()); return WorkflowState.Continue; }
工作流项
WorkflowItem 是工作流函数的轻量级容器
/// <summary> /// A workflow item is a specific process to execute in the workflow. /// </summary> public class WorkflowItem<T> { protected Func<WorkflowContinuation<T>, T, WorkflowState> doWork; /// <summary> /// Instantiate a workflow item. We take a function that takes the Workflow instance associated with this item /// and a data item. We expect a WorkflowState to be returned. /// </summary> /// <param name="doWork"></param> public WorkflowItem(Func<WorkflowContinuation<T>, T, WorkflowState> doWork) { this.doWork = doWork; } /// <summary> /// Execute the workflow item method. /// </summary> public WorkflowState Execute(WorkflowContinuation<T> workflowContinuation, T data) { return doWork(workflowContinuation, data); } }
Workflow 类
Workflow
类管理工作流——要执行的工作流函数集合。
/// <summary> /// The Workflow class handles a list of workflow items that we can use to /// determine the processing of a request. /// </summary> public class Workflow<T> { protected List<WorkflowItem<T>> items; public Workflow() { items = new List<WorkflowItem<T>>(); } /// <summary> /// Add a workflow item. /// </summary> public void AddItem(WorkflowItem<T> item) { items.Add(item); } /// <summary> /// Execute the workflow from the beginning. /// </summary> public void Execute(T data) { WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this); InternalContinue(continuation, data); } /// <summary> /// Continue a deferred workflow, unless it is aborted. /// </summary> public void Continue(WorkflowContinuation<T> wc, T data) { // TODO: Throw exception instead? if (!wc.Abort) { wc.Defer = false; InternalContinue(wc, data); } } /// <summary> /// Internally, we execute workflow steps until: /// 1. we reach the end of the workflow chain /// 2. we are instructed to abort the workflow /// 3. we are instructed to defer execution until later. /// </summary> protected void InternalContinue(WorkflowContinuation<T> wc, T data) { while ((wc.WorkflowStep < items.Count) && !wc.Abort && !wc.Defer) { WorkflowState state = items[wc.WorkflowStep++].Execute(wc, data); switch (state) { case WorkflowState.Abort: wc.Abort = true; break; case WorkflowState.Defer: wc.Defer = true; break; } } } }
这有一个简单而优雅的地方——工作流是通过调用 Execute
方法启动的。如果一个函数将工作传递给另一个线程,它会返回 Defer 状态。接管工作流的线程可以通过调用 Continue
方法继续工作流。
工作流 Continuation
然而,真正起作用、让这一切生效的是 WorkflowContinuation
类
/// <summary> /// Thread-specific instance that preserves the workflow continuation context for that thread. /// </summary> public class WorkflowContinuation<T> { public int WorkflowStep { get; set; } public bool Abort { get; set; } public bool Defer { get; set; } public Workflow<T> Workflow { get; protected set; } public WorkflowContinuation(Workflow<T> workflow) { Workflow = workflow; } }
嗯,作为“重头戏”,它似乎没做什么!但重点是,这个类跟踪工作流上下文的状态,并允许在将其传递给另一个线程时继续工作流。这做了以下几点:
- 我们可以定义一个特定工作流模式的单个实例
- 我们可以同时使用该实例,因为我们实际上是在实现 Continuation Passing Style——我们将 continuation 状态传递给每个工作流函数。
- 因此,作为流程的工作流是线程安全的,即使我们在不同线程之间共享实例。
唯一的性能损失是,当工作流开始执行时,我们必须创建 WorkflowContinuation
WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this);
否则,不需要分配其他任何东西就能让这一切工作。
一些示例
一些基本示例应该有助于理解。下面是如何声明一个基本(不是上面图示的)工作流:
static void InitializeWorkflow() { workflow = new Workflow<HttpListenerContext>(); workflow.AddItem(new WorkflowItem<HttpListenerContext>(LogIPAddress)); workflow.AddItem(new WorkflowItem<HttpListenerContext>(handler.Process)); workflow.AddItem(new WorkflowItem<HttpListenerContext>(CommonHandler.CommonResponse)); }
准备好后,我们可以使用我们的数据实例启动工作流,在本例中是 HttpListenerContext:
workflow.Execute(context);
您已经看到了“插桩”的例子
static WorkflowState LogIPAddress( WorkflowContinuation<HttpListenerContext> workflowContinuation, HttpListenerContext context) { Console.WriteLine(context.Request.RemoteEndPoint.ToString()); return WorkflowState.Continue; }
注意工作流 continuation 是如何传递的(尽管我们这里没有使用它),并注意函数是如何返回 Continue
状态的。
现在,在某个时候,数据将被排队,以便另一个线程可以处理它:
public WorkflowState Process( WorkflowContinuation<HttpListenerContext> workflowContinuation, HttpListenerContext context) { // Create a workflow context and queue it. requests.Enqueue(new WorkflowContext(workflowContinuation, context)); semQueue.Release(); return WorkflowState.Defer; }
请注意,continuation 是如何与请求上下文一起入队的。同时也要注意,此函数返回工作流状态 Defer
。这向工作流引擎表明,此工作流的 continuation 被推迟,直到另一个线程接管工作。
最后,接管工作的线程调用 Continue
函数:
ts.WaitOne(); WorkflowContext context; if (ts.TryDequeue(out context)) { // Continue with where we left off for this context's workflow. context.WorkflowContinuation.Workflow.Continue(context.WorkflowContinuation, context.Context); }
结论
就是这样——这个概念实际上很简单。实际上,解释这个概念所需的时间似乎比编写代码实现它所需的时间更长。