C++11 线程、互斥量和条件变量






4.94/5 (69投票s)
本文将介绍 C++11 对线程和同步机制(互斥量和条件变量)的支持。
线程
<thread>
头文件中的 std::thread 类代表一个执行线程。std::thread
可以与常规函数、Lambda 表达式和仿函数(实现 operator()
的类)一起使用。此外,它还允许您将任意数量的参数传递给线程函数。
#include <thread>
void func()
{
// do some work
}
int main()
{
std::thread t(func);
t.join();
return 0;
}
在此示例中,t
是一个 thread
对象,代表函数 func()
运行的线程。调用 join 会阻塞调用线程(在本例中为主线程),直到被连接的线程完成执行。如果线程函数返回值,则该返回值将被忽略。但是,函数可以接受任意数量的参数。
void func(int i, double d, const std::string& s)
{
std::cout << i << ", " << d << ", " << s << std::endl;
}
int main()
{
std::thread t(func, 1, 12.50, "sample");
t.join();
return 0;
}
尽管可以将任意数量的参数传递给 thread
函数,但所有参数都按值传递。如果函数需要按引用接受参数,则必须像下面的示例一样,将传递的参数包装在 std::ref 或 std::cref 中。
void func(int& a)
{
a++;
}
int main()
{
int a = 42;
std::thread t(func, std::ref(a));
t.join();
std::cout << a << std::endl;
return 0;
}
程序输出 43
,但如果不将 a
包装在 std::ref
中,输出将是 42
。
除了 join
方法外,thread
类还提供了其他一些操作:
- swap:交换两个线程对象的底层句柄。
- detach:允许执行线程独立于
thread
对象继续执行。分离的线程不再可被连接(您无法等待它们)。int main() { std::thread t(funct); t.detach(); return 0; }
需要注意的重要一点是,如果 thread
函数抛出异常,它将不会被常规的 try
-catch
块捕获。换句话说,这样做无效:
try
{
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
}
要在线程之间传播异常,您可以在 thread
函数中捕获它们,并将它们存储在稍后可以访问的位置。
std::mutex g_mutex;
std::vector<std::exception_ptr> g_exceptions;
void throw_function()
{
throw std::exception("something wrong happened");
}
void func()
{
try
{
throw_function();
}
catch(...)
{
std::lock_guard<std::mutex> lock(g_mutex);
g_exceptions.push_back(std::current_exception());
}
}
int main()
{
g_exceptions.clear();
std::thread t(func);
t.join();
for(auto& e : g_exceptions)
{
try
{
if(e != nullptr)
{
std::rethrow_exception(e);
}
}
catch(const std::exception& e)
{
std::cout << e.what() << std::endl;
}
}
return 0;
}
有关捕获和传播异常的更多信息,您可以阅读 Handling C++ exceptions thrown from worker thread in the main thread 和 How can I propagate exceptions between threads?。
在继续之前,值得注意的是,<thread>
头文件在 std::this_thread
命名空间下提供了一些辅助函数:
- get_id:返回当前线程的 ID。
- yield:告诉调度程序运行其他线程,当您处于忙等待状态时可以使用此函数。
- sleep_for:将当前线程的执行阻塞至少指定的时长。
- sleep_until:将当前线程的执行阻塞,直到达到指定的时刻。
锁
在最后一个示例中,我需要同步对 g_exceptions
向量的访问,以确保一次只有一个线程可以推送新元素。为此,我使用了一个互斥量和一个对该互斥量的锁定。互斥量是核心同步原语,在 C++11 中,它在 <mutex>
头文件中提供了四种风格:
- mutex:提供核心函数 lock() 和 unlock(),以及非阻塞的 try_lock() 方法,如果互斥量不可用,该方法会返回。
- recursive_mutex:允许同一线程多次获取互斥量。
- timed_mutex:与
mutex
类似,但它还提供了两个额外的方法:try_lock_for() 和 try_lock_until(),它们尝试在一段时间内或直到某个时刻获取互斥量。 - recursive_timed_mutex:是
timed_mutex
和recursive_mutex
的组合。
以下是一个使用 std::mutex
的示例(请注意,这里使用了前面提到的 get_id()
和 sleep_for()
辅助函数)。
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex g_lock;
void func()
{
g_lock.lock();
std::cout << "entered thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(rand() % 10));
std::cout << "leaving thread " << std::this_thread::get_id() << std::endl;
g_lock.unlock();
}
int main()
{
srand((unsigned int)time(0));
std::thread t1(func);
std::thread t2(func);
std::thread t3(func);
t1.join();
t2.join();
t3.join();
return 0;
}
输出如下:
entered thread 10144
leaving thread 10144
entered thread 4188
leaving thread 4188
entered thread 3424
leaving thread 3424
lock()
和 unlock()
方法应该很简单明了。前者锁定互斥量,如果互斥量不可用则阻塞;后者解锁互斥量。
接下来的示例展示了一个简单的线程安全容器(内部使用 std::vector
)。该容器具有 add()
方法(添加单个元素)和 addrange()
方法(添加多个元素,该方法内部调用 add()
)。
注意:然而,正如下面的评论所示,这个容器并非线程安全,原因有几个,包括使用了 va_args
。此外,dump()
方法不应该属于容器,在实际实现中,它应该是一个辅助(独立)函数。此示例的目的仅仅是为了教授有关互斥量的一些概念,而不是创建一个功能齐全、无错误、线程安全的容器。
template <typename T>
class container
{
std::mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
_lock.lock();
_elements.push_back(element);
_lock.unlock();
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
_lock.lock();
add(va_arg(arguments, T));
_lock.unlock();
}
va_end(arguments);
}
void dump()
{
_lock.lock();
for(auto e : _elements)
std::cout << e << std::endl;
_lock.unlock();
}
};
void func(container<int>& cont)
{
cont.addrange(3, rand(), rand(), rand());
}
int main()
{
srand((unsigned int)time(0));
container<int> cont;
std::thread t1(func, std::ref(cont));
std::thread t2(func, std::ref(cont));
std::thread t3(func, std::ref(cont));
t1.join();
t2.join();
t3.join();
cont.dump();
return 0;
}
当您执行此程序时,它会遇到死锁。原因是容器在释放它之前多次尝试获取互斥量,这是不允许的。这就是 std::recursive_mutex 发挥作用的地方。它允许线程多次获取同一个互斥量。互斥量可以获取的最大次数未指定,但如果达到该次数,调用 lock
将抛出 std::system_error。因此,要修复上述代码中的问题(除了更改 addrange
的实现,使其不调用 lock
和 unlock
)之外,还可以将互斥量替换为 std::recursive_mutex。
template <typename T>
class container
{
std::recursive_mutex _lock;
// ...
};
然后,您将获得类似这样的输出:
6334
18467
41
6334
18467
41
6334
18467
41
细心的读者会注意到,在每次调用 func()
时生成的数字都是相同的。这是因为种子是线程局部的,而 srand()
的调用仅从主线程初始化种子。在其他工作线程中,它未被初始化,因此您每次都会得到相同的数字。
显式锁定和解锁可能导致问题,例如忘记解锁或锁获取顺序不正确导致死锁。标准库提供了一些类和函数来帮助解决这些问题。包装器类允许以 RAII 风格一致地使用互斥量,在作用域内自动锁定和解锁。这些包装器是:
- lock_guard:在对象构造时,它尝试获取互斥量的所有权(通过调用
lock()
);在对象析构时,它自动释放互斥量(通过调用unlock()
)。这是一个不可复制的类。 - unique_lock:一个通用的互斥量包装器,与
lock_guard
不同,它还支持延迟锁定、定时锁定、递归锁定、锁所有权的转移以及条件变量的使用。这也是一个不可复制的类,但它是可移动的。
使用这些包装器,我们可以像这样重写容器类:
template <typename T>
class container
{
std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
_elements.push_back(element);
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
std::lock_guard<std::recursive_mutex> locker(_lock);
add(va_arg(arguments, T));
}
va_end(arguments);
}
void dump()
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e : _elements)
std::cout << e << std::endl;
}
};
不过,有人可能会争辩说,dump()
方法应该被声明为 const
,因为它不改变容器的状态。但如果您将该方法标记为 const
,您将收到以下编译器错误:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1
from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
互斥量(无论使用哪种实现)都必须被获取和释放,这会涉及调用非 const
的 lock()
和 unlock()
方法。因此,传递给 lock_guard
的参数在逻辑上不能是 const
的(如果方法是 const
的,互斥量也会是 const
的)。解决此问题的方法是将互斥量声明为 mutable
。mutable
允许从 const
函数修改状态。但是,它应该只用于隐藏的或“元”状态(例如,缓存计算或查找的数据,以便下次调用可以立即完成,或者修改像互斥量这样的位,它们只补充对象的实际状态)。
template <typename T>
class container
{
mutable std::recursive_mutex _lock;
std::vector<T> _elements;
public:
void dump() const
{
std::lock_guard<std::recursive_mutex> locker(_lock);
for(auto e : _elements)
std::cout << e << std::endl;
}
};
这些包装器 guard 的构造函数有重载,可以接受一个参数来指示锁定策略。可用的策略是:
defer_lock
(类型为defer_lock_t
):不获取互斥量的所有权。try_to_lock
(类型为try_to_lock_t
):尝试在不阻塞的情况下获取互斥量的所有权。adopt_lock
(类型为adopt_lock_t
):假定调用线程已拥有互斥量。
这些策略声明如下:
struct defer_lock_t { };
struct try_to_lock_t { };
struct adopt_lock_t { };
constexpr std::defer_lock_t defer_lock = std::defer_lock_t();
constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t();
constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了这些互斥量包装器之外,标准库还提供了一些用于锁定一个或多个互斥量的方法:
以下是一个死锁示例:我们有一个元素容器,还有一个函数 exchange()
,该函数将一个容器中的元素交换到另一个容器中。为了线程安全,此函数通过获取与每个容器关联的互斥量来同步对两个容器的访问。
template <typename T>
class container
{
public:
std::mutex _lock;
std::set<T> _elements;
void add(T element)
{
_elements.insert(element);
}
void remove(T element)
{
_elements.erase(element);
}
};
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
cont1._lock.lock();
std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch
// to simulate the deadlock
cont2._lock.lock();
cont1.remove(value);
cont2.add(value);
cont1._lock.unlock();
cont2._lock.unlock();
}
假设此函数从两个不同的线程调用,第一个线程从容器 1 中移除元素并添加到容器 2,第二个线程从容器 2 中移除元素并添加到容器 1。这可能导致死锁(如果线程在获取第一个锁后立即上下文切换到另一个线程)。
int main()
{
srand((unsigned int)time(NULL));
container<int> cont1;
cont1.add(1);
cont1.add(2);
cont1.add(3);
container<int> cont2;
cont2.add(4);
cont2.add(5);
cont2.add(6);
std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3);
std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6);
t1.join();
t2.join();
return 0;
}
为了解决这个问题,您可以使用 std::lock
,它保证以无死锁的方式获取锁:
void exchange(container<int>& cont1, container<int>& cont2, int value)
{
std::lock(cont1._lock, cont2._lock);
cont1.remove(value);
cont2.add(value);
cont1._lock.unlock();
cont2._lock.unlock();
}
条件变量
C++11 支持的另一个同步原语是 条件变量,它允许阻塞一个或多个线程,直到收到另一个线程的通知、超时或发生 虚假唤醒。<condition_variable>
头文件提供了两种条件变量的实现:
- condition_variable:要求想要等待它的任何线程首先获取一个 std::unique_lock。
- condition_variable_any:是一个更通用的实现,它可以与满足基本锁条件的任何类型一起工作(即提供
lock()
和unlock()
方法的实现)。它可能更昂贵(在性能和操作系统资源方面),因此只有在需要其额外灵活性时才应优先使用。
以下描述了条件变量的工作原理:
- 至少有一个线程正在等待某个条件变为真。等待线程必须首先获取一个
unique_lock
。该锁被传递给wait()
方法,该方法会释放互斥量并暂停线程,直到条件变量被通知。发生这种情况时,线程会被唤醒并重新获取锁。 - 至少有一个线程正在发出某个条件变为
true
的信号。通知可以通过 notify_one() 来完成,它会解除等待该条件的某个线程(任意一个)的阻塞;或者通过 notify_all() 来完成,它会解除所有等待该条件的线程的阻塞。 - 由于在多处理器系统上使条件唤醒完全可预测存在一些复杂性,可能会发生虚假唤醒。这意味着即使没有人发出条件变量的信号,线程也会被唤醒。因此,在线程唤醒后,有必要检查条件是否仍然为真。由于虚假唤醒可能发生多次,因此必须在循环中进行此检查。
下面的代码展示了一个使用条件变量同步线程的示例:几个“工作”线程在其工作中可能产生错误,并将错误代码放入队列中。“日志记录”线程通过从队列中获取错误代码并打印它们来处理这些错误代码。工作线程在发生错误时向日志记录器发出信号。日志记录器正在等待条件变量被通知。为了避免虚假唤醒,等待操作发生在循环中,该循环会检查一个布尔条件。
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <queue>
#include <random>
std::mutex g_lockprint;
std::mutex g_lockqueue;
std::condition_variable g_queuecheck;
std::queue<int> g_codes;
bool g_done;
bool g_notified;
void workerfunc(int id, std::mt19937& generator)
{
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]\trunning..." << std::endl;
}
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// simulate error
int errorcode = id*100+1;
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
}
// notify error to be logged
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_codes.push(errorcode);
g_notified = true;
g_queuecheck.notify_one();
}
}
void loggerfunc()
{
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]\trunning..." << std::endl;
}
// loop until end is signaled
while(!g_done)
{
std::unique_lock<std::mutex> locker(g_lockqueue);
while(!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
// if there are error codes in the queue process them
while(!g_codes.empty())
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
g_notified = false;
}
}
int main()
{
// initialize a random generator
std::mt19937 generator((unsigned int)
std::chrono::system_clock::now().time_since_epoch().count());
// start the logger
std::thread loggerthread(loggerfunc);
// start the working threads
std::vector<std::thread> threads;
for(int i = 0; i < 5; ++i)
{
threads.push_back(std::thread(workerfunc, i+1, std::ref(generator)));
}
// work for the workers to finish
for(auto& t : threads)
t.join();
// notify the logger to finish and wait for it
g_done = true;
loggerthread.join();
return 0;
}
运行此代码会产生类似以下的输出(请注意,每次运行此输出都不同,因为每个工作线程的工作时长,即睡眠时长,都是随机的):
[logger] running...
[worker 1] running...
[worker 2] running...
[worker 3] running...
[worker 4] running...
[worker 5] running...
[worker 1] an error occurred: 101
[worker 2] an error occurred: 201
[logger] processing error: 101
[logger] processing error: 201
[worker 5] an error occurred: 501
[logger] processing error: 501
[worker 3] an error occurred: 301
[worker 4] an error occurred: 401
[logger] processing error: 301
[logger] processing error: 401
上面看到的 wait()
方法有两个重载:
- 一个只接受
unique_lock
;它会释放锁,阻塞线程,并将其添加到等待该条件变量的线程队列中;当条件变量被通知或发生虚假唤醒时,线程会醒来。当发生其中任何一种情况时,会重新获取锁并返回该函数。 - 另一个除了
unique_lock
外,还接受一个谓词,该谓词用于循环,直到其返回false
;此重载可用于避免虚假唤醒。它基本上等同于:while(!predicate()) wait(lock);
因此,可以通过使用接受谓词(该谓词验证队列的状态(空或非空))的 wait
重载来避免在上面的示例中使用布尔标志 g_notified
。
void workerfunc(int id, std::mt19937& generator)
{
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]\trunning..." << std::endl;
}
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
// simulate error
int errorcode = id*100+1;
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl;
}
// notify error to be logged
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_codes.push(errorcode);
g_queuecheck.notify_one();
}
}
void loggerfunc()
{
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]\trunning..." << std::endl;
}
// loop until end is signaled
while(!g_done)
{
std::unique_lock<std::mutex> locker(g_lockqueue);
g_queuecheck.wait(locker, [&](){return !g_codes.empty();});
// if there are error codes in the queue process them
while(!g_codes.empty())
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl;
g_codes.pop();
}
}
}
除了这个重载的 wait()
方法外,还有另外两个等待方法,它们都有类似的重载,接受一个谓词来避免虚假唤醒:
- wait_for:阻塞线程,直到条件变量被通知或指定的超时发生。
- wait_until:阻塞线程,直到条件变量被通知或指定的时刻已经到达。
这两个函数的无谓词重载会返回一个 cv_status,指示是发生了超时,还是因为条件变量被通知而唤醒,或者是因为虚假唤醒而唤醒。
标准库还提供了一个名为 notify_all_at_thread_exit 的函数,它实现了一种机制来通知其他线程某个线程已完成,包括销毁所有 thread_local
对象。这是因为使用 join()
以外的机制等待线程可能会在 thread_local
被使用时导致不正确和致命的行为,因为在等待线程恢复并可能也完成之后,它们的析构函数可能已经被调用(参见 N3070 和 N2880 以获取更多信息)。通常,此函数必须在线程退出之前调用。下面是一个示例,展示了如何将 notify_all_at_thread_exit
与 condition_variable
一起使用来同步两个线程:
std::mutex g_lockprint;
std::mutex g_lock;
std::condition_variable g_signal;
bool g_done;
void workerfunc(std::mt19937& generator)
{
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "worker running..." << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "worker finished..." << std::endl;
}
std::unique_lock<std::mutex> lock(g_lock);
g_done = true;
std::notify_all_at_thread_exit(g_signal, std::move(lock));
}
int main()
{
// initialize a random generator
std::mt19937 generator((unsigned int)
std::chrono::system_clock::now().time_since_epoch().count());
std::cout << "main running..." << std::endl;
std::thread worker(workerfunc, std::ref(generator));
worker.detach();
std::cout << "main crunching..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5));
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "main waiting for worker..." << std::endl;
}
std::unique_lock<std::mutex> lock(g_lock);
while(!g_done) // avoid spurious wake-ups
g_signal.wait(lock);
std::cout << "main finished..." << std::endl;
return 0;
}
如果工作线程在主线程之前完成工作,则输出将是:
main running...
worker running...
main crunching...
worker finished...
main waiting for worker...
main finished...
如果主线程在工作线程之前完成,则输出将是:
main running...
worker running...
main crunching...
main waiting for worker...
worker finished...
main finished...
结论
C++11 标准使 C++ 开发人员能够以标准、平台无关的方式编写多线程代码。本文介绍了对线程和同步机制的标准支持。<thread>
头文件提供了同名类(以及其他辅助函数),该类代表一个执行线程。<mutex>
头文件提供了几种互斥量和包装器的实现,用于同步对线程的访问。<condition_variable>
头文件提供了两种条件变量的实现,它们允许阻塞一个或多个线程,直到收到另一个线程的通知、超时或发生虚假唤醒。建议阅读更多相关内容以获取更多详细信息。
历史
- 2013 年 5 月 28 日:初始版本