65.9K
CodeProject 正在变化。 阅读更多。
Home

Visual C++ 2010 中的并发运行时

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (60投票s)

2010年5月12日

CPOL

49分钟阅读

viewsIcon

175933

downloadIcon

2228

了解 VC10 中的并行算法、并行容器、任务、任务组、代理库、任务调度器等。


引言

如今,大多数软件系统都需要支持并行/并发,以提高速度、吞吐量、效率和健壮性。为此,一个正在运行的进程必须拥有多个线程——有些是按需创建的,有些在等待某个消息来执行工作,有些只是在等待其他线程/进程(或其他内核对象),还有些只是“待命”,参与进程的线程池。 

随着多处理器和多核技术的进步,软件程序员的工作挑战呈指数级增长。他们需要设计能够有效支持多线程环境的系统,并且他们的软件必须能够处理最新的多处理进步。 

虽然在许多开发环境中都有许多可用的并发/并行库,但我只会介绍**Visual C++ 并发运行时**。概念上,“并发”和“并行”是两个不同的术语,在本文中我将它们互换使用。我假设读者对以下内容有相当的了解:

此处提供的程序只能在**Visual C++ 2010 编译器**中编译。

启动程序

让我们直接开始编写第一个使用并发运行时的并行 C++ 程序(从现在开始,我将使用**CR**)。以下代码计算 1 到 100,000 范围内所有数字的总和。

int nSum = 0;
for(int nNumber=1;nNumber<=100000;++nNumber)
   nSum += nNumber;

此代码的并行形式可以是:

int nSum=0;

parallel_for(1, 100001, [&](int n)
{
   nSum += n; 
} );

关于此代码的一些要点

  • parallel_for **不是**语言结构或关键字,它是一个函数。
  • 它定义在 Concurrency 命名空间中。
  • 并行算法的头文件是 <ppl.h>
  • 此函数接受三个参数:
    1. 起始索引值
    2. 应该运行并行循环的索引值**加一**。
    3. 函数参数,它可以是函数指针、具有 () 运算符的类对象,或 lambda 表达式。
  • 我已为捕获所有局部变量(按引用)的函数使用了 lambda。
  • lambda(在花括号内)正在添加数字。
  • 请注意,在闭花括号后的最后一个括号,实际上是结束了函数调用!

完整的代码是:

#include <iostream>
#include <ppl.h>

int main()
{
    using namespace Concurrency;

    int nSum=0;
    
    parallel_for(1, 100001, [&](int n) 
    {
        nSum += n;        
    });

    std::wcout << "Sum: " << nSum ;
}

库和 DLL 怎么样?

好吧,大多数 CR 代码都在模板中,每当模板类/函数实例化时就会被编译。其余代码在 **MSVCR100.DLL** 中,这是 VC10 的标准 VC++ 运行时 DLL;因此,您无需链接任何库即可使用 CR。

操作系统要求

并发运行时所需的最低操作系统是**Windows XP SP3**。它可以在 32 位和 64 位版本的 Windows XP SP3、Windows Vista SP2、Windows 7、Windows Server 2003 SP2 和 Windows 2008 SP2 及更高版本上运行。在 Windows 7 和 Windows Server 2008 R2 版本上,CR 将使用最新的调度功能。这将在本文中讨论。

要分发 Visual C++ 2010 运行时,您可以从 Microsoft 下载中心下载可再发行程序包(32 位/64 位),并将其分发给您的客户。

parallel_for 是否会创建线程?

简单答案是“是”。但总的来说,它本身不会创建线程,而是利用 CR 的功能来并行化任务。有一个 CR 调度器参与线程的创建和管理。我们稍后将研究运行时调度器。目前,我只能说:并行算法、任务、容器、代理等将并发完成任务——这可能只需要一个线程,或者等于逻辑处理器数量的线程,甚至更多。决定权在于调度器和你调用的并发例程。

在我的四核 CPU 上,有 4 个逻辑处理器,在代码进入 parallel_for 函数之前,我只看到进程的一个线程。一旦代码进入 parallel_for,我看到线程数增加到七个!但它只为当前任务使用了 4 个线程,正如我们使用 GetCurrentThreadId 函数所能验证的那样。CR 创建的其他线程用于调度目的。稍后当你看到更多 CR 功能时,你不会抱怨额外的线程。

嗯,总和可能计算不正确!

绝对正确!由于多个线程正在修改同一个 nSum 变量,因此累积可能不正确。(如果你不确定总和是否被捕获……)。例如,如果我计算 1 到 100 之间的总和,正确总和将是 5050,但以下代码在每次运行时都会产生不同的结果。

parallel_for(1, 101, [&](int n) 
{
   nSum += n; 
});
// nSum may be 4968, 4839, 5050, 4216 or any random

简单的解决方案是使用 InterlockedExchangeAdd

LONG nSum=0; // not int, Interlocked requires LONG
parallel_for(1, 101, [&](int n)
{ 
   InterlockedExchangeAdd(&nSum, n);
}); 

显然,这会破坏将累积放在并发执行中的整个目的。嗯,CR 为此提供了高效的解决方案,但我稍后会讨论。

通过这个启动程序,我简要地阐述了 CR。让我以更结构化的方式讨论并发运行时。

并发运行时

并发运行时将以下内容归类为其核心组件:

  • 并行模式库 (Parallel Patterns Library)
  • 异步代理库 (Asynchronous Agents Library)
  • 任务调度器 (Task Scheduler)
  • 资源管理器 (Resource Manager)

以下是并发运行时中的概念:

  • 同步数据结构
  • CR 中的异常处理
  • PPL(并行模式库)中的取消

不要对这些术语感到困惑,我将详细解释它们!

下图描绘了运行时及其不同组件如何在操作系统和应用程序之间进行协调:

Skeleton.JPG

上层为**紫色**,包括并发运行时中重要且最常用的编程元素——代理库并行模式库,这两者都在本文中进行了详细介绍。**橙色**组件可归类为运行时的低层元素。本文仅解释任务调度器。紫色到橙色的渐变组件构成了并发运行时中的同步数据结构,正如您所见,它在 CR 的高层和低层都起着至关重要的作用。本文还阐述了同步原语。

本文的流程如下:

  • 并行模式库 (Parallel Patterns Library)
  • 异步代理库 (Asynchronous Agents Library)
  • 同步数据结构
  • 并发运行时中的异常
  • PPL 中的取消
  • 任务调度器 (Task Scheduler)

并行模式库 (Parallel Patterns Library)

我上面讨论的 parallel_for 函数属于此类。PPL 有以下分类:

  • 并行算法:
    • parallel_for
    • parallel_for_each
    • parallel_invoke
  • 并行容器和对象
    • concurrent_vector
    • concurrent_queue
    • combinable
  • 任务并行:
    • 结构化
    • 非结构化

并行算法

这三个算法会并行执行给定的工作,并等待所有并行执行的任务完成。需要注意的是,CR 可能会以任何顺序调度任何任务,因此不能保证哪个任务(或其一部分)会在另一个任务之前或之后执行。此外,一个或多个任务可能在同一个调用堆栈、同一个线程中执行(即,同一个线程中有 2 个或更多任务,这可能是调用者的调用堆栈/线程,也可能不是),或者可能内联执行。

每个算法都可以接受函数指针、对象或 lambda 表达式(在本文中称为“函数”)。返回值/参数类型取决于算法以及它们的调用方式,如下所述。

  • parallel_for 算法

parallel_forfor 结构非常相似。它并行执行指定的“函数”。它不是 for-loop 的绝对替代品。此函数的简化语法是:

void parallel_for(IndexType first, IndexType last, Function func);
void parallel_for(IndexType first, IndexType last, IndexType step, Function func);

由于这是一个模板函数(上述语法中未显示),IndexType 可以是任何**整数数据类型**。parallel_for 的第一个版本将从 first 开始,每次递增 1,循环直到 last - 1,在每次迭代时**并行地**调用指定的函数。当前索引将作为参数传递给指定的函数。

第二个重载以类似的方式工作,接受一个额外的参数:step,每次迭代应该递增的值。此值必须为非零正数,否则 CR 将抛出 invalid_argument 异常。

并行度由运行时确定。

以下示例计算给定范围内素数的数量。目前,我使用 InterlockedIncrement API;combinable 类提供了更好的解决方案,我稍后将讨论。

LONG nPrimeCount=0;
parallel_for(1, 50001, [&](int n) 
{        
    bool bIsPrime = true;
    for(int i = 2; i <= (int)sqrt((float)n); i++)
    {
        if(n % i == 0)
        {
                bIsPrime = false;
                break;           
        }
    }

    if(bIsPrime)
    {
        // std::wcout<< n <<"is prime.\t";
        InterlockedIncrement(&nPrimeCount);
    }
});

wcout << "Total prime numbers: " << nPrimeCount << endl;

让我们忽略 1 和 2,编写更有效的例程来计算这个,传递 2 作为步长值。

