65.9K
CodeProject 正在变化。 阅读更多。
Home

C++ 并发编程:第二部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.99/5 (18投票s)

2019年3月3日

CPOL

26分钟阅读

viewsIcon

20972

downloadIcon

273

本文是 C++ 并发编程第一部分的延续。我们将讨论同步、future 和 promise 以及 async,并以此总结 C++ 并发的介绍。

并发对多核环境做出了许多承诺,但也带来了一些挑战。如果没有并发,多核就不会比单核更好。如果程序执行速度更快是并发的深远意义,那么令人麻木的调试则是微妙的。同时运行多个任务的尝试可能会引入许多伴随好处的复杂性和困难。

本文是 C++ 并发编程:第一部分 的延续,并假设您现在熟悉 C++ 中并发和多线程的基本概念。

竞态条件 - 同步的需求

在并发程序中,资源和数据在线程之间共享。假设我们想增加一个数字并输出结果。如果我们在多线程环境中解决这个问题,结果将变得不确定和不可预测。下图将使这个想法更清晰。

对于喜欢动手操作的人,请考虑以下代码。

class Race_condition
{
public:
     int n;
     Race_condition() :n(0) {}
     void increment() {
           ++n;
     }
};

我们的 Race_condition 类实现了我们之前谈到的相同场景。它包含一个简单的函数来增加 n。让我们尝试在多线程环境中调用此函数。

int main()
{
     Race_condition racer;

     std::thread t1([&racer] {
           for (int i = 0; i < 10000; ++i) {
                racer.increment();
           }
     });

     std::thread t2([&racer] {
           for (int i = 0; i < 10000; ++i) {
                racer.increment();
           }
     });

     t1.join();
     t2.join();

     std::cout << racer.n << std::endl;
     return 0;
}

我们只创建了 2 个线程来将计数器 racer 增加 10000 次。您也可以尝试创建更多的线程。这里的想法是向您展示,您不需要几十个线程就能遇到问题。尝试执行上面的程序几次,每次都会得到不同的结果。以下是我在我的系统上得到的一些结果。

// 15556
// 14042
// 20000

嗯,如果您只是想生成随机数,这个程序是一个不错的选择,但要获得可预测的结果,它肯定不行。这被称为竞态条件;一种不希望出现的情况,输出取决于序列或时间。增量不是一个原子操作(原子操作意味着处理器可以在同一个总线操作中同时读写同一个内存位置。我们将在文章后面更详细地探讨它)。此外,对整数的访问没有同步。线程正在非连续地访问它。由于同时访问共享数据,可能会出现许多错误;两个线程可能会相互交错,从而导致结果不可预测。这只是一个简单的小问题。想象一下,如果几十个线程处理涉及数百万次计算和运算的巨大数据,那将是多么混乱。

同步

让我们更深入地研究上面的问题。上面例子中的增量意味着三个操作的执行

  • 读取 n 的当前值
  • 将 n 的当前值增加 1
  • 写入 n 的新值

上述操作的序列必须保持为读、增量和写,并根据需要重复相同的序列。但是当我们启动多个线程时,序列会被打乱,因为我们不知道哪个线程可能正在执行上述哪个函数。请注意,这些操作是在单个对象 n 上执行的,并且它持有的数据在所有我们创建的 10 个线程之间共享。为了避免竞态条件,我们需要一种方法来保护共享数据,使其不能被多个执行线程同时访问。以下是解决同步问题的一些建议方案

  • 信号量 — C++ 标准没有定义信号量类型,但提供了您可以编写自己的信号量的可能性。尽管不一定需要,因为大多数信号量的使用最好用互斥量和/或条件变量来代替。
  • 互斥量 — C++ 提供了 std::mutex 类用于同步和避免竞态条件。二元信号量也可以用作互斥量
  • 原子引用 — C++ 引入了作为模板类 std::atomic 的原子类型
  • 监视器 — 保证一次只有一个线程可以在监视器内活动。C++ 不支持监视器。
  • 条件变量std::condition_variable 类提供了在线程之间同步的能力。
  • 比较并交换 — 它将内存位置的内容与给定值进行比较。如果两个值相同,则它将使用新值修改该内存位置的内容。

在这里,我们只探讨 C++ 中同步问题最常见的解决方案,即互斥量、原子类型和条件变量。

互斥(mutex)

