65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 .NET TPL(任务并行库)释放多核机器的强大功能进行并行快速压缩

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.79/5 (63投票s)

2009 年 12 月 28 日

CPOL

13分钟阅读

viewsIcon

162796

downloadIcon

3740

并行快速压缩使用 TPL 实现对多核系统的完全利用。快速压缩以 nX 的速度压缩文件,其中 n = 机器中的处理器数量。

引言

如今,大多数应用程序、库和服务器都是为单核 CPU 机器设计的。这些软件系统在多核机器上运行并不会自动获得性能改进。考虑一个在单核机器上为千名客户端提供服务的 Web 服务器。此 Web 服务器仅对给定文件执行压缩操作并进行存储。切换到在多核机器上运行此 Web 服务器并不会简单地提高性能/吞吐量。要实现性能改进的唯一方法是编写针对多核的软件系统。不幸的是,编写能够利用多核处理器的软件系统并不容易。Microsoft .NET Framework 任务并行库 (TPL) 旨在简化这项工作。本文介绍了一个 FastCompress 库,它使用 TPL 在多核机器上实现令人印象深刻的性能改进。有多令人印象深刻?答案取决于您系统中的处理器数量。如果您有 4 个处理器,您将看到大约 4 倍(4 倍)的改进,8 个处理器则为 8 倍等。

对于 TPL 新手来说,以下是 MSDN(参考 1)的简要概述:

"TPL 是由 Microsoft® Research、Microsoft Common Language Runtime (CLR) 团队和并行计算平台团队协作创建的。TPL 是 Parallel FX 库的主要组成部分,该库是 Microsoft .NET Framework 的下一代并发支持。TPL 不需要任何语言扩展,并且与 .NET Framework 3.5 及更高版本兼容。

任务并行库 (TPL) 旨在使编写能够自动使用多个处理器的托管代码更加容易。使用该库,您可以方便地在现有顺序代码中表达潜在的并行性,其中公开的并行任务将在所有可用处理器上并发运行。这通常会带来显著的加速。"

下面的 Fast Compress 应用程序是在多核机器(4 个处理器)上测试的。下面捕获了使用 TPL 和不使用 TPL 的两种情况。您可以观察到,当使用 TPL 时,速度几乎比不使用 TPL 时快 4 倍。

为什么多核应用程序、库和服务器有用

回答这个问题的一个简单方法是:搜索“免费午餐结束了”。第一个结果将向您展示为什么需要对软件系统的并发性进行根本性的转变(参考 2)。主要的处理器制造商已无法进一步提高 CPU 性能(速度)。因此,硬件制造商正朝着多核硬件架构发展。这意味着什么?假设现在是 1998 年,我正在销售 App_Sweet,这是一个被数百万用户使用的应用程序。Intel 发布了新的双倍速度的 CPU。我所要做的就是,只需在新计算机(带有 2 倍速度的处理器)上运行我的 App_Sweet。我可以看到我的应用程序性能提升(即,免费午餐)。但现在,情况已经改变了。如果我在新的多核计算机上运行我的 App_Sweet,我可能不会看到多少性能提升。因为我需要为多核处理器设计/构建/编写我的应用程序(即,免费午餐结束了)。现在,如果我想满足用户(通过提高我的应用程序性能),我需要更改我的应用程序设计以释放底层多核架构的强大功能。有趣的是,性能增益可能接近 n 倍,其中 n 是处理器的数量。这种额外的计算能力可以转化为软件系统的性能、附加功能、生产力提高、可用资源使用效率等。

简而言之,如果您有一个企业需要运营并雇佣了 10 名员工,您会将工作委托给所有 10 名员工还是只委托给一名?如果您的答案是您会将工作委托给所有 10 名员工,那么您就会喜欢使用 TPL。

使用代码

必备组件

步骤 1:仅运行,您需要从此处安装 .NET Framework 4 Beta 2http://www.microsoft.com/downloads/details.aspx?familyid=DED875C8-FE5E-4CC9-B973-2171B61FE982&displaylang=en

步骤 2:要编译和运行,您需要从此处安装 Visual Studio 2010 Beta 2:http://msdn.microsoft.com/en-us/vstudio/dd582936.aspx

