65.9K
CodeProject 正在变化。 阅读更多。
Home

C++11 线程、互斥量和条件变量

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (69投票s)

2013 年 5 月 27 日

CPOL

11分钟阅读

viewsIcon

301314

本文将介绍 C++11 对线程和同步机制(互斥量和条件变量)的支持。

线程

<thread> 头文件中的 std::thread 类代表一个执行线程。std::thread 可以与常规函数、Lambda 表达式和仿函数(实现 operator() 的类)一起使用。此外,它还允许您将任意数量的参数传递给线程函数。

#include <thread>
 
void func()
{
   // do some work
}
 
int main()
{
   std::thread t(func);
   t.join();
 
   return 0;
}

在此示例中,t 是一个 thread 对象,代表函数 func() 运行的线程。调用 join 会阻塞调用线程(在本例中为主线程),直到被连接的线程完成执行。如果线程函数返回值,则该返回值将被忽略。但是,函数可以接受任意数量的参数。

void func(int i, double d, const std::string& s)
{
    std::cout << i << ", " << d << ", " << s << std::endl;
}
 
int main()
{
   std::thread t(func, 1, 12.50, "sample");
   t.join();
 
   return 0;
}

尽管可以将任意数量的参数传递给 thread 函数,但所有参数都按值传递。如果函数需要按引用接受参数,则必须像下面的示例一样,将传递的参数包装在 std::ref 或 std::cref 中。

void func(int& a)
{
   a++;
}
 
int main()
{
   int a = 42;
   std::thread t(func, std::ref(a));
   t.join();
 
   std::cout << a << std::endl;
 
   return 0;
}

程序输出 43,但如果不将 a 包装在 std::ref 中,输出将是 42

除了 join 方法外,thread 类还提供了其他一些操作:

  • swap:交换两个线程对象的底层句柄。
  • detach:允许执行线程独立于 thread 对象继续执行。分离的线程不再可被连接(您无法等待它们)。
    int main()
    {
        std::thread t(funct);
        t.detach();
     
        return 0;
    }

需要注意的重要一点是,如果 thread 函数抛出异常,它将不会被常规的 try-catch 块捕获。换句话说,这样做无效:

try
{
    std::thread t1(func);
    std::thread t2(func);
 
    t1.join();
    t2.join();
}
catch(const std::exception& ex)
{
    std::cout << ex.what() << std::endl;
}

要在线程之间传播异常,您可以在 thread 函数中捕获它们,并将它们存储在稍后可以访问的位置。

std::mutex                       g_mutex;
std::vector<std::exception_ptr>  g_exceptions;

void throw_function()
{
   throw std::exception("something wrong happened");
}

void func()
{
   try
   {
      throw_function();
   }
   catch(...)
   {
      std::lock_guard<std::mutex> lock(g_mutex);
      g_exceptions.push_back(std::current_exception());
   }
}

int main()
{
   g_exceptions.clear();

   std::thread t(func);
   t.join();

   for(auto& e : g_exceptions)
   {
      try 
      {
         if(e != nullptr)
         {
            std::rethrow_exception(e);
         }
      }
      catch(const std::exception& e)
      {
         std::cout << e.what() << std::endl;
      }
   }

   return 0;
}

有关捕获和传播异常的更多信息,您可以阅读 Handling C++ exceptions thrown from worker thread in the main threadHow can I propagate exceptions between threads?

在继续之前,值得注意的是,<thread> 头文件在 std::this_thread 命名空间下提供了一些辅助函数:

  • get_id:返回当前线程的 ID。
  • yield:告诉调度程序运行其他线程,当您处于忙等待状态时可以使用此函数。
  • sleep_for:将当前线程的执行阻塞至少指定的时长。
  • sleep_until:将当前线程的执行阻塞,直到达到指定的时刻。

在最后一个示例中,我需要同步对 g_exceptions 向量的访问,以确保一次只有一个线程可以推送新元素。为此,我使用了一个互斥量和一个对该互斥量的锁定。互斥量是核心同步原语,在 C++11 中,它在 <mutex> 头文件中提供了四种风格:

以下是一个使用 std::mutex 的示例(请注意,这里使用了前面提到的 get_id()sleep_for() 辅助函数)。

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
 
