65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库:3/n

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (77投票s)

2011年2月23日

CPOL

13分钟阅读

viewsIcon

233585

downloadIcon

1947

探究任务并行库的使用。

引言

这是我计划的一系列关于TPL的文章的第三部分。上次我介绍了Continuations,并且涵盖了这方面的内容

  • 更多TPL背景信息
  • Continuations,那是什么
  • 简单的Continuation
  • WPF同步
  • 继续“WhenAny”
  • 继续“WhenAll”
  • 使用Continuation进行异常处理
  • 将Continuation用作管道
  • 在Continuation的前置项中捕获异常
  • 取消Continuation

这次我们将介绍如何使用Parallel for/foreach循环。我们还将介绍如何执行通常的TPL操作,例如取消和处理异常,以及如何跳出并行循环,如何在循环内使用线程局部存储,以及如何跳出循环。

文章系列路线图

这是我可能撰写的6篇文章中的第3篇,希望大家会喜欢。下面是我打算涵盖的内容的大纲

  1. 启动任务/触发操作/异常处理/取消/UI同步
  2. 延续 / 取消链式任务
  3. Parallel For / 自定义分区器 / 聚合操作(本文)
  4. 并行 LINQ
  5. 管道
  6. 高级场景 / 任务的 v.Next

现在,我知道有些人会直接阅读本文并说它与MSDN上已有的内容相似,我部分同意这一点;但是,我选择继续撰写这些文章有几个原因,如下所示

  • 只有前几篇文章会显示与MSDN相似的想法;之后,我认为我将深入探讨的内容将不会在MSDN上出现,而是我个人对TPL进行研究的结果,我将在文章中概述这些内容,因此您将受益于我的研究,您可以直接阅读……是的,不错。
  • 这里会有实时输出的截图,这是MSDN上没有太多内容的部分,这可能会帮助一些读者加深对文章内容的理解。
  • 也许这里有一些读者从未听说过任务并行库,因此不会在MSDN上看到它,您知道老话怎么说,您首先需要知道您在寻找什么。
  • 我喜欢关于线程的文章,所以喜欢写它们,所以我写了它们,将会写它们,已经写了它们,并将继续写它们。

说了这么多,如果人们在阅读本文后,真的认为它与MSDN过于相似(我仍然希望不会这样),也请告诉我,我将努力调整即将发表的文章以弥补。

目录

本文我将涵盖以下内容

Parallel For/Foreach

我们中的许多人可能会编写大量顺序代码,如下所示

foreach(SomeObject x in ListOfSomeObjects)
{
   x.DoSomething();
}

在这里,我们对某些源中的每个项都执行操作,并且源中的对象之间没有任何关系,我们只是希望对某个对象源集合中的每个项都执行一些操作。我们想对所有这些对象执行操作,并且它们之间没有紧密耦合,这使得此类操作成为并行处理的理想选择,TPL的设计者也是这么认为的,因此他们提供了创建Parallel.ForParallel.Foreach循环的功能。

本文的其余部分将探讨如何在您自己的代码中使用Parallel.ForParallel.Foreach循环。显然,由于我们处理的是并行性,因此会增加一些复杂性,但总体而言,它仍然很容易理解。

创建一个简单的Parallel For/Foreach

演示解决方案项目:SimpleParallel

让我们开始创建一个非常简单的Parallel.ForParallel.Foreach循环;下面是每个循环的示例

List<string> data = new List<string>() 
    { "There","were","many","animals",
      "at","the","zoo"};

//parallel for
Parallel.For(0, 10, (x) =>
{
    Console.WriteLine(x);
});

//parallel for each
Parallel.ForEach(data, (x) =>
{
    Console.WriteLine(x);
});

Console.ReadLine();

没有比这更简单的了。为了证明它能按预期工作,这是结果,绿色代表Parallel.For,棕色代表Parallel.Foreach

需要注意的一点是,Parallel.ForParallel.Foreach循环完全不保证正确的顺序,正如上面图所示。

跳出和停止并行循环

演示解决方案项目:BreakingAndStopping

我们过去可能都编写过类似这样的代码,我们在串行循环中执行某些操作,并在某个条件下跳出

foreach(SomeClass x in ListofClasses)
{
   if(x.isBest)
   {
      x.Save();
      break;
   }
}

问题是,在使用Parallel.ForParallel.Foreach循环时,您是否也可以执行此类操作?是的,您可以,我们只需要使用一个TPL类,称为ParallelLoopState,我们可以使用TPL的Parallel.ForParallel.Foreach的众多重载之一来使用它。