互斥是指阻止多个线程同时执行临界区(访问共享资源的代码段)的想法。C++ 提供了 std::mutex 类用于同步,它用于保护共享数据免受多个执行线程的并发访问。它通过锁定和解锁机制工作。一旦互斥量被锁定,当前执行的线程将拥有该资源,直到它被解锁;这意味着在互斥量解锁其所包围的代码块之前,没有其他线程可以执行任何指令。让我们看看如何使用互斥量来解决我们之前示例中的同步问题。

class Race_condition
{
public:
     std::mutex mutex;
     int n;
     Race_condition() :n(0) {}
     void increment() {
          mutex.lock();
             ++n;
          mutex.unlock();
     }
};

您可以通过实例化 std::mutex 来创建一个互斥量,通过调用成员函数 lock() 来锁定它,并通过调用 unlock() 来解锁它。互斥量锁定确保一旦一个线程完成修改,然后另一个线程才能修改数据。现在尝试执行与上面相同的程序,看看每次是否都能得到相同的预期输出。这是我在我的系统上得到的结果。

// 20000
// 20000
// 20000

如果出现错误异常,将抛出 system_error,错误代码可能如下所示

  • resource_deadlock_would_occur(将发生资源死锁)
  • resource_unavailable_try_again(资源不可用,请重试)
  • operation_not_permitted(操作不允许)
  • device_or_resource_busy(设备或资源忙)
  • invalid_argument(无效参数)

让我们看看它是如何工作的。

std::mutex mutex;

try {
    mutex.lock() ;
       // do nothing
    mutex.lock() ;
}

catch (system_error & error) {
    cerr << error.what() << endl;
    cerr << error.code() << endl;
}

这是我在我的系统上得到的结果。

// device or resource busy: device or resource busy

// generic:16

// d:\agent\_work\2\s\src\vctools\crt\crtw32\stdcpp\thr\mutex.c(51): mutex destroyed while busy

但是互斥量还有更多吗?是的!库中有几种互斥量类。例如:

std::mutex

提供基本锁定

std::recursive_mutex

同一个线程可以重复锁定而不会死锁

std::timed_mutex

std::recursive_timed_mutex

尝试获取锁时提供超时,即线程在等待另一个线程完成时可以做其他事情。

std::shared_mutex

允许一个以上的线程拥有互斥量

std::call_once(std::once_flag flag, function)

std::call_oncestd::once_flag 匹配,无论启动多少个线程,函数都只会调用一次。

 

一切都很好,但如果我们忘记在函数结束时解锁互斥量怎么办?直接调用成员函数是不明智的,因为我们必须记住解锁每个代码路径(包括异常),否则我们可能会遇到不希望出现的情况,就像我们在执行上述代码时遇到异常一样。在这种情况下,程序通常会调用 abort。

// abort() has been called

优于互斥量的锁

那么我们有更好的选择吗? 嗯,是的!C++ 提供了一些首选的锁类型。

std::lock_guard 为互斥量实现了资源获取即初始化(RAII)(RAII 描述了资源只在对象生命周期内持有的概念。点击链接了解更多关于此编程习语的信息)。它通常被称为互斥量包装器,提供 RAII 风格的机制,这意味着互斥量将在作用域块的持续时间内被拥有。当 std::lock_guard 实例化时,它会尝试获取它所给定的互斥量的所有权(可以把它想象成在互斥量上调用了 lock())。一旦 lock_guard 对象超出作用域(就像调用了 unlock() 一样),lock_guard 的析构函数就会被调用,释放互斥量。让我们使用我们之前示例中的代码问题,但这次实现 lock_guard

class Race_condition
{
public:
     std::mutex mutex;
     int n;
     Race_condition() :n(0) {}
     void increment() {
          //replaced the mutex.lock() with lock_guard
          std::lock_guard<std::mutex> lock(mutex);
          ++n;
     }
};

我们将一个 mutex 传递给 std::lock_guard<> 的构造函数来获取锁,并且 mutex 将在析构函数中被解锁,从而无需显式调用 unlock()。在您的代码中进行上述修改并执行之前的程序。

std::unique_lock 提供了比 std::lock_guard 更多的功能和能力。unique_lock 是一个管理互斥量对象的对象,在锁定和解锁状态下都具有唯一的所有权。它不需要拥有与其关联的互斥量的锁,并且允许您在实例之间转移锁的所有权。(我们将在文章后面讨论死锁避免时探讨 unique_lock 的这种灵活性。现在请坚持住)。至少,它可以像 std::lock_guard 一样使用,以简化互斥量的锁定和解锁并提供异常安全性。请看下面的语法。