LONG nPrimeCount=0;
parallel_for(3, 50001, 2, [&](int n) // Step with 2
{        
   bool bIsPrime = true;
   for(int i = 3; i = (int)sqrt((float)n); i+=2) // Start with 3, increment by 2
   {...}
   ...
}

  • parallel_for_each 算法

此函数在语义上等同于 STL 的 for_each 函数,在语法上也相同。它并行迭代集合,并且像 parallel_for 函数一样,执行顺序是不确定的。此算法本身**不是**线程安全的。也就是说,对集合的任何修改都不是线程安全的,只有读取函数参数是安全的。

简化语法

parallel_for_each(Iterator first, Iterator last, Function func);

该函数必须接受容器底层类型的参数。例如,如果遍历整数向量,则参数类型为 int。对于调用者,此函数只有一个签名,但内部有两个版本:一个用于随机访问迭代器(如 vector、array 或原生数组),一个用于前向迭代器。它在随机访问迭代器上表现良好。
(借助迭代器特性,它可以在**编译时**找到要调用的重载。)

以下代码计算数组中偶数和奇数的数量:

const int ArraySize = 1000;
int Array[ArraySize];
 
// Generate random array
std::generate(Array, Array+ArraySize, rand);

long nEvenCount = 0 , nOddCount = 0;
parallel_for_each(Array, Array+ArraySize, [&nEvenCount, &nOddCount](int nNumber) 
{
   if (nNumber%2 == 0)
      InterlockedIncrement(&nEvenCount);
   else
      InterlockedIncrement(&nOddCount);
} );

正如你所见,它与 parallel_for 大致相同。唯一的区别是函数(lambda)的参数来自容器(整数数组)。重申一下,提供的函数可能按任何顺序调用,因此收到的 nNumber 可能与原始容器中的顺序不同。但是所有元素(在本例中为 1000 个数组元素)都将被此并行例程迭代。

我们也可以使用 STL 的 vector 来调用此例程。我在这里忽略了向量初始化(假设它里面有元素)。

vector<int> IntVector;

// copy(Array, Array+ArraySize, back_inserter(IntVector));
    
nEvenCount = 0 , nOddCount = 0;
parallel_for_each(IntVector.begin(), IntVector.end(), 
      [&nEvenCount, &nOddCount](int n) {... });

同样,也可以使用非随机访问容器。

list<int> IntList;
parallel_for_each(IntList.begin(), IntList.end(), [&](int n) {... });
 
map<int, double> IntDoubleMap;
parallel_for_each(IntDoubleMap.begin(), IntDoubleMap.end(),
   [&](const std::pair<int,double>& element) 
   { // Use 'element' });

重要的是要注意,对于 parallel_for_each,随机访问容器比非随机访问容器更有效。

  • parallel_invoke 算法

此函数将并行执行一组任务。稍后我将讨论在 CR 领域中**任务**是什么。简单来说,任务是一个函数、函数对象或 lambda。parallel_invoke 将并行调用提供的函数,并等待所有提供的函数(任务)完成。此函数重载以接受 2 到 10 个函数作为其任务。与其他并行算法一样,没有保证函数集会按什么顺序调用。它也不保证需要多少线程来完成给定的任务。

简化签名(用于 2 个函数重载):

void parallel_invoke(Function1 _Func1, Function2 _Func2);
// template <typename _Function1, typename _Function2>
// void parallel_invoke(const _Function1& _Func1, const_Function2& _Func2);

使用 parallel_invoke 类似于创建一组线程并对它们调用 WaitForMultipleObjects,但它简化了任务,因为您无需关心线程创建、终止等。

以下示例并行调用两个函数,这两个函数计算给定范围内偶数和奇数的总和。

void AccumulateEvens()
{
    long nSum=0;
    for (int n = 2; n<50000; n+=2)
    {
        nSum += n;
    }
    wcout << "Sum of evens:" << nSum << std::endl;
}

void AccumulateOdds()
{
    long nSum=0;
    for (int n = 1; n<50000; n+=2)
    {
        nSum += n;
    }
    wcout << "Sum of odds:" << nSum << std::endl;
}

int main()
{    
    parallel_invoke(&AccumulateEvens, &AccumulateOdds);
    return 0;
}

在上面的示例中,我传递了函数地址。可以使用 lambda 更优雅地完成同样的事情:

parallel_invoke([]
    {
        long nSum=0;
        for (int n = 2; n<50000; n+=2)
        {
            nSum += n;
        }
        wcout << "Sum of evens:" << nSum << std::endl;
    },
    []{        
        long nSum=0;
        for (int n = 1; n<50000; n+=2)
        {
            nSum += n;
        }
        wcout << "Sum of odds:" << nSum << std::endl;        
    });

lambda 和/或函数指针/类对象之间的选择取决于程序员、编程风格以及最重要的是,取决于并行执行的任务。当然,您也可以同时使用它们。

parallel_invoke([]
    {
        long nSum=0;
        for (int n = 2; n<50000; n+=2)
        {
            nSum += n;
        }
        wcout << "Sum of evens:" << nSum << std::endl; }
   , 
       &AccumulateOdds );

如前所述,parallel_invoke 接受 2 到 10 个参数作为其任务——给定的任何任务都可以是函数指针、lambda 或函数对象。虽然我稍后会提供更多示例,当我会涵盖更多 CR 主题时;让我为了举例而再举一个例子,以便更清楚地说明主题。

让我们使用此函数并行计算几个数字的阶乘:

long Factorial(int nFactOf)
{
    long nFact = 1;
    for(int n=1; n <= nFactOf; ++n)
        nFact *= n;

    return nFact;
}

当你调用 parallel_invoke 时:

parallel_invoke(&Factorial, &Factorial);

你会被编译器错误淹没。为什么?嗯,parallel_invoke 要求任务的参数为 void,返回类型为 void。尽管返回值可以是非 void(与文档相反),但参数必须为零!我们需要(可以)使用 lambda:

long f1, f2, f3; 
parallel_invoke( 
   [&f1]{ f1=Factorial(12); }, 
   [&f2]{ f2=Factorial(18); },    
   [&f3]{ f3=Factorial(24); }
);

请注意,lambda 的类型为 void(void),并且通过引用捕获了所需的变量。请阅读有关 lambda 的内容……

并行容器和对象

并发运行时为 vectorqueue 提供了并发容器类,分别称为 **concurrent_vector** 和 **concurrent_queue**。并行类领域中的另一个类是 combinable

并发容器类提供对其元素的线程安全访问和修改,但*少数操作除外*。

  • concurrent_vector 类

concurrent_vector 类类似于 std::vector 类。它允许对其元素进行随机访问,但与 vector 不同,元素**不**存储在连续内存中(如数组、vector)。但是,对元素的访问以及插入(push_back)是线程安全的。迭代器访问(普通、const、反向、反向-const)也是线程安全的。

由于元素不是连续存储的,因此您不能使用指针算术直接从某个元素访问另一个元素(即 (&v[2] + 4) 将是无效的)。

正如微软在其(**concurrent_vector.h** 头文件中)承认的那样,此类基于 Intel 为 TBB 的 concurrent_vector 的原始实现。

对于示例,我本可以使用标准的线程创建函数;但我更喜欢使用 PPL 的构造来演示 concurrent_vector

concurrent_vector<int> IntCV;

parallel_invoke([&IntCV] // Logical thread 1
{
   for(int n=0; n<10000; ++n)
      IntCV.push_back(n);
},
[&IntCV]  // Logical thread 2
{
   for(int n=0; n<1000;n++)
      IntCV.push_back(n*2);

   // Get the size, thread-safe. Would give different results on different runs,
   // as another task (thread) might be inserting more items.
   wcout << "Vector size: " << IntCV.size() << endl;
});

在这里,将有两个线程在工作,它们都会向 concurrent_vector 插入一些数字。push_back 方法是线程安全的,因此不需要同步原语来修改向量。此外,size 方法也是线程安全的。

到目前为止,如果您无法理解以上代码的任何部分,我敦促您继续阅读 STL 的 vector、C++ lambda,或/和回顾 parallel_invoke!

  • concurrent_queue 类

是的,你猜对了。它是 STL 的 queue 类的并发版本。与 concurrent_vector 一样,concurrent_queue 的重要操作是线程安全的,但少数操作除外。重要操作包括入队(插入)和出队(弹出)以及 empty 方法。要插入元素,我们使用 push 方法。要进行出队操作,我们使用 try_pop 方法。

重要的是要注意,concurrent_queue **不**提供并发安全迭代器支持!

(2009 年 8 月:请给我一些时间来放 concurrent_queue 的示例,尽管它会与 concurrent_vector 几乎相同。对于这两者并发类,我还会列出并发安全或不安全的方法,以及它们与等效 STL 容器的比较。在此之前,请继续阅读下面的新内容!)

  • combinable 类

TLS,或线程局部存储,在概念上是这个类最接近的竞争者。combinable 类方便每个正在运行的任务或线程拥有自己的本地副本进行修改。当所有任务完成后,这些特定于线程的本地副本可以稍后合并。

例如,在上面的 parallel_for 示例中,我们计算了 1 到 100 范围内的所有数字的总和——为了线程安全,我们需要使用 InterlockedExchangeAdd。但我们知道所有线程都会争夺同一个变量来更新(添加)——因此破坏了并行的积极目的。使用 combinable 对象,我们让每个线程拥有自己的变量副本进行更新。让我们在实践中看看:

combinable<int> Count;

int nSum=0;
parallel_for(1, 101, [&](int n) 
{
   int & ref_local = Count.local();
   ref_local += n;
   
   // InterlockedExchangeAdd(&nSum, n);
});

nSum = Count.combine(AddThem);
  • combinable 是模板类,我们为其实例化了 int
  • 方法 combinable::local 返回线程本地变量的**引用**。为了简化示例,我将其存储在 int& 变量中。
  • 然后,将本地引用变量更新为加上当前变量。
  • 就是这样!

不会有对引用变量的争用,因为每个线程都会获得自己的副本。运行时会为你处理。最后,我们调用 combinable::combine 方法来*合并*各个部分。combine 方法迭代所有特定于线程的变量并调用指定的函数。然后返回最终结果。我使用了用户定义的函数 AddThem,它很简单:

int AddThem(int n1, int n2)
{
   return n1 + n2;
}

combine 需要任何具有此签名的函数、函数对象或 lambda:**T f(T, T);** 其中 T 是模板参数类型。当然,参数可以是 const。因此,combine 调用也可以是:

nSum = Count.combine([](int n1,int n2)  // Automatic return type deduction
        { return n1+n2;}     );

这也可以:

plus<int> add_them;
nSum = Count.combine(add_them);

和以前一样,为了收集更多积极的观众回应,我通过声明一个类型为 plus 类的本地变量来简化它,然后将其传递给 combine 方法!对于不知道的人来说,plus 是一个 STL 类,它是一个具有重载**()**运算符的binary_function,它只调用它们上的运算符+。因此,我们可以直接使用 plus,而不是创建带有变量的对象。以下也演示了如何轻松执行其他操作进行组合(尽管并非所有操作都有意义):

combinable<int> Sum, Product, Division, Minus;
    
parallel_for(1, 11, [&](int n))
{
   Sum.local() += n;
   Product.local() *= n;
   Division.local() /= n;
   Minus.local() -= n;

});

wcout << "\nSum: " << Sum.combine(plus<int>()) ;
wcout << "\nProduct: " << Product.combine(multiplies<int>());
wcout<< "\nDivision (?): " << Division.combine(divides<int>());
wcout<< "\nMinus result: " << Minus.combine(minus<int>());

我直接使用了 combinable::local 来修改线程局部变量。然后我使用了不同的类来生成最终结果。

现在……有些事情变得奇怪,也很有趣。它不在 CR 的术语范围内,但我是发现者,所以我必须分享!
乘法和除法的结果将为零。不要问为什么。这里有一个快速、低效的解决方案:

int & prod = Product.local();
if(prod == 0)
   prod = 1;
else
   prod *= n;

除法运算类似。因为,当 CR 第一次分配特定于线程的变量时,它会调用其默认构造函数(或 combinable 构造函数中提供的那个)。现在,编译器提供的 int 的默认构造函数将其设置为零!
所做的代码更改不是最优解决方案。我们需要传递一个构造函数,一种方法是:

combinable<int> Product( []{return 1;}); // Lambda as int's constructor!

当 CR 需要分配新的特定于线程的局部变量时,它将调用这个所谓的构造函数,它返回 1。combinable 的文档称之为初始化器。当然,这个初始化器可以是一个常规函数,也可以是一个函数,返回相同类型并接受零个参数(即 T f())。
请记住,初始化器仅在运行时需要分配新的特定于线程的局部变量时才会被调用。

当你使用用户定义的类进行 combinable 时,你的默认构造函数无论如何都会被调用,你的*组合*操作可能也不会如此简单。

任务并行

如前所述,任务,从概念上讲,不过是线程。CR 可能会在与任务发起者相同的线程中执行给定任务,或者可能只使用一个线程来执行多个给定任务。因此,任务不完全等同于线程。

任务可以被表达为工作的函数式划分。例如,在之前的示例中,我们使用并行算法来*函数式划分*范围内数字的总和,或查找容器中的某些元素。我们通常不会创建多个任务;其中一个等待用户输入,另一个通过套接字响应客户端请求,第三个以异步方式读取文件。所有这些操作都不同,并且不构成逻辑的任务组。除非它们构成执行工作的管道,以逻辑连接的方式,否则它们不能被归类为任务组。

从文件中读取、计算行数/单词数、元音数量、拼写错误等可以被归类为不同的任务,这些任务可以被归类为任务组。在我详细介绍不同类型的任务组之前,让我给你一个例子。

void DisplayEvens()
{
   for (int n=0; n<1000;n+=2)    
      wcout<<n<<"\t";    
}

void DisplayOdds()
{
   for (int n=1; n<1000;n+=2)
      wcout<<n<<"\t";
}

int main()
{
   task_group tg;
   
   tg.run(&DisplayEvens);
   tg.run(&DisplayOdds);

   tg.wait();
}

尽管这是一个糟糕的例子,但我必须展示它以使其简单。
Concurrency::task_group 是用于管理一组任务、执行它们并等待任务完成的类。task_group::run 方法调度给定的任务并立即返回。在上面的示例中,run 被调用了两次以运行不同的任务。两个任务都可能在不同的线程中并行执行。最后,调用 task_group::wait 来等待任务完成。wait 将阻塞直到所有计划的任务完成(或者其中任何一个任务发生异常、取消——我们稍后会看到)。

另一个方法 run_and_wait 可以被调用来合并最后两次调用。也就是说,它将调度任务运行,然后等待所有任务完成。当然,lambda 也可以用来表示一个任务。

tg.run([]
{
   for (int n=0;n<1000;n++)
   {
      wcout << n << "\t";
   }
});
 
// Schedule this task, and wait for all to compete
tg.run_and_wait(&DisplayOdds);

没有必要立即等待任务完成,程序可以继续做其他事情,稍后等待(如果需要)。此外,可以稍后按需调度更多任务。

模板类 task_handle 可用于表示一个任务,它可以接受函数指针、lambda 或函数对象。例如:

task_handle<function<void(void)>> task = &DisplayOdds;

虽然你可能不需要在你的代码中使用 task_handle 类,但你可以使用函数 make_task 来简化声明。

auto task = make_task(&DisplayOdds);

在类似的异类东西进入你的脑海之前,让我继续,我相信这会更容易理解!

有两种类型的任务组:*结构化*和*非结构化*。

  • 结构化任务组

在 PPL 中,结构化任务组由 structured_task_group 类表示。对于少量任务,它的性能优于非结构化任务组。parallel_invoke 算法在内部使用 structured_task_group 来调度指定的任务集。

// From: PPL.H - for parallel_invoke function
structured_task_group _Task_group;

task_handle<_Function1> _Task_handle1(_Func1);
_Task_group.run(_Task_handle1);

task_handle<_Function2> _Task_handle2(_Func2);
_Task_group.run_and_wait(_Task_handle2);

与非结构化任务组不同,你不能在另一个线程上等待任务完成。更多比较如下。

  • 非结构化任务组

名为 task_group 的类表示非结构化任务组。该类在生成任务、等待它们等方面基本相同。非结构化任务组允许你在不同的线程(或多个线程)中创建任务,并在另一个线程中等待它们。简而言之,非结构化任务组比结构化任务组提供了更大的灵活性,并且为此付出了性能代价。

结构化和非结构化任务组之间的显著区别是:

  • 非结构化任务组是线程安全的,结构化任务组不是。这意味着对于 UTG,你可以从不同的线程调度任务,并在另一个线程中等待。但对于 STG,你必须在同一个线程中调度所有任务并等待完成。
  • 所有任务完成后,STG 对象不能重新用于调度更多任务,但你可以重用 UTG 对象来调度更多任务。
  • 由于 STG 不需要跨线程同步,因此其性能优于 UTG。
  • 如果你将 task_handle 传递给 run 或 run_and_wait 方法;task_group(即非结构化)类会复制它。但对于 structured_task_group,你必须确保 task_handle 不会被销毁(例如,超出作用域),因为它不会复制它——它引用了你传递的 task_handle 对象。
  • 多个 STG 必须遵循 FILO 模式。也就是说,如果调度 T1 然后 T2 STG 运行,那么 T2 必须在 T1 对象被销毁之前被销毁(调用析构函数)。示例:
structured_task_group tg1;
structured_task_group tg2; 
 
// The DTOR of tg2 must run before DTOR of tg1
// In this case, it would obviously happen in FILO pattern
// But when you allocate them dynamically, 
// you need to ensure they get destroyed in FILO sequence.
  • (继续上面的 STG 部分)如果一个任务本身创建了 STG 对象,那么这些嵌套/内部 STG 必须在父/外部 STG 完成之前消失。当然,内部对象也必须遵循 FILO。概念性示例:
structured_task_group outer_tg;
auto task = make_task([] {
   structured_task_group inner_tg;
// Assume it schedules and waits for tasks.
});

// Schedule the outer task:
outer_tg.run_and_wait(task);

同意,STG 不容易使用——所以除非必要,否则不要使用它们。使用 UTG(task_group)或 parallel_invoke。对于超过 10 个任务,您可以嵌套 parallel_invoke 本身。

最后说明,STG 的所有操作**都不是**线程安全的,*除了*涉及任务取消的操作。请求任务组取消的方法是 structured_task_group::cancel,它将尝试取消组下的运行任务。可以调用 structured_task_group::is_canceling 方法来检查 TG 是否正在被取消。我们稍后将深入研究 PPL 中的取消。


异步代理库 (Asynchronous Agents Library)

当你需要不同线程相互通信时,你可以使用异步代理库(Agents Library),比标准的同步机制:锁和事件;消息传递函数和编写消息循环;或类似的机制更优雅。

你通过消息传递函数将一个消息传递给另一个线程/任务。消息传递函数利用代理在程序的不同部分之间发送和接收消息。

Agents Library 包含以下组件:

  • 异步消息块 (Asynchronous Message Blocks)
  • 消息传递函数 (Message Passing Functions)
  • 异步代理 (Asynchronous Agents)

开始之前的一些要点:

  • Agents Library 组件位于 Concurrency 命名空间中。
  • 此库是基于模板的。
  • 头文件是 <agents.h>
  • 库和 DLL 的要求与 PPL 相同。

消息块和消息传递函数

这两个组件紧密耦合,因此不可能逐个阐述。消息块是用于存储和检索消息的类集。消息传递函数促进了消息与消息块类之间的传递。

  • single_assignment 类

首先是一个例子:

#include <agents.h>
int main()
{
   single_assignment<long> AssignSum;

   task_group tg;
   tg.run([&AssignSum]
   {
      long nSum = 0;
      for (int n = 1; n < 50000; n++ )
      {            
         nSum += n;
      }

      // Just for illustration, send is not WinSock send!
      Concurrency::send(AssignSum, nSum);
   });
    
    
   wcout << "Waiting for single_assignment...\n";
   receive(AssignSum);

   wcout << "Received.\n" << AssignSum.value();
}

简而言之,上面的程序在不同的线程中计算总和,并在另一个(主线程)中等待其完成。然后显示计算出的值。我使用了 UTG,而不是 STG 来缩短此程序。对于 STG,我们必须使用 task_handle 作为单独的局部变量。

single_assignment 是 Agent 的模板类之一,用于*数据流*。由于我正在计算总和,因此我为其实例化了 long。此类是**一次性写入**的消息块类。多个读取器可以从中读取。多个写入器也可以向其写入,但它只会响应**第一个发送者**的消息。当你学习更多关于数据流消息块类时,你会更好地理解它。

Concurrency::sendConcurrency::receive 是 Agent 用于消息传输的函数。它们接受消息块和消息,然后发送或接收数据。使用语句,

receive(AssignSum);

我们实际上是在等待消息到达 AssignSum 消息块。是的,你感觉对了,此函数将阻塞直到消息到达指定的块。然后我们使用 single_assignment::value 方法来检索收到的内容。返回值将是 long 类型,因为我们将类实例化为 long。或者,我们可以直接调用 value 方法,如果在此消息块上未收到消息,它将等待并阻塞。

同事函数 Concurrency::send 会将消息写入指定的块。这将唤醒 receive。对于 single_assignment 类,如果多次调用 send(从任何线程),它将失败。在 Agent 的库术语中,single_assignment 会*拒绝*第二条消息,因此 send 将返回 false。当你阅读下面的更多内容时,你会了解更多。
(虽然 Concurrency::send 和 WinSock 的 send 同名,但它们的签名不同——可以安全地调用它们而无需命名空间限定。但为了防止潜在的错误/缺陷,你应该使用完全限定的名称)。

可以在哪里或应该在哪里使用这种消息块类型?
在需要一次性信息更新时,从一个或多个线程;并且一个或多个线程愿意读取所需信息。

在我探索更多 Agent 的类之前,有几点要说明:

  • 发送和接收消息的行为称为**消息传播**。
  • 消息包含两个组件:*源*和*目标*。源是发送消息的终结点,目标是接收消息的终结点。
  • 消息块可以是源类型、目标类型或两者兼有。消息块类继承自 Concurrency::ISourceConcurrency::ITarget 或两者兼有,以表示块类型。
  • 消息块可以接受或拒绝消息。它也可以推迟/推迟接受或拒绝消息的决定。
  • 消息可以同步或异步发送。可以使用 sendasend 函数分别发送消息。
  • 可以使用 receivetry_receive 来读取消息。
  • 实例化消息块类的_数据类型_称为该类的**有效负载**类型。对于上面的示例,long 是有效负载类型。

理论够了!让我们继续行动。

  • overwrite_buffer 类

overwrite_buffer 模板类类似于 single_assignment 类,但有一个区别——它可以接收多条消息(即*接受*多条消息)。与 single_assignment 类一样,它只保留**一条**消息。最后发送的消息将是最新消息。如果多个线程发送消息,消息接收的调度和时序决定了最新消息是什么。如果没有消息发送,receive 函数或 value 方法将阻塞。

示例代码:

int main()
{
   overwrite_buffer<float> MaxValue;
   task_group tg;

   tg.run([&MaxValue]
   {
      vector<float> LongVector;
      LongVector.resize(50000);

      float nMax = 1;
      int nIterations = 1;


      // Generate incrementing numbers  ( Lambda modifies nMax )
      generate(LongVector.begin(), LongVector.end(), [&nMax] 
           { return (nMax++) * 3.14159f;});           
      
      nMax = -1.0f;    // Reset nMax to find the actual maximum number
      for(auto iter = LongVector.cbegin(); iter != LongVector.cend(); 
          ++iter,  ++nIterations)
      {
         if( *iter > nMax)
            nMax = *iter;
                    
         // Update the MaxValue overwrite_buffer, on each 100th iteration, 
         // and deliberately sleep for a while
         if(nIterations % 100 == 0)
         {
            send(MaxValue, nMax);
            Concurrency::wait(40);    // Sleep for 40 ms
         }
      }
   });

   tg.run_and_wait([&MaxValue]
   {
     int nUpdates = 50;

      // Show only 50 updates
     while(nUpdates>0)
     {
         wcout << "\nLatest maximum number found: " 
               << MaxValue.value();
 
         wait(500); // Wait 500ms before reading next update
         nUpdates--;
     }
   } );
} 

代码的作用:

任务 1:生成 50000 个升序排列的 float 数字,并将它们放入 vector。现在它试图找出该数组中的最大数字,并更新 nMax 变量。在每次迭代向量的第 100 次时,它会更新 overwrite_buffer 以反映*最新*值。

任务 2:将列出 overwrite_buffer 中的 50 次更新,这本质上意味着它向用户显示向量中找到的最新最大值。在发出*刷新*之前稍作等待。

由于任务 1 将在任务 2 列出所有 50 次更新之前完成,因此最后几次最大值将被重复(显示)。这实际上是找到的最高数字。输出将是:

...
Latest maximum number found: 75084
Latest maximum number found: 79168.1
Latest maximum number found: 82938
Latest maximum number found: 94876
Latest maximum number found: 98645.9
...
Latest maximum number found: 153938
Latest maximum number found: 157080
Latest maximum number found: 157080
Latest maximum number found: 157080
Latest maximum number found: 157080
...

代码说明了一切,我猜我不需要解释。但简而言之。*有效负载*类型是 float。消息等待是在另一个任务中,与 single_assignment 示例不同,在 single_assignment 示例中,主函数(主线程)正在等待。Concurrency::wait 不言自明,它接受以毫秒为单位的超时。

可以在哪里或应该在哪里使用这种消息块类型?

每当一个或多个线程更新共享信息,而一个或多个线程愿意获取最新信息时。来自用户的*刷新*是一个很好的例子。

  • unbounded_buffer 类

需要生产者-消费者模式的多线程程序员,其中一个线程生成一个消息并放入某个消息队列,另一个线程以 FIFO 方式读取这些消息,将欣赏这个类。我们大多数人都使用过 PostThreadMessage 函数或实现了自定义的、并发感知的消息队列类。我们还需要为同一目的使用事件和一些锁。现在,福音来了!

顾名思义,unbounded_buffer 没有界限——它可以存储任意数量的消息。消息的发送和接收以 FIFO 方式进行。当一个消息被*接收*时,它会从内部队列中**移除**,因此同一个消息不能被接收两次。因此,这意味着如果多个线程(目标)从同一个 unbounded_buffer 块读取,那么其中任何一个都会收到一条消息,而另一个则不会。如果消息被目标 A 接收,目标 B 将无法接收它。如果 receive 调用没有待处理消息可供接收,它将阻塞。多个源可以发送消息,它们的顺序(在*不同*源之间)是不确定的。

一个简单的例子:

int main()
{
    unbounded_buffer<int> Numbers;
    
    auto PrimesGenerator = [&Numbers]
    {
        for(int nPrime=3; nPrime<10000;  nPrime+=2)
        {
            bool bIsPrime = true;
            for(int nStep = 3; nStep<sqrtl(nPrime); nStep+=2)
            {
                if(nPrime % nStep==0)
                {
                    bIsPrime=false;
                    break;
                }
            }
            if (bIsPrime)
            {
                send(Numbers, nPrime);
            }
        }        

        wcout << "\n**Prime number generation finished**";

        send(Numbers, 0);    // Send 0 to indicate end
    };
    auto DisplaySums = [&Numbers]
    {
        int nPrimeNumber, nSumOfPrime;

        for(;;)
        {
            nPrimeNumber = receive(Numbers);
            if(nPrimeNumber == 0)    // End of list?
                break;    // Or return

            wcout<<"\nNumber received: "<<nPrimeNumber;

            // Calculate sum (1..n)
            nSumOfPrime=0;
            for(int nStep = 1; nStep<=nPrimeNumber; ++nStep)
                nSumOfPrime += nStep;

            wcout<<"\t\tAnd the sum is: "<<nSumOfPrime;
        }

        wcout << "\n** Received zero **";
    };

    parallel_invoke(PrimesGenerator, DisplaySums);
}

回忆一下 parallel_invoke,它在 STG 中执行任务集?我将两个 lambda 存储在变量中,即 PrimesGeneratorDisplaySums。然后并行调用两者来完成工作。

正如你很容易理解的那样,生成器 lambda 正在查找素数,并将它们放入 unbounded_buffer 消息块,它本质上是一个消息队列。与前两个消息缓冲区类不同,此类不会用新内容替换消息内容。而是将它们放入队列。这就是为什么生成器能够将它们堆积在 Numbers 消息块中。除非被某些*目标*接收,否则不会丢失或删除任何消息。

本例中的目标是 DisplaySums,它从同一个消息块接收数字。receive 将返回插入的第一条消息(即以 FIFO 模式),并**移除**该消息。如果*消息队列*中没有待处理消息,receive 将阻塞。此程序的设计方式是,零(0)值表示消息*结束*。如果你只需要检查消息是否可用,你可以使用 try_receive 函数。try_receive 可以与*面向目标*的其他消息块一起使用,而不仅仅是 unbounded_buffer(即任何继承自 ITarget 的消息块)。

  • call 类

简单来说,call 类就像函数指针。它是一个*仅限目标*的块;这意味着它不能与 receive 函数一起使用,只能与 send 函数一起使用。当你使用 send 函数时,你实际上是在*调用*函数指针(或者说使用了 SendMessage API)。首先是一个简单的例子。

int main()
{
    call Display([](int n)
    {
        wcout << "The number is: " << n;        
    } );

    
    int nInput;

    do 
    {
        wcout << "Enter string: ";
        wcin >> nInput;

        send(Display, nInput);
    } while (nInput != 0);
}

它从用户那里获取一个数字,然后*调用* Display call 块。应该注意的是,send 将*阻塞*直到 Display 完成,因为 Display 只是*目标*。与其他目标块不同,运行时只是为它们放置数据以供以后处理(通过 receive 函数)。

但应该注意的是,call 是*多源*导向的消息块。这意味着它可以从多个任务/线程(通过 sendasend)调用。运行时会将消息放入队列,就像 unbounded_buffer 一样。由于 send 是将消息发送到消息块的同步函数,因此它将一直等待直到完成。使用 sendcall 消息块非常类似于 SendMessage API。

要异步发送消息,我们可以使用 Concurrency::asend 函数。它只会为发送到给定目标块安排消息。消息最终会根据运行时的调度传递给目标。使用 asend 和 call 类似于使用 PostMessage API。

由于它只是一个面向目标的*消息块,因此使用 receivetry_receive 将导致编译器错误。原因很简单:它不继承自 ISource 接口,receive 函数期望继承自 ISource 的消息块。不,你不需要担心这些接口(至少现在不用)——我只是为了让你知道。

要查看 sendasend 之间的区别,只需将 lambda 更改为以下内容:

call Display([](int n)
    {
        wcout << "\nThe number is: " << n;        
        wait(2000); // Suspend for 2 seconds
    } );

如果使用 send 调用它,它将简单地阻塞你的输入(因为 send 要 2 秒才能返回!)。现在,只需将 send 改为 asend,然后快速连续输入几个数字。你会看到那些数字最终被 call 消息块接收。你的输入不会被阻塞,消息也不会丢失!

当然,你可以将 asend 与其他面向目标的消息块一起使用。任何熟练的程序员都会告诉你,你不能在每次都使用 asend。如果你不知道,找出原因!

  • transformer 类

transformer 类既是面向源也是面向目标的*消息块*。并且,顾名思义,它将一种数据转换为另一种数据。它接受两个模板参数,因此可以将一种数据类型转换为另一种数据类型。由于它在逻辑上能够*转换*一种数据到另一种数据,因此它需要一个函数/lambda/函数对象,就像 call 类一样。首先只是一个基本声明:

transformer<int, string> Int2String([](int nInput) -> string
{
   char sOutput[32];
    
   sprintf(sOutput, "%d", nInput);        // or itoa

   return sOutput;
});

我假设你理解 lambda 中的-> 表达式!其他一切都是隐含的,因为你正在阅读和理解 C++ 中的一些好东西。

transformer 构造函数一样,第一个模板参数是其输入类型,第二个是输出类型。你看到它几乎与 call 相同,唯一的区别是它会返回一些东西。

现在让我们*转换*一些东西:

send(Int2String, 128);
auto str = receive(Int2String); // string str;

我只是将一个整数(128)发送到 transformer 消息块,然后从中检索了转换后的消息(以*字符串*形式)。你注意到的一个有趣的事情是 auto 关键字的智能使用——Int2String 是*模板化的*,receive 函数接受 ITarget<datatype> 引用,并返回 datatype——因此 str 的类型被*智能地*推导出来!

显然,你不会仅仅为了将数据类型从一种转换为另一种而使用 transformer 类。它可以有效地用作应用程序组件之间数据传输的*管道*。请注意,unbounded_buffer 类也可用于在组件之间传输数据,但它仅充当消息队列,并且必须由某些代码支持。call 类可以用作消息缓冲区,其中*代码*是目标(接收者),运行时和 call 类本身管理多个发送者向其发送消息,并将它们放入内部队列(以 FIFO 方式)。但是,与 unbounded_buffer 类不同,call 不能与 receive 一起使用(即,它是单目标,多源)。最后,call 本身不能用于将消息*转发*给其他组件。

注意:请记住,所有这些类都有内部消息队列。如果目标不接收输入或不处理消息,它们将被保留在队列中。不会丢失任何消息。send 函数同步地将消息放入队列,直到消息被消息块*接受*、确认接收或*拒绝*,否则不会返回。同样,asend 异步地将消息放入消息块的队列中。所有这些推入和弹出操作都是线程安全的。

transformer 类可用于形成消息管道,因为它接受输入并发出输出。我们需要将多个 transformer 对象*链接*起来以形成管道。一个对象的输出(例如,*左*)将是另一个对象(例如,*右*)的输入。*左*的输出数据类型必须与*右*的输入相同。不仅 transformer 消息块,其他输入/输出消息块也可以链接。

如何链接消息块?

我们需要使用 link_target 方法来链接目标(右侧对象)。有点复杂,但我必须提到:link_target 实际上是 source_block 抽象类的一个方法。source_block 类继承自 ISource 接口。因此,所有源消息块类实际上都继承自 source_block 类。

唉!在实际看到代码之前,文本太多了?现在是看代码的时候了!首先让我们重写 Int2String lambda:

transformer<int, string> Int2String([](int nInput) -> string
{
        string sDigits[]={"Zero ", "One ", "Two ", "Three ", 
               "Four ", "Five ", "Six ", "Seven ", "Eight ", 
               "Nine "};

        string sOutput;

        do {
            sOutput += sDigits [ nInput % 10 ];

            nInput /= 10;
        } while (nInput>0);


        return sOutput;
});

它会将 128 转换为 "Eight Two One"。要实际将数字转换为可显示的字符串,请参阅本文

让我们写另一个转换器,它将计算字符串中的辅音和元音的数量,并将其作为 std::pair 返回:

transformer<string, pair<int,int>> StringCount(
    [](const string& sInput) -> pair<int,int>
{
   pair<int,int> Count;
        
   for (size_t nPos = 0; nPos < sInput.size(); ++nPos)
   {
      char cChar = toupper(sInput[nPos]);

      if( cChar == 'A' || cChar == 'E' || cChar == 'I' ||
           cChar == 'O' || cChar == 'U')
      {
         Count.first++;
      }
      else 
      {
         Count.second++;
      }
   }

   return Count;
} );

对于某些读者来说,在上面的代码中使用 pair 可能会使其读起来有点困难;但相信我,它并不难。pair 的*第一个*组件将包含元音计数,*第二个*将存储辅音(和*其他*)计数。

现在让我们来实现这个数据管道中的最终消息块。因为它结束了管道,所以它不需要是 transformer。我正在使用 call 来显示 pair:

call<pair<int,int>> DisplayCount([](const pair<int,int>& Count)
{
   wcout << "\nVowels: " << Count.first << 
      "Consonants: " << Count.second;
});

DisplayCount 消息块对象显示 pair。*提示*:你可以通过 typedefing pair 来简化代码的可读性。

typedef pair<int,int> CountPair;
// Replace pair<int,int> with CountPair in code.

最后,我们连接管道!

// Int2String sends to StringCount    
Int2String.link_target(&StringCount);

// StringCount sends to DisplayCount
StringCount.link_target(&DisplayCount);

现在向此管道中的第一个消息块发送消息,并等待整个管道完成。

send(Int2String, 12345);

wait(500);

最后的 wait 用于确保管道在 main 函数退出之前完成,否则一个或多个管道消息块将不会执行。在实际程序中,我们可以使用 single_assignment、标准的 Windows 事件或我们稍后将学习的 CR 事件进行同步。这完全取决于管道的实现方式以及管道应该如何终止。

当你将从 Int2String 声明开始的整个代码粘贴到 main 中并运行时,它将显示以下内容:

Vowels: 9               Consonants: 15

你可能想自己调试程序,通过设置断点。需要断点,因为流程并不简单和顺序。或者你可以在控制台输出中放置断点。

  • 数据流和控制流

到目前为止我讨论的 Agent 类基于*数据流*模型。在数据流模型中,程序的各个组件通过发送和接收消息进行通信。当数据在消息块上可用时,就会进行处理,并且数据会被消耗。

而在*控制流*模型中,我们设计程序组件以等待一个或多个消息块中发生的*事件*。对于控制流,尽管使用了数据流组件(即消息块),但我们不需要或读取*数据本身,而是*事件*,即某些数据已到达消息块。因此,我们定义一个事件,用于一个或多个消息块,数据将在其上到达。

虽然在许多情况下,single_assignmentoverwrite_bufferunbounded_buffer 类也可以用于控制流(忽略*实际*数据)。但它们只允许我们等待**一个**数据已到达事件,其次,它们不是为控制流机制设计的。

下面三个类是为控制流设计的。它们可以等待多个消息块的数据已到达事件。概念上,它们类似于 WaitForMultipleObjects API。

  • choice 类

choice 可以等待 2 到 10 个消息块,并返回第一个有消息可用的消息块的索引。所有给定的消息块可以是不同类型的,包括消息块的底层类型(消息块的模板参数)。我们通常不直接声明 choice 对象(这非常危险!)。相反,我们需要使用辅助函数**make_choice**。

single_assignment<int> si;
overwrite_buffer<float> ob;

auto my_choice = make_choice(&si,&ob);

这里,choice 对象 my_choice 被创建来等待两个给定的消息块中的一个。make_choice 辅助函数(屏住呼吸!)实例化以下类型的对象(借助 auto 关键字,你免于输入那些东西):

choice<tuple<single_assignment<int>*,overwrite_buffer<float>*>> // Type
  my_choice(tuple<single_assignment<int>*,overwrite_buffer<float>*> // Variable
    (&si,&ob)); // CTOR call

关于 tuple 类:类似于 std::pair 类。此类最多可以接受 2 到 10 个模板参数,并成为那些数据类型的*元组*。此类是新的 VC++ 10(以及 C++0x)。完全限定名称将是 std::tr1::tuple。这就是为什么 choice 可以有 2 到 10 个消息块等待。我很快就会写一篇关于 STL 新特性的文章!

让我们继续 choice 类。以下代码是 choice 类的最简单示例:

send(ob, 10.5f);
send(si, 20);

size_t nIndex = receive(my_choice);

我们向两个消息块发送了消息,然后等待 my_choice 对象返回。如前所述,choice 类将返回有数据的消息块的零基索引。因此,*接收*来自 choice 的底层类型是 size_t

对于上面的代码,nIndex 将是 0,因为它在第一个消息块中找到消息。这完全取决于 choice 对象是如何构造的——消息块传递给 choice 构造函数的顺序。如果我们注释掉上面的第二行,返回值将是 1。发送消息在哪个消息块上并不重要;重要的是 choice 的初始化顺序。如果我们注释掉两个 send 调用,receive 将阻塞。

因此,我们看到 choice 类类似于 WaitForMultipleObjects API,其中 bWaitAll 参数设置为**false**。它返回找到消息的消息块的索引(根据构造函数中传递的顺序)。

除了使用 receive 函数,我们还可以使用**choice::index** 方法来确定哪个消息块有消息。choice::value 方法需要一个模板参数,它将返回被消耗的消息(即触发choice的消息)。了解所有这三种方法会很有趣(receiveindexvalue):

  • 如果上没有可用消息,将阻塞。
  • 将引用相同的索引/消息,该索引/消息在第一次接收时被接收。
  • 将内在地消耗消息(例如,从 unbounded_buffer 中移除消息)。

请注意,一旦完成一次接收,使用任何一种方法,该 choice 对象就不能再被重复使用。消耗的消息就是被消耗的(逻辑上,因为并非所有消息块类都会移除消息),并且将始终引用相同的消息/消息块。任何其他 send 调用都不会让 choice 接收同一/另一条消息块中的另一条消息!以下代码片段说明了这一点(注释在最后是输出):

// Send to second message-block
send(ob, 10.5f);    
size_t nIndex = receive(my_choice);
wcout << "\nIndex : " << nIndex;

 
// Now send to first message block
send(si, 20);
nIndex = my_choice.index();
wcout << "\nIndex (after resending) : " << nIndex;

 
// Display 
float nData = my_choice.value<float>(); // Template argument is mandatory
wcout << "\nData : " << nData;
 
 
 /* Output:
    Index : 1
    Index (after resending) : 1
    Data : 10.5
*/
  • join 类

概念上,join 类与 choice 类非常相似。唯一的本质区别在于,它类似于 WaitForMultipleObjects,并将 bWaitAll 参数设置为**true**。下面是一个实际的例子:

int main()
{
   join<int> join_numbers(4);
   bool bExit = false;

   single_assignment<int> si_even, si_odd, si_div5;
   overwrite_buffer<int> si_div7;    // Just for illustration
        
   si_even.link_target(&join_numbers);
   si_odd.link_target(&join_numbers);
   si_div5.link_target(&join_numbers);
   si_div7.link_target(&join_numbers);
    
   task_group tg;
   tg.run([&]    // Capture all
   {
      int n;
        
      while(!bExit)
      {
         wcout << "Enter number: ";
         wcin >> n;
            
         if(n%2==0)
            send(si_even, n);
         else
            send(si_odd, n);

         if(n%5==0)
            send(si_div5,n);

         if(n%7==0)
            send(si_div7,n);
      }
   });

   receive(join_numbers);

   wcout << "\n**All number types received";

   bExit = true;
   tg.wait();
}

关于代码:

  • join 对象为 int 类型实例化。它将适用于任何 ITarget<int> 对象。请记住,**仅**适用于 int 目标消息块。join 构造函数中的参数指定了它需要处理的消息块数量。
  • 三个 single_assignment 和一个 overwrite_buffer 消息块,用于 int 类型。通过 join,我们将等待所有这四个消息块中的消息。overwrite_buffer 仅用于说明可以使用任何相同类型的目标块(int)。
  • 使用了 link_targetISource::link_target)方法来设置这四个消息块的目标到此 join 对象。
  • 运行了一个任务组,该任务组会从用户那里请求一些数字。根据数字,它会向该消息块发送消息。
  • main 函数等待 join_members,它只会在所有四个消息块都收到输入时返回。
  • 设置了 bExit 为 true,这将触发任务退出。但请注意,它可能仍会请求一个额外的数字(姑且认为这是一个 bug,但忽略它)。

因此,您可以看到,通过 choicejoin 类以及 receive 函数,您可以实现 WaitForMultipleObjects 所提供的相同功能。两个类之间的唯一区别是选择*全部*或选择*任何*。join 也有一些偏差,我将简要介绍。

超时参数怎么样?

我们长期以来一直在使用 receive 函数,只传递消息块作为参数。这样,函数会无限期地阻塞。receive 函数接受另一个参数——超时参数——它是默认参数。默认超时是无限的(定义为 COOPERATIVE_TIMEOUT_INFINITE)。receive 函数本身是重载的,但所有版本都接受超时,并且所有版本都将其作为最后一个默认参数。超时单位是毫秒。因此,要在所有数字类型都收到之前等待 2 分钟,在前面的示例中,我们可以将 receive 函数调用更改为:

receive(join_numbers, 1000 * 2 * 60);

正如你可以隐含地理解的,receive 函数,带超时参数,可以与所有消息块类型一起使用。

如果 receive 函数超时怎么办?

它将抛出 Concurrency::operation_timed_out 异常,该异常继承自 std::exception。我稍后将详细介绍并发运行时中的异常。

你可能已经注意到,choice 类允许任何类型的消息块,其消息块的底层类型可以不同。但 join 类只接受相同底层类型的消息块。好吧,choice 工作在 tuple 之上;而 join 工作在 vector 之上。出于同样的原因,choice 限制为最多 10 个消息块等待,但 join 可以接受任意数量的消息块。我没有测试 join 到最高范围,但肯定超过了 WaitForMultipleObject 的 64 个句柄限制!(如果你能/已经测试过,请告诉我)。

注意:Agents Library 的任何类/函数都不使用 Windows 同步原语(互斥锁、关键部分、事件等)。但它使用计时器、计时器队列和原子函数。

Joins 可以是*贪婪*的或*非贪婪*的,这在 multitype_join 类下进行了说明。

  • multitype_join 类

是的,顾名思义,multitype_join 可以等待多种类型的消息块(包括消息块的底层类型)。由于它接受多种类型,因此你可以使用模板和 tuple 来实例化 multitype_join 对象,或者使用**make_join** 辅助函数。功能上,它与 join 类相同,只是它将源消息块的数量限制为最多 10 个。创建 multitype_join 对象的示例:

single_assignment<int> si_even, si_odd;
overwrite_buffer<double> si_negative_double;    // double

auto join_multiple  = make_join(&si_even, &si_odd, &si_negative_double);

// Wait
receive(join_numbers);

**Join 的贪婪性:**两种类型的 join 都可以是贪婪的或非贪婪的。它们的贪婪性在尝试从它们那里 receive 信息时生效。在创建对象时,我们指定贪婪或非贪婪模式:

  • join_type::greedy - 贪婪 join 更高效,但可能导致*活锁*(一种死锁,请参阅此处),或者它们可能会无限期阻塞(即使没有死锁)。在 receive 请求时,它们会接收来自所有消息块的消息,并一直等待直到所有消息都被接收。一旦从任何块接收到消息,它就不会返回。
  • join_type::nongreedy - 它将轮询所有消息块以获取消息,并且仅在成功从所有消息块接收到所有消息后才返回。它*保证*能正常工作,因为它不会导致饿死(给其他接收者)或死锁。可能会发生这种情况:它在消息块 A 中找到消息,并尝试在消息块 B 中定位消息。当它在消息块 B 中找到消息时,它会再次尝试查找消息块 A 中的消息,而该消息可能已被另一个接收者移除(尽管*移除*功能取决于消息块)。非贪婪 join 效率较低。

我们创建贪婪和非贪婪 join 对象如下:

// Greedy join
join<int, greedy> join_numbers(411);

// Non-greedy join (DEFAULT)
join<int, non_greedy> join_numbers(411);

join 对象的默认创建模式是非贪婪的。

我们可以使用 make_join 来创建非贪婪的 multitype_join,并使用 make_greedy_join 来创建贪婪的 multitype_join 对象。

  • timer 类

timer 类以给定的时间间隔生成消息。这是一个仅源的消息块,意味着你不能用它来 send,但只能用 receive 函数。*发送*功能就是*计时器*本身,它会以固定的时间间隔触发。消息的触发可以是一次性的或连续的。通常,你不会调用 receive,而是将另一个消息块附加为消息的目标。例如:

int main()
{
   // call object as the target of timer
   call<int> timer_target([](int n)
   {
      wcout << "\nMessage : " << n << " received...";
   })

   timer<int> the_timer(2000, // Interval in ms
            50,        // Message to send
            &timer_target,    // The target
            true);        // Repeating: Yes
    
    
   // Start the timer
   the_timer.start();

   wcout << "  ** Press ENTER to stop timer**  ";
   wcin.ignore();

   the_timer.stop();

   wcout << "\n\n** Timer stopped**";
}

timer 的构造函数不言自明。以下方法对 timer 很重要:

    • timer::start - 计时器不会自动启动,您必须通过调用此方法来启动计时器。如果计时器是非重复的,它只执行一次(在给定超时后)。如果是重复的,它将在给定的超时值上连续重复。
    • timer::stop- 顾名思义。如果计时器未运行,则无效。
    • timer::pause - 对于非重复计时器,它与调用 stop 方法相同。对于重复计时器,它会暂停计时器,可以通过再次调用 start 来恢复。

计时器的目标可以是任何目标消息块。例如,如果我们将 timer_target 的类型更改为如下:

unbounded_buffer<int> timer_target;

在计时器停止之前,它将简单地填满这个 unbounded_buffer 对象!

至此,我们结束了消息块类。我讨论过的消息块类的摘要:

传播 源限制 目标限制 备注
single_assignment

源和目标,两者。

它们既可以是接收者也可以是发送者。

无限

无限

**一次性写入**消息块。拒绝任何其他消息。接收时,返回第一条消息。
overwrite_buffer 写多,但**读一**个消息块。覆盖新发送的消息。不删除消息。
unbounded_buffer 写多,读多消息块,维护**FIFO 顺序**。一旦接收就删除消息,如果*没有*待处理消息,receive 调用将阻塞。
transformer

只允许 1 个接收者。

见备注。

**将**一条消息转换为另一条消息类型。在形成管道时很有用。使用 link_target 连接目标。transformer 是用 lambda/函数创建的。只允许一个目标,否则会引发异常(本文尚未讨论)。
join 模拟 WaitForMultipleObjects,采用**等待所有**原则。一次只能有一个接收者,否则会引发异常(CR 中的异常将在下面讨论)。基于 vector,允许等待任意数量的消息块。
multitype_join

10

对于**不同**底层消息块类型,与上面相同。基于 tuple 类。同时等待时引发异常。
choice 模拟**等待任意一个**等待原则。基于 tuple 类,因此将消息块限制为最多 10 个。不允许同时等待。
call 仅目标

无限

-不适用-

模拟**函数指针**机制。构造函数需要一个函数/lambda 作为消息目标。基于模板,lambda/函数接收相同类型。允许多个发送者。将待处理消息保留在 FIFO 中。函数/Lambda 调用意味着消息已被消耗/移除。
timer 仅源

-不适用-

1

以**固定时间间隔**/*一次*将消息发送到指定的消息块目标。请注意,它运行在 Windows 计时器队列计时器之上。

异步代理 (Asynchronous Agents)

异步代理(或代理)可用于以更结构化和面向对象的方式专门化任务。它有一组状态(*生命周期*):**已创建**、**可运行**、**已启动**、**已完成**和**已取消**。代理与其它任务/线程异步运行。简单来说,代理可以让你将一个线程写成一个单独的类(MFC 程序员可能将其与 CWinThread 相比)。

Agents Library 有一个**抽象**基类 Concurrency::agent。它有一个名为 run 的纯虚函数,你在派生类中实现它。你实例化你的类,然后调用 agent::start 方法。这是正确的猜测——运行时调用(调度)你的 run 方法。

示例派生:

class my_agent : public agent
{
   void run()
   {
      wcout << "This executes as a separate task.";
      done();
   }
};

由于 run 是一个虚函数,由 CR 调用,所以无论你将实现放在公共还是私有部分都没有关系。

下面是使它工作的代码:

int main()
{
    my_agent the_agent;
    the_agent.start();

    agent::wait(&the_agent);
}

现在是描述:

  • agent 是抽象类,run 是纯虚函数。
  • 我继承了它到 my_agent 类,实现了所需的 run 方法,它只不过是 void run(void)
  • 调用了 agent::done 方法,让运行时知道它已经完成。它将此代理的状态设置为**agent_done**。更多内容见下文。
  • main 中,创建了一个名为 the_agent 的对象,并通过 agent::start 方法启动了它。start 方法本质上将代理调度为*异步*运行——即,作为一个单独的任务/线程。
  • 然后我使用静态方法 agent::wait 等待代理完成。还有其他等待的变体,我们很快就会看到。

代理的生命周期

代理从其初始状态终端状态有一个生命周期,如下图所示(摘自 MSDN):

Agent.png

正如您所见,代理有五个阶段。实线和函数名称代表程序员的调用,虚线代表运行时进行的调用。代理不会遵循所有这些生命周期阶段,因为它可能在某个阶段终止。以下枚举成员(agent_status 枚举类型)描述了代理的每个阶段。

代理状态 状态含义
agent_created 代理对象已创建,尚未调度。
agent_runnable 代理已调度运行,但尚未开始执行。start 方法会执行此操作。
agent_started 代理已启动并正在运行。调用 start 方法后,运行时会异步执行此操作。
agent_done 代理已成功完成其执行。覆盖的 run 方法将为此状态显式调用此 done 方法。
agent_canceled 代理在进入已启动阶段之前已被取消。如果其他代理/任务调用了 agent::cancel 方法,则会发生这种情况。

需要注意的是,一旦代理进入已启动阶段,就无法取消它。它将继续运行!

您可能会问,为什么需要显式调用 agent::done 方法?run 函数返回并由运行时知晓其完成是否不够?
嗯,run 重写只是一个代理可以开始自己的工作的方法。它可能有一个或多个消息块用于发送、接收和等待。例如,call 消息块被调度为从其他源获取输入,并调用 done(或有条件地调用 done)。在 run 方法内部调用 done 不是强制性的,但您在代理完成时调用它。agent::wait(和其他等待函数)仅在对等待的代理调用了 done 后才返回。

以下示例说明了这一点

class generator_agent : public agent
{
   call<int> finish_routine;
public:

   single_assignment<int> finish_assignment;

   generator_agent() : // agent' constructor
      finish_routine([this](int nValue)  // Sets the call object with a lambda, captures 'this'
      {
         wcout << "\nSum is: " << nValue;
      
         // Here we call agent::done
         done(); // this->done();
      })
   {
   }

   void run()
   {
      wcout <<"\nGenerator agent started...\n";
      finish_assignment.link_target(&finish_routine);
   }
};

尽管该代理的实现有点复杂,但它例证了如何隐藏代理的内部细节,同时仍然与程序中的其他代理/任务进行通信。此示例并不好,仅用于说明。在此,call 负责调用 donesingle_assignment 通过 link_target 函数将 call 设置为其目标。下面我将说明为什么我将 finish_assignment 放在公共区域。

class processor_agent : public agent
{
   ITarget<int>& m_target;

public:
   processor_agent(ITarget<int>& target) : m_target(target)  {}

   void run()
   {
      wcout << "\nEnter three numbers:\n";
      
      int a,b,c;
      wcin >> a >> b >> c;

      send(m_target, a+b+c);
      done();
   }
};

processor_agent 将接受 ITarget<int> 类型的对象作为引用。run 方法很简单。它将消息发送到指定的(在构造时设置的)目标。processor_agent 对其他代理(或目标)一无所知,它只需要一个 ITarget 的子类型 int。要接受 ITarget 的任何子类型,您可以使 processor_agent 成为模板类。

下面是 main 函数,它使用两个代理进行通信,并使用 agent::wait_for_all 函数等待两个代理完成。processor_agent 的构造函数需要一个 ITarget<int> 对象,因此将 generator.finish_assignment 传递给它。

int main()
{
   generator_agent generator;
   processor_agent processor(generator.finish_assignment);

   // Start
   generator.start();
   processor.start();

   // Wait
   agent* pAgents[2] = {&generator, &processor};
   agent::wait_for_all(2, pAgents); // 2 is the number of agents.
}

本段让您了解 Concurrency Library 的讨论的上层已完成。它包括并行模式库异步代理库。CR 的下层包括任务调度程序资源管理器。我将只讨论任务调度程序。还有一个单独适用的组件是**同步数据结构**,我将在下面讨论它,然后再详细介绍任务调度程序。


同步数据结构

并发运行时支持以下三种同步原语,它们是并发感知的。它们与 CR 的协作式任务调度程序协同工作。我将在本节之后讨论任务调度程序。简而言之,协作式调度不会将计算资源(即 CPU 周期)交给系统中的其他线程,而是将其用于调度程序中的其他任务。以下类型已公开供 CR 中的数据同步使用

  • 临界区 - critical_section
  • 读/写锁 - reader_writer_lock
  • 事件 - event

头文件:concrt.h

与标准的 Windows 同步原语不同,临界区和读/写锁是非重入的。这意味着如果一个线程已经锁定/拥有一个对象,尝试重新锁定同一个对象将引发 improper_lock 类型的异常。

  •  critical_section 类

表示并发感知的临界区对象。既然您正在阅读此内容,我相信您知道什么是临界区。由于它是非重入的,它将把处理资源让给其他任务,而不是抢占它们。critical_section使用 CRITICAL_SECTION Windows 数据类型。以下是 critical_section 类的方法

方法 描述 备注
lock 获取当前线程/任务的锁。如果临界区已被另一个线程/任务锁定,则调用将阻塞。 如果临界区已被当前线程/任务获取,运行时将引发 improper_lock 异常。
try_lock

尝试锁定临界区,而不阻塞。即使 CS 已被同一线程锁定,也不会引发异常。

如果成功,则返回 true,否则返回 false
unlock 解锁已获取的临界区对象。 如果对象被当前任务/线程锁定,则会引发 improper_unlock。

由于上述方法在函数有多个返回点、引发异常或程序员忘记解锁临界区时不够安全,因此 critical_section 包含一个名为 critical_section::scoped_lock 的子类。scoped_lock 类只是父类 critical_sectionRAII 包装器。此类除了构造函数和析构函数外,别无他物。以下示例说明了这一点

不可靠的方案

critical_section cs; // Assume defined in class, or somewhere
 
void ModifyOrAccessData()
{
   cs.lock();

   // Do processing. We have lock.
   // No other threads are using it.
   // This function, however may have multiple return statements
   // and putting unlock everywhere is cumbersome, and misakes
   // may happen. Also, exceptions may pose problems...
   
   cs.unlock();
}
可靠的方案,由 RAII 支持
void ModifyOrAccessData()
{
   critical_section::scoped_lock  s_lock(cs);  // CTOR locks it.
   
   // Do processing. Return from anywhere, any how. 
   // Forget about unlocking the critical section. 
   // The DTOR or scoped_lock will do it for us, even 
   // when exceptions occur. 
} 

需要注意的是,如果锁已被同一线程/任务持有,并且 scoped_lock 用于此,则会发生 improper_lock。同样,如果 scoped_lock 已成功获取锁,并且您显式解锁它,scoped_lock 的析构函数将引发 improper_unlock 异常。代码片段显示了两点:

void InvalidLock()
{ 
  cs.lock(); // Lock it
  ...

  critical_section::scoped_lock s_lock(cs); // EXCEPTION!
}

void InvalidUnlock()
{
  critical_section::scoped_lock s_lock(cs);
  ... 
  cs.unlock();

  // The DTOR of scoped_lock, which would be called ASAP this function 
  // is about to unwind, would attempt to unlock the critical section.
  // And that thing would raise an exception!
}

如前所述,阻塞的 lock 不会抢占处理资源,而是会尝试将其交给其他任务。稍后将详细介绍。

  •  reader_writer_lock 类

假设您的程序中有多个线程正在访问和修改一个数据或数据结构。例如,任意数据类型的数组。当然,您可以使用临界区或其他同步原语来控制对该共享数据的访问。但是,该共享数据主要用于读取,而不是用于更新/写入操作。在这种情况下,用于读取的通常是徒劳的。

reader_writer_lock 类允许多个读取者同时读取,而不会阻塞其他尝试读取共享数据的线程/任务。但是,写入锁只允许一个线程/任务。

  • critical_section 类一样,reader_writer_lock 类也是非重入的。一旦线程获取了(任何类型的)锁,并尝试从同一线程重新获取锁(任何类型),就会导致异常。
  • 由于此类也是并发感知的,因此它不会交出处理资源,而是将其用于其他任务。
  • reader_writer_lock 类是写优先类。它优先处理写入者,读取者可能会被饿死。但是,请求将按 FIFO 顺序提供(为两种锁类型维护单独的请求队列)。
  • 只读锁不能升级为写锁,除非释放只读锁。同样,写锁也不能降级为只读锁,除非释放写锁。

注意:如果共享数据涉及比读取更频繁的写入,建议您使用其他锁定机制。此类在主要用于读取的上下文中将提供最佳性能。

reader_writer_lock使用 Windows Vista 及更高版本中可用的精简读/写锁。SRW 锁和 reader_writer_lock 有以下显著区别

  • SRW 既不是写优先也不是读优先——授予锁的顺序是未定义的。
  • 对于调度,SRW 使用抢占模型,而 RWL 使用协作模型。
  • 由于 SRW 的大小相当小(指针大小),并且不维护请求队列,因此它比 RWL 更快地授予锁。但是,根据程序设计,SRW 的性能可能不如 RWL。
  • SRW 依赖于操作系统(API),RWL 仅依赖于并发运行时。

reader_writer_lock 类显示的方法

方法 描述 备注
lock

获取写(即读+写)锁。直到获得写锁为止一直阻塞。

如果写锁已被当前线程/任务获取,运行时将引发 improper_lock 异常。(另请参阅下表)
try_lock

尝试获取写锁,而不阻塞。

如果成功,则返回 true,否则返回 false。(见下表)
lock_read 获取只读锁并一直阻塞直到获得它。 仅当写锁已被获取时才引发 improper_lock。否则继续。请参阅下表。
try_lock_read

尝试获取写锁,而不阻塞。

如果成功,则返回 true,否则返回 false。(见下表)
unlock 释放锁(无论获取的是哪种类型的锁)

如果未获取锁,则引发 improper_unlock

写锁请求被链接。因此,运行时将立即选择下一个待处理的写锁并解除其阻塞。

...

本文仍在撰写中。还有很多内容需要补充。但我还是先发布。由于这是一个新主题,我需要时间来阅读和尝试这些概念。这不到我期望在本篇文章中解释内容的 80%。肯定会添加更多示例、源文件和外部链接;并且书写错误将被纠正!
  • 修订 1(8 月 10 日):解释了 combinable 类
  • 修订 2(8 月 13 日):详细阐述了任务组
  • 修订 3(8 月 15 日):异步库。解释了一些概念和类。
  • 修订 4(8 月 18 日):说明了 transformer 类。
  • 修订 5(8 月 19 日):解释了 choice 类。
  • 修订 6(8 月 21 日):详细阐述了 Joins 并解释了 Timer。提供了摘要。
  • 修订 7(8 月 25 日):详细阐述了临界区、读/写锁。
  • 修订 8(9 月 2 日):添加了示例项目
实用的示例将很快提供,并且示例项目将包含大量示例!

© . All rights reserved.