并行编程在 .NET 中的内部原理






4.88/5 (64投票s)
.NET 4 引入了一个强大的 Task 库,用于支持代码在并行处理器上运行。它通过使用新编写的任务库(System.Threading.Tasks)在 mscorlib 4.0 中简单地生成线程到多个进程。Task 库包含 For、ForEach 和 Invoke 等方法。
引言
.NET 4 引入了一个强大的 Task 库,用于支持代码在并行处理器上运行。它通过使用新编写的任务库(System.Threading.Tasks
)在 Mscorlib 4.0 中简单地生成线程到多个进程。Task 库包含 For
、ForEach
和 Invoke
等方法来支持 .NET 语言中的并行,我将在后面详细讨论这些方法。
让我们看看 .NET 4.0 在引入并行扩展方面发生了哪些变化。
CLR 线程池 4.0
.NET 2.0 的旧线程池仅支持 1-4 个处理器,其中“单队列”、“单锁”模式就足够了。在 .NET 4.0 中改进的线程池最多可以容纳 16-256 个处理器。
- 新的 ThreadPool 显著减少了将工作项推送到池或从池中拉出的同步开销。在早期版本中,ThreadPool 队列实现为由一个大锁保护的链表,而新版本的队列基于新的数组样式、无锁、GC 友好的
ConcurrentQueue<T>
类。 - CLR 4.0 线程池还支持每个任务的本地队列,并实现了一种工作窃取算法,该算法可以在队列之间进行负载均衡。
- 在 CLR 线程池的 4.0 版本中,“线程注入和退休 算法”发生了变化。在运行线程多于 CPU 的情况下,新的线程池会推迟(即暂时降低并发级别)创建新线程,而不是以每 0.5 秒一个线程的速度创建新线程。
MSCORLIB 4.0
System.Collections.Concurrent 命名空间
System.Collections.Concurrent
命名空间提供了一些线程安全集合类,在多个线程并发访问集合时,应使用它们来代替 System.Collections
和 System.Collections.Generic
命名空间中相应的类型。
System.Threading.Tasks 命名空间
System.Threading.Tasks
命名空间提供了一系列类型,用于简化编写并发和异步代码的工作。主要类型是 System.Threading.Tasks.Task
,它表示一个可以等待和取消的异步操作,以及 System.Threading.Tasks.Task(Of TResult)
,它是一个可以返回值任务。Factory
类提供了创建和启动任务的静态方法,而 System.Threading.Tasks.TaskScheduler
类提供了默认的线程调度基础结构。
.NET 4 取消框架
.NET 4 的一个非常有趣的补充是一组新类型,专门用于构建支持取消的应用程序和库。这些新类型实现了丰富的功能,可以方便安全地进行取消,并有助于简化过去困难、易出错且不可组合的情况。
该框架的基础是两个新类型:CancellationToken
是一个表示“潜在取消请求”的结构体。这个结构体作为参数传递给方法调用,方法可以轮询它或注册一个在请求取消时触发的回调。CancellationTokenSource
是一个提供发起取消请求机制的类,它有一个 Token
属性用于获取关联的令牌。将这两个类合并成一个会很自然,但这种设计允许将两个关键操作(发起取消请求与观察和响应取消)清晰地分开。特别是,只接受 CancellationToken
的方法可以观察取消请求但不能发起取消请求。
工作原理
现在您已经充分了解了 .NET 中用于实现代码并行化的变化。在我们的例子中,让我们从并行化的一个简单实体开始,称为 Task
。新的 System.Threading.Tasks.Task
类负责线程分配和同步;是的,它在内部使用线程,但方式非常高效。
Task
类实现了 IThreadPoolWorkItem
接口,该接口负责执行线程池队列中的工作项。任务并行化的完整流程如下图所示。
ThreadPoolTaskScheduler
类使用 ThreadPool API 将 Task
(IThreadPoolWorkItem
)推送到全局队列。工作线程从全局队列(QueueSegment
)中拾取任务。处理完成后,它通过调用 System.Threading
的手动重置事件来通知任务。如果在任务中创建了任务,则该任务不会被推送到全局队列,而是保存在本地队列(工作窃取队列)中。工作窃取队列中的任何任务项都可以被任何其他工作线程共享。工作窃取是如何工作的?工作线程首先查看本地队列;如果没有要拾取的工作项,它会搜索全局队列。如果仍然没有工作,它会查找其他队列中的任何待处理工作项。因此,在任务处理过程中,没有线程会空闲,这实际上实现了机器所有核心的 100% 利用率。
我认为本地队列和工作窃取队列有很多好处,这是传统线程池所缺乏的,例如全局队列的争用更少,工作线程得到有效利用,等等。需要注意的是,巨大的性能提升也源于将基于链表的队列更改为基于数组的队列;所有本地和全局队列都是基于数组的,这意味着没有用于并发的大锁。
现在让我们开始使用 Visual Studio。
我在我的双核机器上运行了下面的代码示例,最好在四核上尝试。
//In Static void Main
DoSomeBigStuff(1, 10000);
DoSomeBigStuff(2, 10000);
//In some class
public void DoSomeBigStuff(int inp,int limit)
{
ProcessItem();
if (inp > limit)
{
return;
}
DoSomeBigStuff(++inp, limit);
}
public void ProcessItem()
{
Thread.SpinWait(100000);
}
在我的双核系统上运行这段代码得到的结果是 8431ms,但如果您查看 CPU 性能,CPU 只使用了 50%。
让我们对上面的代码做一些修改,并将 DoSomeBigStuff
放入一个任务中。
Task t1 = new Task(() =>
{
DoSomeBigStuff(1, 10000);
});
Task t2 = new Task(() =>
{
DoSomeBigStuff(2, 10000);
});
CPU 的情况完全改变了,现在开始使用两个核心。代码在 4589 ms 内完成,这是一个巨大的改进。现在我无需考虑扩展我的代码;只需在一台 8 核机器上运行相同的代码,它将花费不到 1000ms。Task
类封装了处理线程数(基于处理器)和工作项分配的逻辑。
线程会带来一些开销,因此单个任务可能会花费更长的时间 - 在这种情况下,从单线程示例的约 82 毫秒到并行处理中的 300 毫秒不等。
在决定何时以及在应用程序中并行化什么时,应该考虑此开销。对您的应用程序而言,更重要的是:它完成得更快,还是使用的总 CPU 时间更少?另一个考虑是工作项的顺序。
让我们看一下线程。检查线程调试窗口,您可以看到除了主线程外,还生成了两个新线程。
并行库已启动两个线程来执行与核心数相等的工作。另外创建了一个线程,它负责处理从队列到主线程的回调。
查看单个线程的调用堆栈;它执行的是与我们在上面的图中所讨论的相同操作。
您可以弄清楚 Execute
方法的作用;它只是将项推送到全局队列。我想在这里讨论的有趣方法是 ThreadPoolWorkQueue.Dispatch()
。让我们一步一步地研究这个方法。
ThreadPoolGlobals.workQueue.MarkThreadRequestSatisfied();
这会减少未完成的线程请求,这意味着它从池中获取线程。
ThreadPoolWorkQueueThreadLocals tl =
ThreadPoolGlobals.workQueue.EnsureCurrentThreadHasQueue();
这会创建一个本地队列(如果当前工作线程尚未创建)。
ThreadPoolGlobals.workQueue.Dequeue(tl, out callback, out missedSteal);
这会从工作队列中出队任务。Dequeue
方法足够智能,可以在全局或本地队列中找到任务;然后它会尝试从其他队列窃取任务。
callback.ExecuteWorkItem();
ThreadPool.NotifyWorkItemComplete();
ThreadPoolGlobals.workQueue.EnsureThreadRequested();
执行工作项并通知线程池工作已完成。如果成功,它将线程返回给池。
对我来说,这一切看起来很简单,但却是处理线程的有效逻辑。下图将很好地说明 Task 库处理线程的部分。
额外内容
Task Parallel 库还提供了一个可以使用的类,称为 Parallel
。Parallel
类中存在的 API 用于迭代,如 For
、ForEach
和 Invoke
。所有这些扩展方法都使用上面讨论的相同的 Task 库。
在继续之前,我们将看到另一个示例。
传统 For 循环 | 并行 For 循环 |
---|---|
for (int i=0; i < 1000; i++){
Calculate(i);
}
public void Calculate(int pos){
double result = new Random().NextDouble();
for (int i = 0; i < 20000; i++){
result += Math.Acos(new
Random().NextDouble());
}
}
|
Parallel.For(0, 1000, delegate(int i)
{
Calculate(i);
}
);
完成时间是传统 For 循环的一半。 |
Parallel.For
直接跳转到 ForWorker
方法,参数如下,在 Parallel
类中。
ForWorker<object>(fromInclusive, toExclusive,
s_defaultParallelOptions, body, null, null, null, null);
在内部,ForWorker
方法创建一个 ParallelForReplicatingTask
实例,该实例继承自 Task
,默认设置为 InternalTaskOptions.SelfReplicating
。ForWorker
创建一个空的根任务,迭代任务作为根任务的子任务创建。
rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
此方法启动子任务的自复制过程,并将它们排队到本地队列。Parallel
类决定线程工作数,该数量等于处理器数量。
在 Parallel
类所有方法中,都已考虑到如果收到任何取消任务请求,线程不会被中止,而是会进行软取消。
在自复制时,.NET 必须确保以下几点:
- 自复制应该有一个跨副本的通信机制,用于通信整体活动的完成。
- 仅当此通信和分区管理的成本远低于并行性带来的潜在好处时才使用。
- 不要假设任务总是被复制。只有在有可用资源时才会被复制。出于同样的原因,不要假设会有特定数量的副本。
- 在某些情况下,副本的数量可能远远超过机器上的核心数量。
- 当可能正确处理同一步骤的多次执行时,您可以选择使用乐观并发。
我认为他们在处理线程工作数、使用并发队列和有效的取消框架方面做得很好。
以下是演示源代码
以下是在双核和四核上运行源程序后的结果
一个额外项目
Visual Studio 团队发布了 Concurrency Analyzer:如果您有 VS 2010,可以在 Analyze->Launch Performance Wizard->Concurrency 下看到此选项。
它会生成一份完整的分析报告,包含关于线程和核心使用情况的图表。如果您正在构建一个多线程密集型应用程序,那么您应该进行此分析。还可以查看 Concurrency Analyzer 视频。
做得好,.NET 并行扩展团队!!
参考文献
- http://blogs.msdn.com/b/pfxteam/archive/2009/05/22/9635790.aspx
- http://aviadezra.blogspot.com/2009/06/net-clr-thread-pool-work.html
- http://aviadezra.blogspot.com/2009/04/task-parallel-library-parallel.html
- http://blogs.microsoft.co.il/blogs/sasha/archive/2008/06/11/ waltzing-through-the-parallel-extensions-june-ctp-collection-classes.aspx
- http://aviadezra.blogspot.com/2009/04/task-parallel-library-parallel.html
- http://msdn.microsoft.com/en-us/magazine/cc163329.aspx
- http://blogs.msdn.com/b/pedram/archive/2008/04/03/more-on-self-replicating-tasks.aspx