C# 4.0 中的并行计算概念






4.72/5 (25投票s)
一篇介绍 .NET 4.0 中并行计算基础的文章
引言
.NET 4.0 紧随计算行业追求高密度的步伐。一直以来,人们都在努力提高性能,同时在更短的时间内完成更多的工作。Parallel LINQ、Parallel 类、任务并行构造以及并发集合是 Framework 4.0 的新特性,统称为 PFX (Parallel Framework)。Parallel 类与任务并行构造合称为任务并行库 (TPL)。这是 .NET 的一项必要补充,因为 CPU 时钟速度已经停滞不前,制造商已将重点转移到增加核心数量上。这对我们程序员来说是个问题,因为我们标准的单线程代码不会因为这些额外核心而自动运行得更快。鉴于此,本文将探讨 C# 4.0 语言和 .NET 4.0 运行时中的并行编程。有一点需要澄清:多线程应用程序可以在单个核心上并发运行代码。这段代码同时执行,但不是并行执行。当线程在多个核心之间同时执行时,我们就可以说我们通过并行编程实现了并发。此外,数据并行使用某些操作的输入数据作为将其划分为更小块的方法。数据被分配给可用的硬件处理器以实现并行。此分区步骤通常会复制并执行一些基本独立的程序。因此,利用多核对大多数服务器应用程序来说很容易,因为每个线程都可以独立处理单独的客户端请求,但在桌面应用程序上则更难——因为它通常要求您将计算密集型代码进行以下操作
- 将其划分为小块
- 通过多线程并行执行这些块
- 以线程安全的方式汇总结果
PFX 库专门设计用于帮助解决这些场景。利用多核或多处理器进行编程称为并行编程。这是更广泛的多线程概念的一个子集。有几种策略可以将工作分配给线程:数据并行(已提及,在此重复)和任务并行。当需要对许多数据值执行一组任务时,我们可以通过让每个线程对一部分值执行(相同的)任务集来进行并行化。这称为数据并行,因为我们将数据分发给线程。相比之下,对于任务并行,我们划分任务;换句话说,让每个线程执行不同的任务。通常,数据并行更容易,并且能更好地扩展到高度并行的硬件,因为它减少或消除了共享数据(从而减少了争用和线程安全问题)。此外,数据并行利用了数据值通常比离散任务多的事实,增加了并行潜力。
好的。那么如何并行化循环并使用并行扩展?
下面的算法是 PFX 提供的并行扩展。该文件在命令行上编译为 DLL,或者在 Visual Studio 2010 中作为类构建,然后作为引用到演示顺序代码和并行代码之间时间差异的程序。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace System.Threading.Algorithms
{
public static partial class ParallelAlgorithms
{
public static void Wavefront(
int numRows, int numColumns,
int numBlocksPerRow, int numBlocksPerColumn,
Action<int, int, int, int> processBlock)
{
// Validate parameters
if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
if (numBlocksPerRow <= 0 || numBlocksPerRow > numRows)
throw new ArgumentOutOfRangeException("numBlocksPerRow");
if (numBlocksPerColumn <= 0 || numBlocksPerColumn > numColumns)
throw new ArgumentOutOfRangeException("numBlocksPerColumn");
if (processBlock == null)
throw new ArgumentNullException("processRowColumnCell");
// Compute the size of each block
int rowBlockSize = numRows / numBlocksPerRow;
int columnBlockSize = numColumns / numBlocksPerColumn;
Wavefront(numBlocksPerRow, numBlocksPerColumn, (row, column) =>
{
int start_i = row * rowBlockSize;
int end_i = row < numBlocksPerRow - 1 ?
start_i + rowBlockSize : numRows;
int start_j = column * columnBlockSize;
int end_j = column < numBlocksPerColumn - 1 ?
start_j + columnBlockSize : numColumns;
processBlock(start_i, end_i, start_j, end_j);
});
}
public static void Wavefront(int numRows, int numColumns, Action<int, int>
processRowColumnCell)
{
// Validate parameters
if (numRows <= 0) throw new ArgumentOutOfRangeException("numRows");
if (numColumns <= 0) throw new ArgumentOutOfRangeException("numColumns");
if (processRowColumnCell == null) throw
new ArgumentNullException("processRowColumnCell");
Task[] prevTaskRow = new Task[numColumns];
Task prevTaskInCurrentRow = null;
var dependencies = new Task[2];
// Create a task for each cell
for (int row = 0; row < numRows; row++)
{
prevTaskInCurrentRow = null;
for (int column = 0; column < numColumns; column++)
{
// In-scope locals for being captured in the task closures
int j = row, i = column;
// Create a task with the appropriate dependencies.
Task curTask;
if (row == 0 && column == 0)
{
// Upper-left task kicks everything off, having no dependencies
curTask = Task.Factory.StartNew(() => processRowColumnCell(j, i));
}
else if (row == 0 || column == 0)
{
var antecedent = column == 0
? prevTaskRow[0] : prevTaskInCurrentRow;
curTask = antecedent.ContinueWith(p =>
{
p.Wait(); // Necessary only to propagate exceptions
processRowColumnCell(j, i);
});
}
else // row > 0 && column > 0
{
// All other tasks depend on both the tasks above and to the left
dependencies[0] = prevTaskInCurrentRow;
dependencies[1] = prevTaskRow[column];
curTask = Task.Factory.ContinueWhenAll(dependencies, ps =>
{
Task.WaitAll(ps); // Necessary only to propagate exceptions
processRowColumnCell(j, i);
});
}
// Keep track of the task just created for future iterations
prevTaskRow[column] = prevTaskInCurrentRow = curTask;
}
}
// Wait for the last task to be done.
prevTaskInCurrentRow.Wait();
}
}
}
编译
csc /target:library /out:Algorithms_Wavefront.dll Algorithms_Wavefront.cs
我们将此 DLL 作为 Visual Studio 2010 中的附加引用使用,或在命令行中使用 ‘/reference:’ 开关。请注意 Action
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Linq.Parallel;
using System.Threading.Tasks;
using Microsoft.CSharp;
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
Stopwatch sw = new Stopwatch();
int result;
while (true)
{
string s1 = GenerateRandomString(rand);
string s2 = GenerateRandomString(rand);
sw.Restart();
result = SerialTimeLength(s1, s2);
sw.Stop();
Console.WriteLine("Serial :\t{0}\t{1}", result, sw.Elapsed);
sw.Restart();
result = ParallelTimeLength(s1, s2);
sw.Stop();
Console.WriteLine("Parallel:\t{0}\t{1}", result, sw.Elapsed);
Console.WriteLine("----------------------------------------------------");
GC.Collect();
}
}
private static string GenerateRandomString(Random rand)
{
const int LEN = 10000;
StringBuilder sb = new StringBuilder(LEN);
for (int i = 0; i < LEN; i++) sb.Append((char)('a' + rand.Next(0, 26)));
return sb.ToString();
}
private static int SerialTimeLength(string s1, string s2)
{
int[,] dist = new int[s1.Length + 1, s2.Length + 1];
for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;
for (int i = 1; i <= s1.Length; i++)
{
for (int j = 1; j <= s2.Length; j++)
{
dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
dist[i - 1, j - 1] :
1 + Math.Min(dist[i - 1, j],
Math.Min(dist[i, j - 1],
dist[i - 1, j - 1]));
}
}
return dist[s1.Length, s2.Length];
}
private static int ParallelTimeLength(string s1, string s2)
{
int[,] dist = new int[s1.Length + 1, s2.Length + 1];
for (int i = 0; i <= s1.Length; i++) dist[i, 0] = i;
for (int j = 0; j <= s2.Length; j++) dist[0, j] = j;
int numBlocks = Environment.ProcessorCount * 4;
System.Threading.Algorithms.ParallelAlgorithms.Wavefront(s1.Length, s2.Length,
numBlocks, numBlocks, (start_i, end_i, start_j, end_j) =>
{
for (int i = start_i+1; i <= end_i; i++)
{
for (int j = start_j+1; j <= end_j; j++)
{
dist[i, j] = (s1[i - 1] == s2[j - 1]) ?
dist[i - 1, j - 1] :
1 + Math.Min(dist[i - 1, j],
Math.Min(dist[i, j - 1],
dist[i - 1, j - 1]));
}
}
});
return dist[s1.Length, s2.Length];
}
}
csc /t:ParallelAlgorithms_Wavefront.dll Program.cs
注意顺序执行代码与并行化循环的代码之间的时间差异
Serial : 8811 00:00:06.3117762
Parallel: 8811 00:00:03.6879674
-------------------------------------------------------
Serial : 8791 00:00:06.3353283
Parallel: 8791 00:00:03.6033089
-------------------------------------------------------
Serial : 8813 00:00:06.1330880
Parallel: 8813 00:00:03.5224918
-------------------------------------------------------
Serial : 8787 00:00:06.2184788
Parallel: 8787 00:00:03.7236997
-------------------------------------------------------
Serial : 8810 00:00:06.1260435
Parallel: 8810 00:00:03.5696177
-------------------------------------------------------
Serial : 8800 00:00:06.1468799
Parallel: 8800 00:00:03.5442969
-------------------------------------------------------
Serial : 8807 00:00:06.3541186
Parallel: 8807 00:00:03.5775287
-------------------------------------------------------
Serial : 8819 00:00:06.2730393
Parallel: 8819 00:00:03.6345988
-------------------------------------------------------
Serial : 8801 00:00:06.2168016
Parallel: 8801 00:00:03.6277170
-------------------------------------------------------
Serial : 8762 00:00:06.1840874
Parallel: 8762 00:00:03.5964011
-------------------------------------------------------
Serial : 8810 00:00:06.1708841
Parallel: 8810 00:00:03.8727436
-------------------------------------------------------
Serial : 8799 00:00:06.1792801
Parallel: 8799 00:00:03.6226900
-------------------------------------------------------
Serial : 8822 00:00:06.1833951
Parallel: 8822 00:00:03.7146820
-------------------------------------------------------
Serial : 8794 00:00:06.1674580
Parallel: 8794 00:00:03.6571650
-------------------------------------------------------
Serial : 8803 00:00:06.2745721
Parallel: 8803 00:00:04.0783518
-------------------------------------------------------
Serial : 8787 00:00:06.5583867
Parallel: 8787 00:00:04.9902464
-------------------------------------------------------
Serial : 8803 00:00:06.5382290
Parallel: 8803 00:00:04.3523305
-------------------------------------------------------
Serial : 8792 00:00:06.2359404
Parallel: 8792 00:00:03.6123527
-------------------------------------------------------
Serial : 8810 00:00:06.2226213
Parallel: 8810 00:00:03.6718996
-------------------------------------------------------
Serial : 8816 00:00:06.1712133
Parallel: 8816 00:00:03.6129905
and so on …
计算的例子是什么?
圆周率 π 是一个超越数。这意味着它有一个无限不重复的十进制小数,没有规律。数学家已经计算出 π 的数千万位,但没有发现任何规律。