{
    std::mutex mutex;
    std::unique_lock<std::mutex> lock(mutex);

    //do something
}

但它不仅仅是锁定和解锁互斥量。让我们看看它提供了哪些更多功能。

unique_lock::try_lock

如果互斥量尚未锁定,则锁定互斥量

unique_lock::try_lock_for

在指定时间段内尝试锁定互斥量

unique_lock::try_lock_until

尝试锁定互斥量直到指定时间点

unique_lock::operator=

移动赋值 unique_lock

unique_lock::swap

交换 unique_lock

unique_lock::release

释放互斥量

unique_lock::owns_lock

拥有锁

unique_lock::operator bool

如果拥有锁则返回 true,否则返回 false

unique_lock::mutex

获取互斥量

我还不打算深入实现以上每个功能,因为那超出了本文的范围。现在我将把它留给您探索和实验。

死锁

到目前为止,互斥量的使用似乎轻而易举,因为它们确保临界区中的代码在任何时候都只能由单个线程访问。锁定和解锁机制提供了排他性,但猫还在袋子里。让我们编写一个简单的程序来释放魔鬼。

void print() {
     std::cout << "Critical Data" << std::endl;
}

int main() {
     std::mutex mutex1, mutex2;

     std::thread t1([&mutex1, &mutex2] {
          std::cout << "Acquiring First mutex..." << std::endl;
          mutex1.lock();
          print();         

          std::cout << "Acquiring second mutex..." << std::endl;
          mutex2.lock();
          print();

          mutex1.unlock();
          mutex2.unlock();
     });

     std::thread t2([&mutex1, &mutex2] {
          std::cout << "Acquiring second mutex..." << std::endl;
          mutex2.lock();
          print();

          std::cout << "Acquiring First mutex..." << std::endl;
          mutex1.lock();
          print();
         
          mutex2.unlock();
          mutex1.unlock();
     });

     t1.join();
     t2.join();

     return 0;
}

在上面的代码中,我们将互斥量对象 mutex1mutex2 作为参数传递给线程 t1t2。还可以看到我们正在获取多个锁。

执行上面的程序,看看会发生什么。这是我在我的系统上得到的结果。

注意:您的输出可能不同。这是因为在多线程环境中没有严格的执行顺序。您的程序也有可能成功运行并且不会陷入死锁情况(我稍后会解释原因)。不,代码没有问题。多运行几次程序,您就会发现死锁。

程序就卡在那里,之后什么也没发生。这种情况被称为死锁(一种状态,其中一个进程持有一个资源并等待被另一个进程获取的某个其他资源,因此两个进程最终无限期地相互等待做某事)。当您不知道幕后发生了什么时,这种情况会让您抓狂。我自己第一次遇到死锁情况时,花了好几个小时试图找出问题所在以及如何解决。但让我为您简化一下。事情是这样的。

我们的线程 t1t2 都可以访问这两个互斥锁。线程 t1 尝试按 mutex1.lock()mutex2.lock() 的顺序获取锁,而线程 t2 尝试按 mutex2.lock()mutex1.lock() 的顺序获取锁。如果线程 t1 获取互斥量 mutex1 而线程 t2 获取互斥量 mutex2,则可能发生死锁。现在 t1 将等待互斥量 mutex2,而 t2 将等待互斥量 mutex1。两个互斥量将相互等待释放资源,从而导致死锁情况。

我曾经执行过一次程序,它运行得很好。没有死锁。这是因为即使可能发生死锁,如果线程 t1 在线程 t2 尝试获取锁之前获取并释放 mutex1mutex2 的互斥锁,它也不会发生。当然,线程运行的顺序取决于 CPU 调度器如何调度它们。

最后但同样重要的是要在这里提到:上面的代码代表了糟糕的代码示例。切勿在持有另一个锁的同时尝试获取锁,因为它很可能会导致死锁。不惜一切代价避免嵌套锁和此类代码。这只是为了通过示例使想法清晰。

避免死锁

死锁是噩梦,必须不惜一切代价避免它们。避免死锁最常见的方法之一是按相同的顺序锁定互斥量。mutex1 总是先于 mutex2 锁定,这样我们就不会再面临死锁。在我们的例子中,这非常直接,可以通过应用这个简单规则来避免死锁。看这里

