65.9K
CodeProject 正在变化。 阅读更多。
Home

并行扩展 for .NET Framework - 第一部分 (介绍 & PLINQ)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (16投票s)

2008年11月14日

CPOL

8分钟阅读

viewsIcon

72575

downloadIcon

973

本文介绍了 .NET Framework 的 PFX,并讨论了 PLINQ。

引言

Microsoft 最近发布了 Visual Studio 2010 CTP。除了 Dynamic C# 之外,这个版本中最有趣的功能之一是任务并行库。TPL 和 PFX (并行框架) 作为 VS2008 和 .NET Framework 3.5 的 CTP 可用。任务并行库旨在针对多核或多处理器系统。它旨在利用先进硬件的优势,而无需担心它们。下面的文章讨论了 PFX 中可用的 API,并探讨了性能方面。

开发环境

并发编程和并行编程

并发和并行编程是老生常谈的流行词。两者都非常密切相关。并发编程是指多个交互式计算任务的同时执行,这些任务可以实现为单独的程序,或者由单个程序创建的一组进程或线程。这些任务可以在单个处理器、多个处理器或分布式网络上执行。并行编程是将一个大问题分解成小问题,并同时执行每个块。并行计算有不同的形式,如位级、指令级、数据和任务并行。在这两种形式中,在任务并行库的上下文中重要的是数据并行和任务并行。

数据并行

在数据并行中,重点是将数据分发到不同的并行计算节点。对于执行一组指令的多处理器系统,可以通过让每个处理器在分布式数据的不同部分上执行相同的任务来实现数据并行。可以由单个线程控制,也可以由多个线程控制。然而,数据处理对于每个线程来说都是相同的。

任务并行

任务并行是每个处理器在相同或不同数据上执行不同的线程。线程可以执行相同的代码,也可以执行不同的代码。对于一个简单的双处理器机器的例子,如果我们想做两个任务,那么 CPU ‘a’ 将执行任务 ‘A’,CPU ‘b’ 将执行任务 ‘B’。

任务并行强调处理的分布式(并行化)性质(即线程),而不是数据(数据并行)。

并行扩展概述

并行扩展是 Microsoft 提供的一组 API,主要设计用于利用具有多核和多处理器系统的优势,通过数据并行、任务并行和并行硬件上的协调。

它帮助开发人员利用并行硬件的优势,而无需担心可伸缩性和性能。最好的地方是,随着处理器数量的增加,开发人员无需重写代码。

扩展主要针对并发和并行编程的三个领域

  1. 声明式数据并行
  2. 声明式指的是您希望如何处理数据,而不是您希望如何处理它。LINQ 向我们介绍了基于声明式编程模型的一组新 API。PLINQ 向我们介绍了一组适用于声明式数据并行的新 API。

  3. 命令式数据并行
  4. 并行扩展为循环等交互式数据操作提供了一个 API。并行扩展引入了 For 和 ForEach 等方法,这些方法具有自动将循环中的工作分发到并行硬件上运行的机制。

  5. 命令式任务并行
  6. 并行扩展引入了任务并行库,该库提供了一个 API,用于以任务而非数据的形式来表达并行性。该扩展将任务调度到并行硬件上运行,并为我们提供了取消或等待的方法。

    该扩展包含基于调度器的任务并行库和 PLINQ。该调度器是一个健壮、高效且可伸缩的引擎,旨在利用协作式调度和工作窃取算法来实现快速、高效且最大化的 CPU 利用率。我们也可以在这个调度器之上编写自己的扩展。

声明式数据并行 (PLINQ)

声明式数据并行通过 Parallel LINQ 实现。PLINQ 是 LINQ to Objects 的实现,它并行执行查询,并根据可用处理器数量进行扩展。使用 PLINQ 执行查询与使用 LINQ 执行查询类似。PLINQ 支持 System.Collection.Enumerable 支持的所有查询运算符。

但是,不支持 LINQ to SQL 和 LINQ to Entities,因为查询应该由数据库或查询运算符执行。

将现有代码转换为 PLINQ 非常容易。您需要添加对 System.Threading 程序集的引用。接下来,您需要通过调用 AsParallel 扩展方法将数据源包装到 IParallelEnumerable<T> 接口中。

LINQ

var q = from file in fileList  
         where file.Length > 10000 
        select file; 

PLINQ

var q = from file in fileList.AsParallel()
         where file.Length > 10000                      
        select file;

使用 AsParallel 方法可以确保编译器使用 ParallelEnumerable<T> 而不是查询接口的 Enumerable<T>。然而,与 LINQ 一样,PLINQ 具有延迟执行,即查询直到 foreach 循环遍历它,或者调用 GetEnumerator,或者调用 ToListToArrayToDictionary 方法时才会被评估。现在,从性能角度来看,这是一个非常重要的事情。