通过使用ParallelLoopState,我们可以使用两个循环控制方法:Stop()Break()

ParallelLoopState.Stop()

通知System.Threading.Tasks.Parallel循环应在系统最早方便的时候停止执行。

ParallelLoopState.Break()

通知System.Threading.Tasks.Parallel循环应在系统最早方便的当前迭代之后的迭代停止执行。

我们还可以使用ParallelLoopResult结构来获取有关Parallel.ForParallel.Foreach循环工作如何进行的详细信息,我们可以将其用作TPL Parallel.ForParallel.Foreach循环的返回值。以下三个示例也显示了使用TPL ParallelLoopResult结构检查TPL Parallel.ForParallel.Foreach循环状态的方法

这里有一些演示

For循环 Stop()

//parallel for stop
ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
{
    if (x < 5)
        Console.WriteLine(x);
    else
        state.Stop();
});

Console.WriteLine("For loop LowestBreak Iteration : {0}", 
                  res1.LowestBreakIteration);
Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
Console.WriteLine("\r\n");

Foreach循环 Stop()

//parallel foreach stop
ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
{
    if (!x.Equals("zoo"))
        Console.WriteLine(x);
    else
        state.Stop();
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                  res2.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
Console.WriteLine("\r\n");

Foreach循环 Break()

//parallel for each that actuaally breaks, rather than stops
ParallelLoopResult res3 = Parallel.ForEach(data, (x, state) =>
{
    if (x.Equals("zoo"))
    {
        Console.WriteLine(x);
        state.Break();
    }
});
Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                  res3.LowestBreakIteration);
Console.WriteLine("Foreach loop Completed : {0}", res3.IsCompleted);

这是运行的截图。请注意,当查询时,前两个(ParallelLoopState.Stop()的那些)不提供ParallelLoopResult.LowestBreakIteration。这是因为它们使用了ParallelLoopState.Stop(),因此实际上并未中断。

处理异常

演示解决方案项目:HandlingExceptions

我们在第一篇文章中讲过通用的任务异常处理,并且您可以使用任何这些技术;然而,我发现最简单的方法是使用我们都用于非异步代码的熟悉结构,即:try/catch,我们只需捕获TPL的AggregateException,其模式大致如下

try
{
    .....
    .....

}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

这是基本模式,因此这里有一些针对真实Parallel.ForParallel.Foreach循环的实际异常处理

Parallel.For循环异常处理

