C# 中的任务并行基础






4.82/5 (42投票s)
一篇解释任务并行编程基础的文章。
前言
.NET Framework 开发人员应该了解任务并行库 (TPL)。数据并行性通常是指根据输入数据执行某个操作,并将输入数据分解为更小的部分。数据会被分配给可用的硬件处理器以实现并行。然后,通常会复制并执行一些独立的操作来处理这些分区。通常,应用于数据集元素的也是相同的操作。
任务并行性是指程序已经分解为可以并行运行的各个部分(语句、方法等)。更重要的是,任务并行性将问题视为可以分解为称为“任务”的序列的指令流,这些任务可以同时执行。为了提高计算效率,构成任务的操作应在很大程度上独立于其他任务中的操作。数据分解的观点侧重于任务所需的数据以及如何将其分解为不同的块。只有当数据块可以相对独立地进行操作时,与数据块相关的计算才会高效。虽然在决定并行化时这两种观点显然相互依赖,但将它们分开学习会更好。一本关于任务和计算密集型异步操作的强大参考资料是 Jeffrey Richter 的书籍《CLR via C#, 3rd Edition》。这本书值得一读。
在这篇简短的文章中,我们将重点介绍 System.Threading.Tasks
Task
对象的一些特性。要执行一个简单的任务,请创建一个 Task
类的实例,并将代表您希望执行的工作负载的 System.Action
委托作为构造函数参数传递。您可以显式创建 Action 委托以引用命名方法、使用匿名函数或使用 lambda 函数。创建 Task
实例后,调用 Start()
方法,您的 Task
将被传递给任务调度程序,由它负责为执行工作分配线程。下面是一个示例代码:
using System;
using System.Threading.Tasks;
public class Program {
public static void Main() {
// use an Action delegate and named method
Task task1 = new Task(new Action(printMessage));
// use an anonymous delegate
Task task2 = new Task(delegate { printMessage() });
// use a lambda expression and a named method
Task task3 = new Task(() => printMessage());
// use a lambda expression and an anonymous method
Task task4 = new Task(() => { printMessage() });
task1.Start();
task2.Start();
task3.Start();
task4.Start();
Console.WriteLine("Main method complete. Press <enter> to finish.");
Console.ReadLine();
}
private static void printMessage() {
Console.WriteLine("Hello, world!");
}
}
要从任务中获取结果,请创建 Task<t>
的实例,其中 T
是将产生的结果的数据类型,并在 Task
体内返回该类型的实例。要读取结果,请调用您创建的 Task
的 Result
属性。例如,假设我们有一个名为 Sum
的方法。我们可以构造一个 Task<tresult>
对象,并将操作的返回数据类型作为泛型 TResult
参数传递。
using System;
using System.Threading.Tasks;
public class Program {
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
checked { sum += n; }
return sum;
}
public static void Main() {
Task<int32> t = new Task<int32>(n => Sum((Int32)n), 1000);
t.Start();
t.Wait();
// Get the result (the Result property internally calls Wait)
Console.WriteLine("The sum is: " + t.Result); // An Int32 value
}
}
生成
The sum is: 500500
如果计算密集型操作抛出未处理的异常,该异常将被吞没并存储在一个集合中,线程池会允许其返回到线程池。当调用 Wait
方法或 Result
属性时,这些成员将抛出一个 System.AggregateException
对象。您可以使用 CancellationTokenSource
来取消 Task
。我们必须重写 Sum
方法,使其接受一个 CancellationToken
,然后我们就可以编写代码,创建一个 CancellationTokenSource
对象。
using System;
using System.Threading;
using System.Threading.Tasks;
public class Program {
private static Int32 Sum(CancellationToken ct, Int32 n) {
Int32 sum = 0;
for (; n > 0; n--) {
ct.ThrowIfCancellationRequested();
//Thread.Sleep(0); // Simulate taking a long time
checked { sum += n; }
}
return sum;
}
public static void Main() {
CancellationTokenSource cts = new CancellationTokenSource();
Task<int32> t = new Task<int32>(() => Sum(cts.Token, 1000), cts.Token);
t.Start();
cts.Cancel();
try {
// If the task got canceled, Result will throw an AggregateException
Console.WriteLine("The sum is: " + t.Result); // An Int32 value
}
catch (AggregateException ae) {
ae.Handle(e => e is OperationCanceledException);
Console.WriteLine("Sum was canceled");
}
}
}
输出任务已被取消
Sum was canceled
有一种更好的方法来了解任务何时完成运行。当一个任务完成时,它可以启动另一个任务。现在,当执行 Sum
的任务完成后,该任务将启动另一个任务(也在某个线程池线程上)来显示结果。执行以下代码的线程不会阻塞等待这两个任务中的任何一个完成;该线程可以执行其他代码,或者如果它本身是一个线程池线程,它可以返回到池以执行其他操作。请注意,执行 Sum
的任务可能在调用 ContinueWith
之前完成。
using System;
using System.Threading.Tasks;
public class Program {
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
checked { sum += n; }
return sum;
}
public static void Main() {
// Create Task, defer starting it, continue with another task
Task<int32> t = new Task<int32>(n => Sum((Int32)n), 1000);
t.Start();
// notice the use of the Result property
Task cwt = t.ContinueWith(task => Console.WriteLine(
"The sum is: " + task.Result));
cwt.Wait(); // For the testing only
}
}
生成类似的结果
The sum is: 500500
现在,当执行 Sum
的任务完成后,该任务将启动另一个任务(也在某个线程池线程上)来显示结果。执行上述代码的线程不会阻塞等待这两个任务中的任何一个完成;该线程可以执行其他代码,或者如果它是一个线程池线程,它可以返回到池以执行其他操作。请注意,执行 Sum
的任务可能在调用 ContinueWith
之前完成。这不会有问题,因为 ContinueWith
方法将看到 Sum
任务已完成,并会立即启动显示结果的任务。顺便说一下,任务也支持父/子关系。请查看以下代码:
using System;
using System.Threading;
using System.Threading.Tasks;
public class Program {
private static Int32 Sum(Int32 n)
{
Int32 sum = 0;
for (; n > 0; n--)
checked { sum += n; }
return sum;
}
public static void Main() {
Task<int32[]> parent = new Task<int32[]>(() => {
var results = new Int32[3]; // Create an array for the results
// This tasks creates and starts 3 child tasks
new Task(() => results[0] = Sum(100),
TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[1] = Sum(200),
TaskCreationOptions.AttachedToParent).Start();
new Task(() => results[2] = Sum(300),
TaskCreationOptions.AttachedToParent).Start();
// Returns a reference to the array
// (even though the elements may not be initialized yet)
return results;
});
// When the parent and its children have
// run to completion, display the results
var cwt = parent.ContinueWith(parentTask =>
Array.ForEach(parentTask.Result, Console.WriteLine));
// Start the parent Task so it can start its children
parent.Start();
cwt.Wait(); // For testing purposes
}
}
生成父/子任务结果
5050
20100
45150
在内部,Task
对象包含一个 ContinueWith
任务的集合,这意味着您实际上可以使用单个 Task
对象多次调用 ContinueWith
。重要的是要注意,Task
不会取代线程:它们在线程上运行。当任务完成时,所有 ContinueWith
任务都将被排队到线程池。请记住,当 CLR 初始化时,线程池中没有线程。在内部,线程池维护一个操作请求队列。当您的应用程序想要执行异步操作时,您会调用某个方法,该方法会将条目添加到线程池的队列中。线程池的代码将从该队列中提取条目,并将条目分派到线程池线程。如果没有线程在线程池中,将会创建一个新线程。这就是为什么在编写托管应用程序时,您无需手动创建线程。系统会为每个进程维护一个池,因此线程池只提供静态方法。要调度一个工作项以供执行,您需要调用 QueueUserWorkItem
方法,并将一个 WaitCallback
委托传递给它。但请再次记住,通过创建 Task
对象,我们可以避免调用 ThreadPool
的 QueueUserWorkItem
所带来的限制。
参考文献
- CLR via C#, 第 3 版 Jeffrey Richter。