到目前为止,将数据源包装到 IParallelEnumerable 接口中只是确保在执行时使用数据并行算法。您如何处理输出是另一回事。当您使用 IParallelEnumerable<T> 包装数据源时,结果集合也是 IParallelEnumerable<T> 类型。PLINQ 提供以下三种处理输出的方法:

  1. 管道处理模型
  2. 顾名思义,除了处理数据的线程之外,还会添加另一个线程来枚举查询输出。简单来说,除了执行查询的几个线程外,还有一个专门用于枚举的线程。

    输出处理的默认方法是管道处理。所以,这与 LINQ 没有区别。

    foreach(var e in q)
    {
         Console.WriteLine(e);
    }

    一个线程将管理 foreach,而其他线程将执行查询。foreach 循环使用 System.Collection.Generic.IEnumerator<T> 进行迭代。因此,当调用 MoveNext() 时,一组工作线程将执行查询,并从这个调用和所有后续的 MoveNext() 调用返回结果。IParallelEnumarable <T> 有自己的 GetEnumerator(bool usePipeLining) 方法。

    因此,一种显式的输出处理方式是:

    using (var e = fileGroup.GetEnumerator(true))  
    {
       while (e.MoveNext())
       {
          Console.WriteLine(e);
       }
    }

    传递 false 将是停止并获取处理。管道处理的好处是可以减少保存结果的内存需求。但是,它的缺点是生产者线程很多,而消费者线程只有一个,这可能会导致工作分配不均,从而导致处理器效率低下。

  3. 停止并获取处理模型
  4. 顾名思义,停止并获取处理是指开始枚举的线程会停止并加入执行查询的另一个线程来执行查询。一旦获得所有结果,它将恢复对结果的迭代。要使用此功能,我们必须在 IParallelEnumerable<T>GetEnumerator 方法中传递 false

    using (var e = fileGroup.GetEnumerator(false ))  
    {
       while (e.MoveNext())
       {
           Console.WriteLine(e);
       }
    }

    当第一次调用 MoveNext() 时,启动枚举的线程将加入另一个线程并完成查询的执行,即查询在第一次调用 MoveNext() 时完成执行,我们只是遍历内存中的集合。

    该模型比管道模型效率稍高,原因有二。首先,生产者和消费者线程之间的同步开销更少。其次,PLINQ 更清楚数据在哪里以及如何访问它。ToArrayToList 等默认基于停止并获取模型。

  5. 反向枚举模型
  6. 在此模型中,将一个 lambda 函数提供给 PLINQ,说明需要执行什么处理。此函数将与 foreach 一起传递给所有执行查询的线程。IParallelEnumerable<T> 为此公开了一个 ForAll 方法。与管道模型和停止-获取模型不同,它不会收集结果然后处理它们。相反,最终调用的函数通过 ForAll 扩展方法传递给每个线程。这是最快的方法,但它要求传递给 ForAll 的函数是线程安全的,并且最好不带锁和副作用。

    fileGroup.ForAll(q=>Console.WriteLine(q));

并发异常

PLINQ 中的异常与 LINQ 中的异常处理不同。在 LINQ 中,查询是顺序执行的,因此如果发生异常,执行将被中止。然而,在 PLINQ 中,所有查询都是并行执行的。因此,PLINQ 以不同的方式处理异常。例如:

string[] arrStr = new string[]{"1","2","3","v", 
                                "1","2","3","'",
                                "1","2","3","12222222222" };
var query = from a in arrStr.AsParallel()
            select int.Parse(a);
query.ForAll(q=>Console.WriteLine(q));  

每次运行查询时,我们都会得到一个异常。但是,由于 PLINQ 查询是在多个线程上执行的,因此我们可能会得到不同的异常。我使用“可能”是因为这完全取决于 PLINQ 如何划分以及异常何时发生。上面的示例有两个解析错误和一个值过大错误。由于 PLINQ 执行多个线程,因此我可能只会得到两个输入错误,或者一个输入和一个值过大错误,或者可能全部都会出现。实际上,在后台,当任何线程发生异常时,PLINQ 会尝试尽快停止所有其他线程。这可能及时发生,也可能不发生。一旦所有线程都停止,PLINQ 会将所有异常聚合到一个新的 System.Threading.AggregateException 中并抛出它。这些异常作为内部异常以及它们的堆栈跟踪一起可用。我们可以查看内部异常并进行适当的处理。

在我的下一篇文章中,我将讨论命令式数据并行和命令式任务并行。

使用示例应用程序

我附上了一个示例应用程序,用于使用不同的输出处理来比较 LINQ 和 PLINQ 的执行时间。您可以通过查看 FileInformation 类来编辑查询并进行尝试。它不是非常健壮,但可以轻松比较 LINQ 和 PLINQ。

延伸阅读

© . All rights reserved.