65.9K
CodeProject 正在变化。 阅读更多。
Home

异步运行工作流以提高性能

starIconstarIconstarIconstarIconstarIcon

5.00/5 (9投票s)

2012 年 3 月 7 日

CPOL

4分钟阅读

viewsIcon

41859

downloadIcon

794

在本文中,我们使用不同的技术在 WF4.0 中实现了一个简单的数学计算,以比较它们的执行时间。

介绍 

在本文中,我们实现了三个不同工作流的代码片段。第一个在 Foreach 活动中实现。第二个在 ParallelForeach 中实现,第三个在单独的线程上运行不同工作流实例上的代码片段。本文的目的是演示 WorkflowApplication 对象如何异步运行多个工作流。最后,我将比较这些方法的执行时间。

注意:app.config 中,请记住更改 Trace 标签以正确记录执行信息。

  <system.diagnostics>
    <trace>
      <listeners>
        <clear/>
        <add name="MyFileListener" type="System.Diagnostics.TextWriterTraceListener"
             initializeData="TraceLog.TXT" />
      </listeners>
    </trace>
  </system.diagnostics>

要求 

要学习本文,您需要安装以下软件

  • Visual Studio 2010 或更高版本,以及 .NET 4.0 或更高版本.
  • 本文的代码.

背景

在 WF 4.0 中,WorkflowApplication 类用于运行工作流(也使用 WorkflowInvoker)。使用 WorkflowApplication.Run,我们可以更好地控制工作流的执行。在 WorkflowApplication 中,我们可以为 Completed、Aborted、Idle、OnUnhandledException 设置操作,以处理工作流上的各种事件。我们将看到使用 WorkflowApplication 异步运行工作流如何提高我们代码的性能。

Using the Code

我们的示例是一个 C# 工作流控制台应用程序。首先,我们需要创建三个独立的工作流,如下面的工作流所示。

ForeachWorkflow.xoml:这里使用了一个 Sequence 活动来创建一个简单的数学计算,然后将 Sequence 放入 Foreach 活动中。Foreach 活动将枚举我们集合的元素(itemList 作为参数),我们的 Sequence 将为 itemList 中的每个值执行一次。 

ParallelForeachWorkflow.xoml:在这里,与之前的工作流一样,使用了一个 Sequence 活动,并将其放入 ParallelForeach 活动中。ParallelForeach 与 Foreach 活动的区别在于,如果 ParallelForeach 中的任何嵌入式活动是异步的或进入空闲状态,例如 Delay 活动或派生自 AsyncCodeActivity 的活动,则嵌入的语句会被调度在一起并异步运行。但是,如果嵌入的活动都不是异步的或进入空闲状态,Foreach 和 ParallelForeach 的执行方式将相同。在我们的工作流 ParallelForeachWorkflow.xoml 中,由于 ParallelForeach 中的活动之间有一个 Delay 活动,每个 Sequence 都在不同的线程上执行。

MultiThreadWorkflow.xaml:在此工作流中,没有 Foreach 或 ParallelForeach 活动。迭代将在后续讨论的代码中实现。

 

这是我们模拟长时间计算的自定义活动。这里使用了一个 Delay 活动,它就像一个耗时的计算。

public sealed class LongRunningCalcActivity : CodeActivity
{
    public InArgument<TimeSpan> Duration { get; set; }

    protected override void Execute(CodeActivityContext context)
    {
        Thread.Sleep(context.GetValue<TimeSpan>(Duration));
    }
}

在我们上述工作流的 LongRunningCalcActivity 中,我将 Duration 参数设置为 00:00:01,这意味着延迟 1 秒。

执行工作流

WorkflowApplication 是一个异步类,与在当前线程中运行工作流的 ActivityInvoker 不同,因此 WorkflowApplication.Run() 将异步执行工作流。此外,使用 WorkflowApplication,我们可以更好地控制执行。以下是我创建 WorkflowApplication 并运行 Workflow 的代码。

WorkflowApplication workflowApp = new WorkflowApplication(new MultiThreadWorkflow());
workflowApp.Completed = WorkflowAppCompleted;
workflowApp.Run(); 
WorkflowAppCompleted 是一个在工作流执行完成后调用的方法。在下面的代码中,numberOfThreadsRunning 是一个变量,当一个工作流执行完成时,它会被递减。numberOfThreadsRunning 会一直递减直到达到 0。如果达到 0,则设置 ManualResetEvent,主线程的执行将恢复。这是 WorkflowAppCompleted 方法。
static int numberOfCalcs = 20; // number of workflows
static int numberOfThreadsRunning = numberOfCalcs; 
static ManualResetEvent eventDone = new ManualResetEvent(false);
public static void WorkflowAppCompleted(WorkflowApplicationCompletedEventArgs e)
{
   if (e.CompletionState == ActivityInstanceState.Faulted)
   {
       Trace.WriteLine(string.Format("Workflow {0} Terminated.", e.InstanceId.ToString()));
       Trace.WriteLine(string.Format("Exception: {0}\n{1}",
			e.TerminationException.GetType().FullName,
			e.TerminationException.Message));
   }
   else if (e.CompletionState == ActivityInstanceState.Canceled)
   {
       Trace.WriteLine("Canceled!");
   }
   else
   {
       if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
          eventDone.Set();
   } 
}