void print() {
     std::cout << "Critical Data" << std::endl;
}

int main() {
     std::mutex mutex1, mutex2;

     std::thread t1([&mutex1, &mutex2] {
           std::cout << "Acquiring First mutex..." << std::endl;
           mutex1.lock();
           print();

           std::cout << "Acquiring second mutex..." << std::endl;
           mutex2.lock();
           print();

           mutex1.unlock();
           mutex2.unlock();
     });

     std::thread t2([&mutex1, &mutex2] {
        //changing the order of acquiring mutexes to avoid deadlock
           std::cout << "Acquiring First mutex..." << std::endl;
           mutex1.lock();
           print();

           std::cout << "Acquiring Second mutex..." << std::endl;
           mutex2.lock();
           print();

           mutex1.unlock();
           mutex2.unlock();
     });

     t1.join();
     t2.join();

     std::cout << “Program ran succesfully” << std::endl;
     return 0;
}

现在我们的程序将不再陷入死锁。这是输出。

但并非所有情况下都如此简单,例如当互斥量保护同一个类的不同实例时。在这种情况下,通常需要应用一些 C++ 提供的额外功能。

有两种常见的模式用于避免死锁。

  • 同时锁定互斥量,然后创建守卫
  • 创建守卫,然后同时锁定互斥量

这两种模式的代码实际上是等效的,可以相互替换。工作原理如下。

{
   std::lock(mutex1, mutex2);
   std::lock_guard<std::mutex>  lock1(mutex1, std::adopt_lock);
   std::lock_guard<std::mutex>  lock2(mutex2, std::adopt_lock);

   //do something
}

在这里,我们首先获取锁(避免死锁),然后创建守卫以正确释放它们。std::lock() 使用死锁避免算法并锁定所有作为参数传递的对象。std::adopt_lock() 用作 lock_guard 构造函数的可能参数(也可以与 unique_lock 一起使用)。它假定调用线程已经拥有锁,这很正确,因为我们刚刚锁定了它们。现在 std::adopt 包装器将采用互斥量的所有权,并在控制流出作用域时立即释放它。

这是实现相同功能的另一种方法。