//parallel for Exception handling
try
{
    ParallelLoopResult res1 = Parallel.For(0, 10, (x, state) =>
    {
        if (!state.IsExceptional)
        {
            if (x < 5)
                Console.WriteLine(x);
            else
                throw new InvalidOperationException("Don't like nums > 5");
        }
    });
    Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

Parallel.Foreach循环异常处理

//parallel foreach Exception handling
try
{
    ParallelLoopResult res2 = Parallel.ForEach(data, (x, state) =>
    {
        if (!x.Equals("zoo"))
            Console.WriteLine(x);
        else
            throw new InvalidOperationException("Found Zoo throwing Exception");
    });
    Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

为了向您展示所有内容的运行情况,这里有一张截图

可以看出,我们收到了由Parallel.For循环抛出的两个AggregateException;这是因为两个抛出throw new InvalidOperationException("Don't like nums > 5")的迭代已经被调度,因此运行了,导致发生并捕获了两个AggregateException

取消并行循环

演示解决方案项目:CancellingLoops

我们在第一篇文章中讲过通用的任务取消,并且当您使用TPL时,这个想法仍然是一样的。如何取消并行操作?使用CancellationToken,就是这样。

根据我们从前两篇文章中了解到的信息,我们知道我们应该捕获AggregateException,并且我们通常会在并行代码中使用一段代码,类似这样

token.CancellationToken.ThrowIfCancellationRequested();

到目前为止我们一直在这样做。不幸的是,这并不是我们在使用Parallel.ForParallel.Foreach循环时需要做的事情,并且会导致一个未处理的“OperationCancelledException”,如下面的图所示

那么我们必须怎么做?嗯,在这种特定情况下(即取消),这非常简单。我们必须确保提供一个专门针对OperationCancelledException的捕获处理程序,并且似乎无论我们是否使用该行

token.CancellationToken.ThrowIfCancellationRequested();

都没有关系。当其相应的CancellationToken被取消时,Parallel.ForParallel.Foreach循环总是会抛出一个OperationCancelledException。我想这取决于个人喜好,是否包含该行,最终而言。由于这似乎对是否抛出OperationCancelledException没有影响,因此我选择不在Parallel.ForParallel.Foreach循环演示中包含该部分代码。

下面是Parallel.ForParallel.Foreach循环的完整列表,它们都将在5秒后被同一个CancellationToken取消。

List<string> data = new List<string>() 
    { "There", "were", "many", "animals", 
      "at", "the", "zoo" };

CancellationTokenSource tokenSource = new CancellationTokenSource();

Task cancelTask = Task.Factory.StartNew(() =>
{
    Thread.Sleep(500);
    tokenSource.Cancel();
});

ParallelOptions options = new ParallelOptions()
{
    CancellationToken = tokenSource.Token
};

//parallel for cancellation
try
{
    ParallelLoopResult res1 = Parallel.For(0, 1000, options, (x, state) =>
    {
        if (x % 10 == 0)
            Console.WriteLine(x);

        Thread.Sleep(100);
    });

    Console.WriteLine("For loop LowestBreak Iteration : {0}", 
                      res1.LowestBreakIteration);
    Console.WriteLine("For loop Completed : {0}", res1.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
    Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
    Console.WriteLine("Operation Cancelled");
}

//parallel foreach cancellation
try
{
    ParallelLoopResult res2 = Parallel.ForEach(data,options, (x, state) =>
    {
        Console.WriteLine(x);
        Thread.Sleep(100);
    });
    Console.WriteLine("Foreach loop LowestBreak Iteration : {0}", 
                      res2.LowestBreakIteration);
    Console.WriteLine("Foreach loop Completed : {0}", res2.IsCompleted);
    Console.WriteLine("\r\n");
}
catch (OperationCanceledException opCanEx)
{
    Console.WriteLine("Operation Cancelled");
}
catch (AggregateException aggEx)
{
    Console.WriteLine("Operation Cancelled");
}

Console.ReadLine();

这是输出

我们确实看到了取消消息,并且Parallel.Foreach没有调度任何内容。

但是,这看起来很奇怪,对吧?

我提请您注意这一点……请注意,我们只让for循环打印了10和500(好吧,它是由我设计的,只有当索引 % (模数) 10 ==0 时才会打印,所以无法真正确定发生了什么),但这确实表明Parallel.For循环肯定不保证顺序,您不应依赖任何顺序,否则将导致彻底的失败。

TPL会根据需要将Parallel.For委托调度到工作线程上,如上所示。我们可能已经调用了其他索引,但我们真正知道的是,在取消之前,我们调用了索引0和500。100、200、300……呢?顺序不保持,所以请不要期望TPL会保持任何顺序,那不会发生。

话虽如此,它确实很好地演示了Parallel.Foreach的工作根本没有被调度,因为我们在上面的输出中根本没有看到任何输出。这可能会因您PC上有多少个核心而异,我只有2个,所以我看到了这样的输出。

分区以提高性能

演示解决方案项目:OrderedPartitioner

我不知道有多少人足够仔细地注意到,当我们运行Task/Continuation或Parallel.For/Parallel.Foreach循环时,我们实际上是在排队一组将最终在ThreadPool工作线程上运行的工作委托。问题在于,尤其是在Parallel.For/Parallel.Foreach循环的情况下,当委托体很小时,创建/切换这些小委托负载所花费的时间可能会对性能产生不利影响。

问题是,我们能做些什么吗?是的,TPL允许我们创建自己的分区块,其思想是将整体Parallel.For/Parallel.Foreach循环的工作量分成我们自己在代码中指定的块大小。我们实际上是在创建一个自定义分区算法。如果我们使用默认的Partitioner,它会使用一个默认的分区算法,该算法会考虑核心数量等因素,可能不会产生最佳结果。

为了说明这一点,我在下面提供了三个示例

  1. 根本没有分区
  2. 使用默认TPL分区算法(我的笔记本有2个核心)
  3. 使用我们自己的自定义分区逻辑

下面是演示这三种情况的完整代码列表。想法是我使用一个简单的Task来填充一个带有虚拟数据的数组,然后依次运行(并计时)上面概述的三种情况。这三种情况执行相同的任务。它从原始虚拟列表中获取一个项,将其平方,然后将该值写入另一个结果数组。通过确保所有三种情况都执行相同的操作,我们应该能够获得真实的比较。

在被测试的代码块结束时,我们还通过检查结果数组中的0来验证所有元素是否都被命中。当在每种情况下重新初始化结果数据数组时,0才应该存在。我们基本上确保每个原始列表中的元素都被该场景命中。

我只使用标准的Threading同步原语(一个ManualResetEventSlim)来控制三种情况的运行,一次只允许一种运行,并且它们按源文件中声明的顺序运行。

// create the results array
int[] inputData = new int[50000000];
int[] resultData = new int[50000000];
ManualResetEventSlim mre = new ManualResetEventSlim(false);
Random rand = new Random();
Object locker = new Object();

//create some dummy data
Task setupTask = Task.Factory.StartNew(() =>
{
    Parallel.For(0, inputData.Length, (i) =>
    {
        lock (locker)
        {
            inputData[i] = rand.Next(2,10);
        }
    });
    mre.Set();
});



//***********************************************************************************
//
//   SCENARIO 1 : No partitioning at all
//
//***********************************************************************************
mre.Wait();
mre.Reset();

Task timerTask1 = Task.Factory.StartNew(() =>
{
    Stopwatch watch = new Stopwatch();
    watch.Start();
    Parallel.ForEach(inputData, (int item, ParallelLoopState loopState, long index) =>
    {
        resultData[index] = item * item; 
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for No partitioning at all version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
             "which proves each element was hit\r\n",
            (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});


//****************************************************************
//
//   SCENARIO 2 : Use the default TPL partitioning algorithm
//               (affected by your PCs # of cores)
//
//****************************************************************
mre.Wait();
mre.Reset();

//clear results
Parallel.For(0, inputData.Length, (i) =>
{
    resultData[i] = 0;
});

Task timerTask2 = Task.Factory.StartNew(() =>
{
    // create an orderable partitioner
    Stopwatch watch = new Stopwatch();
    OrderablePartitioner<int> op = Partitioner.Create(inputData);
    watch.Reset();
    watch.Start();
    Parallel.ForEach(op, (int item, ParallelLoopState loopState, long index) =>
    {
        resultData[index] = item * item;
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for default TPL partitioning algorithm version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
         "which proves each element was hit\r\n",
        (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});


//***********************************************************************************
//
//   SCENARIO 3 : Use our own custom partitioning logic
//
//***********************************************************************************
mre.Wait();
mre.Reset();

//clear results
Parallel.For(0, inputData.Length, (i) =>
{
    resultData[i] = 0;
});

Task timerTask3 = Task.Factory.StartNew(() =>
{
    // create an chunking orderable partitioner
    Stopwatch watch = new Stopwatch();
    watch.Reset();
    watch.Start();
    OrderablePartitioner<Tuple<int, int>> chunkPart = 
             Partitioner.Create(0, inputData.Length, 5000);
    Parallel.ForEach(chunkPart, chunkRange =>
    {
        for (int i = chunkRange.Item1; i < chunkRange.Item2; i++)
        {
            resultData[i] = inputData[i] * inputData[i];
        }
    });
    watch.Stop();
    Console.WriteLine("time ellapsed for custom Partitioner version {0}ms", 
                      watch.ElapsedMilliseconds);
    Console.WriteLine("Number of results with 0 as square : {0}, " + 
         "which proves each element was hit\r\n",
        (from x in resultData where x == 0 select x).Count()); mre.Set();
    mre.Set();
});

mre.Wait();
mre.Reset();

Console.ReadLine();

这是运行此代码的结果(同样,我的笔记本有2个核心,所以您的结果可能不同)

我认为结果不言自明。不进行分区是可以的,然后我们使用默认分区器,情况变得更糟(很可能是因为我的笔记本只有2个核心以及其他因素),但看看当我们完全控制分区大小时会发生什么,这使我们获得了最佳性能。

所以,这就是您所了解的,值得思考。

线程局部存储

有时我们在循环中需要做的是累积某种运行总数,就像在顺序代码中一样

int count =0;
int count =0;

foreach(SomeObject object in SomeObjectList)
{
    if(object.Contains(someKeyWord)
    {
        count++
    }
}

那么,如何使用Parallel.For/Parallel.Foreach循环来实现这一点呢?答案在于使用ThreadLocalStorage,它已经存在一段时间了,但TPL赋予了它新的意义。

幸运的是,在构造Parallel.For/Parallel.Foreach循环时,我们有所有必需的重载来使用ThreadLocalStorage,只需理解如何使用它。为此,我提供了两个示例,如下所示

演示解决方案项目:ThreadLocalStorage

此示例在单词源列表中搜索特定搜索词,并统计该词出现的次数。重要的是要知道,当我们访问计数变量时,我们实际上是在访问一个需要某种线程同步的共享对象。我选择使用lock(..)(它实际上是在后台使用Monitor.TryEnter()/Monitor.Exit()),但您可以使用任何您喜欢的线程同步原语。

int matches = 0;
object syncLock = new object();

string[] words
    = new string[] { "the","other","day","I","was",
    "speaking","to","a","man","about",
    "this","and","that","cat","and","he","told","me",
    "the","only","other","person","to","ask","him",
    "about","the","crazy","cat","was","the","cats","owner"};

string searchWord = "cat";

//Add up all words that match the searchWord
Parallel.ForEach(
    //source
    words,  
    //local init
    () => 0, 
    //body
    (string item, ParallelLoopState loopState, int tlsValue) => 
    {
        if (item.ToLower().Equals(searchWord))
        {
            tlsValue++;
        }
        return tlsValue;
    },
    //local finally
    tlsValue =>
    {
        lock (syncLock)
        {
            matches += tlsValue;
        }
    });

Console.WriteLine("Matches for searchword '{0}' : {1}\r\n", searchWord, matches);
Console.WriteLine("Where the original word list was : \r\n\r\n{0}",
    words.Aggregate((x, y) => x.ToString() + " " + y.ToString()));
Console.ReadLine();

如果您想使用ThreadLocalStorage,循环的格式几乎由TPL决定,这是一种您必须学习和遵循的模式。这在.NET 4.0本身中的方法签名和注释看起来是这样的

//
// Summary:
//     Executes a for each operation on an System.Collections.IEnumerable{TSource}
//     in which iterations may run in parallel.
//
// Parameters:
//   source:
//     An enumerable data source.
//
//   localInit:
//     The function delegate that returns the initial state of the local data for
//     each thread.
//
//   body:
//     The delegate that is invoked once per iteration.
//
//   localFinally:
//     The delegate that performs a final action on the local state of each thread.
//
// Type parameters:
//   TSource:
//     The type of the data in the source.
//
//   TLocal:
//     The type of the thread-local data.
//
// Returns:
//     A System.Threading.Tasks.ParallelLoopResult structure that contains information
//     on what portion of the loop completed.
//
// Exceptions:
//   System.ArgumentNullException:
//     The exception that is thrown when the source argument is null.-or-The exception
//     that is thrown when the body argument is null.-or-The exception that is thrown
//     when the localInit argument is null.-or-The exception that is thrown when
//     the localFinally argument is null.
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, 
       Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, 
       TLocal> body, Action<TLocal> localFinally);

这看起来有点吓人,但我认为代码比TPL元数据更容易理解。

这是代码运行时的情况

演示解决方案项目:ThreadLocalStorage2

此示例使用随机double值初始化源列表,然后将这些输入值相加形成单个输出值。同样,它使用ThreadLocalStorage来执行此操作,因此,我们再次访问了一个需要某种线程同步的共享变量。同样,我选择使用lock(..)。另一件需要注意的事情是,由于此演示中我处理的数据类型是double,因此我必须使用Parallel.For<T>上的泛型,在这种情况下是double

double total = 0;
object syncLock = new object();

double[] values = new double[100];
Random rand = new Random();
Object locker = new Object();
ManualResetEventSlim mre = new ManualResetEventSlim();

//initialise some random values, and signal when done
Task initialiserTask = Task.Factory.StartNew(() =>
    {
        Parallel.For(0, values.Length, (idx) =>
            {
                lock (locker)
                {
                    values[idx] = rand.NextDouble();
                }
            });
        mre.Set();
    });

//wait until all initialised
mre.Wait();

//now use Thread Local Storage and Sum them together
Parallel.For<double>(
    0,
    values.Length,
    //local init
    () => 0,
    //body
    (int index, ParallelLoopState loopState, double tlsValue) =>
    {
        tlsValue += values[index];
        return tlsValue;
    },
    //local finally
    tlsValue =>
    {
        lock (syncLock)
        {
            total += tlsValue;
        }
    });

Console.WriteLine("Total: {0}", total);

// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();

这是它正在运行的截图

暂时就到这里

这就是本文我想说的全部内容。希望您喜欢它,并想要更多。如果您喜欢这篇文章,并想要更多,您能否花些时间留下评论和投票?非常感谢。

希望在下一篇文章,以及下下一篇,还有下下下一篇中见到您,总共6篇。我最好抓紧时间了。

© . All rights reserved.