65.9K
CodeProject 正在变化。 阅读更多。
Home

并行计算和数据并行

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (13投票s)

2010年2月2日

CPOL

14分钟阅读

viewsIcon

39243

本文聚焦于基于多核处理器技术的数据并行性。

摘要

微处理器发展趋势正在避免提高时钟速度,而是专注于具有独立执行单元的多核。线程或其他多处理技术将需要多线程实践。微处理器编程和开发的未来可能就是标准化多核技术。多核通常是指两个或两个以上CPU在同一芯片上协同工作。多核技术是一种架构,其中单个物理处理器包含两个或两个以上处理器的核心逻辑。这些处理器被封装在一个单一的集成电路(IC)中。这些单一的集成电路被称为一个die。多核也可以指多个die封装在一起。多核能够使系统执行更多任务,并提高整体系统性能。多核技术可用于台式机、移动PC、服务器和工作站。与双核(在同一IC中包含两个独立处理器(执行核心)的单芯片)形成对比。

在这次并行计算运动中,开发人员不得不研究各种方法,将顺序算法分解成适合并行执行的子问题。这可能会让人感到困惑,因为在文档中,并发和数据并行这两个术语可以互换使用。在编写用于并发的算法时,开发人员需要做出的最重要的设计选择是如何将原始问题划分为独立的子部分。有三种主要方法可以解决这个设计问题:数据并行、任务并行和消息并行。本文旨在重点介绍第一种方法:数据并行。

为了运行示例代码,用户必须安装Microsoft Visual Studio 2005或Visual Studio 2008(尽管Visual Studio 2010由于包含了并行PF扩展,更适合并行计算)。库的选择将是Intel Threading Building Blocks性能库(TBB)。这意味着读者必须下载Intel C++ Compiler for Windows的评估版,安装它以实现与Microsoft Visual Studio的集成,然后安装Intel TBB性能库。TBB的组件定义在tbb命名空间中。我选择Intel TBB的原因是Microsoft和Intel合作研究这些核心的最佳用法,为Windows 7提供更强大的平台。如果他们是使多核技术趋于完善的公司之一,那么我们将使用他们的一个线程库来学习并行计算。因为本文关注的是隐式线程,所以我们也会研究OpenMP编译器指令。当然,最后我们会简要介绍Microsoft用于Visual Studio 2010和.NET 4的并行计算机制。

关于安装Intel TBB的快速说明

对于Windows系统,您必须按以下方式修改Visual Studio项目构建配置(debug,release):在C++属性的“常规”选项中,添加包含目录$(TBB22_INSTALL_DIR)\include。在链接器属性的“常规”选项中,添加此附加库目录:$(TBB22_INSTALL_DIR)\ia32\vc8\lib。在“输入”选项中,添加附加依赖项:tbb_debug.libtbb.lib

什么是数据并行?

数据并行利用了某个操作的输入数据作为划分成小块的手段。数据被分配给可用的处理器以实现并行。这个划分步骤之后通常会复制并在这些划分上执行一些大多独立的程序操作。通常,应用于数据集元素的操作是并发的。考虑一个简单的循环并行。当使用数据并行时,首先要考虑的是如何将迭代空间分解为独立的工作单元。这就是为什么本文将重点介绍Intel Threading Building Blocks性能库,通过利用多核技术来提高应用程序开发的性能。因此,它将通过检查Intel TBB提供的用于循环并行的一些通用算法模板来触及数据并行。读者应该注意到,程序的每个部分并非都由循环组成。但对于可以使用的程序部分,这应该是首选。但是,Intel TBB到底是什么?

简而言之,Intel TBB是一个标准的C++模板库(C++的STD库),用于循环级别的并行,它专注于定义任务,而不是显式线程(如POSIX线程)。通用并行算法、并发容器、底层同步原语和任务调度器构成了Intel TBB。使用TBB的开发人员可以通过将迭代块视为任务来并行执行循环迭代,并允许TBB任务调度器确定任务大小、使用的线程数、任务分配给这些线程以及如何调度这些线程执行。这种数据并行方法提高了可伸缩性。并行度的上限通常要大得多,因为循环计数通常很大,并且取决于需要操作的数据的动态大小。程序需要操作的数据量通常会随着时间的推移而增长,虽然处理器时钟速度已开始放缓,但磁盘使用量的增长并未放缓。数据并行程序中数据大小的增长转化为更多的并行机会,这些机会可以扩展以使用可用的多个处理器。这就是为什么Intel TBB性能库在开发可伸缩应用程序方面发挥重要作用的原因。