步骤 3:要看到性能差异的演示,除了上述步骤(步骤 1 和 2)之外,您还需要一台多核机器。

查看快速演示

  1. 下载 QuickDemo.zip 文件。
  2. 将文件解压缩到一个目录(文件夹)。
  3. 运行可执行文件 TestFastCompress.exe
  4. 通过单击“浏览”按钮选择一个文件(选择一个大于几 MB 的较大文件)进行压缩。
  5. 选择“不使用 TPL”选项。然后,单击“压缩”按钮。记下“操作结果”部分中的性能(以毫秒为单位)。
  6. 选择“使用多核提高性能”选项。然后,单击“压缩”按钮。记下“操作结果”部分中的性能(以毫秒为单位)。

根据您机器中的处理器数量,您应该会看到速度提升。通常,速度提升接近 n 倍,其中 n 是您机器中的处理器数量。

故障排除提示:如果从本文下载的测试应用程序(快速演示)无法运行或抛出异常,您可能安装了 .NET Framework 4 Beta 1。请从此链接安装 .NET Framework 4 Beta 2:http://www.microsoft.com/downloads/details.aspx?familyid=DED875C8-FE5E-4CC9-B973-2171B61FE982&displaylang=en

要在您的应用程序中使用 FastCompress 库:在您的应用程序中使用 FastCompress 库很简单。您只需将库(程序集)添加为项目中的引用。然后,开始使用该库。

如何做到这一点

  1. 在 Visual Studio 2010 中,选择“解决方案资源管理器”。
  2. 右键单击“引用”以选择“添加引用…”菜单选项。浏览以选择 FastCompress.dll。然后,单击“确定”将此引用添加到您的项目中。
  3. 使用下面的示例代码
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using BK.Util;
     
    namespace TestFastCompress
    {
    class Program
    {
        static void Main(string[] args)
        {
           FastCompress.doNotUseTPL = true;
           FastCompress.compressStrictSeqential = false;
           System.Console.WriteLine("Time taken for Seq compression = {0}", 
             FastCompress.CompressFast(@"Z:\File1.test.seq", 
             @"Z:\File1.test", true));
           System.Console.WriteLine("Time taken for Seq Un compression = {0}", 
             FastCompress.UncompressFast(@"Z:\File1.test.orgSeq", 
             @"Z:\File1.test.seq", true));
         
           FastCompress.doNotUseTPL = false;
           FastCompress.compressStrictSeqential = false;
           System.Console.WriteLine("Time taken for Parallel " + 
             "compression = {0}", 
             FastCompress.CompressFast(@"Z:\File1.test.pll", 
             @"Z:\File1.test", true));
           System.Console.WriteLine("Time taken for Parallel Un compression = {0}", 
             FastCompress.UncompressFast(@"Z:\File1.test.orgpll", 
             @"Z:\File1.test.pll", true));
        }
    }
    }

性能结果

用于此性能测试的机器的基本配置是

FastCompressionBK/BasicSysCfg.jpg

下面显示了 FastCompress 库的性能结果。X 轴是压缩文件的大小(以兆字节 MB 为单位),Y 轴是压缩所需的时间(以毫秒 ms 为单位)。您可以观察到,使用 TPL 的压缩速度接近快 4 倍。该测试是在一台 4 处理器机器上进行的。

FastCompressionBK/Perf.jpg

FastCompress 库(使用 TPL)的解释

多处理器机器正变得标准化,因为单处理器的时钟速度提升几乎已经停止。为了提高性能,软件组件需要被设计/架构以利用多核架构。例如,考虑一个压缩实用程序应用程序。在 4 处理器机器和一台处理器机器上运行压缩实用程序不会有太大区别。要提高 4 处理器机器上压缩实用程序的性能,需要重新设计压缩实用程序。重新设计应考虑在多个处理器之间分配工作。这似乎是一项简单的任务。高效的动态工作分配给所有可用处理器是一项棘手的任务。并行化所有内容在某些情况下可能会降低性能。此外,如果不考虑未来的业务动态,扩展并行设计可能会带来复杂的问题并导致随机崩溃。随机崩溃非常难以调试,尤其是在并行编程环境中。

