Microsoft .NET 4.0 并行编程介绍






4.93/5 (135投票s)
介绍 .NET 4.0 的并行编程功能。
引言
"能力越大,责任越大"……随着硬件世界的蓬勃发展,处理能力不断提升,开发者有责任利用这种力量。好吧,这个比喻有点牵强,但我很喜欢《蜘蛛侠》电影,总想找个机会用它的口号……看来我还是得继续寻找 :)
说真的,多核(和多处理器)机器如今已占主导地位。不久的将来,正如一些主要供应商报告的那样,您将无法找到核心数少于 8 的机器。因此,传统的串行程序编写方式将无法默认地利用这种力量。这些程序应该以一种能够利用多核力量的方式来编写。
我们必须这样做吗?有人可能会问。在多核机器上运行我的程序难道不会自动提高性能吗?毕竟,不是有更多的线程吗?答案是否定的!在多核机器上运行串行程序几乎不会带来任何性能提升。除了 IO 操作,您的程序一次仍然只使用一个核心的一个线程。线程切换会发生(而且还会产生开销),但始终只有一个线程处于活动状态(同样,IO 操作除外,它可以发生在不同的线程上)。
多线程编程是开发者能够同时利用多个线程的方式。虽然它已经存在很多年了,但由于其复杂性,多线程编程经常被避免。开发者不得不将大部分精力投入到编写干净的多线程代码上,而不是专注于业务问题。
多线程与多核有什么关系?即使只有一个核心,我们不能编写多线程代码吗?是的,我们可以。但同样,只要我们只有一个核心,一次就只有一个线程工作,因为所有线程都属于同一个核心……这很符合逻辑。当您的多线程程序在多核机器上运行时,属于不同核心的线程将能够一起运行,因此您的程序将利用硬件的力量。
在继续之前,最后一点说明:多线程编程无论您拥有的机器是多核机器还是多处理器机器,都没有区别。这是硬件的区分;底层来说,您的代码仍然与线程一起工作。
并行 vs. 并发
嗯,这两个概念都相关,但并不相同:最简单的定义是,并发是指以并发模式执行多个不相关的任务。这些不相关的任务共享系统资源,但它们没有共同要解决的问题;也就是说,它们在逻辑上是独立的。现在,将这些任务想象成线程。并且,由于这些线程共享系统资源,您会遇到并发编程普遍存在的问题,例如死锁和数据争用。这使得并发编程极其困难,调试也变得复杂。
什么是并行性?并行性是指将一个特定的任务分解成一组相关的任务,以并发方式执行。太棒了!所以,再次将这些任务想象成线程,并行编程仍然存在并发编程的相同问题(死锁、数据争用等),并引入了新的挑战,最重要的是,在这些相关线程之间共享和划分数据。这使得并行编程更加困难,调试也更加复杂。:(
然而,并非全是坏消息。.NET 4.0 并行编程是一个巨大的进步。新的 API 解决了并行编程的许多问题(但不是全部),并大大简化了并行调试……
设置 .NET 并行编程的背景
前面的章节介绍了多线程编程及其需求。本节将逐步介绍 .NET Framework 中并行编程的背景。
- 我们拥有多核机器,需要考虑一种利用其力量的方式。因此,需要考虑并行编程。
- 并行编程到底是什么?它是关于如何将一块工作分解成多个并发单元。
- 我们有哪些划分选项?选项 1 是静态划分。静态划分是“硬编码”您要使用的单元和线程数。例如,说您想将大量工作分成“n”个单元,每个单元在单个核心上运行一个线程。这种方法的问题在于它不具可扩展性。虽然这个解决方案在一台有“n”个核心的机器上效果很好,但对于拥有“n+m”个核心的机器来说,它很快就会变得不可扩展。
- 解决划分问题的自然演进应该是动态划分。这意味着您将工作划分为每个可用核心一个单元。虽然这可以解决可扩展性问题,但它会引入另一个问题:负载不平衡。在实际问题中,所有工作单元花费相同时间的可能性非常小。这意味着一些核心将完成它们的工作,而其他核心仍将忙碌,无法利用现在空闲的核心。
- 沿着解决方案链向上移动,我们现在有了另一个选项:将工作划分为最小可行单元,这将利用最可行数量的可用线程。这将解决负载不平衡问题,因为工作被划分为最小的可能单元,每个单元将在可用线程上运行。然而,现在我们遇到了一个问题:线程开销。创建和销毁线程是一个昂贵的过程。
- 这使我们达到了最终的划分选项:.NET 线程池。 .NET 线程池维护一个调度程序和一个线程队列。当一个线程完成工作后,它不会被销毁,而是被放入队列中,以便下一个计划执行的工作单元可以拾取。线程池通过
System.Threading.ThreadPool
命名空间公开,并且自 .NET v 4.0 之前就已可用。
等等,那么 .NET 4.0 并行编程模型中究竟有什么新东西,如果上述讨论适用于 v 4.0 之前?答案将在检查线程池的不足之处时得到。线程池是一种“即发即弃”的编程模型。一旦您通过代码创建了一个线程,您基本上无法对其执行任何操作。不支持继续、组合、监视数据流、执行工作取消和工作等待。
那么解决方案是什么?它是 .NET 4.0 基于任务的编程。在 Framework 的 v 4.0 中,您可以对任务进行编程,而不是对低级线程进行编程。现在,您可以将工作分配给任务而不是线程。这将为您提供对并行工作的更大能力和控制,我们稍后会看到。当然,如果您想使用更底层的线程细节,仍然可以根据需要直接使用线程池。
.NET 4.0 并行编程
.NET 4.0 的并行编程模型由以下部分组成:
- 任务并行库 (TPL):这是前面章节讨论的基于任务的编程的基础。它包括:
Task
类:您将对其进行编码的工作单元,而不是以前的线程模型。Parallel
类:一个static
类,它公开了一些并行性质问题的基于任务的版本。更具体地说,它包含以下方法:For
Foreach
Invoke
- Parallel LINQ (PLINQ):构建在 TPL 之上,并公开熟悉的 LINQ 作为并行扩展。
关于示例
讨论的所有示例都可下载。为了说明主要思想,下面各节将展示一些代码示例。
所有示例都使用 VS2010 RC1 构建;无需其他任何东西。为了感受并行编程的价值,您需要一台多核(或多处理器)机器;这里显示的结果是在双核机器上运行示例的结果。
示例 1:任务
第一个示例展示了新的任务编程模型。如前所述,与线程池相比,任务提供了对线程编程的更多控制。事实上,您现在处理的是任务而不是线程。
static void Main(string[] args)
{
Task t = Task.Factory.StartNew(() =>
{
Console.WriteLine("I am the first task");
});
var t2 = t.ContinueWith(delegate
{
//simulate compute intensive
Thread.Sleep(5000);
return "Tasks Example";
});
//block1
//string result = t2.Result;
//Console.WriteLine("result of second task is: " + result);
//end block1
//block2
//t2.ContinueWith(delegate
// {
// Console.WriteLine("Here i am");
// });
//Console.WriteLine("Waiting my task");
//end block2
Console.ReadLine();
}
在上面的代码中,我们首先使用 Task
类创建一个新任务,并将要在此 Task
中运行的代码(在单独的线程中)传递给 lambda 表达式。
另一个特性是能够“继续”一个正在运行的 Task
。这意味着一旦 Task
完成(在我们的示例中是 Task
"t
"),就可以开始运行另一个 Task
"t2
",并通过匿名方法(通过委托)提供要运行的代码。
现在取消注释块 1。在这里,我们看到了另一个特性,即从 Task
获取结果。请注意,我们通过“return
”语句为 Task
"t2
" 分配了一个名为“Tasks Example”的值。现在在块 1 中,我们可以检索该值。这引发了一个问题:如果语句“t2.Result
”在“t2
”完成执行之前执行(回想一下,“t2
”是在与主程序不同的线程上运行的),会发生什么?强制执行此行为(这就是我使用 Thread.Sleep
的原因),您会看到在这种情况下,主线程将在“t2.Result
”处等待,直到“t2
”完成执行并执行其“return
”语句。真棒!这是以前不可能实现的功能。
现在取消注释块 2。在第一个和第二个“Console.WriteLine
”语句处设置两个断点。运行程序,并注意到“Waiting my task”比“Here I am”先打印。这表明主程序(线程)正在等待另一个线程上运行的任务“t2
”完成。一旦“t2
”完成(通过使用 Thread.Sleep
延迟),它的语句就会被打印出来。
示例 2:Parallel
本示例介绍了 Parallel
类。具体来说,它展示了如何使用新的 For
方法来执行通常串行执行的工作。
static void Main(string[] args)
{
Stopwatch watch;
watch = new Stopwatch();
watch.Start();
//serial implementation
for (int i = 0; i < 10; i++)
{
Thread.Sleep(1000);
//Do stuff
}
watch.Stop();
Console.WriteLine("Serial Time: " + watch.Elapsed.Seconds.ToString());
//parallel implementation
watch = new Stopwatch();
watch.Start();
System.Threading.Tasks.Parallel.For(0, 10, i =>
{
Thread.Sleep(1000);
//Do stuff with i
}
);
watch.Stop();
Console.WriteLine("Parallel Time: " + watch.Elapsed.Seconds.ToString());
Console.ReadLine();
}
上面的代码显示了串行执行一个工作单元(通过 Sleep
操作模拟)与并行执行工作单元之间的执行时间差异。在我的双核机器上,时间减少了一半。这是一个巨大的改进!想象一下您通常在 For
循环(或 Foreach
循环)中完成的所有工作,并考虑通过利用新的 Parallel.For
和 Parallel.Foreach
方法可以获得的改进时间。
一个重要的注意事项:有时并不那么容易。并行化工作时,有时您必须处理诸如同步和死锁之类的特殊问题;您仍然必须处理这些问题,我将在后续示例中进行讨论。
为了变化起见,让我们看看 Parallel.For
方法的一个重载。为了支持从并行循环中退出,Parallel.For
方法有一个重载,它接受 ParallelLoopState
作为 Action
委托中的参数。
Parallel.For(0,1000,(int i, ParallelLoopState loopState) =>
{
If(i==500)
loopState.Break();
//or loopState.Stop();
return; // to ensure that this iteration also returns
//do stuff
});
Break
和 Stop
方法之间的区别在于 Break
确保在退出循环之前执行所有已开始的迭代。
Stop
方法不提供此类保证;它基本上表示循环已完成,应尽快退出。
两种方法都会阻止将来的迭代运行。
示例 3:PLINQ
PLINQ 是一个 LINQ 提供程序,因此您仍然可以使用熟悉的 LINQ 模型。然而,在底层模型中,PLINQ 使用多个线程来评估查询。
static void Main(string[] args)
{
Stopwatch watch;
//for loop
watch = new Stopwatch();
watch.Start();
bool[] results = new bool[arr.Length];
for (int i = 0; i < arr.Length; i++)
{
results[i] = IsPrime(arr[i]);
}
watch.Stop();
Console.WriteLine("For Loop took: " + watch.Elapsed.Seconds);
//LINQ to Objects
watch = new Stopwatch();
watch.Start();
bool[] results1 = arr.Select(x => IsPrime(x))
.ToArray();
watch.Stop();
Console.WriteLine("LINQ took: " + watch.Elapsed.Seconds);
//PLINQ
watch = new Stopwatch();
watch.Start();
bool[] results2 = arr.AsParallel().Select(x => IsPrime(x))
.ToArray();
watch.Stop();
Console.WriteLine("PLINQ took: " + watch.Elapsed.Seconds);
Console.ReadLine();
}
上面的代码展示了如何使用 LINQ(和 For
)与 PLINQ 来执行一个故意耗时的 IsPrime
函数。 AsParallel
扩展允许您的代码使用 PLINQ 而不是 LINQ 运行。使用 AsParallel
的步骤是使用 ParallelQuery
包装器包装数据源,并使查询中剩余的扩展方法绑定到 PLINQ 而不是 LINQ to Objects。
运行示例,您会注意到,尽管使用串行 For
循环和传统 LINQ 的执行时间几乎相同(实际上与许多人的看法相反,LINQ 比 For
循环有性能开销……但这是另一回事),但使用 PLINQ 的执行时间几乎缩短了一半。
示例 4:PLINQ 扩展
PLINQ 具有许多影响工作划分方式的扩展。在本例中,我们将讨论其中一个扩展。请检查下面的代码。
static void Main(string[] args)
{
Stopwatch watch = new Stopwatch();
watch.Start();
int[] src = Enumerable.Range(0, 200).ToArray();
var query = src.AsParallel()
.Select(x => ExpensiveFunc(x));
foreach (var x in query)
{
Console.WriteLine(x);
}
watch.Stop();
Console.WriteLine("Elapsed: " + watch.Elapsed.Seconds.ToString());
Console.ReadLine();
}
private static int ExpensiveFunc(int x)
{
Thread.Sleep(1);
return x;
}
此代码与前面的示例类似。它展示了如何使用 PLINQ 的 AsParallel
扩展来执行函数。现在运行程序。您将看到一些可能没有预料到的结果。请注意,打印了 0 到 199 的值;但是,它们的打印顺序不正确。
思考一下:我们正在使用一个从 0 到 200 的枚举器。然而,由于我们使用的是 PLINQ 而不是 LINQ,因此划分本身不是串行的。工作被分成随机的并行单元;例如,从 0 到 10 分配给线程 A,而从 11 到 30 分配给线程 B,依此类推。一旦一个工作单元完成,它的输出就会立即刷新回最终结果。这就是为什么输出不是按顺序排列的原因。下图对此进行了说明。
现在,编辑代码并使用 AsOrdered
扩展,如下所示:
var query = src.AsParallel().AsOrdered()
.Select(x => ExpensiveFunc(x));
再次运行示例,这次请注意输出是从 0 到 199 按顺序排列的。这次,AsOrdered
扩展强制使用缓冲区,将所有工作单元的输出收集并排序,然后再刷新回最终输出。下图显示了该过程。
示例 5:数据争用
在示例 2 中,我提到在进行多线程编程时可能会发生数据争用。本示例展示了数据争用场景及其解决方法。
static void Main(string[] args)
{
Stopwatch watch = new Stopwatch();
watch.Start();
Parallel.For(0, 100000, i =>
{
Thread.Sleep(1);
counter++;
});
watch.Stop();
Console.WriteLine("Seconds Elapsed: " + watch.Elapsed.Seconds);
Console.WriteLine(counter.ToString());
Console.ReadKey();
}
上面的代码使用 Parallel.For
方法执行一个简单的 Sleep
语句。每次执行循环体时,计数器都会递增。
如果您在多核机器上运行该程序,您期望看到什么数字?100000 是符合逻辑的结果。现在,运行程序,您会发现这个数字小于 100000(如果您得到 100000,那只是运气好,所以请再试一次)。那么这里发生了什么?为什么会出错,尽管它似乎如此明显?答案是发生了数据争用。
让我们解释一下。这里的诀窍在于理解 counter++
语句是如何执行的。在中间语言 (IL) 级别,counter++
不再是单个语句。实际上,它由 JIT(即时编译器)执行 4 个 IL 步骤:
- 获取计数器的当前值
- 获取值
1
- 将两个数字相加
- 将结果保存回计数器
那么这与并行编程和使用 Parallel.For
有什么关系呢?当像现在这样在多个核心上执行时,循环体将并行地在多个线程上执行。下图解释了这种情况:
结果是,由于两个线程同时获得了对变量 counter 的访问权限,因此一个步进增量被忽略了。上述情况发生了多次,您将得到错误的最终值,正如我们在运行程序时所遇到的那样。这是一个典型的数据争用案例。
这种情况下的一种解决方案是锁定变量,这样一旦一个线程正在访问该变量,另一个线程就必须等待,直到锁被释放,才能访问该变量。虽然这会引入时间延迟,但这是我们为解决数据争用而必须付出的代价。
我们可以锁定变量的方法有很多,并且在此处进行详尽的讨论超出了本文的范围,但我将在我的情况下使用最佳选项,即 Interlocked.Increment
方法,如下所示(“lock
”语句是另一个在本例中使用的示例;注释显示了如何使用它代替):
static void Main(string[] args)
{
//Object thisLock = new Object(); //if not using Interlocked
Stopwatch watch = new Stopwatch();
watch.Start();
Parallel.For(0, 100000, i =>
{
Thread.Sleep(1);
#region L
Interlocked.Increment(ref counter);
//lock (thisLock) //lock is another method and is done using two lines
// counter++;
#endregion
//counter++;
});
watch.Stop();
Console.WriteLine("Seconds Elapsed: " + watch.Elapsed.Seconds);
Console.WriteLine(counter.ToString());
Console.ReadKey();
}
示例 6:调试
注意:此示例由 Microsoft 构建。
static void Main(string[] args)
{
var primes =
from n in Enumerable.Range(1, 10000000)
.AsParallel()
.AsOrdered()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
where IsPrime(n)
select n;
foreach (var prime in primes)
Console.Write(prime + ", ");
}
public static bool IsPrime(int numberToTest)
{
if (numberToTest == 2) return true;
if (numberToTest < 2 || (numberToTest & 1) == 0) return false;
int upperBound = (int)Math.Sqrt(numberToTest);
for (int i = 3; i < upperBound; i += 2)
{
if ((numberToTest % i) == 0) return false;
}
// It's prime!
return true;
}
上面的代码使用 PLINQ 来打印给定范围内的所有素数。并行代码本身没有什么特别之处,除了 IsPrime
函数中故意包含了一个逻辑错误:运行程序时,您会注意到数字 9、15 和 25 被报告为素数,这是错误的。现在,假设您想调试 IsPrime
函数并在其中设置断点。运行程序,您会很快注意到这里的问题:由于有多个线程在执行,因此调试行为是不确定的,因为当线程轮流命中断点时,焦点会从一个线程切换到另一个线程。
为了解决这个问题,Microsoft 为我们提供了并行任务窗口(调试菜单 -> 窗口 -> 并行任务)。
通过此窗口,您可以在调试模式下看到所有正在运行的线程。您可以选择冻结除您想要实际调试的线程之外的所有线程,完成后,您可以运行其他线程。
另一个有用的窗口是并行堆栈窗口(调试菜单 -> 窗口 -> 并行堆栈)。
通过此窗口,您可以直观地看到程序正在运行的线程及其来源。
示例 7:死锁
注意:此示例由 Microsoft 构建。
示例 5 解释了数据争用以及如何通过锁来处理它们。然而,您应该注意,锁定可能会以死锁的丑陋形式困扰您。简单地说,死锁是指线程 TA
锁定了资源 R1
,而线程 TB
锁定了资源 R2
,现在 TA
需要 R2
才能继续,而 TB
需要 R1
才能继续。两个线程都卡住了,无法继续,从而导致死锁。
让我们看一个死锁的例子。
static void Main(string[] args)
{
int transfersCompleted = 0;
WatchDeadlock.BreakIfRepeats(() => transfersCompleted, 500);
BankAccount a = new BankAccount { Balance = 1000 };
BankAccount b = new BankAccount { Balance = 1000 };
while (true)
{
Parallel.Invoke(
() => Transfer(a, b, 100),
() => Transfer(b, a, 100));
transfersCompleted += 2;
}
}
class BankAccount { public int Balance; }
static void Transfer(BankAccount one, BankAccount two, int amount)
{
lock (one)
{
lock (two)
{
one.Balance -= amount;
two.Balance += amount;
}
}
}
上面的代码使用 Parallel.Invoke
来并发执行银行账户“a
”到“b
”以及反之的资金转账。在 Transfer
方法内部,对两个对象都应用了锁(使用“lock
”语句),目的是使其对并发执行是线程安全的。这种锁定方法会导致死锁场景。
“WatchDeadlock
”类用于在检测到一定时间内没有新的转账完成后,使用 Debugger.Break
强制中断。
运行程序,看看我们是如何陷入死锁的。
让我们看看 VS 2010 的调试支持如何帮助我们识别此类死锁。在程序运行时,打开并行任务窗口。该窗口会清楚地向您显示死锁情况。不仅如此,它还会具体告诉您导致锁定的两个线程以及每个线程如何等待另一个线程拥有的资源(在本例中是 Bank
对象)。
那么如何避免死锁呢?答案是:干脆别发生。在进行并行编程和使用锁时,请始终以不会发生死锁的方式进行编程!
要点
.NET 4.0 中的新并行编程显然解决了多线程复杂性中的许多问题。它以任务的形式为您提供了对线程的更多控制,它为您提供了执行并行操作的许多方法重载,例如 For
、Foreach
和 Invoke
,它为您提供了 PLINQ 的强大功能,它会为您处理划分,并且它引入了 .NET 4.0 中的新线程安全集合,例如 ConcurrentDictionary
、ConcurrentQueue
和 ConcurrentStack
。此外,改进的调试支持使处理并行编程相关问题更加容易。但是,您仍然需要关注多线程相关概念,例如数据争用和死锁。
更好的消息是,对 Microsoft 而言,并行编程仍然是一个正在进行中的项目,因此我们可以期待在后续版本中获得更多功能和改进的支持。