std::mutex g_lock;
 
void func()
{
    g_lock.lock();
 
    std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));
    std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
 
    g_lock.unlock();
}
 
int main()
{
    srand((unsigned int)time(0));
 
    std::thread t1(func);
    std::thread t2(func);
    std::thread t3(func);
 
    t1.join();
    t2.join();
    t3.join();
 
    return 0;
}

输出如下:

entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424

lock()unlock() 方法应该很简单明了。前者锁定互斥量,如果互斥量不可用则阻塞;后者解锁互斥量。

接下来的示例展示了一个简单的线程安全容器(内部使用 std::vector)。该容器具有 add() 方法(添加单个元素)和 addrange() 方法(添加多个元素,该方法内部调用 add())。

注意:然而,正如下面的评论所示,这个容器并非线程安全,原因有几个,包括使用了 va_args。此外,dump() 方法不应该属于容器,在实际实现中,它应该是一个辅助(独立)函数。此示例的目的仅仅是为了教授有关互斥量的一些概念,而不是创建一个功能齐全、无错误、线程安全的容器。

template <typename T>
class container 
{
    std::mutex _lock;
    std::vector<T> _elements;
public:
    void add(T element) 
    {
        _lock.lock();
        _elements.push_back(element);
        _lock.unlock();
    }
 
    void addrange(int num, ...)
    {
        va_list arguments;
 
        va_start(arguments, num);
 
        for (int i = 0; i < num; i++)
        {
            _lock.lock();
            add(va_arg(arguments, T));
            _lock.unlock();
        }
 
        va_end(arguments); 
    }
 
    void dump()
    {
        _lock.lock();
        for(auto e : _elements)
            std::cout << e << std::endl;
        _lock.unlock();
    }
};
 
void func(container<int>& cont)
{
    cont.addrange(3, rand(), rand(), rand());
}
 
int main()
{
    srand((unsigned int)time(0));
 
    container<int> cont;
 
    std::thread t1(func, std::ref(cont));
    std::thread t2(func, std::ref(cont));
    std::thread t3(func, std::ref(cont));
 
    t1.join();
    t2.join();
    t3.join();
 
    cont.dump();
 
    return 0;
}

当您执行此程序时,它会遇到死锁。原因是容器在释放它之前多次尝试获取互斥量,这是不允许的。这就是 std::recursive_mutex 发挥作用的地方。它允许线程多次获取同一个互斥量。互斥量可以获取的最大次数未指定,但如果达到该次数,调用 lock 将抛出 std::system_error。因此,要修复上述代码中的问题(除了更改 addrange 的实现,使其不调用 lockunlock)之外,还可以将互斥量替换为 std::recursive_mutex

template <typename T>
class container 
{
    std::recursive_mutex _lock;
    // ...
};

然后,您将获得类似这样的输出:

6334
18467
41
6334
18467
41
6334
18467
41

细心的读者会注意到,在每次调用 func() 时生成的数字都是相同的。这是因为种子是线程局部的,而 srand() 的调用仅从主线程初始化种子。在其他工作线程中,它未被初始化,因此您每次都会得到相同的数字。

显式锁定和解锁可能导致问题,例如忘记解锁或锁获取顺序不正确导致死锁。标准库提供了一些类和函数来帮助解决这些问题。包装器类允许以 RAII 风格一致地使用互斥量,在作用域内自动锁定和解锁。这些包装器是:

  • lock_guard:在对象构造时,它尝试获取互斥量的所有权(通过调用 lock());在对象析构时,它自动释放互斥量(通过调用 unlock())。这是一个不可复制的类。
  • unique_lock:一个通用的互斥量包装器,与 lock_guard 不同,它还支持延迟锁定、定时锁定、递归锁定、锁所有权的转移以及条件变量的使用。这也是一个不可复制的类,但它是可移动的。

使用这些包装器,我们可以像这样重写容器类:

template <typename T>
class container 
{
    std::recursive_mutex _lock;
    std::vector<T> _elements;
public:
    void add(T element) 
    {
        std::lock_guard<std::recursive_mutex> locker(_lock);
        _elements.push_back(element);
    }
 
