任务并行库:4/n






4.93/5 (60投票s)
探讨使用任务并行库。
引言
这是我提出的TPL系列文章的第四部分。上次,我介绍了Parallel For和Foreach,并涵盖了这方面的内容
- 并行 For/Foreach
- 创建简单的并行 For/Foreach
- 中断和停止并行循环
- 处理异常
- 取消并行循环
- 为了提高性能而进行分区
- 使用线程本地存储
这一次,我们将研究如何使用并行LINQ,或更广为人知的PLINQ。我们还将研究如何执行通常的TPL操作,例如取消和处理异常,此外,我们还将研究如何使用自定义分区和自定义聚合。
文章系列路线图
这是可能发布的6篇文章中的第4篇,我希望大家会喜欢。下面是我希望涵盖的大致大纲
现在,我知道有些人会简单地阅读本文并说它与MSDN上已有的内容相似,我部分同意这一点,但是出于以下几个原因,我仍然选择承担撰写这些文章的任务
- 只有前几篇文章会展示与MSDN相似的想法,之后,我认为我将涉及的材料将不会在MSDN上,而是我个人对TPL进行研究的结果,我将在文章中概述这些内容,因此您将从我的研究中受益,只需阅读即可…是的,不错
- 这里会有实时输出的屏幕截图,这是MSDN上没有太多内容的,这可能有助于一些读者加深对文章内容的理解
- 可能有一些读者甚至从未听说过任务并行库,因此不会在MSDN上遇到它,你知道老一套了,首先你需要知道你在寻找什么。
- 我喜欢关于线程的文章,所以喜欢写它们,所以我写了它们,将会写它们,已经写了它们,并将继续写它们。
说了这么多,如果人们读完本文后,真的认为它与MSDN太过相似(我仍然希望它不会),也请告诉我,我将尝试调整后续文章以弥补。
目录
总之,本文将涵盖以下内容
PLinq简介
正如现在大多数.NET开发人员所知,.NET内置了查询数据的功能,这被称为LINQ(Language Integrated Query,简称LINQ),它有几种主要形式:LINQ to Objects、LINQ to SQL/EF和LINQ to XML。
我们都可能已经习惯了在日常生活中编写这样的代码
(from x in someData where x.SomeCriteria == matchingVariable select x).Count();
或者
(from x in peopleData where x.Age > 50 select x).ToList();
这是.NET语言中的一个有价值的补充,没有LINQ我肯定不行。问题在于TPL的设计者已经考虑到了这一点,并可能看到了大量的LINQ代码,这些代码只是循环查找某个特定项,或者计算满足某些Predicate<T>
的项,或者我们执行一些聚合,例如普通的LINQ扩展方法Sum()
、Average()
、Aggregate()
等。
现在看来,当我们只是遍历简单查询的结果时,这些查询试图匹配Predicate<T>
,这非常适合并行化。有些其他区域不那么简单(至少在我看来),但仍然是可能的,幸运的是,TPL的设计者在PLINQ中使用时包含了一种方法来完成所有这些事情。
与标准LINQ一样,大多数PLINQ功能通过扩展方法提供,这些方法主要适用于ParallelQuery
和ParallelEnumerable
类。它们提供了许多熟悉的LINQ扩展方法。需要注意的是,使用PLINQ并不保证工作会更快,它只是提供了某种程度的异步性,正如我们稍后将看到的,有时TPL甚至会选择使用顺序版本的查询而不是PLINQ查询,如果它被分析并认为这是一个更好的选择。
这些领域以及普通的取消/异常处理将在本文的其余部分涵盖。
开始之前的重要说明
我正在使用一台双核笔记本电脑运行这些示例,有时为了屏幕截图的方便,我特意选择小数据集,这可能无法显示最佳的计时效果,但它不影响TPL代码的编写方式。所以,您必须忍受我这个小小的疏忽。
有用的PLinq扩展方法
与常规LINQ一样,PLINQ主要通过扩展方法实现。以下是您可能需要使用的大多数常用扩展方法的列表
扩展方法 | 描述 |
AsParallel() | 用于指定您希望数据源被异步查询 http://msdn.microsoft.com/en-us/library/system.linq.parallelenumerable.asparallel.aspx |
WithExecutionMode() | 设置查询的ExecutionMode ,它可以是以下enum 值之一默认值 这是默认设置。PLINQ将检查查询的结构,并且只有在可能导致加速时才会并行化查询。如果查询结构表明不太可能获得加速,那么PLINQ将像普通的LINQ to Objects查询一样执行该查询。 ForceParallelism 并行化整个查询,即使这意味着使用高开销的算法。在您知道查询的并行执行将导致加速,但PLINQ在默认模式下会将其作为顺序执行的情况下使用此标志。 http://msdn.microsoft.com/en-us/library/dd642145.aspx |
AsOrdered() | 启用将数据源视为已排序,覆盖默认的无序 |
WithDegreeOfParallelism() | 设置用于查询的并行度。并行度是用于处理查询的最大并发执行任务数 |
WithMergeOptions() | 设置此查询的合并选项,这些选项指定查询如何缓冲输出 |
WithCancellation() | 设置要与查询关联的System.Threading.CancellationToken |
这些应该有助于您掌握PLinq,我们将在本文的其余部分中看到其中一些方法的实际应用。
简单的PLinq示例
演示项目名称:SimpleParrallelLinq
标准LINQ场景
让我们从一个基本示例开始,好吗?下面所示的演示代码包含3个场景 - 它使用标准的(顺序)LINQ,如下所示
IEnumerable<double> results = StaticData.DummyRandomIntValues.Value
.Select(x => Math.Pow(x, 2));
foreach (int item in results)
{
Console.WriteLine("Result is {0}", item);
}
这一切都很熟悉,希望如此,没什么好说的,只是数据源是本文所有演示中使用的常见数据源,您可以在附带解决方案的ParallelLinq.Common
项目中找到此数据。
可能的PLinq场景
那么,我们如何指定要将某项内容运行为PLinq呢?嗯,实际上很简单,您只需在DataSource
上使用AsParallel()
扩展方法,这很重要,因为
someDataSource.AsParallel()
和
(from x in someDataSource where x.Age > 3 select x).AsParallel()
前者将尝试使用PLinq运行,而后者将在顺序运行查询后应用AsParallel()
,这是不正确的。所以请注意这一点,AsParallel()
需要在PLinq查询中的数据源上。
总之,现在您知道什么是PLinq查询了,让我们继续看第一个示例。
下一个场景可以使用Plinq运行,也可能使用顺序Linq运行?
var results2 = StaticData.DummyRandomIntValues.Value.AsParallel()
.Select(x => Math.Pow(x, 2));
foreach (int item in results2)
{
Console.WriteLine("Result is {0}", item);
}
什么?怎么回事?我们已经按照您告诉我们的那样,在正确的位置指定了AsParallel()
,这是怎么回事?嗯,问题在于,仅仅因为我们指定了AsParallel()
,并不一定意味着查询将被并行运行。实际上发生的是,TPL会分析查询,并确定查询最好是作为顺序查询运行还是作为并行查询运行。这就是为什么它可能不会异步运行。
但是,有一个程序化的方法可以让我们真正地并行化PLinq查询,这将在下面展示。
真正并行的PLinq场景
通过指定更多的扩展方法,我们可以更多地控制TPL为PLinq提供的功能。通过添加一个...
WithExecutionMode(ParallelExecutionMode.ForceParallelism)
…我们可以说我们不在乎TPL的分析阶段怎么想,我们知道得更多,我们想要这个查询并行运行,拜托。
这是一个例子。
var results3 = StaticData.DummyRandomIntValues.Value
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => Math.Pow(x, 2));
foreach (int item in results3)
{
Console.WriteLine("Result is {0}", item);
}
总之,这是运行这3个简单场景的结果
如我上面所述,我有时为了演示的目的使用小数据集,所以当您应用大型数据集和更多核心时,这些结果可能会有所不同。
排序
演示项目名称:SimpleOrdering
默认情况下,PLinq在实际异步运行时(可能根本不异步,正如我刚才解释的)不会保留顺序,但假设它是Async
,我们如何让它保留原始数据源中元素的顺序?这可以通过使用AsOrdered()
扩展方法轻松实现,如下所示。
此示例显示了一个顺序的标准LINQ查询,然后是一个PLinq(异步)查询,然后是另一个设置了AsOrdered()
的PLinq查询。请注意,只有顺序和AsOrdered()
的PLinq查询会保留顺序。
所以,如果顺序对您很重要,只需使用AsOrdered()
。
ManualResetEventSlim mre = new ManualResetEventSlim();
//***********************************************************************************
//
// SCENARIO 1 : Sequential (which will maintain order)
//
//***********************************************************************************
IEnumerable<int> results1 = StaticData.DummyOrderedIntValues.Value
.Select(x => x);
foreach (int item in results1)
{
Console.WriteLine("Sequential Result is {0}", item);
}
mre.Set();
//***********************************************************************************
//
// SCENARIO 2 : No Ordering At All
//
//***********************************************************************************
mre.Wait();
mre.Reset();
IEnumerable<int> results2 = StaticData.DummyOrderedIntValues.Value.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => x);
foreach (int item in results2)
{
Console.WriteLine("PLINQ Result is {0}", item);
}
mre.Set();
//***********************************************************************************
//
// SCENARIO 3 : Ordered
//
//***********************************************************************************
mre.Wait();
mre.Reset();
IEnumerable<int> results3 = StaticData.DummyOrderedIntValues.Value.AsParallel().AsOrdered()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => x);
foreach (int item in results3)
{
Console.WriteLine("PLINQ AsOrdered() Result is {0}", item);
}
这是运行的屏幕截图
使用范围
演示项目名称:ParallelRange
在使用标准LINQ时,使用Range投影来获取一系列值是很常见的。PLinq也提供了一种执行此操作的方法,如下所示
IEnumerable<int> results = (from i in ParallelEnumerable.Range(0, 100).AsOrdered() select i);
foreach (int item in results)
{
Console.WriteLine("Result is {0}", item);
}
运行起来就像这样
在此示例中,我使用ParallelQuery.AsOrderered(..)
扩展方法来保留顺序。
处理异常
在PLinq中处理异常与我们在本系列前面文章中看到的异常处理方式差别不大,我们只需要使用try
/catch
并确保捕获AggregateException
(以及可能的OperationCancelledException
),或者使用第一篇文章中讨论的任何异常处理方法。我通常使用try
/catch
,因为我在不使用TPL时在其他地方也使用它。
在使用PLinq时,最重要的一点是围绕枚举或使用PLinq结果的地方使用try
/catch
。
现在我将展示3种不同的场景,所有场景都使用150个Person
对象的_数据源,并简要讨论每种场景会发生什么,因为您有时可能会对获得的结果感到惊讶。我对一些结果感到非常惊讶,不得不请朋友Steve Soloman(又名Steve“The Thread”)来看,我们一起得出了一些结论,这些结论在下面的3个场景中进行了概述。
使用List<T>作为数据源
IEnumerable<Person> results1 =
StaticData.DummyRandomPeople.Value.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
if (x.Age >= 100)
throw new InvalidOperationException(
"Can only accept items < 100");
else
return x;
});
//Put try catch around the enumerating over the results of the PLINQ query
try
{
foreach (Person item in results1)
{
Console.WriteLine("Result is {0}", item);
}
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("PLinq over List<T> caught exception '{0}'",
ex.Message));
}
}
第一个场景使用List<Person>
对象作为数据源。由于我们使用的是List<T>
,PLinq知道我们需要查询多少项,因此它很可能使用默认的分区器(数据源被分成分区器决定的块),因此我们得到类似这样的结果(结果可能因您的机器而异)
使用IEnumerable<T>作为数据源
下一个场景使用IEnumerable<Person>
IEnumerable<Person> results2 =
StaticData.DummyRandomPeopleEnumerable().AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
if (x.Age >= 100)
throw new InvalidOperationException(
"Can only accept items < 100");
else
return x;
});
//Put try catch around the enumerating over the results of the PLINQ query
try
{
foreach (Person item in results2)
{
Console.WriteLine("Result is {0}", item);
}
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("PLinq over IEnumerable<T> caught exception '{0}'",
ex.Message));
}
}
这个的_问题是,因为我们**没有**指定AsOrdered()
,PLinq不会尝试保持任何顺序,而且由于数据源是Enumerable<Person>
,机灵的您会注意到它不是一个明确长度的列表,因此,PLinq在不枚举的情况下不知道数据源中有多少项,所以它无法使用分区,并且必须枚举_所有_结果,因此这次我们得到的结果完全不同,如下所示(结果可能因您的PC而异)
使用IEnumerable<T>数据源,然后将其用作Ordered()
我想展示的最后一个场景是使用IEnumerable<Person>
,但这次指定了AsOrdered()
子句,这应该会强制PLinq保留顺序。
IEnumerable<Person> results3 =
StaticData.DummyRandomPeopleEnumerable().AsParallel()
.AsOrdered()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.WithDegreeOfParallelism(2)
.Select(x =>
{
if (x.Age >= 100)
throw new InvalidOperationException(
"Can only accept items < 100");
else
return x;
});
//Put try catch around the enumerating over the results of the PLINQ query
try
{
foreach (Person item in results3)
{
Console.WriteLine("Result is {0}", item);
}
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("PLinq over IEnumerable<T>
using AsOrdered() caught exception '{0}'",
ex.Message));
}
}
这里的问题是,在后台仍然使用Task
,TPL会在Task
中完成工作,然后获取每个Task
的Task.Result
(这是一个特殊的Trigger
方法,会导致AggregateException
被观察到),然后将合并后的Task.Result
合并成一个有序的结果集。但由于我们在后台使用Task
,并且还使用Task.Result
,我们期望会更快地看到AggregateException
,所以让我们看看结果
这确实是一个模糊的世界……但只要您记住TPL在后台使用任务,您应该没问题的。
取消PLinq查询
演示项目名称:Cancellation
我之前已经写了3篇关于TPL的文章,在每一篇中,我都讨论了如何使用CancellationToken
来取消某些与TPL相关的功能,因此,我现在假设您已经熟悉CancellationToken
的工作原理。
值得注意的是,您如何将CancellationToken
注册到PLinq查询,这可以通过上面扩展方法表中显示的.WithCancellation(tokenSource.Token)
扩展方法轻松实现。
这是一个小例子。在此示例中,我们创建一个新的PLinq查询,该查询使用.WithCancellation(tokenSource.Token)
扩展方法,然后启动一个Task
,该任务将在一段时间后取消PLinq查询。我们显然必须确保捕获OperationCancelledException
和AggregateException
,这在处理TPL时是典型的。
这是完整的代码列表
// create a cancellation token source
CancellationTokenSource tokenSource = new CancellationTokenSource();
IEnumerable<double> results =
StaticData.DummyRandomHugeIntValues.Value
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithCancellation(tokenSource.Token)
.Select(x => Math.Pow(x,2));
// create a task that will wait for 500 ms and then cancel the token
Task.Factory.StartNew(() =>
{
Thread.Sleep(500);
tokenSource.Cancel();
Console.WriteLine("Cancelling");
});
//now try and use the results, and make sure we catch Exceptions
try
{
foreach (int item in results)
{
Console.WriteLine("Result is {0}", item);
}
}
catch (OperationCanceledException opcnclEx)
{
Console.WriteLine("Operation was cancelled");
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
以下是演示所有内容运行的演示
可能提高性能的分区
演示项目名称:CustomPartitioning
要并行化数据源上的操作,关键步骤之一是将源分割成多个部分,多个线程可以并发访问这些部分。PLINQ和任务并行库(TPL)提供默认的分区器,当您编写并行查询或ForEach
循环时,这些分区器会透明工作。对于更高级的场景,您可以插入自己的分区器。
在上一篇文章(Parallel For Partitioning)中,我讨论了当工作负载委托很小时,如何使用自定义分区器来提高并行for
/foreach
循环的速度。PLinq也支持分区,并且有一个很好的MSDN文章讨论了可以使用的不同类型分区。
链接在此:http://msdn.microsoft.com/en-us/library/dd997411.aspx
对于这个项目的演示应用程序,我创建了一个简单的静态分区器(通过继承Partitioner<T>
),它将数据源分割成n个分区块,其中partitionCount
由TPL本身指定,并通过覆盖Partitioner<T>
方法提供给我的自定义分区器。
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
这是创建PLinq简单自定义分区的完整代码(很可能这比使用标准的PLinq分区器没有优势,但它确实展示了如何创建自己的自定义分区器)。
public class SimpleCustomPartitioner<T> : Partitioner<T>
{
private T[] sourceData;
public SimpleCustomPartitioner(T[] sourceData)
{
this.sourceData = sourceData;
}
public override bool SupportsDynamicPartitions
{
get
{
return false;
}
}
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
IList<IEnumerator<T>> partitioned = new List<IEnumerator<T>>();
//work out how many items will go into a single partition
int itemsPerPartition = sourceData.Length / partitionCount;
//now create the partitions, all but the last one,
//which we treat as special case
for (int i = 0; i < partitionCount - 1; i++)
{
partitioned.Add(GetItemsForPartition(i * itemsPerPartition,
(i + 1) * itemsPerPartition));
}
//now create the last partition
partitioned.Add(GetItemsForPartition((partitionCount - 1) *
itemsPerPartition, sourceData.Length));
return partitioned;
}
private IEnumerator<T> GetItemsForPartition(int start, int end)
{
for (int i = start; i < end; i++)
yield return sourceData[i];
}
}
这是一个使用此分区的简短演示。此代码包含3个场景
- 顺序LINQ
- 使用Plinq(使用默认分区器,因为我们有一个项目数组,TPL和PLinq可以使用默认分区器)
- 使用带有自定义分区的PLing(正如我所说,这可能不会比TPL提供的默认PLinq分区器更好)
无论如何,这是代码
int[] sourceData = StaticData.DummyOrderedLotsOfIntValues.Value;
ManualResetEventSlim mre = new ManualResetEventSlim();
List<string> overallResults = new List<string>();
//***********************************************************************************
//
// SCENARIO 1 : Sequential LINQ
//
//***********************************************************************************
Stopwatch watch1 = new Stopwatch();
watch1.Start();
IEnumerable<double> results1 =
sourceData.Select(item => Math.Pow(item, 2));
// enumerate results
int visited1 = 0;
foreach (double item in results1)
{
Console.WriteLine("Result is {0}", item);
visited1++;
}
watch1.Stop();
overallResults.Add(string.Format("Visited {0} elements in {1} ms",
visited1.ToString(), watch1.ElapsedMilliseconds));
mre.Set();
//***********************************************************************************
//
// SCENARIO 2 : Use PLINQ
//
//***********************************************************************************
mre.Wait();
mre.Reset();
Stopwatch watch2 = new Stopwatch();
watch2.Start();
IEnumerable<double> results2 =
sourceData.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(item => Math.Pow(item, 2));
// enumerate results
int visited2 = 0;
foreach (double item in results2)
{
Console.WriteLine("Result is {0}", item);
visited2++;
}
watch2.Stop();
overallResults.Add(string.Format("PLINQ No Partioner Visited {0} elements in {1} ms",
visited2.ToString(), watch2.ElapsedMilliseconds));
mre.Set();
//***********************************************************************************
//
// SCENARIO 3 : Use PLINQ and custom partitioner
//
//***********************************************************************************
mre.Wait();
mre.Reset();
// create the partitioner
SimpleCustomPartitioner<int> partitioner =
new SimpleCustomPartitioner<int>(sourceData);
Stopwatch watch3 = new Stopwatch();
watch3.Start();
IEnumerable<double> results3 =
partitioner.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(item => Math.Pow(item, 2));
// enumerate results
int visited3 = 0;
foreach (double item in results3)
{
Console.WriteLine("Result is {0}", item);
visited3++;
}
watch3.Stop();
overallResults.Add(string.Format("PLINQ With Custom Partioner Visited {0} elements in {1} ms",
visited3.ToString(), watch3.ElapsedMilliseconds));
//print results of 3 different variations
foreach (string overallResult in overallResults)
{
Console.WriteLine(overallResult);
}
Console.ReadLine();
以下是所有这些运行的结果
可以看到顺序版本确实花费的时间更长,但另外两个PLinq场景之间差别不大,这并不奇怪,我的意思是,我期望编写TPL和PLinq的人们至少能设计出一个与我设计的相媲美的默认分区器,结果证明他们确实做到了。我设计的那个在这个屏幕截图中表现更好,但如果再次运行,情况可能完全不同。
总之,这里的重点是这段代码向您展示了如何编写自己的分区器,我现在希望您知道如何编写。
使用自定义聚合
演示项目名称:CustomAggregation
现在,在使用标准的顺序LINQ时,使用聚合扩展方法非常容易,我们可以这样做,搞定
int sequentialResult = (from x in peopleData where x.Age gt; 50 select x).Count();
但是,当我们把数据源查询分成小分区时,这样做会相当困难,不是吗?嗯,是的,如果我们必须手动编写所有内容,并且处理共享对象的同步,并管理各个Task
,那将是痛苦的。
幸运的是,我们不必这样做,TPL提供了一种方法,这类似于使用我上次讲过的线程本地存储。让我们看看与上面用许多Task
运行的真正并行PLinq示例相同的示例。
这是可以执行与上述顺序LINQ聚合相同操作的代码
int plinqResult =
peopleData.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithDegreeOfParallelism(2) //cores on my laptop
.Aggregate(
// 1st function - initialize the result
0,
// 2nd function - process each person and the per-Task subtotal
(subtotal, person) => subtotal += (person.Age > 50) ? 1 : 0,
// 3rd function - process the overall total and the per-Task total
(total, subtotal) => total + subtotal,
// 4th function - perform final processing
这显然有点复杂,但我们只需从一个初始值开始,然后分块处理,然后将这些块组合起来形成自定义聚合的最终结果,在本例中,这是一个Int
,它是数据源中Age
属性值大于50
的Person
对象的数量。这个特定的演示数据源有150个Person
对象,Age
从1-150开始;
为了证明没有猫腻,这里是结果的屏幕截图。
暂时就这些
这就是我在这篇文章中想说的全部内容。我希望您喜欢它,并想要更多。如果您喜欢这篇文章,并希望有更多内容,能否花点时间留下评论和投票。非常感谢!
希望在下一篇与您见面。