FastCompress 库基于一个简单的设计来并行化压缩。如下图所示,FastCompress 将文件(要压缩的文件)切片以创建一个批次,供处理器独立操作(压缩)。这些切片被分配给每个处理器。这是通过 .NET Framework 4 任务并行库 (TPL) 实现的。请注意,通过使用 Task.Factory.StartNew,我们向 TPL 表达了我们希望并行运行任务的意愿。TPL 会自动扩展到主计算机中的处理器数量。如果主计算机有 8 个处理器,则很可能前 8 个切片将被用于在 8 个处理器上并行执行。这显然会带来大约 8 倍的性能提升。

FastCompressionBK/LibDesign.jpg

下面的代码用于执行并行压缩

while(0 != (read = sourceStream.Read(bufferRead, 0, sliceBytes)))
{
   tasks[taskCounter] = Task.Factory.StartNew(() => 
     CompressStreamP(bufferRead, read, taskCounter, 
     ref listOfMemStream, eventSignal)); // Line 1
   eventSignal.WaitOne(-1);           // Line 2
   taskCounter++;                     // Line 3
   bufferRead = new byte[sliceBytes]; // Line 4
}

Task.WaitAll(tasks);                  // Line 6

在上面的代码中,第一行的 while 循环一次读取 sliceBytes(1 MB)。第二行调用 TPL 方法以触发并行活动,即异步调用 CompressStreamP 方法。在 CompressStreamP 方法完成之前,下一行中的代码将被执行。这里的并行度有点过高。我们需要在这里进行调整。例如,假设我们没有第二行。在第一次迭代中,taskCounter 的值为 0。但在调用 CompressStreamP 之前,taskCounter 可能会增加到 1(第 3 行)。因此,第一次调用 CompressStreamP 可能以 taskCounter = 1 的值结束。详细说明:CompressStramPtaskCounter 参数是按值传递的。但在它被复制到堆栈(推送到堆栈以调用方法)之前,由于第 3 行的增量运算符,该值可能会发生变化。为了避免这种情况,我们有第 2 行,它会等待堆栈复制(堆栈推送和弹出以精确说明)完成。第 3 行增加以传递文件中的下一个 1 MB 切片。第 4 行创建一个新的缓冲区来复制下一个 1 MB 切片。在为系统中可用的处理器安排完所有切片后,第 6 行会等待所有任务(压缩任务)完成。

所有任务完成后,下一步是将它们合并到一个压缩文件中。正如许多人可能猜到的那样,简单的合并是行不通的。这是因为压缩的切片大小可能不同,也就是说,每个 1 MB 的切片不会压缩到 10 KB。这些大小可能会有所不同。在解压缩时,我们需要精确地解压缩与压缩时相同的切片。然后,合并所有这些切片将得到原始文件。如下图所示,我们将压缩的切片合并为文件大小、压缩流、文件大小、压缩流…… 这也有助于我们以后以并行方式解压缩文件。

FastCompressionBK/LibMergeDesign.jpg

执行此任务的代码如下

for (taskCounter = 0; taskCounter < tasks.Length; taskCounter++)
{
   byte[] lengthToStore = 
     GetBytesToStore((int)listOfMemStream[taskCounter].Length);
 
   targetStream.Write(lengthToStore, 0, lengthToStore.Length);
   byte[] compressedBytes = listOfMemStream[taskCounter].ToArray();
   listOfMemStream[taskCounter].Close();
  listOfMemStream[taskCounter] = null;
  targetStream.Write(compressedBytes, 0, compressedBytes.Length);
}

等待所有任务完成后,将合并压缩流。在 for 循环中,每个流被转换为字节数组并写入目标文件流。然后关闭并释放内存流。