注意:在上面的代码中,使用了 Interlocked.Decrement 来使 numberOfThreadsRunning 的修改成为互斥的。

注意:在许多情况下,我们可能会使用类似以下代码来等待其他线程完成。

WaitHandle[] waitHandles = new WaitHandle[2] { new AutoResetEvent(false), new AutoResetEvent(false) };
//Call Async methods
WaitHandle.WaitAll(waitHandles);

但是,在这种情况下,WaitHandles 的数量必须是有限的(64),否则会抛出异常!

这是完成的代码。

class Program
{
  static int numberOfCalcs = 100;
  static int numberOfThreadsRunning = numberOfCalcs;
  static string workflowInArg = "itemList";
  static ManualResetEvent eventDone = new ManualResetEvent(false);

  static void Main(string[] args)
  {
     List<Double> items = new List<Double>();

     for (int i = 0; i < numberOfCalcs; i++)
        items.Add(System.Convert.ToDouble(i));

     Dictionary<string, object> p = new Dictionary<string, object>();
     p.Add(workflowInArg, items);

     //Linear Foreach
     RunSimpleForeach(p);

     //Parallel Foreach
     RunParallelForeach(p);

     //MultiThreading
     RunMultiThreading();

     Trace.Flush();

 }

 private static void RunMultiThreading()
 {
    Stopwatch sw = Stopwatch.StartNew();
    for (int workflowIndex = 0; workflowIndex < numberOfCalcs; workflowIndex++)
    {
       WorkflowApplication workflowApp = new WorkflowApplication(new MultiThreadWorkflow());
       workflowApp.Completed = WorkflowAppCompleted;
       workflowApp.Run();
    }

    eventDone.WaitOne();
    Trace.WriteLine("**** MultiThread Workflow, Elapsed Time: " + sw.Elapsed.ToString());
 }

 private static void RunParallelForeach( Dictionary<string, object> p)
 {
    Stopwatch sw = Stopwatch.StartNew();
    WorkflowInvoker.Invoke(new ParallelForeachWorkflow(), p);
    Trace.WriteLine("**** Parallel Foreach, Elapsed Time: " + sw.Elapsed.ToString());
 }

 private static void RunSimpleForeach(Dictionary<string, object> p)
 {
    Stopwatch sw = Stopwatch.StartNew();
    WorkflowInvoker.Invoke(new ForeachWorkflow(), p);
    Trace.WriteLine("**** Simple Foreach, Elapsed Time: " + sw.Elapsed.ToString());
 }

public static void WorkflowAppCompleted(WorkflowApplicationCompletedEventArgs e)
{
   if (e.CompletionState == ActivityInstanceState.Faulted)
   {
       Trace.WriteLine(string.Format("Workflow {0} Terminated.", e.InstanceId.ToString()));
       Trace.WriteLine(string.Format("Exception: {0}\n{1}",
			e.TerminationException.GetType().FullName,
			e.TerminationException.Message));
   }
   else if (e.CompletionState == ActivityInstanceState.Canceled)
   {
       Trace.WriteLine("Canceled!");
   }
   else
   {
       if (Interlocked.Decrement(ref numberOfThreadsRunning) == 0)
          eventDone.Set();
   } 
}
} 

执行结果

这是运行我们程序的后的结果。

线程数:100
**** 简单 Foreach,耗时:00:01:40.3350970
**** 并行 Foreach,耗时:00:01:40.0157880
**** 多线程工作流,耗时:00:00:10.0113136

可以看出,多线程工作流比其他工作流快得多! 

注意:以上结果在您的机器上可能会有所不同。但是,在您的机器上,多线程工作流也会比其他工作流快得多!   

讨论  

这里出现的问题是,为什么 Foreach 和 ParallelForeach 活动的结果几乎相同?

原因是,在这两个活动中,工作流实例都在单个线程上运行。WF 运行时使用类似队列的结构在该线程上调度工作。当 ParallelForEach 活动执行时,它只是将子活动调度给运行时。因为这些活动执行的是阻塞的、同步的工作,它们会阻塞工作流运行的线程。解决方法是使用 WorkflowApplication 对象异步运行工作流。另一种方法是将 LongRunningCalcActivity 继承自 AsyncCodeActivity 而不是 CodeActivity,以便在不同线程上运行我们的活动。

结论 

WorkflowApplication 对象异步运行工作流实例。为了有效地利用机器的内存和 CPU 进行长时间运行的工作流,我们可以使用此对象来提高性能。

历史

  • 2012 年 3 月:首次发布。 

© . All rights reserved.