    void addrange(int num, ...)
    {
        va_list arguments;
 
        va_start(arguments, num);
 
        for (int i = 0; i < num; i++)
        {
            std::lock_guard<std::recursive_mutex> locker(_lock);
            add(va_arg(arguments, T));
        }
 
        va_end(arguments); 
    }
 
    void dump()
    {
        std::lock_guard<std::recursive_mutex> locker(_lock);
        for(auto e : _elements)
            std::cout << e << std::endl;
    }
};

不过,有人可能会争辩说,dump() 方法应该被声明为 const,因为它不改变容器的状态。但如果您将该方法标记为 const,您将收到以下编译器错误:

‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 
                from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'

互斥量(无论使用哪种实现)都必须被获取和释放,这会涉及调用非 constlock()unlock() 方法。因此,传递给 lock_guard 的参数在逻辑上不能是 const 的(如果方法是 const 的,互斥量也会是 const 的)。解决此问题的方法是将互斥量声明为 mutablemutable 允许从 const 函数修改状态。但是,它应该只用于隐藏的或“元”状态(例如,缓存计算或查找的数据,以便下次调用可以立即完成,或者修改像互斥量这样的位,它们只补充对象的实际状态)。

template <typename T>
class container 
{
   mutable std::recursive_mutex _lock;
   std::vector<T> _elements;
public:
   void dump() const
   {
      std::lock_guard<std::recursive_mutex> locker(_lock);
      for(auto e : _elements)
         std::cout << e << std::endl;
   }
};

这些包装器 guard 的构造函数有重载,可以接受一个参数来指示锁定策略。可用的策略是:

  • defer_lock(类型为 defer_lock_t):不获取互斥量的所有权。
  • try_to_lock(类型为 try_to_lock_t):尝试在不阻塞的情况下获取互斥量的所有权。
  • adopt_lock(类型为 adopt_lock_t):假定调用线程已拥有互斥量。

这些策略声明如下:

struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
 
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();

除了这些互斥量包装器之外,标准库还提供了一些用于锁定一个或多个互斥量的方法:

  • lock:使用避免死锁的算法锁定互斥量(通过调用 lock()try_lock()unlock())。
  • try_lock:尝试按指定顺序调用互斥量的 try_lock()

以下是一个死锁示例:我们有一个元素容器,还有一个函数 exchange(),该函数将一个容器中的元素交换到另一个容器中。为了线程安全,此函数通过获取与每个容器关联的互斥量来同步对两个容器的访问。

template <typename T>
class container 
{
public:
    std::mutex _lock;
    std::set<T> _elements;
 
    void add(T element) 
    {
        _elements.insert(element);
    }
 
    void remove(T element) 
    {
        _elements.erase(element);
    }
};
 
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
    cont1._lock.lock();
    std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch 
                                                          // to simulate the deadlock
    cont2._lock.lock();    
 
    cont1.remove(value);
    cont2.add(value);
 
    cont1._lock.unlock();
    cont2._lock.unlock();
}

假设此函数从两个不同的线程调用,第一个线程从容器 1 中移除元素并添加到容器 2,第二个线程从容器 2 中移除元素并添加到容器 1。这可能导致死锁(如果线程在获取第一个锁后立即上下文切换到另一个线程)。

int main()
{
    srand((unsigned int)time(NULL));
 
    container<int> cont1; 
    cont1.add(1);
    cont1.add(2);
    cont1.add(3);
 
    container<int> cont2; 
    cont2.add(4);
    cont2.add(5);
    cont2.add(6);
 
    std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);
    std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);
 
    t1.join();
    t2.join();
 
    return 0;
}

为了解决这个问题,您可以使用 std::lock,它保证以无死锁的方式获取锁:

void exchange(container<int>& cont1, container<int>& cont2, int value)
{
    std::lock(cont1._lock, cont2._lock); 
 
    cont1.remove(value);
    cont2.add(value);
 
    cont1._lock.unlock();
    cont2._lock.unlock();
}

条件变量

C++11 支持的另一个同步原语是 条件变量,它允许阻塞一个或多个线程,直到收到另一个线程的通知、超时或发生 虚假唤醒<condition_variable> 头文件提供了两种条件变量的实现:

  • condition_variable:要求想要等待它的任何线程首先获取一个 std::unique_lock
  • condition_variable_any:是一个更通用的实现,它可以与满足基本锁条件的任何类型一起工作(即提供 lock()unlock() 方法的实现)。它可能更昂贵(在性能和操作系统资源方面),因此只有在需要其额外灵活性时才应优先使用。

