65.9K
CodeProject 正在变化。 阅读更多。
Home

线程与对象

starIconstarIconstarIconstarIconstarIcon

5.00/5 (6投票s)

2022年6月17日

MIT

8分钟阅读

viewsIcon

10004

downloadIcon

182

C++ 线程对象及其用法

最周密的线程和对象的计划
往往会出错,
只留下悲伤和痛苦,
辜负了曾经许诺的喜悦!

(向R. Burns致歉)

背景 - 应许之地

多线程和面向对象语言,都承诺使复杂系统的创建者生活更简单。它们都提供了将复杂系统分解成可管理的小块,并定义它们之间清晰交互的方法。一方面,多线程试图将工作分解成小块,并将每一块分配给一个单独的处理器,无论是物理的还是虚拟的。然后,人们可以等待所有工作块完成,然后组装最终结果。

另一方面,面向对象语言则认为,只有重要的信息才应该对外界可见,而将实现细节隐藏在这些“对象”内部,并且更复杂的对象可以通过继承或组合从简单的对象创建而来。

如果我们能将这两种概念结合起来,并拥有一些能够完成工作并隐藏所有不必要实现细节的小线程对象,那该多好啊?正如我们将看到的,这确实是可能的,但并不那么容易。

例如,我们将研究如何找出小于某个值的素数,并且我们将坚持使用老式的 C++,因为它仍然被认为是最有效的语言之一。

使用 std::thread 的简单程序

自 2011 年以来,C++ 标准已经包含了 std::thread 对象。作为我们的第一个多线程程序,我们将使用这个

#include <thread>
bool is_prime (int n)
{
  for (auto i = n - 1; i > 1; --i)
    if (n % i == 0)
      return false;
  return true;
}

int main ()
{
  std::vector<int> primes;
  int n = 0;

  auto worker = [&]() {
    for (auto i = 2; i < 20; ++i)
    {
      if (is_prime (i))
        primes.at (n++) = i;
    }
  };

  std::thread th (worker);
  th.join ();
  std::cout << "Primes: ";
  for (auto val : primes ) 
    std::cout << val << ' ';
}

我们这里有什么:一个非常简单的 is_prime 函数被 worker 函数反复调用。然后它将素数放入一个向量中。主函数只是创建一个线程来运行 worker 函数,并在打印结果之前等待它完成。这算不上真正的多线程,因为除了主线程之外,我们只有一个单独的线程,但我们希望有所改进。

异常问题

令人惊讶或不令人惊讶的是,程序无法工作。它有一个显而易见的 bug:primes 向量是空的,并且设置一个不存在的元素

        primes.at (n++) = i;

会触发一个 std::out_of_range 异常。

我们可以很容易地通过修改代码来修复它

        primes.push_back (i);

但让我们看看我们是否可以进行一些异常处理。我们将把整个 main 函数包装在一个 try...catch 块中,并让它处理范围外异常。这是我们新的 main 函数

int main ()
{
  std::vector<int> primes;
  int n = 0;

  try {
    auto worker = [&]() {
      for (auto i = 2; i < 20; ++i)
      {
        if (is_prime (i))
          primes.at (n++) = i;
      }
    };

    std::thread th (worker);
    th.join ();
    std::cout << "Primes: ";
    for (auto val : primes)
      std::cout << val << ' ';
  }
  catch (std::exception& x) {
    std::cout << "Exception: " << x.what () << std::endl;
  }
}

异常处理程序没有被调用,我们最终得到了与之前完全相同的错误。

解释原因在于一个关于线程的非常重要的规则

每个线程都有自己的栈。

当发生异常时,C++ 运行时开始一个称为栈展开的过程,在这个过程中,它会遍历每个已调用函数的栈帧,寻找异常处理程序。然而,我们的异常处理程序位于主线程的栈上,因此从未被调用。异常不会在线程之间传播。

在继续进行其他操作之前,我们先修复我们的程序。我们将分两步进行。首先,我们将 try... catch 块移到线程函数中

auto worker = [&]() {
    try {
      for (auto i = 2; i < 20; ++i)
      {
        if (is_prime (i))
          primes.at (n++) = i;
      }
    }
    catch (std::exception& x)
    {
      std::cout << "Exception: " << x.what () << std::endl;
    }
  };

这次,它确实会捕获异常,并且程序输出是

Exception: invalid vector subscript
Primes:

作为最后一步,我们现在修复我们的小“bug”。完成的程序是