使用并行性时避免陷阱

  • 并行性和 GUI:在 UI 线程中执行的任何密集任务都会阻塞用户界面的响应能力。通常,这意味着此类代码会被卸载到后台线程或工作线程。这种卸载有助于保持 GUI 的响应能力。当后台线程(或工作线程)完成任务时,它会将结果推回 GUI。要记住的重要一点是,其他线程和 GUI 之间的调用必须通过 GUI 线程的上下文进行。也就是说,在后台(或工作)线程的上下文中对 GUI 进行的任何调用都将导致崩溃、状态损坏甚至死锁。然后您将需要 WinDBG SOS 来解决死锁。为避免这种情况,后台线程(或任何工作线程)需要通过“invoke”方法向 GUI 线程发送消息。
  • 共享内存:并行执行的主要原因是速度。为了提高速度,请避免使用共享内存/数据。这并非完全不可避免。将共享内存的使用量降至最低可获得更好的结果。 wherever 共享内存已使用,请使用适当的锁。只读内存可以共享,无需加锁。
  • 过度并行化:许多新开发人员认为并行总是更快。虽然这是正确的,但需要考虑“边际收益递减定律”。计算能力受系统 CPU 数量的限制。创建过多的线程最初会带来快速的执行时间。但一旦超过某个界限,大部分周期将用于上下文切换而不是执行当前任务。设计师在开发高效的应用程序或服务器时需要划定这条界限。
  • 线程安全和线程不安全方法:避免调用线程不安全的方法。因为对这些方法的调用需要使用锁。这些锁通常效率不高,因为整个执行路径被锁定,而不是仅锁定需要锁的代码部分。此外,限制对线程安全方法的调用是个好主意。考虑调用线程安全方法 1000 次,而不是调用一次线程安全方法,并将一个包含 1000 个值的数组传递给它。做出关于线程安全调用的明智选择可以带来出色的性能提升。
  • COM 互操作:使用 COM 组件时,请注意各种线程模型。大多数 Windows API 都可以在 .NET Framework 堆栈中找到。但是,直到今天,并非所有原生 SDK 或 COM 库都可以在 .NET 中使用。在构建面向性能的应用程序/服务器时,了解使用 COM /原生 Win32 API 的幕后影响很重要。

关注点

  1. 通过将任务分配给机器中的所有可用处理器来实现快速压缩。这显著提高了压缩速度。此外,它还可以有效地利用可用的机器资源,为用户带来好处。
  2. 在启动新任务后,AutoResetEvent 对于确保将参数正确复制到函数堆栈中至关重要。这是为了确保方法调用的推送和弹出操作具有正确的值。
  3. 设计保持简单,以避免使用任何同步(除了第 2 点中提到的信号)来提高性能。将 ArrayList 传递给并行压缩方法,抑制了使用并行数据结构的诱惑。
  4. 在多核机器上,FastCompress 的压缩和解压缩速度将比 GZipStream(.NET Framework 4)更快。
  5. Fast Compress 的性能与主机机器中的处理器数量成正比。
  6. 性能指标(如上述性能部分所述)表明,使用 TPL 实现了显著的性能提升。
  7. 通过运行提供的 FastCompression 示例应用程序,也可以验证 FastCompress 的解压缩速度也大约是 n 倍。
  8. FastCompress 库是多线程安全的。
  9. 如果使用不同的压缩哈希值来压缩同一个文件,并行压缩会效率低下(导致文件更大)。最好的方法是使用一个压缩哈希,但将压缩负载分布到不同的处理器。
  10. 为了优化代码,KevinAG 建议的一个很好的替代方案(2)如下:
while(0 != (read = sourceStream.Read(bufferRead, 0, sliceBytes)))
{
   int counterCopy = taskCounter;  // Line 1, the value to be captured and used by the closure.
   tasks[counterCopy] = Task.Factory.StartNew(() =>
     CompressStreamP(bufferRead, read, counterCopy,
     ref listOfMemStream)); // Line 2
   taskCounter++;                     // Line 3
   bufferRead = new byte[sliceBytes]; // Line 4
}
 
Task.WaitAll(tasks);

参考文献

  1. 优化多核计算机的托管代码 -- http://msdn.microsoft.com/en-us/magazine/cc163340.aspx
  2. 免费午餐结束了:软件并发性的根本转变 - http://www.gotw.ca/publications/concurrency-ddj.htm
  3. GZipStream MSDN 参考 - http://msdn.microsoft.com/en-us/library/system.io.compression.gzipstream.aspx
  4. .NET Framework 中的并行编程 -- http://msdn.microsoft.com/en-us/library/dd460693(VS.100).aspx

历史

© . All rights reserved.