下面所示的代码是数值积分中点矩形法则的实现,用于求解上述积分。要计算曲线下方区域的近似值,我们必须通过找到每个矩形的中点 (mid) 来计算一些矩形 (num_rects
) 的面积,并计算该矩形的高度 (height),即该中点处的函数值。我们将所有矩形的高度相加 (sum),一旦计算完成,我们将高度的总和乘以矩形的宽度 (width) 来确定所需总面积 (area) 和 π 的值
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
static long num_rects=100000;
int main(int argc, char *argv[])
{
int i;
double mid, height, width, sum = 0.0;
double area;
width = 1.0/(double) num_rects;
for (i = 0; i < num_rects; i++){
mid = (i + 0.5) * width;
height = 4.0/(1.0 + mid*mid);
sum += height;
}
area = width * sum;
printf("Computed pi = %f\n",area);
}
Computed pi = 3.141593
因此,如果我们用托管 C# 计算 π,无论是串行计算还是并行计算,执行计算所花费的时间是否会接近?再看看基础 C 代码。我没有对这段代码进行多线程处理。两个工作变量 mid 和 height 在每次迭代中都会被赋值。如果每个线程都有一个本地副本,该副本可以在分配给线程的迭代执行期间使用。此外,迭代变量 i
会在每次迭代中更新。每个线程都需要一个本地副本,以避免干扰其他线程内的迭代执行。sum 变量在每次迭代中都会被更新,但由于该值在循环外部使用,因此我们不能让每个线程使用一个本地副本,该副本将在线程完成工作后被丢弃。
这是一个计算 π 五次的文件。每次计算的时间从最长到最短。第一次计算使用一种名为 SerialLinqPI()
的方法。我们从该方法开始,建立一个基准,了解计算可以更快、更好地执行。
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
class Program
{
const int num_steps = 100000000;
static void Main(string[] args)
{
while (true)
{
Time(() => SerialLinqPi());
Time(() => ParallelLinqPi());
Time(() => SerialPi());
Time(() => ParallelPi());
Time(() => ParallelPartitionerPi());
Console.WriteLine("----");
Console.ReadLine();
}
}
static void Time<t>(Func<t> work)
{
var sw = Stopwatch.StartNew();
var result = work();
Console.WriteLine(sw.Elapsed + ": " + result);
}
static double SerialLinqPi()
{
double step = 1.0 / (double)num_steps;
return (from i in Enumerable.Range(0, num_steps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
static double ParallelLinqPi()
{
double step = 1.0 / (double)num_steps;
return (from i in ParallelEnumerable.Range(0, num_steps)
let x = (i + 0.5) * step
select 4.0 / (1.0 + x * x)).Sum() * step;
}
static double SerialPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
for (int i = 0; i < num_steps; i++)
{
double x = (i + 0.5) * step;
sum = sum + 4.0 / (1.0 + x * x);
}
return step * sum;
}
static double ParallelPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
{
double x = (i + 0.5) * step;
return local + 4.0 / (1.0 + x * x);
}, local => { lock (monitor) sum += local; });
return step * sum;
}
static double ParallelPartitionerPi()
{
double sum = 0.0;
double step = 1.0 / (double)num_steps;
object monitor = new object();
Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0,
(range, state, local) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
double x = (i + 0.5) * step;
local += 4.0 / (1.0 + x * x);
}
return local;
}, local => { lock (monitor) sum += local; });
return step * sum;
}
}
正如您现在可能预料到的,计算时间会根据我们如何划分数据和并行化循环而减少。
00:00:05.6078739: 3.14159265359043
00:00:04.0519734: 3.14159265358991
00:00:01.4407659: 3.14159265359043
00:00:01.2224946: 3.14159265358996
00:00:00.8162013: 3.14159265358965
人们可能会认为最简单的静态分解形式是将循环大小除以处理器数量,以获得每个线程的迭代次数,并让每个线程处理一系列连续的迭代。这实际上并不是最有效的方法,因为它可能导致处理器使用效率低下。为了从数据并行中获益,应该仔细研究不同类型的分解方法。
参考文献
- Microsoft 的 Stephen Toub 撰写的“并行编程模式”