使用 delegate.BeginInvoke/.EndInvoke 等待并行执行






4.59/5 (16投票s)
等待并行执行不需要回调或 WaitHandles
引言
这只是一篇关于一个技巧的短文 - 如何解决一个特殊的线程同步问题。
你只需要调用 Delegate.BeginInvoke()
/ Delegate.EndInvoke()
,不需要 Callback
、Waithandle
和其他东西。
假设,一个(主)线程启动多个子线程,并且在主线程可以继续处理之前,它必须等待所有子线程完成它们的工作。从逻辑上讲,总的等待时间几乎完全是速度最慢的子线程所需的时间(当然,其他线程在那时已经运行完毕)。这怎么实现呢?
首先是完整的代码,用 VB 和 C# 编写
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading
Public Module Module1
Private Sub ThreadOut(ByVal ParamArray args As Object())
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString()
& ": " & String.Concat(args))
End Sub
Private Function ThreadFunc(ByVal halfAmount As Integer) As Integer
ThreadOut("start sleep for ", halfAmount)
Thread.Sleep(halfAmount)
ThreadOut("sleeped for ", halfAmount)
Return halfAmount * 2
End Function
Public Sub Main(ByVal args As String())
Do
Console.WriteLine()
Console.WriteLine("start? y / n")
If Console.ReadKey().KeyChar <> "y"c Then Return
Console.WriteLine()
'[the kernel]
Dim sleepTimes = New Integer() {1500, 500, 2500}
'create a delegate
Dim dlg = New Func(Of Integer, Integer)(AddressOf ThreadFunc)
'collect dlg.BeginInvoke()-calls into a collection of IAsyncResults
Dim asyncResults = From n In sleepTimes _
Select dlg.BeginInvoke(n, Nothing, Nothing)
Dim results = New List(Of Integer)()
Dim sw = System.Diagnostics.Stopwatch.StartNew()
'loop IAsyncResults and collect EndInvoke-ReturnValues as Results
For Each asyncRes In asyncResults.ToArray()
results.Add(dlg.EndInvoke(asyncRes))
Next
'[/the kernel]
'process results in main-thread
Console.WriteLine()
Console.WriteLine(String.Format("Done in {0} ms", sw.ElapsedMilliseconds))
Console.WriteLine()
Console.WriteLine("results:")
For Each itm In results
Console.WriteLine(itm)
Next
Loop
End Sub
End Module
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
class Program {
static private void ThreadOut(params object[]args) {
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + ": "
+string.Concat(args));
}
static private int ThreadFunc(int halfAmount) {
ThreadOut("start sleep for " , halfAmount);
Thread.Sleep(halfAmount);
ThreadOut("sleeped for ", halfAmount);
return halfAmount * 2;
}
static void Main(string[] args) {
while(true) {
Console.WriteLine();
Console.WriteLine("start? y / n");
if(Console.ReadKey().KeyChar != 'y') return;
Console.WriteLine();
//[the kernel]
var sleepTimes = new int[] { 1500, 500, 2500 };
//create a delegate
var dlg = new Func<int, int>(ThreadFunc);
//collect dlg.BeginInvoke()-calls into a collection of IAsyncResults
var asyncResults = from n in sleepTimes select dlg.BeginInvoke(n, null, null);
var results = new List<int>();
var sw = System.Diagnostics.Stopwatch.StartNew();
//loop IAsyncResults and collect EndInvoke-ReturnValues as Results
foreach(var asyncRes in asyncResults.ToArray())
results.Add(dlg.EndInvoke(asyncRes));
//[/the kernel]
//process results in main-thread
Console.WriteLine();
Console.WriteLine(string.Format("Done in {0} ms", sw.ElapsedMilliseconds));
Console.WriteLine();
Console.WriteLine("results:");
foreach(var val in results) Console.WriteLine(val);
}
}
}
让我重复一下要点(已经在内核注释中提到过)
- 从要异步执行的方法创建一个委托
- 循环遍历数据,并通过调用
Delegate.BeginInvoke(dataItem)
开始处理 - 收集每个返回的IASyncResult
。这非常快,因为BeginInvoke()
并不处理任何事情,而只是触发并行执行。 - 循环遍历
IASyncResults
,并调用Delegate.EndInvoke()
- 将结果数据收集到最终结果集合中。
最后一点是技巧:EndInvoke
会阻塞,直到并行进程运行完毕,并可以交付其结果。因此,第一个 EndInvoke
调用会阻塞,并等待子线程结束。之后,它会直接运行到下一个 EndInvoke
调用中。
现在,如果第二个进程比第一个进程快,则第二个 EndInvoke
调用将*不会*阻塞,因为它能够立即交付其结果。否则它也会阻塞,但只阻塞第二个进程比第一个进程慢的时间。
依此类推。
你看到了:总的等待时间将几乎完全是速度最慢的进程所需的时间。
关注点
更好的方法
在大多数情况下,有比让主线程等待更好的方法
- 你可以直接从子线程触发每个结果的通知到主线程,并在评估结果时直接处理它。例如,你可以使用我的文章 AsyncWorker [ ] 中展示的方法来实现这一点(类型)安全且简单。
- 你可以保存一个作业队列,并由*一个*子线程一个接一个地执行它们。通知可以在每个完成的作业或所有作业完成后实现。
线程池行为
运行给定的应用程序,执行并行进程三次,并查看输出

这些数字行由子线程输出(这些数字是线程 ID)。
您可以看到,线程池需要“预热”才能变得真正高效:在第一次执行中,池将这三个作业分派到两个线程中 - 因此总等待时间是作业 #2 和作业 #3 的总和。
第二次执行也比预期的慢得多 - 似乎线程池管理本身花费了大量时间。
从第三次执行开始,有三个子线程立即启动,总等待时间几乎就是最慢的进程所需的时间。
即使启动顺序也混淆了(将其与代码给定的输入数据的顺序进行比较) - 也就是说,并行进程应该如何运行。
将线程池设置为“已预热”。
在某些情况下,在请求它们之前设置 threadpool
中运行线程的最小数量是一个有效的优化
ThreadPool.SetMinThreads(3, 0);
现在性能立即与上面提到的第三次运行一样好。也许您想在之后将数字重置为先前的值,因为线程是一种“昂贵”的资源。