C++ 中的自旋锁
一个可用于通用锁定的自旋锁实现。
引言
如果我们使用常见的同步原语,如互斥锁和临界区,那么两个试图获取锁的线程之间会发生以下事件序列
- 线程 1 获取锁 L 并执行。
- T2 尝试获取锁 L,但它已被持有,因此阻塞并导致上下文切换。
- T1 释放锁 L。这会通知 T2,在较低级别,这涉及某种内核转换。
- T2 唤醒并获取锁 L,导致另一次上下文切换。
因此,当使用原始同步对象时,总会有至少两次上下文切换。自旋锁可以避免昂贵的上下文切换和内核转换。
大多数现代硬件支持原子指令,其中之一是“比较并交换”(CAS)。在 Win32 系统上,它们被称为互锁操作。使用这些互锁函数,应用程序可以在一个原子不可中断的操作中比较和存储一个值。通过互锁函数,可以实现无锁,从而节省昂贵的上下文切换和内核转换,这在低延迟应用程序中可能会成为瓶颈。在多处理器机器上,自旋锁(一种忙等待)可以避免上述两个问题,从而在上下文切换中节省数千个 CPU 周期。然而,使用自旋锁的缺点是,如果它们被持有时间过长,就会变得浪费,在这种情况下,它们会阻止其他线程获取锁并继续执行。本文中所示的实现旨在开发一种通用自旋锁。
算法
典型的(或基本的)自旋锁获取和释放函数如下所示
// acquire the lock
class Lock
{
volatile int dest = 0;
int exchange = 100;
int compare = 0;
void acquire()
{
While(true)
{
if(interlockedCompareExchange(&dest, exchange, compare) == 0)
{
// lock acquired
break;
}
}
}
// release the lock
Void release()
{
// lock released
dest = 0;
}
};
.......
在这里,线程 T1 通过调用 acquire()
函数获取锁。在这种情况下,dest
的值将变为 100。当线程 T2 尝试获取锁时,它将连续循环(即忙等待),因为 dest
和 compare
的值不同,因此 InterlockedCompareExchange
函数将失败。当 T1 调用 release()
时,它将 dest
的值设置为 0,从而允许 T2 获取锁。因为只有那些 acquire()
的线程才会调用 release()
,所以互斥得以保证。
以上是自旋锁的简单实现。然而,仅此实现不足以用于生产环境,因为自旋会消耗 CPU 周期而没有任何有用的工作,这意味着自旋的线程仍将在处理器上调度,直到被抢占。自旋的另一个缺点是它会不断访问内存以重新评估 Interlockedxxx
函数中 dest
的值,这也给总线通信带来了压力。
在单处理器机器上,自旋等待将完全浪费 CPU,因为在自旋线程被内核切换之前,另一个线程 T2 甚至不会被调度。
到目前为止,这个实现还不够好。一个通用自旋锁需要更多的工作,以防在最坏的情况下,当它自旋时间过长时,回退到真正的等待。这里有一些必须考虑的要点
让出处理器
Win32 函数 YieldProcessor()
在处理器上发出“无操作”指令。这使处理器知道代码当前正在执行自旋等待,并使处理器可用于超线程处理器中的其他逻辑处理器,以便其他逻辑处理器可以继续执行。
切换到另一个线程
当一个自旋线程已经消耗了足够的时间自旋,相当于内核分配给它的线程时间片时,强制进行上下文切换有时会很有用。在这里,允许另一个线程做有用的工作更有意义。函数 SwitchToThread()
放弃调用线程的时间片,并运行就绪状态的另一个线程。如果发生切换,它返回 true
,否则返回 false
。
睡眠
SwitchToThread()
可能不会考虑系统中所有线程的执行,因此有时调用 Sleep()
或 Sleepex()
可能是明智之举。以参数 0 调用 Sleep()
是一种很好的方法,因为它在就绪状态中没有相同优先级的线程时不会导致上下文切换。如果优先级更高的线程处于就绪状态,Sleep(0)
将导致上下文切换。
其他注意事项
纯自旋锁仅在锁被持有的时间非常短时才足够好。这里的临界区可能不超过 10 条指令,实际上即使是简单的内存分配、虚函数调用或文件 I/O 也可能需要超过 10 条指令。
其次,如上所述,当应用程序在单处理器上运行时,使用自旋锁是浪费的。
示例项目和实现
C++ 中的示例项目包含一个自旋锁实现,其中考虑了上述几点。它还包含 Stack、Queue 和一个精简的 Producer-Consumer 类的实现。我在这里只关注自旋锁的实现,其余部分很容易理解。
文件 SpinLock.h 定义了这些常量
YIELD_ITERATION
设置为 30 - 这意味着自旋线程将自旋 30 次,等待获取锁,然后调用sleep(0)
以给其他线程提供进展的机会。MAX_SLEEP_ITERATION
设置为 40 - 这意味着当总迭代(或自旋)计数达到 40 时,它将强制使用SwitchToThread()
函数进行上下文切换,以防另一个线程处于就绪状态。
结构体 tSpinLock
充当锁对象,它在声明其对象被同步的类中声明。然后将此对象传递给 tScopedLock
对象的构造函数,该构造函数初始化(引用)传递给它的锁对象。tScopedLock()
构造函数使用 tSpinWait
类的成员函数锁定对象。析构函数 ~tScopedLock()
释放锁。
tSpinWait
类中的 Lock()
函数有一个嵌套的 while
循环。这是有意为之的。因此,如果一个线程正在自旋以获取锁,它不会在每次迭代中都调用 interlockedxxx()
,而是在内部的 while
循环中循环。这种技巧避免了系统内存总线因连续调用 interlockedxx
函数而过度繁忙。
// spin wait to acquire
while(LockObj.dest != LockObj.compare) {
if(HasThreasholdReached())
{
if(m_iterations + YIELD_ITERATION >= MAX_SLEEP_ITERATION)
Sleep(0);
if(m_iterations >= YIELD_ITERATION && m_iterations < MAX_SLEEP_ITERATION)
SwitchToThread();
}
// Yield processor on multi-processor but if on single processor
// then give other thread the CPU
m_iterations++; if(Helper::GetNumberOfProcessors() > 1)
{
YieldProcessor(/*no op*/);
}
else { SwitchToThread(); }
}
内部的 while
循环只比较 dest
和 compare
的值,如果它们不相等,则尝试使用 interlockedxxx
获取它们。根据迭代次数,线程要么被置于睡眠状态,要么被切换。当应用程序在单 CPU 上运行时,它总是强制进行上下文切换。
测试结果
我通过从多个线程(每个线程向队列插入 10000 个整数)向队列插入 10000 个整数来测试此自旋锁实现的性能。然后,我在代码中将 SpinLock
替换为临界区同步原语,并运行了相同的测试。我在 Intel Core DUO CPU T9600 @ 2.80 GHz 上运行了所有测试。
x 轴是线程数,y 轴是所需时间(毫秒)。当线程数为 2 和 4 时,两种同步方法(自旋锁和 CS)表现出接近的性能。随着线程数的增加,临界区锁定所需时间是自旋锁的两倍多。当由于大量线程导致争用增加时,自旋锁似乎扩展得更好。所需时间使用 QueryPerformanceCounter
Win32 方法计算。但是,我建议您在使用平台上进行自己的测试。
以下是结果表
线程数 | 所需时间(自旋锁) | 所需时间(临界区) | |
2 | 6.6 | 7.14 | |
4 | 12.81 | 14.09 | |
6 | 16.01 | 46.37 | |
8 | 23.32 | 54.34 | |
10 | 26.21 | 74.76 | |
15 | 41.17 | 89.05 | |
20 | 47.63 | 116.82 | |
25 | 62.25 | 147.68 | |
30 | 64.37 | 169.17 | |
35 | 88.02 | 210.07 | |
40 | 93.99 | 296.32 |
未来工作
- 在不同平台上分析代码。
- 向项目中添加更多数据结构,如关联数组和哈希表。
结论
这是开发通用自旋锁实现的一次尝试。纯自旋锁在所有情况下都不是一个好的选择,因此需要一种允许自旋线程被内核挂起的实现。
历史
- 初稿。
- 修订 1 - 修正了一些错别字。
- 修订 2 - 代码现在是可重入安全的。
- 修订 3 - 锁释放现在使用
interlockedexchange
。 - 修订 4 - 添加了测试结果。