65.9K
CodeProject 正在变化。 阅读更多。
Home

C# 中的并行计算

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.84/5 (63投票s)

2008 年 10 月 1 日

GPL3

11分钟阅读

viewsIcon

233903

downloadIcon

5791

本文将介绍如何使用纯 C# 实现并行计算。

引言

如今,双核 PC 越来越经济实惠,并逐渐成为标准。四核 PC 也越来越近,当然,具有更多 CPU/核心的 PC 也可用。由于有大量耗时的计算任务,这些任务的并行/分布式计算至关重要。因此,随着用户和开发人员获得多 CPU/核心的 PC,人们自然希望利用这些 PC 的所有计算能力,并加载所有核心以并行化耗时的计算。

本文将讨论在 C# 中并行计算的主题,并有效地将它们分布到系统中的所有可用核心上。我们将简要介绍 Microsoft 的并行计算库提供的功能,但文章的主要目的是讨论如何仅使用 .NET 框架的标准功能来实现并行,以及如何使其易于使用,以便对现有代码进行最少的更改即可支持并行。

Microsoft 的解决方案

众所周知,Microsoft 提供了一个 .NET Framework 3.5 的扩展,允许将并行计算分布到系统中的所有可用核心上。还有一个专用博客,其中提供有关该库的各种新闻和信息。

Microsoft 的库功能强大、易于使用,并提供了许多不同的功能,可以解决并行计算的各种任务。例如,让我们看看如何并行化下面的代码,该代码执行两个方阵的乘法。

void MatrixMultiplication( double[,] a, double[,] b, double[,] c )
{
    int s = a.GetLength( 0 );

    for ( int i = 0; i < s; i++ )
    {
        for ( int j = 0; j < s; j++ )
        {
            double v = 0;

            for ( int k = 0; k < s; k++ )
            {
                v += a[i, k] * b[k, j];
            }

            c[i, j] = v;
        }
    }
}

代码的并行版本将如下所示。

void ParalleledMatrixMultiplicationMS( double[,] a, double[,] b, double[,] c )
{
    int s = a.GetLength( 0 );

    System.Threading.Parallel.For( 0, s, delegate( int i )
    {
        for ( int j = 0; j < s; j++ )
        {
            double v = 0;

            for ( int k = 0; k < s; k++ )
            {
                v += a[i, k] * b[k, j];
            }

            c[i, j] = v;
        }
    } );
}

Microsoft 的解决方案效果很好,而且它们的库提供的功能远不止一个简单的 Parallel.For()。但是,有一些问题可能使此库对您的应用程序不太理想。

  • 并行计算扩展针对 .NET Framework 3.5。3.5 版本非常好,并提供了很多功能,但某些应用程序可能仍希望支持早期 .NET Framework 版本,例如 2.0,这使得它们无法使用此扩展。
  • 并行计算扩展尚未包含在标准的 .NET Framework 安装中,而是作为单独的组件提供,该组件也应安装在目标系统上。这可能会使您的产品分发有些复杂,需要您向用户提供特殊说明,说明所有依赖项,或者将扩展的安装合并到您产品的安装中。
  • Microsoft 提供的并行计算扩展旨在运行在 Windows 系统上。但是,如果您正在开发应也能在 Linux 上运行的跨平台应用程序,例如在 Mono 环境下,那么您将无法获得出色的并行化功能。尽管并行扩展很快就会出现在 Mono 中,但它们在当前版本 (1.9) 中尚未提供。

我不确定以上所有要点对您是否重要。我个人对支持 Mono 项目感兴趣,而且我还没有准备好将我所有的项目都切换到 .NET 3.5 版本。

因此,在本文中,我们将讨论自定义的 Parallel.For() 实现,该实现可以与早期 .NET 版本以及 Mono 一起使用,并且可以轻松地集成到任何项目中,因为它只是一个微小的 DLL 程序集。通过我们自己的实现,我们可以保持其使用的简单性,并获得良好的性能,而这种性能不会比 Microsoft 的并行扩展提供的性能差。注意:我们不会实现 Microsoft 提供的并行扩展库的完整模拟,而只会实现 Parallel.For(),它涵盖了大多数并行化任务。

如果您只对 Microsoft 的解决方案感兴趣,那么本文对那些愿意学习如何使用纯 C# 实现易于使用的并行方法的人仍然可能有趣。