以下描述了条件变量的工作原理:

  • 至少有一个线程正在等待某个条件变为真。等待线程必须首先获取一个 unique_lock。该锁被传递给 wait() 方法,该方法会释放互斥量并暂停线程,直到条件变量被通知。发生这种情况时,线程会被唤醒并重新获取锁。
  • 至少有一个线程正在发出某个条件变为 true 的信号。通知可以通过 notify_one() 来完成,它会解除等待该条件的某个线程(任意一个)的阻塞;或者通过 notify_all() 来完成,它会解除所有等待该条件的线程的阻塞。
  • 由于在多处理器系统上使条件唤醒完全可预测存在一些复杂性,可能会发生虚假唤醒。这意味着即使没有人发出条件变量的信号,线程也会被唤醒。因此,在线程唤醒后,有必要检查条件是否仍然为真。由于虚假唤醒可能发生多次,因此必须在循环中进行此检查。

下面的代码展示了一个使用条件变量同步线程的示例:几个“工作”线程在其工作中可能产生错误,并将错误代码放入队列中。“日志记录”线程通过从队列中获取错误代码并打印它们来处理这些错误代码。工作线程在发生错误时向日志记录器发出信号。日志记录器正在等待条件变量被通知。为了避免虚假唤醒,等待操作发生在循环中,该循环会检查一个布尔条件。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <queue>
#include <random>

std::mutex              g_lockprint;
std::mutex              g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int>         g_codes;
bool                    g_done;
bool                    g_notified;

void workerfunc(int id, std::mt19937& generator)
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\trunning..." << std::endl;
    }

    // simulate work
    std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

    // simulate error
    int errorcode = id*100+1;
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout  << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
    }

    // notify error to be logged
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);
        g_codes.push(errorcode);
        g_notified = true;
        g_queuecheck.notify_one();
    }
}

void loggerfunc()
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[logger]\trunning..." << std::endl;
    }

    // loop until end is signaled
    while(!g_done)
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);

        while(!g_notified) // used to avoid spurious wakeups 
        {
            g_queuecheck.wait(locker);
        }

        // if there are error codes in the queue process them
        while(!g_codes.empty())
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[logger]\tprocessing error:  " << g_codes.front()  << std::endl;
            g_codes.pop();
        }

        g_notified = false;
    }
}

int main()
{
    // initialize a random generator
    std::mt19937 generator((unsigned int)
                 std::chrono::system_clock::now().time_since_epoch().count());

    // start the logger
    std::thread loggerthread(loggerfunc);

    // start the working threads
    std::vector<std::thread> threads;
    for(int i = 0; i < 5; ++i)
    {
        threads.push_back(std::thread(workerfunc, i+1, std::ref(generator)));
    }

    // work for the workers to finish
    for(auto& t : threads)
        t.join();

    // notify the logger to finish and wait for it
    g_done = true;
    loggerthread.join();

    return 0;
}

运行此代码会产生类似以下的输出(请注意,每次运行此输出都不同,因为每个工作线程的工作时长,即睡眠时长,都是随机的):

[logger]        running...
[worker 1]      running...
[worker 2]      running...
[worker 3]      running...
[worker 4]      running...
[worker 5]      running...
[worker 1]      an error occurred: 101
[worker 2]      an error occurred: 201
[logger]        processing error:  101
[logger]        processing error:  201
[worker 5]      an error occurred: 501
[logger]        processing error:  501
[worker 3]      an error occurred: 301
[worker 4]      an error occurred: 401
[logger]        processing error:  301
[logger]        processing error:  401

上面看到的 wait() 方法有两个重载:

  • 一个只接受 unique_lock;它会释放锁,阻塞线程,并将其添加到等待该条件变量的线程队列中;当条件变量被通知或发生虚假唤醒时,线程会醒来。当发生其中任何一种情况时,会重新获取锁并返回该函数。
  • 另一个除了 unique_lock 外,还接受一个谓词,该谓词用于循环,直到其返回 false;此重载可用于避免虚假唤醒。它基本上等同于:
    while(!predicate()) 
       wait(lock);

