65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 delegate.BeginInvoke/.EndInvoke 等待并行执行

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.59/5 (16投票s)

2010年3月25日

CPOL

3分钟阅读

viewsIcon

54785

downloadIcon

533

等待并行执行不需要回调或 WaitHandles

引言

这只是一篇关于一个技巧的短文 - 如何解决一个特殊的线程同步问题。
你只需要调用 Delegate.BeginInvoke() / Delegate.EndInvoke(),不需要 CallbackWaithandle 和其他东西。

假设,一个(主)线程启动多个子线程,并且在主线程可以继续处理之前,它必须等待所有子线程完成它们的工作。从逻辑上讲,总的等待时间几乎完全是速度最慢的子线程所需的时间(当然,其他线程在那时已经运行完毕)。这怎么实现呢?

首先是完整的代码,用 VB 和 C# 编写

Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading

Public Module Module1

   Private Sub ThreadOut(ByVal ParamArray args As Object())
      Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() 
         & ": " & String.Concat(args))
   End Sub

   Private Function ThreadFunc(ByVal halfAmount As Integer) As Integer
      ThreadOut("start sleep for ", halfAmount)
      Thread.Sleep(halfAmount)
      ThreadOut("sleeped for ", halfAmount)
      Return halfAmount * 2
   End Function

   Public Sub Main(ByVal args As String())
      Do
         Console.WriteLine()
         Console.WriteLine("start? y / n")
         If Console.ReadKey().KeyChar <> "y"c Then Return
         Console.WriteLine()

         '[the kernel]
         Dim sleepTimes = New Integer() {1500, 500, 2500}
         'create a delegate
         Dim dlg = New Func(Of Integer, Integer)(AddressOf ThreadFunc)
         'collect dlg.BeginInvoke()-calls into a collection of IAsyncResults
         Dim asyncResults = From n In sleepTimes _
            Select dlg.BeginInvoke(n, Nothing, Nothing)
         Dim results = New List(Of Integer)()
         Dim sw = System.Diagnostics.Stopwatch.StartNew()
         'loop IAsyncResults and collect EndInvoke-ReturnValues as Results
         For Each asyncRes In asyncResults.ToArray()
            results.Add(dlg.EndInvoke(asyncRes))
         Next
         '[/the kernel]

         'process results in main-thread
         Console.WriteLine()
         Console.WriteLine(String.Format("Done in {0} ms", sw.ElapsedMilliseconds))
         Console.WriteLine()
         Console.WriteLine("results:")
         For Each itm In results
            Console.WriteLine(itm)
         Next
      Loop

   End Sub

End Module
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

class Program {

   static private void ThreadOut(params object[]args) {
      Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + ": "
         +string.Concat(args));
   } 
   static private int ThreadFunc(int halfAmount) {
      ThreadOut("start sleep for " , halfAmount);
      Thread.Sleep(halfAmount);
      ThreadOut("sleeped for ", halfAmount); 
      return halfAmount * 2;
   }
   static void Main(string[] args) {
      while(true) {
         Console.WriteLine();
         Console.WriteLine("start? y / n");
         if(Console.ReadKey().KeyChar != 'y') return;
         Console.WriteLine();

         //[the kernel]
         var sleepTimes = new int[] { 1500, 500, 2500 };
         //create a delegate
         var dlg = new Func<int, int>(ThreadFunc);
         //collect dlg.BeginInvoke()-calls into a collection of IAsyncResults
         var asyncResults = from n in sleepTimes select dlg.BeginInvoke(n, null, null);
         var results = new List<int>();
         var sw = System.Diagnostics.Stopwatch.StartNew();
         //loop IAsyncResults and collect EndInvoke-ReturnValues as Results
         foreach(var asyncRes in asyncResults.ToArray()) 
            results.Add(dlg.EndInvoke(asyncRes));
         //[/the kernel]

         //process results in main-thread
         Console.WriteLine();
         Console.WriteLine(string.Format("Done in {0} ms", sw.ElapsedMilliseconds));
         Console.WriteLine();
         Console.WriteLine("results:");
         foreach(var val in results) Console.WriteLine(val);
      }

   }
}

让我重复一下要点(已经在内核注释中提到过)

  • 从要异步执行的方法创建一个委托
  • 循环遍历数据,并通过调用 Delegate.BeginInvoke(dataItem) 开始处理 - 收集每个返回的 IASyncResult。这非常快,因为 BeginInvoke() 并不处理任何事情,而只是触发并行执行。
  • 循环遍历 IASyncResults,并调用 Delegate.EndInvoke() - 将结果数据收集到最终结果集合中。

最后一点是技巧:EndInvoke 会阻塞,直到并行进程运行完毕,并可以交付其结果。因此,第一个 EndInvoke 调用会阻塞,并等待子线程结束。之后,它会直接运行到下一个 EndInvoke 调用中。

现在,如果第二个进程比第一个进程快,则第二个 EndInvoke 调用将*不会*阻塞,因为它能够立即交付其结果。否则它也会阻塞,但只阻塞第二个进程比第一个进程慢的时间。
依此类推。

你看到了:总的等待时间将几乎完全是速度最慢的进程所需的时间。

关注点

更好的方法

在大多数情况下,有比让主线程等待更好的方法

  • 你可以直接从子线程触发每个结果的通知到主线程,并在评估结果时直接处理它。例如,你可以使用我的文章 AsyncWorker [ ] 中展示的方法来实现这一点(类型)安全且简单。
  • 你可以保存一个作业队列,并由*一个*子线程一个接一个地执行它们。通知可以在每个完成的作业或所有作业完成后实现。

线程池行为

运行给定的应用程序,执行并行进程三次,并查看输出

WaitWithEndInvoke.Png

这些数字行由子线程输出(这些数字是线程 ID)。
您可以看到,线程池需要“预热”才能变得真正高效:在第一次执行中,池将这三个作业分派到两个线程中 - 因此总等待时间是作业 #2 和作业 #3 的总和。

第二次执行也比预期的慢得多 - 似乎线程池管理本身花费了大量时间。

从第三次执行开始,有三个子线程立即启动,总等待时间几乎就是最慢的进程所需的时间。

即使启动顺序也混淆了(将其与代码给定的输入数据的顺序进行比较) - 也就是说,并行进程应该如何运行。

将线程池设置为“已预热”。

在某些情况下,在请求它们之前设置 threadpool 中运行线程的最小数量是一个有效的优化

ThreadPool.SetMinThreads(3, 0); 

现在性能立即与上面提到的第三次运行一样好。也许您想在之后将数字重置为先前的值,因为线程是一种“昂贵”的资源。

© . All rights reserved.