//working version
int main ()
{
  std::vector<int> primes;

  auto worker = [&]() {
    try {
      for (auto i = 2; i < 20; ++i)
      {
        if (is_prime (i))
          primes.push_back(i);
      }
    }
    catch (std::exception& x)
    {
      std::cout << "Exception: " << x.what () << std::endl;
    }
  };

  std::thread th (worker);
  th.join ();
  std::cout << "Primes: ";
  for (auto val : primes)
    std::cout << val << ' ';
}

输出是:

Primes: 2 3 5 7 11 13 17 19 

线程封装

到目前为止,我们已经看到了如何使用 std::thread 对象来完成工作,但我们仍然需要弄清楚如何将一个线程及其私有数据打包到某种对象中。

假设我们的素数检查线程还需要保留它找到的素数数量的计数。此外,我们希望将结果向量以某种方式传递给线程。

一种解决方案是让一个对象 prime_finder 派生自 std::thread。类似于这样做

class prime_finder : public std::thread
{
public:
  prime_finder (std::vector<int>& v)
    : std::thread ([this] {this->worker (); })
    , count (0)
    , primes (v) {}

  int get_count () { return count; }
private:
  int count;
  inline
  void worker ()
  {
    try {
      for (auto i = 2; i < 20; ++i)
      {
        if (is_prime (i))
        {
          primes.push_back (i);
          count++;
        }
      }
    }
    catch (std::exception& x)
    {
      std::cout << "Exception: " << x.what () << std::endl;
    }
  };

  std::vector<int>& primes;
};

int main ()
{
  std::vector<int> results;
  prime_finder th (results);
  th.join ();
  std::cout << "Found " << th.get_count() << " primes: ";
  for (auto val : results)
    std::cout << val << ' ';
} 

您猜怎么着?它甚至能工作

Found 8 primes: 2 3 5 7 11 13 17 19 

但是,如果您珍视您的好睡眠,请不要使用这样的代码!除非您希望被愤怒的同事或客户整天叫醒,抱怨您的代码崩溃了,并让您发疯,因为您无法重现这些错误。

要找出这段代码的问题所在,让我们看看在 main 函数中实例化 prime_finder 对象时会发生什么。prime_finder 构造函数分配对象空间,然后调用任何基对象的构造函数,在本例中是 std::thread 构造函数。根据 C++ 标准关于 std::thread 构造函数的规定

创建一个新的 std::thread 对象,并将其与一个执行线程关联起来。新的执行线程开始执行 /*INVOKE*/(std::move(f_copy), std::move(args_copy)...)

关键在于,新线程开始执行,可能在 prime_finder 构造函数完成对象设置之前。现在取决于操作系统调度程序让主线程完成 prime_finder 对象的初始化(将 count 初始化为 0 并设置 primes 向量的地址),或者立即切换到新创建的线程。事情可能会顺利进行很长时间,直到操作系统调度程序在错误的一侧醒来,我们的线程过早开始运行,整个程序就会崩溃。

为了说明这个问题,我们可以在 prime_finder 构造函数中引入一个人工延迟

class prime_finder : public std::thread
{
public:
  prime_finder (std::vector<int>& v)
    : std::thread ([this] {this->worker (); })
    , primes (v)
  {
    std::this_thread::sleep_for (std::chrono::milliseconds (10));
    count = 0;
  }
//...

现在结果是

Found 0 primes: 2 3 5 7 11 13 17 19 

count 变量在 worker 函数完成很久之后才被初始化为 0

这里重要的教训是

不要std::thread 对象继承。

一个更好的线程类

我不得不承认,我对 std::thread 类的设计并不十分满意。虽然与异常处理有关的问题在某种程度上是不可避免的,但构造时运行新线程的想法似乎更像是一个失误。幸运的是,在 C++11 之前很久,我就在我设计的 mlib 库中设计了自己的线程类,所以不必忍受这个问题。

以下是相关部分

class thread : public syncbase
{
public:
  /// Thread state
  thread (std::function<int ()> func);
  virtual ~thread   ();

  virtual void start ();
//...
protected:
  thread (const char *name=0, bool inherit=false, 
          DWORD stack_size=0, PSECURITY_DESCRIPTOR sd=NULL);
  virtual void init ();
  virtual void run ();
//...
private:
  static unsigned int _stdcall entryProc (thread *ts);
//...
}; 

基类 syncbase 只是一个 Windows 同步对象(如信号量、互斥锁或事件)句柄的包装器。public 构造函数与 std::thread 构造函数非常相似。它创建一个将运行函数的线程对象。但是,新线程尚未启动。要启动它,用户必须调用 start 函数。还有一个受保护的构造函数,可供需要更精细控制线程堆栈大小和安全属性等方面的派生对象使用。

内部来看,启动一个新线程是一个相对复杂的过程,分阶段进行