因此,可以通过使用接受谓词(该谓词验证队列的状态(空或非空))的 wait 重载来避免在上面的示例中使用布尔标志 g_notified

void workerfunc(int id, std::mt19937& generator)
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\trunning..." << std::endl;
    }

    // simulate work
    std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

    // simulate error
    int errorcode = id*100+1;
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
    }

    // notify error to be logged
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);
        g_codes.push(errorcode);
        g_queuecheck.notify_one();
    }
}

void loggerfunc()
{
    // print a starting message
    {
        std::unique_lock<std::mutex> locker(g_lockprint);
        std::cout << "[logger]\trunning..." << std::endl;
    }

    // loop until end is signaled
    while(!g_done)
    {
        std::unique_lock<std::mutex> locker(g_lockqueue);

        g_queuecheck.wait(locker, [&](){return !g_codes.empty();});

        // if there are error codes in the queue process them
        while(!g_codes.empty())
        {
            std::unique_lock<std::mutex> locker(g_lockprint);
            std::cout << "[logger]\tprocessing error:  " << g_codes.front() << std::endl;
            g_codes.pop();
        }
    }
}

除了这个重载的 wait() 方法外,还有另外两个等待方法,它们都有类似的重载,接受一个谓词来避免虚假唤醒:

  • wait_for:阻塞线程,直到条件变量被通知或指定的超时发生。
  • wait_until:阻塞线程,直到条件变量被通知或指定的时刻已经到达。

这两个函数的无谓词重载会返回一个 cv_status,指示是发生了超时,还是因为条件变量被通知而唤醒,或者是因为虚假唤醒而唤醒。

标准库还提供了一个名为 notify_all_at_thread_exit 的函数,它实现了一种机制来通知其他线程某个线程已完成,包括销毁所有 thread_local 对象。这是因为使用 join() 以外的机制等待线程可能会在 thread_local 被使用时导致不正确和致命的行为,因为在等待线程恢复并可能也完成之后,它们的析构函数可能已经被调用(参见 N3070N2880 以获取更多信息)。通常,此函数必须在线程退出之前调用。下面是一个示例,展示了如何将 notify_all_at_thread_exitcondition_variable 一起使用来同步两个线程:

std::mutex              g_lockprint;
std::mutex              g_lock;
std::condition_variable g_signal;
bool                    g_done;

void workerfunc(std::mt19937& generator)
{
   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "worker running..." << std::endl;
   }

   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "worker finished..." << std::endl;
   }

   std::unique_lock<std::mutex> lock(g_lock);
   g_done = true;
   std::notify_all_at_thread_exit(g_signal, std::move(lock));
}

int main()
{
   // initialize a random generator
   std::mt19937 generator((unsigned int)
        std::chrono::system_clock::now().time_since_epoch().count());

   std::cout << "main running..." << std::endl;

   std::thread worker(workerfunc, std::ref(generator));
   worker.detach();

   std::cout << "main crunching..." << std::endl;

   std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));

   {
      std::unique_lock<std::mutex> locker(g_lockprint);
      std::cout << "main waiting for worker..." << std::endl;
   }

   std::unique_lock<std::mutex> lock(g_lock);
   while(!g_done) // avoid spurious wake-ups
      g_signal.wait(lock);

   std::cout << "main finished..." << std::endl;

   return 0;
}

如果工作线程在主线程之前完成工作,则输出将是:

main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...

如果主线程在工作线程之前完成,则输出将是:

main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...

结论

C++11 标准使 C++ 开发人员能够以标准、平台无关的方式编写多线程代码。本文介绍了对线程和同步机制的标准支持。<thread> 头文件提供了同名类(以及其他辅助函数),该类代表一个执行线程。<mutex> 头文件提供了几种互斥量和包装器的实现,用于同步对线程的访问。<condition_variable> 头文件提供了两种条件变量的实现,它们允许阻塞一个或多个线程,直到收到另一个线程的通知、超时或发生虚假唤醒。建议阅读更多相关内容以获取更多详细信息。

历史

  • 2013 年 5 月 28 日:初始版本
© . All rights reserved.