异步运行工作流以提高性能





5.00/5 (9投票s)
在本文中,我们使用不同的技术在 WF4.0 中实现了一个简单的数学计算,以比较它们的执行时间。
介绍
在本文中,我们实现了三个不同工作流的代码片段。第一个在 Foreach 活动中实现。第二个在 ParallelForeach 中实现,第三个在单独的线程上运行不同工作流实例上的代码片段。本文的目的是演示 WorkflowApplication
对象如何异步运行多个工作流。最后,我将比较这些方法的执行时间。
注意:在 app.config 中,请记住更改 Trace 标签以正确记录执行信息。
<system.diagnostics> <trace> <listeners> <clear/> <add name="MyFileListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="TraceLog.TXT" /> </listeners> </trace> </system.diagnostics>
要求
要学习本文,您需要安装以下软件
- Visual Studio 2010 或更高版本,以及 .NET 4.0 或更高版本.
- 本文的代码.
背景
在 WF 4.0 中,WorkflowApplication
类用于运行工作流(也使用 WorkflowInvoker)。使用 WorkflowApplication.Run
,我们可以更好地控制工作流的执行。在 WorkflowApplication 中,我们可以为 Completed、Aborted、Idle、OnUnhandledException 设置操作,以处理工作流上的各种事件。我们将看到使用 WorkflowApplication
异步运行工作流如何提高我们代码的性能。
Using the Code
我们的示例是一个 C# 工作流控制台应用程序。首先,我们需要创建三个独立的工作流,如下面的工作流所示。
ForeachWorkflow.xoml:这里使用了一个 Sequence 活动来创建一个简单的数学计算,然后将 Sequence 放入 Foreach 活动中。Foreach 活动将枚举我们集合的元素(itemList 作为参数),我们的 Sequence 将为 itemList 中的每个值执行一次。

ParallelForeachWorkflow.xoml:在这里,与之前的工作流一样,使用了一个 Sequence 活动,并将其放入 ParallelForeach 活动中。ParallelForeach 与 Foreach 活动的区别在于,如果 ParallelForeach 中的任何嵌入式活动是异步的或进入空闲状态,例如 Delay 活动或派生自 AsyncCodeActivity
的活动,则嵌入的语句会被调度在一起并异步运行。但是,如果嵌入的活动都不是异步的或进入空闲状态,Foreach 和 ParallelForeach 的执行方式将相同。在我们的工作流 ParallelForeachWorkflow.xoml 中,由于 ParallelForeach 中的活动之间有一个 Delay 活动,每个 Sequence 都在不同的线程上执行。