Intel TBB是一个性能库,它旨在通过利用多核处理器来实现可伸缩性和数据并行。该库是一种线程库,根据定义,它是线程安全的——它使用基于ISO C++标准模板库的迭代器和算法。Intel TBB还定义了哈希表、向量和队列的并发容器。C++ STL容器不是线程安全的。TBB容器设计用于安全使用,多个线程尝试并发访问容器。您不仅可以在与TBB并行算法结合使用这些容器,还可以在使用其他线程库实现的并发代码中使用它们。

学习Intel Threading Building Blocks涉及理解递归、任务窃取和算法模板。算法模板在TBB中似乎起着最重要的作用。TBB提供了以下类型的通用并行算法用于循环并行:parallel_for和parallel_reduce算法模板以及parallel_scan算法模板。后者被称为scan,它是一个模板函数,用于并行计算前缀计算(y[i] = y[i-1]op(x[i]))。话虽如此,现在让我们考虑TBB的parallel_for模板,从一个简单的循环示例开始。假设您想对数组的每个元素应用一个函数Foo,并且可以安全地并发处理每个元素。下面是执行此操作的顺序代码。下面是原始循环代码

void SerialApplyFoo( float a[], size_t n ) {
  for( size_t i=0;  i< n; ++i )
    Foo(a[i]);

这里的迭代空间类型为size_t,从0到n-1。size_t类型对应于语言运算符sizeof返回的整型数据类型,并在cstring头文件(以及其他头文件)中定义为无符号整型。模板函数tbb::parallel_for将此迭代空间分解为块,并在每个单独的线程上运行每个块。并行化此循环的第一步是将循环体转换为处理块的形式。这种形式是标准模板库(STL)风格的函数对象,称为主体对象,其中operator( )处理一个块。下面显示的类声明了主体对象。

#include "tbb/blocked_range.h"
class ApplyFoo {
float *const my_a;
public:
    void operator( )( const blocked_range& r ) const {
   float *a = my_a;
   for( size_t i=r.begin(); i!=r.end( ); ++i )
 Foo(a[i]);
      }
  ApplyFoo( float a[] ) :
  my_a(a)
     {}
};

注意operator( )的迭代空间参数。blocked_range<t>是库提供的模板类。它描述了类型为T的一维迭代空间。parallel_for类也与其他类型的迭代空间一起工作。库提供了blocked_range2d用于二维空间。ApplyFoo实例需要成员字段来记住原始循环外部但内部使用的所有局部变量。通常,主体对象的构造函数会初始化这些字段,尽管parallel_for不会。

parallel_for模板并行化包含在for循环中的任务。该模板需要两个参数:用于迭代的范围类型,以及执行范围或子范围迭代的主体类型。范围类必须定义复制构造函数和析构函数,以及is_empty()(如果范围为空则返回TRUE)和is_divisible()(如果范围可以分割则返回TRUE)方法,以及一个分割构造函数(将范围分成两半)。可以使用分区器类对象来启发式地查找应分配的最小迭代次数。同样,Intel TBB库提供了两种预定义的范围类型:blocked_rangeblocked_range2D。主体类必须同时定义复制构造函数和析构函数以及operator()operator()将包含原始串行循环的副本,该副本已修改为在来自范围类型的子范围值上运行。

下一个涉及循环并行的模板是parallel_reduce模板。由于两个模板都提供了一个固定数量的独立循环迭代的负载均衡并行执行,parallel_reduce将遍历一个范围并合并由每个任务需求计算出的部分结果,就像parallel_for一样。主体类型需要一个分割构造函数和一个join方法。主体中的分割构造函数复制运行循环体所需的只读数据,并分配约简操作的标识元素来初始化约简变量。join方法根据使用的约简操作组合任务的部分结果。考虑这个使用parallel_reduce算法启动线程并计算数值积分的例子。任务调度器将迭代范围分解成更小的块。循环迭代的块被视为可以由线程执行的独立任务。一旦在每个任务中计算出的partialHeight变量通过Join方法相加,最终高度之和就乘以宽度,并打印出计算出的PI近似值。

#include <stdio.h>
#include "tbb/parallel_reduce.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/blocked_range.h"
#include "tbb/partitioner.h"

using namespace std;
using namespace tbb;
static long num_rects = 100000;
class MyPi {
    double *const my_rects;
public:
    double partialHeight;
    MyPi(double *const width) : my_rects(width), partialHeight(0) {}
    MyPi(MyPi& x, split) : my_rects(x.my_rects), partialHeight(0) {}

    void operator()(const blocked_range<size_t>& r) {
        double rectangleWidth = *my_rects;
        double x;
        for (size_t i = r.begin(); i != r.end(); ++i) {
            x = (i + 0.5) * rectangleWidth;
            partialHeight += 4.0/(1.+ x*x);
        }
    }

    void join(const MyPi& y) {partialHeight += y.partialHeight;}
};

int main(int argc, char* argv[])
{
    double area;
    double width = 1./(double)num_rects;
    MyPi my_block((double *const)&width);
    task_scheduler_init init;
    parallel_reduce(blocked_range<size_t>(0,num_rects), 
                    my_block, auto_partitioner());
    area = my_block.partialHeight * width;
    printf("The value of PI is %f\n",area);
    return 0;
}

要在命令行上编译此代码,您必须设置环境变量

set PATH=%PATH%;.;C:\Program Files\Intel\Compiler\11.1\054\bin\ia32.

如果您在默认目录中编译此文件,那么您只需执行iclvars_ia32.bat命令。使用/MD开关:icl.exe /MD ThePi.cpp tbb_debug.lib。成功编译的输出是:PI的值为3.141593。

下一个示例直接来自Intel TBB示例目录,也使用了parallel_reduce算法。您可以从http://www.threadingbuildingblocks.org/下载此类示例和其他示例。

/
/ Example program that computes number of prime numbers up to n, 
// where n is a command line argument. The algorithm here is a 
// fairly efficient version of the sieve of Eratosthenes. 
// The parallel version demonstrates how to use parallel_reduce,
// and in particular how to exploit lazy splitting.

#include <cassert>
#include <cstdio>
#include <math.h>
#include <cstdlib>
#include <cstring>
#include <cctype>
#include "tbb/parallel_reduce.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tick_count.h"
using namespace std;
using namespace tbb;

typedef unsigned long Number;

//! If true, then print primes on stdout.
static bool PrintPrimes = false;

//! Grainsize parameter
static Number GrainSize = 1000;

class Multiples {
    inline Number strike( Number start, Number limit, Number stride ) {
        // Hoist "my_is_composite" into register for sake of speed.
        bool* is_composite = my_is_composite;
        assert( stride>=2 );
        for( ;start<limit; start+=stride ) 
            is_composite[start] = true;
        return start;
    }
    //! Window into conceptual sieve 
    bool* my_is_composite;

    //! Indexes into window
    /** my_striker[k] is an index into my_composite corresponding to
        an odd multiple multiple of my_factor[k]. */
    Number* my_striker;

    //! Prime numbers less than m.
    Number* my_factor;
public:
    //! Number of factors in my_factor.
    Number n_factor;
    Number m;
    Multiples( Number n ) : 
        is_forked_copy(false) 
    {
        m = Number(sqrt(double(n)));
        // Round up to even
        m += m&1;
        my_is_composite = new bool[m/2];
        my_striker = new Number[m/2];
        my_factor = new Number[m/2];
        n_factor = 0;
        memset( my_is_composite, 0, m/2 );
        for( Number i=3; i<m; i+=2 ) {
            if( !my_is_composite[i/2] ) {
                if( PrintPrimes )
                    printf("%d\n",(int)i);
                my_striker[n_factor] = strike( i/2, m/2, i );
                my_factor[n_factor++] = i;
            }
        }
    }

    //! Find primes in range [start,window_size), advancing my_striker as we go.
    /** Returns number of primes found. */
    Number find_primes_in_window( Number start, Number window_size ) {
        bool* is_composite = my_is_composite;
        memset( is_composite, 0, window_size/2 );
        for( size_t k=0; k<n_factor; ++k )
            my_striker[k] = strike( my_striker[k]-m/2, window_size/2, my_factor[k] );
        Number count = 0;
        for( Number k=0; k<window_size/2; ++k ) {
            if( !is_composite[k] ) {
                if( PrintPrimes )
                    printf("%ld\n",long(start+2*k+1));
                ++count;
            }
        }
        return count;
    }

    ~Multiples() {
        if( !is_forked_copy )
            delete[] my_factor;
        delete[] my_striker;
        delete[] my_is_composite;
    }

    //! True if this instance was forked from another instance.
    const bool is_forked_copy;

    Multiples( const Multiples& f, split ) :
        n_factor(f.n_factor),
        m(f.m),
        my_is_composite(NULL),
        my_striker(NULL),
        my_factor(f.my_factor),
        is_forked_copy(true)
    {}

    bool is_initialized() const {
        return my_is_composite!=NULL;
    }

    void initialize( Number start ) { 
        assert( start>=1 );
        my_is_composite = new bool[m/2];
        my_striker = new Number[m/2];
        for( size_t k=0; k<n_factor; ++k ) {
            Number f = my_factor[k];
            Number p = (start-1)/f*f % m;
            my_striker[k] = (p&1 ? p+2*f : p+f)/2;
            assert( m/2<=my_striker[k] );
        }
    }
};

//! Count number of primes between 0 and n
/** This is the serial version. */
Number SerialCountPrimes( Number n ) {
    // Two is special case
    Number count = n>=2;
    if( n>=3 ) {
        Multiples multiples(n);
        count += multiples.n_factor;
        if( PrintPrimes ) 
            printf("---\n");
        Number window_size = multiples.m;
        for( Number j=multiples.m; j<=n; j+=window_size ) { 
            if( j+window_size>n+1 ) 
                window_size = n+1-j;
            count += multiples.find_primes_in_window( j, window_size );
        }
    }
    return count;
}

//! Range of a sieve window.
class SieveRange {
    //! Width of full-size window into sieve.
    const Number my_stride;

    //! Always multiple of my_stride
    Number my_begin;

    //! One past last number in window.
    Number my_end;

    //! Width above which it is worth forking.
    const Number my_grainsize;

    bool assert_okay() const {
        assert( my_begin%my_stride==0 );
        assert( my_begin<=my_end );
        assert( my_stride<=my_grainsize );
        return true;
    } 
public:
    ---
    bool is_divisible() const {return my_end-my_begin>my_grainsize;}
    bool empty() const {return my_end<=my_begin;}
    SieveRange( SieveRange& r, split ) : 
        my_stride(r.my_stride), 
        my_grainsize(r.my_grainsize),
        my_end(r.my_end)
    {
        assert( r.is_divisible() );
        assert( r.assert_okay() );
        Number middle = r.my_begin + (r.my_end-r.my_begin+r.my_stride-1)/2;
        middle = middle/my_stride*my_stride;
        my_begin = middle;
        r.my_end = middle;
        assert( assert_okay() );
        assert( r.assert_okay() );
    }

    Number begin() const {return my_begin;}
    Number end() const {return my_end;}
    SieveRange( Number begin, Number end, Number stride, Number grainsize ) :
        my_begin(begin),
        my_end(end),
        my_stride(stride),      
        my_grainsize(grainsize<stride?stride:grainsize)
    {
        assert( assert_okay() );
    }
};

//! Loop body for parallel_reduce.
/** parallel_reduce splits the sieve into subsieves.
    Each subsieve handles a subrange of [0..n]. */
class Sieve {
public:
    //! Prime multiples to consider, and working storage for this subsieve.
    Multiples multiples;

    //! Number of primes found so far by this subsieve.
    Number count;

    //! Construct Sieve for counting primes in [0..n].
    Sieve( Number n ) : 
        multiples(n),
        count(0)
    {}

    
    void operator()( const SieveRange& r ) {
        Number m = multiples.m;
        if( multiples.is_initialized() ) { 
            // Simply reuse "multiples" structure from previous window
            // This works because parallel_reduce always applies
            // *this from left to right.
        } else {
            // Need to initialize "multiples" because *this is a forked copy
            // that needs to be set up to start at r.begin().
            multiples.initialize( r.begin() );
        }
        Number window_size = m;
        for( Number j=r.begin(); j<r.end(); j+=window_size ) { 
            assert( j%multiples.m==0 );
            if( j+window_size>r.end() ) 
                window_size = r.end()-j;
            count += multiples.find_primes_in_window( j, window_size );
        }
    }
    void join( Sieve& other ) {
        count += other.count;
    }
    Sieve( Sieve& other, split ) : 
        multiples(other.multiples,split()),
        count(0)
    {}
    // End of signatures required by parallel_reduce
    
};

//! Count number of primes between 0 and n
/** This is the parallel version. */
Number ParallelCountPrimes( Number n ) {
    // Two is special case
    Number count = n>=2;
    if( n>=3 ) {
        Sieve s(n);
        count += s.multiples.n_factor;
        if( PrintPrimes ) 
            printf("---\n");
        // Explicit grain size and simple_partitioner()
        // used here instead of automatic grainsize 
        // determination becase we want SieveRange
        // to be decomposed down to GrainSize or smaller.
        // Doing so improves odds that the working
        // set fits in cache when evaluating Sieve::operator().
        parallel_reduce( SieveRange( s.multiples.m, n, s.multiples.m, GrainSize ), 
                         s, simple_partitioner() );
        count += s.count;
    }
    return count;
}

//! A closed range of Number.
struct NumberRange {
    Number low;
    Number high;
    void set_from_string( const char* s );
    NumberRange( Number low_, Number high_ ) : low(low_), high(high_) {}
};

void NumberRange::set_from_string( const char* s ) {
    char* end;
    high = low = strtol(s,&end,0);
    switch( *end ) {
    case ':': 
        high = strtol(end+1,0,0); 
        break;
    case '\0':
        break;
    default:
        printf("unexpected character = %c\n",*end);
    }
}

//! Number of threads to use.
static NumberRange NThread(0,4);

//! If true, then at end wait for user to hit return
static bool PauseFlag = false;

//! Parse the command line.
static Number ParseCommandLine( int argc, char* argv[] ) {
    Number n = 100000000;
    int i = 1;
    if( i<argc && strcmp( argv[i], "pause" )==0 ) {
        PauseFlag = true;
        ++i;
    }
    if( i<argc && !isdigit(argv[i][0]) ) { 
        // Command line is garbled.
        fprintf(stderr,"Usage: %s [['pause'] n [nthread [grainsize]]]\n", argv[0]);
        fprintf(stderr,"where n is a positive integer [%lu]\n",n);
        fprintf(stderr,"      nthread is a non-negative integer, " 
                       "or range of the form low:high [%ld:%lu]\n", 
                       NThread.low,NThread.high);
        fprintf(stderr,"      grainsize is an optional " 
                "postive integer [%lu]\n",GrainSize);
        exit(1);
    }
    if( i<argc )
        n = strtol(argv[i++],0,0);
    if( i<argc )
        NThread.set_from_string(argv[i++]);
    if( i<argc )
        GrainSize = strtol(argv[i++],0,0);
    return n;
}

static void WaitForUser() {
    char c;
    printf("Press return to continue\n");
    do {
        c = getchar();
    } while( c!='\n' );
}

int main( int argc, char* argv[] ) {
    Number n = ParseCommandLine(argc,argv);

    // Try different numbers of threads
    for( Number p=NThread.low; p<=NThread.high; ++p ) {
        task_scheduler_init init(task_scheduler_init::deferred);
        // If p!=0, we are doing a parallel run
        if( p ) 
            init.initialize(p);

        Number count;
        tick_count t0 = tick_count::now();
        if( p==0 ) {
            count = SerialCountPrimes(n);
        } else {
            count = ParallelCountPrimes(n);
        }
        tick_count t1 = tick_count::now();

        printf("#primes from [2..%lu] = %lu (%.2f sec with ",
            (unsigned long)n, (unsigned long)count, (t1-t0).seconds());
        if( p ) 
            printf("%lu-way parallelism)\n", p );
        else 
            printf("serial code)\n");
    }
    if( PauseFlag ) {
        WaitForUser();
    }
    return 0;
}

此代码的输出很有趣,因为它显示了循环并行如何提高时序因素。转到C:\Program Files\Intel\Compilers\11.1\054\bin\ia32目录并键入iclvars_ia32.bat。然后,使用Intel C++编译器。

#primes from [2..100000000] = 5761455 (0.29 sec with serial code)
#primes from [2..100000000] = 5761455 (0.30 sec with 1-way parallelism)
#primes from [2..100000000] = 5761455 (0.16 sec with 2-way parallelism)
#primes from [2..100000000] = 5761455 (0.16 sec with 3-way parallelism)
#primes from [2..100000000] = 5761455 (0.16 sec with 4-way parallelism)

时间改进不到50%。

查看C:\Program Files\Intel\TBB\v2.2\examples\Getting Started\sub_string_finder\目录

有三个sub_string_finder示例使用parallel_for算法模板:sub_string_finder.cppsub_string_finder_pretty.cppsub_string_finder_extended.cpp。让我们看看sub_string_finder_extended.cpp

#include <iostream>
#include <string>
#include <algorithm>

#include "tbb/parallel_for.h"
#include "tbb/blocked_range.h"
#include "tbb/tick_count.h"

using namespace tbb;
using namespace std;
static const size_t N = 22;

void SerialSubStringFinder ( const string &str, 
           size_t *max_array, size_t *pos_array) {
    for ( size_t i = 0; i < str.size(); ++i ) {
        size_t max_size = 0, max_pos = 0;
        for (size_t j = 0; j < str.size(); ++j)
            if (j != i) {
                size_t limit = str.size()-( i > j ? i : j );
                for (size_t k = 0; k < limit; ++k) {
                    if (str[i + k] != str[j + k]) break;
                    if (k > max_size) {
                        max_size = k;
                        max_pos = j;
                    }
                }
            }
            max_array[i] = max_size;
            pos_array[i] = max_pos;
        }
    }

    class SubStringFinder {
        const string str;
        size_t *max_array;
        size_t *pos_array;
    public:
        void operator() ( const blocked_range<size_t>& r ) const {
            for ( size_t i = r.begin(); i != r.end(); ++i ) {
                size_t max_size = 0, max_pos = 0;
                for (size_t j = 0; j < str.size(); ++j) 
                    if (j != i) {
                        size_t limit = str.size()-( i > j ? i : j );
                        for (size_t k = 0; k < limit; ++k) {
                            if (str[i + k] != str[j + k]) break;
                            if (k > max_size) {
                                max_size = k;
                                max_pos = j;
                            }
                        }
                    }
                    max_array[i] = max_size;
                    pos_array[i] = max_pos;
            }
        }
        SubStringFinder(string &s, size_t *m, size_t *p) : 
        str(s), max_array(m), pos_array(p) { }
    };

    int main(int argc, char *argv[]) {

        string str[N] = { string("a"), string("b") };
        for (size_t i = 2; i < N; ++i) str[i] = str[i-1]+str[i-2];
        string &to_scan = str[N-1]; 

        size_t *max = new size_t[to_scan.size()];
        size_t *max2 = new size_t[to_scan.size()];
        size_t *pos = new size_t[to_scan.size()];
        size_t *pos2 = new size_t[to_scan.size()];
        cout << " Done building string." << endl;

        tick_count serial_t0 = tick_count::now();
        SerialSubStringFinder(to_scan, max2, pos2);
        tick_count serial_t1 = tick_count::now();
        cout << " Done with serial version." << endl;

        tick_count parallel_t0 = tick_count::now();
        parallel_for(blocked_range<size_t>(0, to_scan.size(), 100),
           SubStringFinder( to_scan, max, pos ) );
        tick_count parallel_t1 = tick_count::now();
        cout << " Done with parallel version." << endl;

        for (size_t i = 0; i < to_scan.size(); ++i) {
        if (max[i] != max2[i] || pos[i] != pos2[i]) {
            cout << "ERROR: Serial and Parallel Results are Different!" << endl;
        }
    }
     cout << " Done validating results." << endl;

     cout << "Serial version ran in " 
          << (serial_t1 - serial_t0).seconds() 
          << " seconds" << endl
          << "Parallel version ran in " 
          <<  (parallel_t1 - parallel_t0).seconds() 
          << " seconds" << endl
          << "Resulting in a speedup of " 
          << (serial_t1 - serial_t0).seconds() / (parallel_t1 - parallel_t0).seconds() 
          << endl;
    delete[] max;
    delete[] pos;
    delete[] max2;
    delete[] pos2;
    return 0;
}

当在Intel Duo Core处理器上编译和执行时,结果如下:

Done building string.
Done with serial version.
Done with parallel version.
Done validating results.
Serial version ran in 20.9056 seconds
Parallel version ran in 12.2254 seconds
Resulting in a speedup of 1.71001

因此,通过并行化循环,我们获得了巨大的性能提升。对parallel_for模板函数调用的第一个参数是一个blocked_range对象,它描述了迭代空间。回想一下,blocked_range是Intel TBB中提供的模板类。构造函数接受三个参数:范围的下界和范围的上界。parallel_for函数的第二个参数是要应用于每个子范围的函数对象。我们实现了parallel_for循环的主体,因此在运行时,模板parallel_for会自动将范围划分为子范围,并在每个子范围上调用SubStringFinder函数对象。SubSringFinder类的定义填充了给定子范围内的maxpos数组元素。对r.begin()的调用返回子范围的开始,r.end()方法返回子范围的结束。

上述描述了TBB提供的几种for循环算法。它们作为隐式线程的示例。现在,我们将简要介绍第二个库OpenMP,它采用不同的方法来实现并发。OpenMP通过插入到源代码中的特殊pragmas和指令来实现并发,以指示要并发执行的段。这些pragmas被编译器识别和处理。Intel TBB使用定义的并行算法来执行用户编写的类的中的方法,这些类封装了并发操作。

OpenMP指令标记需要并行执行的代码(称为并行区域)并控制代码如何分配给线程。对于C/C++,OpenMP使用pragmas作为指令。所有OpenMP pragma都具有相同的#pragma omp前缀。后面跟着一个OpenMP构造和一个或多个可选子句来修改构造。要在应用程序中定义并行区域,请使用parallel构造。

#pragma omp parallel

此pragma后面将跟一个由花括号括起来的单个语句或代码块。当应用程序在执行期间遇到此语句时,它将分叉一个线程团队,在每个线程上执行并行区域内的所有语句,并在区域的最后一条语句后合并线程。在许多应用程序中,大量的独立操作存在于循环中。使用OpenMP的循环工作共享构造,您可以分割这些循环迭代并将它们分配给线程进行并发执行。parallel for构造将在pragma后面的单个for循环周围启动一个新的并行区域,并将循环迭代分配给线程团队。在完成分配的迭代后,线程将停在并行区域末尾的隐式屏障处,等待与其他线程合并。

可以将组合的parallel for构造拆分为两个pragma:一个parallel构造和一个for构造,后者必须在词法上包含在并行区域内。当线程团队有并行工作,除了循环迭代之外,可以使用这种分离。您还可以将schedule子句附加到循环工作共享构造,以控制迭代如何分配给线程。静态调度将在循环迭代开始执行之前将迭代分成块并分发给线程;如果有比线程多的块,则使用循环调度。动态调度将为团队中的每个线程分配一个迭代块;当线程完成上一组迭代时,将分配一个新的块,直到所有块都分发完毕。静态和动态调度都可以选择chunk参数,该参数控制每个块的迭代次数。

这是另一个使用OpenMP计算PI值的示例。

#include <stdio.h>
static long num_rects = 1000000;
int main(int argc, char* argv[])
{
    double mid, height, width, sum=0.0;
    int i;
    double area;
    width = 1./(double)num_rects;
#pragma omp parallel for private(mid, height) reduction(+:sum)
    for (i=0; I < num_rects; i++) {
        mid = (i + 0.5)*width;
        height = 4.0/(1.+ mid*mid);
        sum += height;
    }
    area = width * sum;
    printf("The value of PI is %f\n",area);
    return 0;
}

输出是PI的值为3.141593

使用.NET的数据并行

在使用C#等.NET语言时,for循环的迭代空间通常是一系列整数,而foreach循环则遍历集合中的单个元素。那么,我们如何将这些分配给处理器上的核心,或者如何定义循环并行?例如,如果我们必须并行化下面的循环,我们将如何决定使用多少线程,如何最好地调度它们,如何将迭代范围分配给线程等等?

void For(int low, int high, Action<int> body)
{
    for (int i = low; i < high; i++)
    {
        body(i);
    }
}

Parallel.ForParallel.ForEach构造派生自Parallel类。这是一个也计算PI值的程序,但使用了几个构造来计算多个值。添加了一个计时器,以显示Parallel.ForParallel.ForEach构造使用的耗时更少。请检查已在.NET 4.0上编译的代码。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

class Program
{
    const int num_steps = 100000000;

    /// <summary>Main method to time various
    /// implementations of computing PI.</summary>
    static void Main(string[] args)
    {
        while (true)
        {
            Time(() => SerialLinqPi());
            Time(() => ParallelLinqPi());
            Time(() => SerialPi());
            Time(() => ParallelPi());
            Time(() => ParallelPartitionerPi());

            Console.WriteLine("----");
            Console.ReadLine();
        }
    }

    /// <summary>Times the execution of a function and outputs
    /// both the elapsed time and the function's result.</summary>
    static void Time<t>(Func<t> work)
    {
        var sw = Stopwatch.StartNew();
        var result = work();
        Console.WriteLine(sw.Elapsed + ": " + result);
    }

    /// <summary>Estimates the value of PI using a LINQ-based implementation.</summary>
    static double SerialLinqPi()
    {
        double step = 1.0 / (double)num_steps;
        return (from i in Enumerable.Range(0, num_steps)
                let x = (i + 0.5) * step
                select 4.0 / (1.0 + x * x)).Sum() * step;
    }

    /// <summary>Estimates the value of PI using a PLINQ-based implementation.</summary>
    static double ParallelLinqPi()
    {
        double step = 1.0 / (double)num_steps;
        return (from i in ParallelEnumerable.Range(0, num_steps)
                let x = (i + 0.5) * step
                select 4.0 / (1.0 + x * x)).Sum() * step;
    }

    /// <summary>Estimates the value of PI using a for loop.<summary>
    static double SerialPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        for (int i = 0; i < num_steps; i++)
        {
            double x = (i + 0.5) * step;
            sum = sum + 4.0 / (1.0 + x * x);
        }
        return step * sum;
    }

    /// <summary />Estimates the value of PI using a Parallel.For.</summary>
    static double ParallelPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        object monitor = new object();
        Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
        {
            double x = (i + 0.5) * step;
            return local + 4.0 / (1.0 + x * x);
        }, local => { lock (monitor) sum += local; });
        return step * sum;
    }

    /// <summary>Estimates the value of PI using
    /// a Parallel.ForEach and a range partitioner.</summary>
    static double ParallelPartitionerPi()
    {
        double sum = 0.0;
        double step = 1.0 / (double)num_steps;
        object monitor = new object();
        Parallel.ForEach(Partitioner.Create(0, 
                 num_steps), () => 0.0, (range, state, local) =>
        {
            for (int i = range.Item1; i < range.Item2; i++)
            {
                double x = (i + 0.5) * step;
                local += 4.0 / (1.0 + x * x);
            }
            return local;
        }, local => { lock (monitor) sum += local; });
        return step * sum;
    }
}

输出

00:00:06.4096024: 3.14159265359043
00:00:04.0270848: 3.14159265358991
00:00:01.5588931: 3.14159265359043
00:00:01.3326232: 3.14159265358968
00:00:00.8320172: 3.14159265358958

注意使用循环并行时的耗时差异。第一个计算使用串行(顺序)机制,而Parallel.ForParallel.ForEach构造用于程序末尾。第一个计算耗时6秒以上,而后者计算耗时约1秒或更短。

诚然,本文只触及了OpenMP和Intel Threading Blocks强大功能的表面。对TBB感兴趣的学生应该努力理解递归在这些循环并行算法中的重要性,并牢固掌握何时使用细粒度锁或粗粒度锁。这当然指向同步、互斥和其他多线程技术,以确保线程在执行和可能访问资源时,能够有序地进行。请记住,循环并行的第一个先决条件是它们必须是线程安全的。

参考文献

  • MSDN Library,用于定义和概念验证
  • 《C# 4.0 in a NutShell》,作者Joseph和Benhari
  • 《Concurrent Programming for Windows》,作者Joe Duffy
© . All rights reserved.