  1. 构造函数调用 Windows _beginthreadex 函数来创建一个以 entryProc 为主体的新线程。新线程以挂起状态创建,因此保证不会开始运行。
  2. _beginthreadex 函数返回后,构造函数会恢复新创建的线程,并等待 created 信号量变为已发出信号状态。
  3. 现在 entryProc 函数可以运行了。它发出 created 信号量,并等待 started 信号量。
  4. 由于 created 信号量已发出信号,构造函数现在可以继续执行并返回。如果线程构造函数是在派生对象的构造函数中调用的,则可以继续构造过程的其余部分。

如前所述,要真正启动新线程,用户必须调用 start 函数。这将发出 started 信号量,entryProc 函数将首先调用一个虚拟的 init 函数(可以执行任何初始化工作),然后调用 run 函数,这是线程的实际运行循环。

请注意,这些线程对象并非轻量级的。每个对象都附带两个信号量,并且创建它们需要两次上下文切换。它们安全且功能强大,但需要付出代价。

这是我们使用 mlib::thread 对象重写后的程序

#include "mlib/thread.h"

class prime_finder : public mlib::thread
{
public:
  prime_finder (std::vector<int>& v)
    : primes (v) {
    std::this_thread::sleep_for (std::chrono::milliseconds (10));
    count = 0;
  }

  int get_count () { return count; }
private:
  int count;
  inline void run ()
  {
    for (auto i = 2; i < 20; ++i)
    {
      if (is_prime (i))
      {
        primes.push_back (i);
        count++;
      }
    }
  }

  std::vector<int>& primes;
};

int main ()
{
  std::vector<int> results;
  prime_finder th (results);
  try {
    th.start ();
    th.join ();
  }
  catch (std::exception& x)
  {
    std::cout << "Exception: " << x.what () << std::endl;
  }

  std::cout << "Found " << th.get_count () << " primes: ";
  for (auto val : results)
    std::cout << val << ' ';
} 

跨线程边界抛出异常

眼尖的读者会注意到,我已经将异常处理代码从工作线程移回了主线程。这是可能的,因为 thread::entryProc 函数有一个 try...catch 块,可以捕获所有异常。这些异常存储在线程内部的 std::exception_ptr 对象中。当主线程调用 thread::wait 函数时,如果存在异常,该异常将在主线程的上下文中重新抛出。为了验证,我们修改 run 函数以抛出异常

// ...
  inline void run ()
  {
    int t = std::vector<int> ().at (1); //this triggers an out of range exception
    for (auto i = 2; i < 20; ++i)
    {
      if (is_prime (i))
      {
        primes.push_back (i);
        count++;
      }
    }
  }
//... 

输出是:

Exception: invalid vector subscript
Found 0 primes: 

不必将异常处理代码移到主线程。如果程序逻辑更适合,您仍然可以在 run 函数中放置 try...catch 块,但如果您需要一个集中的错误处理,mlib::thread 可以跨线程边界传输错误。然而,这种传输是“延迟的”——异常将在调用 join 函数时重新抛出。

最后的想法

将线程封装到对象中并不那么简单,但它提供了明确的好处。它允许您区分需要从其他线程访问的代码和数据(我称之为外部),以及内部数据和函数(我称之为自有)。作为一般规则,自有数据和函数应保留为 privateprotected 成员,而外部函数构成 public 接口。构造函数和析构函数本质上是外部的,因此它们需要特别小心。对于其他外部函数,我倾向于一种模式,即调用者通过某个命令信号量或事件传输请求,然后等待结果

class cool_thread : public mlib::thread
{
public:
//....
  stuff do_domething_cool () //foregin function
  {
    //send command to thread
    thread_critical_section.enter();
    command = WHAT_TO_DO;
    commad_semaphore.signal ();
    thread_critical_section.leave ();

    //wait for results
    results_semaphore.wait ();
    thread_critical_section.enter();
    stuff s = get_results ();
    thread_critical_section.leave ();
    return s;
  }
//...
private:
  stuff& get_results () {//...} //own function
}

除了我讨论的两个问题(异常处理和构造危险)之外,还有一个问题我想提一下,但没有提供任何代码来演示。线程销毁也可能是一个危险的时刻。通常,它不应通过调用对象的析构函数来完成,因为您无法控制线程在被销毁时的状态。在上面的示例中,如果线程在调用者等待结果时被销毁,调用者将死锁。

历史

  • 2022年6月17日 - 初始版本
© . All rights reserved.