MultiThreadWorkflow.xaml:在此工作流中,没有 Foreach 或 ParallelForeach 活动。迭代将在后续讨论的代码中实现。
这是我们模拟长时间计算的自定义活动。这里使用了一个 Delay 活动,它就像一个耗时的计算。
public sealed class LongRunningCalcActivity : CodeActivity { public InArgument<TimeSpan> Duration { get; set; } protected override void Execute(CodeActivityContext context) { Thread.Sleep(context.GetValue<TimeSpan>(Duration)); } }
在我们上述工作流的 LongRunningCalcActivity
中,我将 Duration 参数设置为 00:00:01
,这意味着延迟 1 秒。
执行工作流
WorkflowApplication
是一个异步类,与在当前线程中运行工作流的 ActivityInvoker 不同,因此 WorkflowApplication.Run()
将异步执行工作流。此外,使用 WorkflowApplication
,我们可以更好地控制执行。以下是我创建 WorkflowApplication 并运行 Workflow 的代码。
WorkflowApplication workflowApp = new WorkflowApplication(new MultiThreadWorkflow()); workflowApp.Completed = WorkflowAppCompleted; workflowApp.Run();
WorkflowAppCompleted
是一个在工作流执行完成后调用的方法。在下面的代码中,numberOfThreadsRunning
是一个变量,当一个工作流执行完成时,它会被递减。numberOfThreadsRunning
会一直递减直到达到 0。如果达到 0,则设置 ManualResetEvent,主线程的执行将恢复。这是 WorkflowAppCompleted
方法。static int numberOfCalcs = 20; // number of workflows static int numberOfThreadsRunning = numberOfCalcs; static ManualResetEvent eventDone = new ManualResetEvent(false); public static void WorkflowAppCompleted(WorkflowApplicationCompletedEventArgs e) { if (e.CompletionState == ActivityInstanceState.Faulted) { Trace.WriteLine(string.Format("Workflow {0} Terminated.", e.InstanceId.ToString())); Trace.WriteLine(string.Format("Exception: {0}\n{1}", e.TerminationException.GetType().FullName, e.TerminationException.Message)); } else if (e.CompletionState == ActivityInstanceState.Canceled) { Trace.WriteLine("Canceled!"); } else { if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0) eventDone.Set(); } }
注意:在上面的代码中,使用了 Interlocked.Decrement
来使 numberOfThreadsRunning
的修改成为互斥的。
注意:在许多情况下,我们可能会使用类似以下代码来等待其他线程完成。
WaitHandle[] waitHandles = new WaitHandle[2] { new AutoResetEvent(false), new AutoResetEvent(false) }; //Call Async methods WaitHandle.WaitAll(waitHandles);
但是,在这种情况下,WaitHandles
的数量必须是有限的(64),否则会抛出异常!
这是完成的代码。
class Program
{
static int numberOfCalcs = 100;
static int numberOfThreadsRunning = numberOfCalcs;
static string workflowInArg = "itemList";
static ManualResetEvent eventDone = new ManualResetEvent(false);
static void Main(string[] args)
{
List<Double> items = new List<Double>();
for (int i = 0; i < numberOfCalcs; i++)
items.Add(System.Convert.ToDouble(i));
Dictionary<string, object> p = new Dictionary<string, object>();
p.Add(workflowInArg, items);
//Linear Foreach
RunSimpleForeach(p);
//Parallel Foreach
RunParallelForeach(p);
//MultiThreading
RunMultiThreading();
Trace.Flush();
}
private static void RunMultiThreading()
{
Stopwatch sw = Stopwatch.StartNew();
for (int workflowIndex = 0; workflowIndex < numberOfCalcs; workflowIndex++)
{
WorkflowApplication workflowApp = new WorkflowApplication(new MultiThreadWorkflow());
workflowApp.Completed = WorkflowAppCompleted;
workflowApp.Run();
}
eventDone.WaitOne();
Trace.WriteLine("**** MultiThread Workflow, Elapsed Time: " + sw.Elapsed.ToString());
}
private static void RunParallelForeach( Dictionary<string, object> p)
{
Stopwatch sw = Stopwatch.StartNew();
WorkflowInvoker.Invoke(new ParallelForeachWorkflow(), p);
Trace.WriteLine("**** Parallel Foreach, Elapsed Time: " + sw.Elapsed.ToString());
}
private static void RunSimpleForeach(Dictionary<string, object> p)
{
Stopwatch sw = Stopwatch.StartNew();
WorkflowInvoker.Invoke(new ForeachWorkflow(), p);
Trace.WriteLine("**** Simple Foreach, Elapsed Time: " + sw.Elapsed.ToString());
}
public static void WorkflowAppCompleted(WorkflowApplicationCompletedEventArgs e)
{
if (e.CompletionState == ActivityInstanceState.Faulted)
{
Trace.WriteLine(string.Format("Workflow {0} Terminated.", e.InstanceId.ToString()));
Trace.WriteLine(string.Format("Exception: {0}\n{1}",
e.TerminationException.GetType().FullName,
e.TerminationException.Message));
}
else if (e.CompletionState == ActivityInstanceState.Canceled)
{
Trace.WriteLine("Canceled!");
}
else
{
if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
eventDone.Set();
}
}
}
执行结果
这是运行我们程序的后的结果。
线程数:100
**** 简单 Foreach,耗时:00:01:40.3350970
**** 并行 Foreach,耗时:00:01:40.0157880
**** 多线程工作流,耗时:00:00:10.0113136
可以看出,多线程工作流比其他工作流快得多!
注意:以上结果在您的机器上可能会有所不同。但是,在您的机器上,多线程工作流也会比其他工作流快得多!
讨论
这里出现的问题是,为什么 Foreach 和 ParallelForeach 活动的结果几乎相同?
原因是,在这两个活动中,工作流实例都在单个线程上运行。WF 运行时使用类似队列的结构在该线程上调度工作。当 ParallelForEach 活动执行时,它只是将子活动调度给运行时。因为这些活动执行的是阻塞的、同步的工作,它们会阻塞工作流运行的线程。解决方法是使用 WorkflowApplication 对象异步运行工作流。另一种方法是将 LongRunningCalcActivity 继承自 AsyncCodeActivity 而不是 CodeActivity,以便在不同线程上运行我们的活动。结论
WorkflowApplication
对象异步运行工作流实例。为了有效地利用机器的内存和 CPU 进行长时间运行的工作流,我们可以使用此对象来提高性能。
历史
- 2012 年 3 月:首次发布。