任务并行库:3/n






4.93/5 (77投票s)
探究任务并行库的使用。
引言
这是我计划的一系列关于TPL的文章的第三部分。上次我介绍了Continuations,并且涵盖了这方面的内容
- 更多TPL背景信息
- Continuations,那是什么
- 简单的Continuation
- WPF同步
- 继续“WhenAny”
- 继续“WhenAll”
- 使用Continuation进行异常处理
- 将Continuation用作管道
- 在Continuation的前置项中捕获异常
- 取消Continuation
这次我们将介绍如何使用Parallel for/foreach循环。我们还将介绍如何执行通常的TPL操作,例如取消和处理异常,以及如何跳出并行循环,如何在循环内使用线程局部存储,以及如何跳出循环。
文章系列路线图
这是我可能撰写的6篇文章中的第3篇,希望大家会喜欢。下面是我打算涵盖的内容的大纲
- 启动任务/触发操作/异常处理/取消/UI同步
- 延续 / 取消链式任务
- Parallel For / 自定义分区器 / 聚合操作(本文)
- 并行 LINQ
- 管道
- 高级场景 / 任务的 v.Next
现在,我知道有些人会直接阅读本文并说它与MSDN上已有的内容相似,我部分同意这一点;但是,我选择继续撰写这些文章有几个原因,如下所示
- 只有前几篇文章会显示与MSDN相似的想法;之后,我认为我将深入探讨的内容将不会在MSDN上出现,而是我个人对TPL进行研究的结果,我将在文章中概述这些内容,因此您将受益于我的研究,您可以直接阅读……是的,不错。
- 这里会有实时输出的截图,这是MSDN上没有太多内容的部分,这可能会帮助一些读者加深对文章内容的理解。
- 也许这里有一些读者从未听说过任务并行库,因此不会在MSDN上看到它,您知道老话怎么说,您首先需要知道您在寻找什么。
- 我喜欢关于线程的文章,所以喜欢写它们,所以我写了它们,将会写它们,已经写了它们,并将继续写它们。
说了这么多,如果人们在阅读本文后,真的认为它与MSDN过于相似(我仍然希望不会这样),也请告诉我,我将努力调整即将发表的文章以弥补。
目录
本文我将涵盖以下内容
Parallel For/Foreach
我们中的许多人可能会编写大量顺序代码,如下所示
foreach(SomeObject x in ListOfSomeObjects)
{
x.DoSomething();
}
在这里,我们对某些源中的每个项都执行操作,并且源中的对象之间没有任何关系,我们只是希望对某个对象源集合中的每个项都执行一些操作。我们想对所有这些对象执行操作,并且它们之间没有紧密耦合,这使得此类操作成为并行处理的理想选择,TPL的设计者也是这么认为的,因此他们提供了创建Parallel.For
和Parallel.Foreach
循环的功能。
本文的其余部分将探讨如何在您自己的代码中使用Parallel.For
和Parallel.Foreach
循环。显然,由于我们处理的是并行性,因此会增加一些复杂性,但总体而言,它仍然很容易理解。
创建一个简单的Parallel For/Foreach
演示解决方案项目:SimpleParallel
让我们开始创建一个非常简单的Parallel.For
和Parallel.Foreach
循环;下面是每个循环的示例
List<string> data = new List<string>()
{ "There","were","many","animals",
"at","the","zoo"};
//parallel for
Parallel.For(0, 10, (x) =>
{
Console.WriteLine(x);
});
//parallel for each
Parallel.ForEach(data, (x) =>
{
Console.WriteLine(x);
});
Console.ReadLine();
没有比这更简单的了。为了证明它能按预期工作,这是结果,绿色代表Parallel.For
,棕色代表Parallel.Foreach
。
需要注意的一点是,Parallel.For
和Parallel.Foreach
循环完全不保证正确的顺序,正如上面图所示。
跳出和停止并行循环
演示解决方案项目:BreakingAndStopping
我们过去可能都编写过类似这样的代码,我们在串行循环中执行某些操作,并在某个条件下跳出
foreach(SomeClass x in ListofClasses)
{
if(x.isBest)
{
x.Save();
break;
}
}
问题是,在使用Parallel.For
和Parallel.Foreach
循环时,您是否也可以执行此类操作?是的,您可以,我们只需要使用一个TPL类,称为ParallelLoopState
,我们可以使用TPL的Parallel.For
和Parallel.Foreach
的众多重载之一来使用它。
通过使用ParallelLoopState
,我们可以使用两个循环控制方法:Stop()
和Break()
。
ParallelLoopState.Stop()
通知System.Threading.Tasks.Parallel
循环应在系统最早方便的时候停止执行。
ParallelLoopState.Break()
通知System.Threading.Tasks.Parallel
循环应在系统最早方便的当前迭代之后的迭代停止执行。
我们还可以使用ParallelLoopResult
结构来获取有关Parallel.For
或Parallel.Foreach
循环工作如何进行的详细信息,我们可以将其用作TPL Parallel.For
和Parallel.Foreach
循环的返回值。以下三个示例也显示了使用TPL ParallelLoopResult
结构检查TPL Parallel.For
和Parallel.Foreach
循环状态的方法
这里有一些演示
For循环 Stop()
//parallel for stop
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
if (x < 5)
Console.WriteLine(x);
else
state.Stop();
});
Console.WriteLine("For loop LowestBreak Iteration : {0}",
res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
Foreach循环 Stop()
//parallel foreach stop
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
if (!x.Equals("zoo"))
Console.WriteLine(x);
else
state.Stop();
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
Foreach循环 Break()
//parallel for each that actuaally breaks, rather than stops
ParallelLoopResult res3 = Parallel.ForEach(data, (x, state) =>
{
if (x.Equals("zoo"))
{
Console.WriteLine(x);
state.Break();
}
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res3.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res3.IsCompleted);
这是运行的截图。请注意,当查询时,前两个(ParallelLoopState.Stop()
的那些)不提供ParallelLoopResult.LowestBreakIteration
。这是因为它们使用了ParallelLoopState.Stop()
,因此实际上并未中断。
处理异常
演示解决方案项目:HandlingExceptions
我们在第一篇文章中讲过通用的任务异常处理,并且您可以使用任何这些技术;然而,我发现最简单的方法是使用我们都用于非异步代码的熟悉结构,即:try/catch
,我们只需捕获TPL的AggregateException
,其模式大致如下
try
{
.....
.....
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
这是基本模式,因此这里有一些针对真实Parallel.For
和Parallel.Foreach
循环的实际异常处理
Parallel.For循环异常处理
//parallel for Exception handling
try
{
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
if (!state.IsExceptional)
{
if (x < 5)
Console.WriteLine(x);
else
throw new InvalidOperationException("Don't like nums > 5");
}
});
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
Parallel.Foreach循环异常处理
//parallel foreach Exception handling
try
{
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
if (!x.Equals("zoo"))
Console.WriteLine(x);
else
throw new InvalidOperationException("Found Zoo throwing Exception");
});
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
foreach (Exception ex in aggEx.InnerExceptions)
{
Console.WriteLine(string.Format("Caught exception '{0}'",
ex.Message));
}
}
为了向您展示所有内容的运行情况,这里有一张截图
可以看出,我们收到了由Parallel.For
循环抛出的两个AggregateException
;这是因为两个抛出throw new InvalidOperationException("Don't like nums > 5")
的迭代已经被调度,因此运行了,导致发生并捕获了两个AggregateException
。
取消并行循环
演示解决方案项目:CancellingLoops
我们在第一篇文章中讲过通用的任务取消,并且当您使用TPL时,这个想法仍然是一样的。如何取消并行操作?使用CancellationToken
,就是这样。
根据我们从前两篇文章中了解到的信息,我们知道我们应该捕获AggregateException
,并且我们通常会在并行代码中使用一段代码,类似这样
token.CancellationToken.ThrowIfCancellationRequested();
到目前为止我们一直在这样做。不幸的是,这并不是我们在使用Parallel.For
和Parallel.Foreach
循环时需要做的事情,并且会导致一个未处理的“OperationCancelledException
”,如下面的图所示
那么我们必须怎么做?嗯,在这种特定情况下(即取消),这非常简单。我们必须确保提供一个专门针对OperationCancelledException
的捕获处理程序,并且似乎无论我们是否使用该行
token.CancellationToken.ThrowIfCancellationRequested();
都没有关系。当其相应的CancellationToken
被取消时,Parallel.For
和Parallel.Foreach
循环总是会抛出一个OperationCancelledException
。我想这取决于个人喜好,是否包含该行,最终而言。由于这似乎对是否抛出OperationCancelledException
没有影响,因此我选择不在Parallel.For
和Parallel.Foreach
循环演示中包含该部分代码。
下面是Parallel.For
和Parallel.Foreach
循环的完整列表,它们都将在5秒后被同一个CancellationToken
取消。
List<string> data = new List<string>()
{ "There", "were", "many", "animals",
"at", "the", "zoo" };
CancellationTokenSource tokenSource = new CancellationTokenSource();
Task cancelTask = Task.Factory.StartNew(() =>
{
Thread.Sleep(500);
tokenSource.Cancel();
});
ParallelOptions options = new ParallelOptions()
{
CancellationToken = tokenSource.Token
};
//parallel for cancellation
try
{
ParallelLoopResult res1 = Parallel.For(0, 1000, options, (x, state) =>
{
if (x % 10 == 0)
Console.WriteLine(x);
Thread.Sleep(100);
});
Console.WriteLine("For loop LowestBreak Iteration : {0}",
res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
Console.WriteLine("Operation Cancelled");
}
//parallel foreach cancellation
try
{
ParallelLoopResult res2 = Parallel.ForEach(data,options, (x, state) =>
{
Console.WriteLine(x);
Thread.Sleep(100);
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}",
res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
Console.WriteLine("Operation Cancelled");
}
Console.ReadLine();
这是输出
我们确实看到了取消消息,并且Parallel.Foreach
没有调度任何内容。
但是,这看起来很奇怪,对吧?
我提请您注意这一点……请注意,我们只让for
循环打印了10和500(好吧,它是由我设计的,只有当索引 % (模数) 10 ==0 时才会打印,所以无法真正确定发生了什么),但这确实表明Parallel.For
循环肯定不保证顺序,您不应依赖任何顺序,否则将导致彻底的失败。
TPL会根据需要将Parallel.For
委托调度到工作线程上,如上所示。我们可能已经调用了其他索引,但我们真正知道的是,在取消之前,我们调用了索引0和500。100、200、300……呢?顺序不保持,所以请不要期望TPL会保持任何顺序,那不会发生。
话虽如此,它确实很好地演示了Parallel.Foreach
的工作根本没有被调度,因为我们在上面的输出中根本没有看到任何输出。这可能会因您PC上有多少个核心而异,我只有2个,所以我看到了这样的输出。
分区以提高性能
演示解决方案项目:OrderedPartitioner
我不知道有多少人足够仔细地注意到,当我们运行Task
/Continuation或Parallel.For/Parallel.Foreach
循环时,我们实际上是在排队一组将最终在ThreadPool
工作线程上运行的工作委托。问题在于,尤其是在Parallel.For/Parallel.Foreach
循环的情况下,当委托体很小时,创建/切换这些小委托负载所花费的时间可能会对性能产生不利影响。
问题是,我们能做些什么吗?是的,TPL允许我们创建自己的分区块,其思想是将整体Parallel.For/Parallel.Foreach
循环的工作量分成我们自己在代码中指定的块大小。我们实际上是在创建一个自定义分区算法。如果我们使用默认的Partitioner
,它会使用一个默认的分区算法,该算法会考虑核心数量等因素,可能不会产生最佳结果。
为了说明这一点,我在下面提供了三个示例
- 根本没有分区
- 使用默认TPL分区算法(我的笔记本有2个核心)
- 使用我们自己的自定义分区逻辑
下面是演示这三种情况的完整代码列表。想法是我使用一个简单的Task
来填充一个带有虚拟数据的数组,然后依次运行(并计时)上面概述的三种情况。这三种情况执行相同的任务。它从原始虚拟列表中获取一个项,将其平方,然后将该值写入另一个结果数组。通过确保所有三种情况都执行相同的操作,我们应该能够获得真实的比较。
在被测试的代码块结束时,我们还通过检查结果数组中的0来验证所有元素是否都被命中。当在每种情况下重新初始化结果数据数组时,0才应该存在。我们基本上确保每个原始列表中的元素都被该场景命中。
我只使用标准的Threading
同步原语(一个ManualResetEventSlim
)来控制三种情况的运行,一次只允许一种运行,并且它们按源文件中声明的顺序运行。
// create the results array
int[] inputData = new int[50000000];
int[] resultData = new int[50000000];
ManualResetEventSlim mre = new ManualResetEventSlim(false);
Random rand = new Random();
Object locker = new Object();
//create some dummy data
Task setupTask = Task.Factory.StartNew(() =>
{
Parallel.For(0, inputData.Length, (i) =>
{
lock (locker)
{
inputData[i] = rand.Next(2,10);
}
});
mre.Set();
});
//***********************************************************************************
//
// SCENARIO 1 : No partitioning at all
//
//***********************************************************************************
mre.Wait();
mre.Reset();
Task timerTask1 = Task.Factory.StartNew(() =>
{
Stopwatch watch = new Stopwatch();
watch.Start();
Parallel.ForEach(inputData, (int item, ParallelLoopState loopState, long index) =>
{
resultData[index] = item * item;
});
watch.Stop();
Console.WriteLine("time ellapsed for No partitioning at all version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
//****************************************************************
//
// SCENARIO 2 : Use the default TPL partitioning algorithm
// (affected by your PCs # of cores)
//
//****************************************************************
mre.Wait();
mre.Reset();
//clear results
Parallel.For(0, inputData.Length, (i) =>
{
resultData[i] = 0;
});
Task timerTask2 = Task.Factory.StartNew(() =>
{
// create an orderable partitioner
Stopwatch watch = new Stopwatch();
OrderablePartitioner<int> op = Partitioner.Create(inputData);
watch.Reset();
watch.Start();
Parallel.ForEach(op, (int item, ParallelLoopState loopState, long index) =>
{
resultData[index] = item * item;
});
watch.Stop();
Console.WriteLine("time ellapsed for default TPL partitioning algorithm version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
//***********************************************************************************
//
// SCENARIO 3 : Use our own custom partitioning logic
//
//***********************************************************************************
mre.Wait();
mre.Reset();
//clear results
Parallel.For(0, inputData.Length, (i) =>
{
resultData[i] = 0;
});
Task timerTask3 = Task.Factory.StartNew(() =>
{
// create an chunking orderable partitioner
Stopwatch watch = new Stopwatch();
watch.Reset();
watch.Start();
OrderablePartitioner<Tuple<int, int>> chunkPart =
Partitioner.Create(0, inputData.Length, 5000);
Parallel.ForEach(chunkPart, chunkRange =>
{
for (int i = chunkRange.Item1; i < chunkRange.Item2; i++)
{
resultData[i] = inputData[i] * inputData[i];
}
});
watch.Stop();
Console.WriteLine("time ellapsed for custom Partitioner version {0}ms",
watch.ElapsedMilliseconds);
Console.WriteLine("Number of results with 0 as square : {0}, " +
"which proves each element was hit\r\n",
(from x in resultData where x == 0 select x).Count()); mre.Set();
mre.Set();
});
mre.Wait();
mre.Reset();
Console.ReadLine();
这是运行此代码的结果(同样,我的笔记本有2个核心,所以您的结果可能不同)
我认为结果不言自明。不进行分区是可以的,然后我们使用默认分区器,情况变得更糟(很可能是因为我的笔记本只有2个核心以及其他因素),但看看当我们完全控制分区大小时会发生什么,这使我们获得了最佳性能。
所以,这就是您所了解的,值得思考。
线程局部存储
有时我们在循环中需要做的是累积某种运行总数,就像在顺序代码中一样
int count =0;
int count =0;
foreach(SomeObject object in SomeObjectList)
{
if(object.Contains(someKeyWord)
{
count++
}
}
那么,如何使用Parallel.For/Parallel.Foreach
循环来实现这一点呢?答案在于使用ThreadLocalStorage
,它已经存在一段时间了,但TPL赋予了它新的意义。
幸运的是,在构造Parallel.For/Parallel.Foreach
循环时,我们有所有必需的重载来使用ThreadLocalStorage
,只需理解如何使用它。为此,我提供了两个示例,如下所示
演示解决方案项目:ThreadLocalStorage
此示例在单词源列表中搜索特定搜索词,并统计该词出现的次数。重要的是要知道,当我们访问计数变量时,我们实际上是在访问一个需要某种线程同步的共享对象。我选择使用lock(..)
(它实际上是在后台使用Monitor.TryEnter()/Monitor.Exit()
),但您可以使用任何您喜欢的线程同步原语。
int matches = 0;
object syncLock = new object();
string[] words
= new string[] { "the","other","day","I","was",
"speaking","to","a","man","about",
"this","and","that","cat","and","he","told","me",
"the","only","other","person","to","ask","him",
"about","the","crazy","cat","was","the","cats","owner"};
string searchWord = "cat";
//Add up all words that match the searchWord
Parallel.ForEach(
//source
words,
//local init
() => 0,
//body
(string item, ParallelLoopState loopState, int tlsValue) =>
{
if (item.ToLower().Equals(searchWord))
{
tlsValue++;
}
return tlsValue;
},
//local finally
tlsValue =>
{
lock (syncLock)
{
matches += tlsValue;
}
});
Console.WriteLine("Matches for searchword '{0}' : {1}\r\n", searchWord, matches);
Console.WriteLine("Where the original word list was : \r\n\r\n{0}",
words.Aggregate((x, y) => x.ToString() + " " + y.ToString()));
Console.ReadLine();
如果您想使用ThreadLocalStorage
,循环的格式几乎由TPL决定,这是一种您必须学习和遵循的模式。这在.NET 4.0本身中的方法签名和注释看起来是这样的
//
// Summary:
// Executes a for each operation on an System.Collections.IEnumerable{TSource}
// in which iterations may run in parallel.
//
// Parameters:
// source:
// An enumerable data source.
//
// localInit:
// The function delegate that returns the initial state of the local data for
// each thread.
//
// body:
// The delegate that is invoked once per iteration.
//
// localFinally:
// The delegate that performs a final action on the local state of each thread.
//
// Type parameters:
// TSource:
// The type of the data in the source.
//
// TLocal:
// The type of the thread-local data.
//
// Returns:
// A System.Threading.Tasks.ParallelLoopResult structure that contains information
// on what portion of the loop completed.
//
// Exceptions:
// System.ArgumentNullException:
// The exception that is thrown when the source argument is null.-or-The exception
// that is thrown when the body argument is null.-or-The exception that is thrown
// when the localInit argument is null.-or-The exception that is thrown when
// the localFinally argument is null.
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,
Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,
TLocal> body, Action<TLocal> localFinally);
这看起来有点吓人,但我认为代码比TPL元数据更容易理解。
这是代码运行时的情况
演示解决方案项目:ThreadLocalStorage2
此示例使用随机double
值初始化源列表,然后将这些输入值相加形成单个输出值。同样,它使用ThreadLocalStorage
来执行此操作,因此,我们再次访问了一个需要某种线程同步的共享变量。同样,我选择使用lock
(..)。另一件需要注意的事情是,由于此演示中我处理的数据类型是double
,因此我必须使用Parallel.For<T>
上的泛型,在这种情况下是double
。
double total = 0;
object syncLock = new object();
double[] values = new double[100];
Random rand = new Random();
Object locker = new Object();
ManualResetEventSlim mre = new ManualResetEventSlim();
//initialise some random values, and signal when done
Task initialiserTask = Task.Factory.StartNew(() =>
{
Parallel.For(0, values.Length, (idx) =>
{
lock (locker)
{
values[idx] = rand.NextDouble();
}
});
mre.Set();
});
//wait until all initialised
mre.Wait();
//now use Thread Local Storage and Sum them together
Parallel.For<double>(
0,
values.Length,
//local init
() => 0,
//body
(int index, ParallelLoopState loopState, double tlsValue) =>
{
tlsValue += values[index];
return tlsValue;
},
//local finally
tlsValue =>
{
lock (syncLock)
{
total += tlsValue;
}
});
Console.WriteLine("Total: {0}", total);
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
这是它正在运行的截图
暂时就到这里
这就是本文我想说的全部内容。希望您喜欢它,并想要更多。如果您喜欢这篇文章,并想要更多,您能否花些时间留下评论和投票?非常感谢。
希望在下一篇文章,以及下下一篇,还有下下下一篇中见到您,总共6篇。我最好抓紧时间了。