使用代码

我们将首先介绍如何使用我们自定义的并行例程实现,然后将在下一节介绍实现细节。如上所述,我们的目标是提供类似 Microsoft 在其并行扩展库中提供的功能,并使其易于使用。因此,让我们实现一个 Parallel.For() 的变体,该变体由 Microsoft 提供。我们变体的唯一区别在于,我们只有一个方法的定义,该方法接受 for 循环的开始和停止索引以及作为委托的循环体。下面是我们的方阵乘法代码,但已使用我们的 Parallel.For() 实现进行并行化。

void ParalleledMatrixMultiplicationAForge( double[,] a, double[,] b, double[,] c )
{
    int s = a.GetLength( 0 );

    AForge.Parallel.For( 0, s, delegate( int i )
    {
        for ( int j = 0; j < s; j++ )
        {
            double v = 0;

            for ( int k = 0; k < s; k++ )
            {
                v += a[i, k] * b[k, j];
            }

            c[i, j] = v;
        }
    } );
}

因此,从上面的代码可以看出,使用我们的自定义并行实现与使用 Microsoft 的解决方案一样简单,唯一的区别在于定义 Parallel 类的命名空间名称。

// Microsoft's solution
System.Threading.Parallel.For( 0, s, delegate( int i )
...

// our implementation
AForge.Parallel.For( 0, s, delegate( int i )
...

是的,正如我已经提到的,Microsoft 提供了额外的 Parallel.For() 定义,它们不仅可以与委托一起使用,还可以与 lambda 表达式一起使用。但这只是一个额外的灵活性功能,是需要付出代价的。对我来说,委托已经足够了,而且它们不需要 .NET Framework 3.5。

实现细节

我们的 Parallel.For() 实现的完整代码可以在文章的附件中找到,但在这里,我们将只讨论主要思想,该思想集中在三个主要例程——初始化、作业调度和执行。

我们的并行实现将基于 System.Threading 命名空间中的常规类,这些类在任何版本的 .NET 中都可用——ThreadAutoResetEventManualResetEvent。要并行化 for 循环,我们需要创建一定数量的线程,这些线程将用于执行 for 循环体的迭代,但需要事件来指示线程可用性和作业可用性。默认情况下,我们创建的线程数等于系统中的核心数,但此值可以由用户配置,因此将使用更多(或更少)的线程来并行化循环。

// Initialize Parallel class's instance creating required number of threads
// and synchronization objects
private void Initialize( )
{
    // array of events, which signal about available job
    jobAvailable = new AutoResetEvent[threadsCount];
    // array of events, which signal about available thread
    threadIdle = new ManualResetEvent[threadsCount];
    // array of threads
    threads = new Thread[threadsCount];

    for ( int i = 0; i < threadsCount; i++ )
    {
        jobAvailable[i] = new AutoResetEvent( false );
        threadIdle[i]   = new ManualResetEvent( true );

        threads[i] = new Thread( new ParameterizedThreadStart( WorkerThread ) );
        threads[i].IsBackground = true;
        threads[i].Start( i );
    }
}

为每个线程创建两个事件的背后是什么想法?threadIdle 事件用于指示一个线程是否空闲(可用于某项作业),还是忙于执行某些计算。jobAvailable 事件用于向特定线程发出信号,表示它需要唤醒并执行一些工作。因此,在完成上述初始化后,所有 threadIdle 事件都设置为已触发状态,这意味着它们都处于空闲状态并可用于执行操作,而所有 jobAvailable 事件都设置为未触发状态,这意味着尚无工作可供线程处理。所有线程都在等待这些 jobAvailable 事件,一旦它们被设置为已触发状态,线程就会唤醒并开始执行作业。稍后我们将看到工作线程的函数,这样就能更清楚地了解线程如何获取作业。

现在,是时候看看作业是如何调度的了……

public static void For( int start, int stop, ForLoopBody loopBody  )
{
    lock ( sync )
    {
        // get instance of parallel computation manager
        Parallel instance = Instance;

        instance.currentIndex   = start - 1;
        instance.stopIndex      = stop;
        instance.loopBody       = loopBody;

        // signal about available job for all threads and mark them busy
        for ( int i = 0; i < threadsCount; i++ )
        {
            instance.threadIdle[i].Reset( );
            instance.jobAvailable[i].Set( );
        }

        // wait until all threads become idle
        for ( int i = 0; i < threadsCount; i++ )
        {
            instance.threadIdle[i].WaitOne( );
        }
    }
}

从上面的代码可以看出,作业调度非常简单——保存循环属性,将所有线程标记为忙碌(threadIdle[i].Reset()),向所有线程发出有作业可供它们处理的信号(jobAvailable[i].Set()),然后只需等待线程再次空闲。

最后一步是看看工作线程实际上是如何工作的……

// Worker thread performing parallel computations in loop
private void WorkerThread( object index )
{
    int threadIndex = (int) index;
    int localIndex = 0;

    while ( true )
    {
        // wait until there is job to do
        jobAvailable[threadIndex].WaitOne( );

        // exit on null body
        if ( loopBody == null )
            break;

        while ( true )
        {
            // get local index incrementing global loop's current index
            localIndex = Interlocked.Increment( ref currentIndex );

            if ( localIndex >= stopIndex )
                break;

            // run loop's body
            loopBody( localIndex );
        }

        // signal about thread availability
        threadIdle[threadIndex].Set( );
    }
}

因此,从上面的代码可以看出,所有工作线程只是无所事事地等待,直到有事可做,这由 jobAvailable 事件发出信号。一旦收到事件,线程就会开始工作——使用原子增量安全地接收它们需要处理的循环索引,然后仅在需要时执行循环体,直到整个循环计算完成。

整个实现非常简单,可以使用任何版本的 .NET Framework 完成,这正是我们一开始想要的。现在,是时候测试它并将其性能与 Microsoft 的解决方案进行比较了。

性能测试

我们将如何测试性能?嗯,我们将使用一种简单的技术,只需在一个循环中多次运行我们的例程,并检查我们花费了多少时间。测试代码看起来像这样。

// run specified number of tests
for ( int test = 0; test < tests; test++ )
{
    // test 1
    DateTime start = DateTime.Now;

    for ( int run = 0; run < runs; run++ )
    {
        MatrixMultiplication( a, b, c1 );
    }

    DateTime end = DateTime.Now;
    TimeSpan span = end - start;

    Console.Write( span.TotalMilliseconds.ToString( "F3" ) + "\t | " );
    test1time += span.TotalMilliseconds;
    
    // other tests
    ...

请注意,我们将运行多次测试迭代,因此最后我们还将获得平均性能。

// provide average performance
test1time /= tests;
test2time /= tests;
test3time /= tests;

Console.WriteLine( "------------------- AVG -------------------" );
Console.WriteLine(  test1time.ToString( "F3" ) + "\t | " +
                    test2time.ToString( "F3" ) + "\t | " +
                    test3time.ToString( "F3" ) + "\t | " );

所以,让我们运行它并查看我们的测试结果(以下测试是在 Intel Core 2 Duo CPU - 2.2 GHz 上进行的)。

Matrix size: 50, runs: 200
Starting test with 2 threads

Clear C# | AForge   | MS       |
156.250  | 109.37   | 218.750  |  
171.875  | 93.750   | 125.000  |  
156.250  | 109.375  | 109.375  |  
171.875  | 93.750   | 125.000  |  
156.250  | 93.750   | 125.000  |  
------------------- AVG --------
162.500  | 100.000  | 140.625  |
Matrix size: 100, runs: 100
Starting test with 2 threads

Clear C# | AForge    | MS        |
687.500  | 390.625   | 515.625   |  
718.750  | 390.625   | 406.250   |  
703.125  | 390.625   | 406.250   |  
687.500  | 390.625   | 406.250   |  
734.375  | 390.625   | 406.250   |  
------------------- AVG ----------
706.250  | 390.625   | 428.125   |
Matrix size: 250, runs: 40
Starting test with 2 threads

Clear C#     | AForge    | MS        |
4453.125     | 2484.375  | 2593.750  |  
4609.375     | 2500.000  | 2500.000  |  
4515.625     | 2484.375  | 2500.000  |  
4546.875     | 2484.375  | 2500.000  |  
4671.875     | 2500.000  | 2500.000  |  
------------------- AVG --------------
4559.375     | 2490.625  | 2518.750  |
Matrix size: 1000, runs: 10
Starting test with 2 threads

Clear C#     | AForge       | MS         |
133078.125   | 72406.250    | 72531.250  |  
134875.000   | 72718.750    | 72406.250  |  
135296.875   | 72578.125    | 72375.000  |  
135484.375   | 72531.250    | 75062.500  |  
136500.000   | 72515.625    | 72343.750  |  
------------------- AVG -------------------
135046.875   | 72550.000    | 72943.750  |

从提供的结果中,我们可以看到我们的实现看起来并不比 Microsoft 的解决方案差。是的,我们没有所有功能和灵活性,但我们满足了支持早期 .NET 版本和 Mono 的要求。从上述结果来看,我们的实现甚至表现得更好一些。

进一步分析我们的结果,我们可以看到 Microsoft 的解决方案在第一次运行时需要更多时间,这意味着它们执行更复杂的worker线程初始化,从而花费更多时间。

此外,从我们的结果来看,我们可以看到,减小矩阵大小会导致在工作量太小不适合并行化的情况下,性能提升不那么明显。但我们将在下一节讨论这一点。

在优化之前和之后进行性能分析。

在开始优化某些代码之前和之后立即进行某种形式的性能分析和测试是常见且良好的做法。性能测试将显示您从优化中获得了多少,以及是否获得任何东西。在许多情况下,您可能认为您正在优化代码,使其运行得更快,但实际上,您可能会遇到性能下降。并行化并非万能药,在某些情况下,您的并行化代码可能会运行得更慢。这可能是因为并行工作量非常小,而线程同步所花费的时间要多得多。为了演示这种效果,让我们看看并行化相同矩阵乘法的结果,但这次我们使用小矩阵。

Matrix size: 10, runs: 1000
Starting test with 2 threads

Clear C# | AForge    | MS        |
0.000    | 46.875    | 156.250   |  
15.625   | 15.625    | 46.875    |  
0.000    | 15.625    | 46.875    |  
0.000    | 15.625    | 46.875    |  
0.000    | 15.625    | 31.250    |  
------------------- AVG ----------
3.125    | 21.875    | 65.625    |

从上面的结果可以看出,不使用任何并行化来乘法小矩阵要快得多。并行化此类计算只会导致用于线程同步的所有附加例程花费的时间比实际有用工作的时间要多。因此,在决定使用哪种代码之前,请衡量性能。考虑到 Parallel.For() 语法与常规 for 循环语句的差异不大,因此更改几行代码来执行不同的测试应该不是问题。

结论

因此,可以看出,我们已经成功地实现了自己的小型但易于使用的 Parallel.For() 实现,它的性能相当不错,而且并不比 Microsoft 的并行扩展差多少。当然,Microsoft 提供了更多的灵活性和功能,但许多并行化任务可以通过并行化 for 循环来解决,这正是我们的目标,而且我们已经实现了。

我们在文章中没有讨论其他并行化任务,只讨论了矩阵乘法,因此可以研究更多样本以提供更清晰的结果。我个人已经在另一个处理图像处理和其他事务的项目中使用了这个 AForge.Parallel.For() 实现。图像处理作为可能使用耗时计算的领域之一,是众多可以成功利用并行计算的领域之一。该项目的初步测试已经显示了并行化的提升。

关注点

通过研究现有实现的最佳后台线程数量,可以对现有实现进行一些调整。当前实现默认创建的线程数等于系统中的核心数。但是,可以通过使用 AForge.Parallel.ThreadsCount 属性来更改这一点,该属性允许用户精确指定要创建的后台线程数。例如,如果我们看一下 Microsoft 的解决方案,我们可以发现他们在双核系统上创建了两个以上的线程。检查我系统上的任务管理器,我发现当调用其 Parallel.For() 时,创建了大约 10 个额外的线程。

作为提高并行计算任务性能的另一个方向,可以考虑利用 GPU。例如,NVidia 提供了 CUDA 库,可用于利用其 GPU 进行通用计算。查看 CUDA 网站,我们可以发现它已成功应用于许多不同的应用程序。

致谢

我想感谢 **Israel Lot** 对 AForge.Parallel 实现提供的宝贵意见和贡献。

AForge.NET

虽然文章提供了代码和演示应用程序,但 AForge.Parallel 将成为 AForge.NET framework 的一项新功能,并将发布在 2.0 版本中,届时该类将用于并行化框架的其他类。

© . All rights reserved.