{
    std::unique_lock<std::mutex> lock1 (mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2 (mutex2, std::defer_lock);
    std::lock(lock1, lock2);
 
    //do something
}

在这里,我们首先创建锁(不获取它们),然后使用 std::lock 同时获取它们,而没有死锁的风险。std::defer_lock 不获取互斥量的所有权,而是假定调用线程稍后会调用 lock 来获取互斥量。同样,一旦控制流出作用域,包装器就会释放锁。

这里需要注意的一点是,您不应该连续两次调用 std::lock 来单独获取互斥量,否则您将再次陷入死锁情况。

 {
    std::unique_lock<std::mutex> lock1 (mutex1, std::defer_lock);
    std::unique_lock<std::mutex> lock2 (mutex2, std::defer_lock);

    std::lock(lock1);
    std::lock(lock2); //deadlock!!!!
}

C++ 17 添加了 std::scoped_lock,它也是一个互斥量包装器,提供 RAII 机制,用于拥有一个或多个互斥量,只要作用域块处于活动状态。它使用与 std::lock 相同的死锁避免算法,一次性锁定任意数量的互斥量。当创建 scoped_lock 对象时,它将尝试获取给定互斥量的所有权。一旦控制流离开创建 scope_lock 对象的范围,该对象将被销毁,互斥量将按相反顺序释放。因此,它更优雅地解决了死锁问题。请观察语法。

{
    std::scoped_lock<std::mutex, std::mutex> lock(mutex1, mutex2);

    //do something
}

线程间的同步

不仅共享数据需要保护,我们还需要同步不同线程之间的操作。一个线程可能需要等待事件发生或另一个线程完成任务。例如,一个线程通知另一个线程它已经完成数据读取,而另一个线程现在可以进一步处理数据。因此,需要一种等待/通知机制来在访问共享资源时同步线程。这比轮询等其他方法更高效。C++ 标准库提供了条件变量来处理此类问题。

std::condition_variable 类是一种同步原语,用于阻塞调用线程,直到被“通知”恢复。该类提供了一些函数:

condition_variable::wait()

在互斥量上等待直到被通知

condition_variable::wait_for()

等待超时或直到被通知

condition_variable::wait_until()

等待直到指定时间点或被通知

condition_variable::notify_one

唤醒一个等待线程

condition_variable::notify_all

唤醒所有等待线程

让我们实现一个简单的发送方/接收方场景,看看它是如何工作的。

std::mutex mutex;
std::condition_variable cv;
bool ready = false;

void print()
{
       std::cout << "Waiting for other thread to signal ready!" << std::endl;
       std::unique_lock<std::mutex> lock(mutex);
       while (!ready)
       {
              cv.wait(lock);
       }
       std::cout << "thread is executing now...." << std::endl;
}

void execute()
{
       std::cout << "Thready is ready to be executed!" << std::endl;
       ready = true;
       cv.notify_all();
}

int main()
{
       std::thread t1(print);
       std::thread t2(print);
       std::thread t3(execute);
      
       t1.join();
       t2.join();
       t3.join();

       return 0;
}

请注意,任何需要在 std::condition_variable 上等待的线程,都需要首先获取 std::unique_lock。等待操作将原子性地释放互斥量(这就是我们使用 unique_lock 提供的灵活性,即等待线程将在等待时释放互斥量),并暂停线程的执行。在条件变量被通知后,线程将被唤醒,并重新获取互斥量。

当我们需要处理数据(但在处理之前)时,也可以使用解锁 std::unique_lock 的灵活性,因为数据处理可能非常耗时,并且将互斥量锁定超过必要的时间不是一个好主意。

数据竞争和原子操作

我在谈论竞态条件时说过增量不是原子操作,但当时没有深入探讨。现在是时候讨论什么是原子操作和原子类型了。

当处理器在同一总线操作中同时读取和写入一个位置时,该操作被称为原子操作。原子意味着不可分割和不可约简,这意味着它们不能被分成子任务,因此原子操作必须完全执行,否则根本不执行。因此,它保证以单个事务执行,这意味着其他线程将看到操作开始之前的系统状态或操作完成之后的系统状态,但不能看到任何中间状态。

在这里,让我带您回到我们的增量示例,并解释我的意思。正如我们之前讨论的,增量是读-修改-写操作(显然不是原子操作)。这是当我们执行增量函数(没有锁)时幕后发生的事情。

int n = 0;

Thread 1 — t1:
   int tmp = n;     // 0
   ++tmp;                        // 1
   n = tmp;           // 1

Thread 2 — t2:
   int tmp = n;     // 0
   ++tmp;                        // 1
   n = tmp;           // 1!

如果从不同的线程访问单个内存位置的两次访问之间没有强制顺序,其中一次或两次访问都不是原子的,并且其中一次或两次是写入,那么这就是数据竞争

(不要因为我之前称之为竞态条件而责怪我。实际上两者都有。许多竞态条件实际上是由数据竞争引起的。将数据竞争视为原因,竞态条件视为结果。竞态条件发生在事件的时间或顺序影响程序正确性时,这也可以由上下文切换、多处理器上的内存操作(这里就是数据竞争)或硬件中断引起。我们已经讨论了如何修复竞态条件。在这里,我想讨论如何修复数据竞争。)

当一个程序涉及多个线程并发执行的两个或多个操作,目标是同一内存位置且未同步时,就会发生数据竞争。请注意,如果我们使程序线程安全(在线程之间引入同步),那么就可以避免数据竞争,从而解决由此引起的竞态条件。(就像我们之前使用锁所做的那样)但是,如果要调整一个简单值,例如递增或递减计数器,那么有更简单高效的方法可用。C++11 并发库引入了原子类型作为模板类:std::atomic。这些类型避免了未定义行为,并提供了线程之间的操作排序。操作会自动在变量上执行,可以称为变量同步

您可以使用任何您想要的类型作为模板参数,并且在该变量上的操作将是原子的,因此是线程安全的。

std::atomic<Type> object

让我们尝试使用原子操作来实现我们的增量函数。

class Atomic_counter
{
public:
       std::atomic<int> n;
       Atomic_counter () :n(0) {}
       void increment() {
              ++n;  
       }
};

int main()
{
       Atomic_counter counter;
       std::thread t1([&counter] {
              for (int i = 0; i < 10000; ++i) {
                     counter.increment();
              }
      
       });

       std::thread t2([&counter] {
              for (int i = 0; i < 10000; ++i) {
                     counter.increment();
              }
       });

       t1.join();
       t2.join();

       std::cout << counter.n << std::endl;
       return 0;
}

现在观察上述程序的输出。

// 20000
// 20000
// 20000

这清楚地表明我们已经修复了数据竞争,因为 ++n 现在是原子行为。您可能已经注意到上面的代码是无锁的,因此效率更高。嗯,这就是原子操作的全部意义。锁(和互斥量)实际上会暂停线程执行,并倾向于为其他任务释放资源,但显然代价是上下文切换开销(停止和重新启动线程会产生一些计算成本),而原子操作中的线程不会等待,而是尝试忙等待(尝试执行操作直到成功),并且只有实际操作的持续时间才被锁定。因此,它们不会产生上下文切换开销,也不会释放 CPU 资源,这使得它们更高效。此外,它们不会遭受死锁。

但它并非总是无锁。根据数据类型和大小应用不同的锁定技术。像 int、long、float 这样的整数类型意味着无锁技术,这比互斥量更快、更高效。对于大型类型,变量不能在单个时间刻度内修改,因此编译器会在变量前后插入线程守卫,这不会比互斥量带来任何性能优势。

那么哪些类型可以原子化?广义上讲,任何可平凡复制的类型(没有虚函数,无异常构造函数)都可以原子化。

std::atomic<T>

其中 T 代表所有类型,例如:

std::atomic<int> n;
std::atomic<double> d;
struct S {long a; long b;};
std::atomic<S> s;

其他操作

所有原子操作都可以在原子变量上执行,例如读取、写入、特殊原子变量或某些类型相关的操作。我们已经看到了前置增量;这里还有一些可以执行的操作。

  • 所有类型的赋值和复制(读写);内置类型和用户定义类型
  • 原始指针的增量和减量
  • 整数的加法、减法和位逻辑运算(++、+=、--、-=、|=、&=、^=)
  • std::atomic 有效,无特殊操作
  • std::atomic 有效,无特殊操作
  • 显式读写
std::atomic<T> a;

T b = a.load();       // Same as T b = a;
a.store(b);           // Same as a = b;
  • 原子交换
T c = a.exchange(b); // Atomically: c = a; a = b;
  • 对于整数 T,还有各种其他 RMW(读-修改-写)操作可用,例如 fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor()
std::atomic<int> a;
a.fetch_add(b);      // Same as a += b;

比较并交换

比较并交换(CAS)用于大多数无锁算法。在 C++ 中所有可用的 RMW 操作中,比较并交换是绝对必要的。例如:

bool success = a.compare_exchange_strong(b, c);

// If a==b, make a=c and return true
// Otherwise, set b=a and return false

关于 CAS 的重要一点是,原子增量只适用于 int,即使在这种情况下,原子乘法也不是合法操作。

n *= 2;    // 没有原子乘法!

但在这种情况下可以使用 CAS。例如,您可以使用 CAS 来实现整数增量、双精度数增量、整数乘法等。

以下是您如何进行整数增量的方法。

std::atomic<int> x{0};

int n = x;
while ( !x.compare_exchange_strong(n, n+1) ) {}

整数乘法呢?

while ( !n.compare_exchange_strong(n, n*2) ) {}

还有其他 CAS 操作可用,例如:

  • atomic_compare_exchange_weak,
  • atomic_compare_exchange_weak_explicit,
  • atomic_compare_exchange_strong_explicit

您可以在这里阅读更多关于它们的信息。

线程 vs. 任务

到目前为止,我们一直在讨论线程的低级管理,这被称为基于线程的并行(有人可能会争论它是并发还是并行,但这个争论超出了本文的范围)。您创建线程,启动它们,"加入"它们等等。尽管它们是操作系统抽象,但您此时最接近硬件。使用线程时,我们最好知道我们需要多少线程,它们在特定时间有多少正在运行,以及如何在它们之间平衡负载,否则很快就会变得一团糟。

但现在我们不再这样做。在如此低的层次处理机器是繁琐且容易出错的。有一个更高级别的抽象,称为基于任务的并行,您可以在其中管理“任务”。您定义整个工作块,然后库(或 API)管理线程。确保线程不会太少或太多,并且工作负载在线程之间合理平衡,无需您的干预。当然,您对低级系统的控制会少很多,但它更安全、更方便。此外,这里讨论了使用基于任务的并行的一些性能优势。尽管这是一个相当详细的讨论,但根据我的拙见,性能更多地取决于所执行任务的类型。

C++ 标准库确实提供了一些设施,允许在概念上以任务级别进行编程。以下是一些选择。

  • future 和 promise
  • packaged_task
  • async

std::future 和 std::promise

std::futurestd::promise 是类模板,用于从可能在单独线程上生成的任务返回值。它们使得在线程/任务之间传输值变得容易,而无需显式使用锁。其思想是承诺要传递的值,并且相应的future可以读取它。此图形说明将使该思想更清晰。

std::promise 提供了设置值的方法,该值稍后将通过关联的 std::future 读取,这意味着 std::future 充当将可用值的代理。如果 std::promise 对象在设置值之前被销毁,std::future 将抛出异常。每个 future 和 promise 都持有指向共享值的指针。这些指针是可移动的。让我们编写一个简单的程序来了解如何使用它们。

void func(std::promise<int> * p)
{
     int a = 10, b = 5;
     int result = a + b;
     std::cout << "From inside the Thread...." << std::endl;     p->set_value(result);
}

int main()
{
     std::promise<int> p;
     std::future<int> f = p.get_future();
     std::thread th(func, &p);
     std::cout << f.get() << std::endl;
     th.join();
     return 0;

}

我们在上面的程序中创建了 promise 和相应的 future 对象。std::promise 允许您显式设置值,就像我们在 p->set_value(result) 中所做的那样,该值稍后由 future 对象获取。如果我们想在不同时间点返回多个值,那么我们所需要做的就是将多个 promise 对象传递到线程中,并从它们相应的 future 对象中获取多个返回值。

std::packaged_task

std::promisestd::future 的情况下,值应该在 promise 对象中设置,然后线程函数才能在主线程中通过关联的 future 返回它,否则 future 将被阻塞直到值可用。另外需要注意的是,如果 promise 对象在返回值之前被销毁,则结果将是一个异常。C++ 提供了 std::packaged_task 来帮助解决上述情况。它是一个类模板包装器,意味着一个普通的函数可以作为异步运行。std::packaged_task 的构造函数接受一个函数作为参数,并创建一个 packaged_task 对象(关联的回调和返回的值或异常存储在其内部共享状态中)。返回的值可以通过关联的 future 对象在另一个线程或主函数中访问。这里还要提到的一点是,std::packaged_task 用于手动任务调用,并且主要用于线程池。

int func()
{
     int a = 10, b = 5;
     std::cout << "From inside the Thread...." << std::endl;    
     return a+b;
}

int main()
{
     std::packaged_task<int()> p(func);
     std::future<int> f = p.get_future();
     std::thread th(std::move(p));
     std::cout << f.get() << std::endl;
     th.join();
     return 0;
}

对我来说,packaged_task 最好的地方是**没有指针**。我希望我们都能同意指针很糟糕,或者至少我仍然不擅长处理它们,所以这是一种解脱。此外,由于它是一个包装器,因此可以使用任何可调用对象,即函数、函数对象或 lambda。请随意尝试使用其他方法。

std::async

std::async 是对 promise 的高层抽象,用于实现任务的异步执行。它是一个函数模板,将可调用对象(函数、函数对象或 lambda)作为参数并异步执行它。请看下面的代码。

int func()
{
     int a = 10, b = 5;
     std::cout << "From inside the Thread...." << std::endl;    
     return a+b;
}

int main()
{
     //auto f = std::async(func)
     // both would work

     std::future<int> f = std::async(func);
     std::cout << f.get() << std::endl;
     return 0;
}

请注意,std::async 消除了手动创建线程的部分。std::async 将自动为我们创建线程(或从内部线程池中选择一个线程)和一个 promise 对象。然后将此 std::promise 对象传递给线程函数,并返回关联的 std::future 对象。promise 对象中的值在传递的参数函数退出时设置,并最终在相应的 std::future 对象中返回。

std::async 充当任务调度器,其行为也可以通过参数进行配置。请观察以下代码。

int func(int)
{
     return 0;
}

int main()
{
     auto f = std::async(func, 1);
     int result1 = f.get();
     int result2 = func(2);
     return 0;
}

我们的函数将使用 std::async 异步启动。这就像我们在一个新线程中执行函数一样(或者至少尝试在一个新线程中异步执行它)。一旦与 func(由 std::async 执行)关联的 future 对象返回并存储在 f 中,我们的主线程将继续执行函数调用 func(2)。关于上述代码,另一点需要注意的是,您可以像我们使用 std::async(func, 1) 那样轻松地将参数传递给 async 接口。

通常,我们需要启动多个异步调用。现在,为了获取每个返回的 future,我们不得不给它们分别命名,这很痛苦,更不用说容易出错。一种方法是使用向量来存储 future。

std::vector<std::future<int>> f;

让我们在这里使用 lambda 让事情变得更有趣一点。让我们编写一个简单的程序。

int main() {
    std::vector<std::future<int>> f;
    for (int i = 0; i < 5; ++i)
    {
        f.push_back (std::async( [] (int x) {return x*x;}, i));
    }

    for (auto &result: f) {
        std::cout << result.get() << std::endl;
    }
    return 0;
}

传递参数

我们已经看到 std::async 允许我们传递参数。让我们看看它是否允许我们按引用传递参数。

std::string copy_func(std::string const &s) {
     return s;
}

int main() {
     std::string s = "some string";
     auto f = std::async(copy_func, std::ref(s));
     std::cout << f.get() << std::endl;
     return 0;
}

同样,您也可以使用 lambda 传递参数。

auto f = std::async([&s]() {return copy_func(s); });

但是,传递**异常**呢?std::async 允许这样做吗?让我们看看。

int func(){
     throw std::runtime_error("An error has occured");
}

int main() {
     auto f = std::async(func);
     try {
           std::cout << "From main..." << f.get() << std::endl;
     }
     catch (std::runtime_error const& error) {
           std::cout << "Caught Exception: " << error.what() << std::endl;
     }
}

您看,传递异常就像传递参数一样容易。您可以直接通过 std::async 接口传递参数,就像我们在之前的示例中所做的那样,这让程序员的生活变得轻松,您不必跳窗。

promise, packaged_task 和 async 之间的区别

到目前为止,我们已经看到了三种将任务与 future 关联起来的方法,即 promisepackaged_taskasync。让我再次强调它们之间的区别以及为什么某个方法会比其他方法更受青睐。

promise 允许您在不同的时间点将值传递给相应的 future,并且不限制返回值在函数调用结束时。此外,手动线程调用在较低级别为您提供了更好的控制。

packaged_task 将 future 的创建与任务的执行分离开来,这允许我们手动调用任务,在处理线程池时最有用。

async 是对 promise 和 future 的更高层抽象,省去了我们手动任务或线程调用的麻烦。它处理线程创建、线程数量和它们之间的负载平衡等较低层细节。

并发 != 并行

我们一直在讨论 std::async 如何帮助我们编写比 std::thread 更简洁、更小、更具可读性的多线程程序。这确实使我们不必重复造轮子并处理低级构造,但是 async 真的能实现真正的基于任务的并行吗?不幸的是,不能!

为了理解这一点,我们来谈谈启动策略。默认情况下,std::async 在运行时决定是并发执行任务还是等待请求结果。这些行为可以通过将启动标志作为第一个参数进行配置。

auto f = std::async( std::launch::async, //other arguments)

auto f = std::async( std::launch::deferred, //other arguments)

使用 std::launch::async 请求并发执行。要请求任务仅在需要结果时才执行,请使用 std::launch::deferred

std::launch::async

“仿佛”在一个新线程中

std::launch::deferred

按需执行

std::launch::async | std::launch::deferred

默认(运行时选择)

std::launch::async 是贪婪的,会立即启动一个新线程,但 deferred 在许多方面与 async 大不相同。如果我们将 asyncdeferred 策略一起启动,这意味着 future 实际上不会运行任务,只有 get() 才会运行。这意味着如果我们等待任务,我们将陷入无限循环,直到调用 get()。我一直使用默认值的原因是 async 会尝试在可能的情况下立即启动一个新线程来并发完成任务,否则会等待。当您处理更简单的任务时,这是一个好主意。

接下来是什么?

 这篇文章也只是出于介绍目的,完全基于我的理解。在此我也要感谢 CppCon,它真的帮助我理解了并发概念。还有很多东西有待发现,但这超出了本文的范围。如果您想深入研究并发,我推荐您阅读 Anthony A. Williams 的 C++ Concurrency in Action。欢迎您自由探索并给我留下任何建议。

 

© . All rights reserved.