无锁循环数组队列的又一个实现






4.97/5 (68投票s)
一个基于循环数组的无锁队列,
1. 引言
如今,在有高性能约束的应用中,改进应用的一个显而易见的选择就是:多线程。线程已经存在很久了。在过去,当大多数计算机只有一个处理器时,线程主要被用作将总工作量分解为更小的执行单元的一种方式,允许它们执行实际工作(即“处理”),而其他小的执行单元则在等待资源。一个简单的例子是网络应用程序,它监听 TCP 端口,并在收到来自该端口的请求后执行一些处理。在单线程方法中,应用程序在处理完每个请求之前将无法响应更多请求,因此潜在用户可能会认为应用程序已宕机,而实际上它只是在忙于工作。在多线程方法中,一个新线程可以负责处理部分工作,而主线程将始终准备好响应潜在用户。
在单处理器机器上,那些热衷于处理的多线程应用程序可能不会得到预期的结果。所有线程可能会“争抢”处理器来执行一些工作,整体性能可能与只让一个处理单元完成所有工作相同,甚至更差,因为通信和共享数据的开销很大。
然而,在对称多处理 (SMP) 机器上,前面的多线程应用程序将真正地同时执行多个任务(并行化)。每个线程将拥有一个真正的物理处理器,而不是共享唯一的可用资源。在一个 N 处理器的 SMP 系统中,一个 N 线程的应用程序理论上可以将这类应用程序所需的时间减少 N 倍(实际情况总是少一些,因为跨线程通信或数据共享仍有开销)。
SMP 机器过去价格昂贵,只有对这类软件有浓厚兴趣的公司才能负担得起,但如今多核处理器已经相当便宜(目前售卖的大多数计算机都有多个核心),因此由于其对性能的巨大影响,这类应用程序的并行化变得越来越受欢迎。
但是多线程并非易事。线程必须共享数据并相互通信,很快您就会发现自己面临着老问题:死锁、对共享数据的失控访问、跨线程的动态内存分配/释放等。此外,如果您幸运地在一个对性能有高要求的应用程序上工作,还会有一系列不同的问题严重影响您心爱的多线程系统的整体性能:
- 缓存抖动
- 同步机制上的争用。队列
- 动态内存分配
本文将介绍如何使用基于数组的无锁队列来最小化前面 3 个与性能相关的问题。特别是动态内存分配的使用,因为这在最初设计此无锁队列时是其主要目标。
2. 线程同步如何降低整体性能
2.1 缓存抖动
线程是(根据维基百科):“由操作系统调度的最小处理单元”。每个操作系统都有自己的线程实现,但基本上一个进程包含一组指令(代码)以及该进程的局部内存。线程运行一段代码,但与其所在的进程共享内存空间。在 Linux 中(本文中介绍的队列最初 intended 在此 OS 上工作),线程只是另一种“执行上下文”,在该 OS 中“没有线程的概念。Linux 将线程实现为标准进程。Linux 内核不提供任何特殊的调度语义或数据结构来表示线程。相反,线程只是一个与其他进程共享某些资源的进程。”[1]
这些运行的任务、线程、执行上下文,或者您可能想称呼它们的任何名称,都使用 CPU 的寄存器集来运行。它们包含任务的内部数据,例如当前正在运行的指令地址、某些操作的操作数和/或结果、堆栈指针等。这一组信息称为“上下文”。任何抢占式操作系统(大多数现代操作系统都是抢占式的)都必须能够在几乎任何时间停止一个正在运行的任务,将其上下文保存在某处,以便将来可以恢复(有一些例外,例如声明自己一段时间内不可抢占以执行某项操作的进程)。一旦任务恢复,它将继续进行它正在做的事情,就好像什么都没发生过一样。这是一件好事,处理器在任务之间共享,因此等待 I/O 的任务可以被抢占,从而允许另一个任务接管。单处理器系统可以表现得好像它们是多处理器一样,但正如生活中的一切一样,它有一个权衡:处理器是共享的,但每次任务被抢占时,都会有开销来保存/恢复退出任务和进入任务的上下文。
保存/恢复上下文还有一个额外的隐藏开销:保存在缓存中的数据对于新进程将毫无用处,因为这些数据是为前一个任务缓存的。重要的是要考虑到处理器比内存快几倍,因此在等待内存读写进出处理器的数据时会浪费大量处理时间。这就是为什么在标准 RAM 内存和处理器之间放置缓存内存。它们是更快的、更小的(也更昂贵的)内存槽,从中访问标准 RAM 的数据会被复制到其中,因为它们可能在不久的将来再次被访问。在处理密集型应用程序中,缓存未命中对性能非常重要,因为当数据已经在缓存中时,处理时间将快很多倍。
因此,任何时候任务被抢占,缓存很可能会被后续进程覆盖,这意味着当该进程恢复其操作时,需要一段时间才能使其恢复到被抢占之前的生产力。(某些操作系统——例如 Linux——会尝试将进程恢复到它们上次使用的最后一个处理器上,但根据该进程需要的内存量,缓存可能也无用)。当然,我并不是说抢占是坏事,抢占是 OS 正常工作所必需的,但根据您的 MT 系统的设计,某些线程可能会过于频繁地遭受抢占,由于缓存抖动而降低性能。
那么,任务何时会被抢占?这在很大程度上取决于您的 OS,但中断处理、定时器和系统调用非常可能导致 OS 抢占子系统决定将处理时间分配给系统中的其他进程。这实际上是 OS 难题的一个非常重要的一部分,因为没有人希望进程(饥饿)空闲太久。某些系统调用是“阻塞”的,这意味着任务向 OS 请求资源,然后等待直到资源可用,因为该任务需要资源才能继续执行。这是一个可抢占任务的好例子,因为它将无所事事直到资源可用,所以 OS 会将该任务置于等待状态,并将处理器交给其他任务来工作。
资源基本上是内存中的数据或硬盘、网络、外围设备,以及阻塞同步机制,如信号量或互斥锁。如果一个任务尝试进入一个已经被持有的互斥锁,它将被抢占,一旦互斥锁再次可用,该线程将被添加到“就绪运行”的任务队列中。所以,如果您担心您的进程被过于频繁地抢占(并导致缓存抖动),那么您应该尽可能避免使用阻塞同步机制。
但正如生活中的一切一样,事情并非如此简单。如果您使用比物理处理器数量更多的线程来执行处理密集型任务,而在避免阻塞同步机制时,系统的延迟可能会受到影响。OS 轮换任务的次数越少,当前不活动的进程就越要等到找到空闲处理器才能恢复。整个应用程序甚至可能受到影响,因为它正在等待一个饥饿的线程完成一些计算,然后系统才能向前推进做其他事情。没有有效的公式,这总是取决于您的应用程序、您的系统和您的 OS。例如,在一个处理密集型的实时应用程序中,我会选择使用非阻塞机制来同步线程,并使用比物理处理器少的线程,尽管这可能并非总是可行。在许多其他大部分时间空闲等待来自网络等数据的应用程序中,非阻塞同步可能是过度(可能导致致命)。这里没有秘诀,每种方法都有其优点和缺点,由您自己决定使用哪种方法。
2.2 同步机制上的争用。队列
队列可以轻松地应用于各种不同的多线程场景。如果两个或多个线程需要按顺序通信事件,我首先想到的就是队列。易于理解,易于使用,经过良好测试,并且易于教授(即便宜)。世界上每个程序员都不得不处理队列。它们无处不在。
在单线程应用程序中,队列易于使用,并且它们可以“轻松”地适应多线程系统。您只需要一个未受保护的队列(例如 C++ 中的 std::queue
)和一些阻塞同步机制(例如互斥锁和条件变量)。我在文章中上传了一个使用 glib 实现的阻塞队列的简单示例(这使其兼容 glib 已移植的广泛操作系统)。虽然没有必要真正地重新发明轮子,因为GAsyncQueue[7] 是 Glib 中已包含的线程安全队列的实现,但这段代码很好地展示了如何将标准队列转换为线程安全的队列。
让我们看一下队列中最常见的方法的实现:IsEmpty
、Push
和 Pop
。基本的未受保护队列是 std::queue
,它被声明为 std::queue
。我们将在此处看到的三个方法展示了如何使用 GLib 互斥锁和条件变量(声明为 GMutex* m_mutex
和 Cond* m_cond
)来包装此不安全的实现。可以从本文下载的实际队列还包含 TryPush
和 TryPop
,如果队列已满或为空,则它们不会阻塞调用线程。
template <typename T>
bool BlockingQueue<T>::IsEmpty()
{
bool rv;
g_mutex_lock(m_mutex);
rv = m_theQueue.empty();
g_mutex_unlock(m_mutex);
return rv;
}
IsEmpty
应该在队列没有元素时返回 true,但在任何线程可以访问队列的非安全实现之前,它必须受到保护。这意味着调用线程可能会被阻塞一段时间,直到互斥锁被释放。
template <typename T>
bool BlockingQueue<T>::Push(const T &a_elem)
{
g_mutex_lock(m_mutex);
while (m_theQueue.size() >= m_maximumSize)
{
g_cond_wait(m_cond, m_mutex);
}
bool queueEmpty = m_theQueue.empty();
m_theQueue.push(a_elem);
if (queueEmpty)
{
// wake up threads waiting for stuff
g_cond_broadcast(m_cond);
}
g_mutex_unlock(m_mutex);
return true;
}
Push
将一个元素插入队列。如果另一个线程拥有保护队列的锁,调用线程将被阻塞。如果队列已满,线程将在该调用中被阻塞,直到有人从队列中弹出元素为止;但是,当等待别人从队列中弹出一个元素时,调用线程不会使用任何 CPU 时间,因为它已经被 OS 睡眠了。
template <typename T>
void BlockingQueue<T>::Pop(T &out_data)
{
g_mutex_lock(m_mutex);
while (m_theQueue.empty())
{
g_cond_wait(m_cond, m_mutex);
}
bool queueFull = (m_theQueue.size() >= m_maximumSize) ? true : false;
out_data = m_theQueue.front();
m_theQueue.pop();
if (queueFull)
{
// wake up threads waiting for stuff
g_cond_broadcast(m_cond);
}
g_mutex_unlock(m_mutex);
}
Pop
从队列中提取一个元素(并将其从队列中移除)。如果另一个线程拥有保护队列的锁,调用线程将被阻塞。如果队列为空,线程将在该调用中被阻塞,直到有人向队列中推送一个元素为止;但是(正如 Push
的情况一样),当等待别人从队列中推送一个元素时,调用线程不会使用任何 CPU 时间,因为它已经被 OS 睡眠了。
正如我在上一节中所尝试解释的,阻塞并非易事。它涉及 OS 将当前任务“挂起”或睡眠(等待而不使用任何处理器)。一旦某个资源(例如互斥锁)可用,被阻塞的任务就可以被唤醒(唤醒),这也不是一件小事,因此它可以继续进入互斥锁。在负载很重的应用程序中,使用这些阻塞队列在线程之间传递消息可能会导致争用,即任务花费更长的时间来获取互斥锁(睡眠、等待、唤醒)来访问队列中的数据,而不是实际对数据执行“某些操作”。
在最简单的情况下,一个线程向队列插入数据(生产者),另一个线程从中移除数据(消费者),两个线程都在“争抢”保护队列的唯一互斥锁。如果我们选择自己编写队列的实现而不是简单地包装现有的队列,我们可以使用 2 个不同的互斥锁,一个用于插入,一个用于移除队列中的项目。在这种情况下,争用仅限于极端情况,即队列几乎为空或几乎满。现在,一旦我们需要多于 1 个线程插入或移除队列中的元素,我们的问题就回来了,消费者或生产者将争夺互斥锁。
这就是非阻塞机制的应用之处。任务“争抢”任何资源,而是“预留”队列中的一个位置,而无需被阻塞或唤醒,然后它们插入/移除数据。这些机制需要一种称为 CAS
(比较和交换)的特殊操作,它(根据维基百科)定义为“一个特殊指令,它原子地将内存位置的内容与给定值进行比较,只有当它们相同时,才将该内存位置的内容修改为给定的新值”。例如
volatile int a;
a = 1;
// this will loop while 'a' is not equal to 1. If it is equal to 1 the operation will atomically
// set a to 2 and return true
while (!CAS(&a, 1, 2))
{
;
}
使用 CAS
实现无锁队列并非全新的话题。有相当多的数据结构实现示例,其中大部分使用链表。请参见 [2] [3] 或 [4]。本文的目的不是描述什么是无锁队列,但基本原理是
- 要将新数据插入队列,会分配一个新的节点(使用 malloc),并通过
CAS
操作将其插入队列。 - 要移除队列中的元素,它使用 CAS 操作来移动链表的指针,然后检索被移除的节点以访问数据。
这是一个简单的基于链表的无锁队列实现的示例(摘自 [2],其基于 [5])
typedef struct _Node Node;
typedef struct _Queue Queue;
struct _Node {
void *data;
Node *next;
};
struct _Queue {
Node *head;
Node *tail;
};
Queue*
queue_new(void)
{
Queue *q = g_slice_new(sizeof(Queue));
q->head = q->tail = g_slice_new0(sizeof(Node));
return q;
}
void
queue_enqueue(Queue *q, gpointer data)
{
Node *node, *tail, *next;
node = g_slice_new(Node);
node->data = data;
node->next = NULL;
while (TRUE) {
tail = q->tail;
next = tail->next;
if (tail != q->tail)
continue;
if (next != NULL) {
CAS(&q->tail, tail, next);
continue;
}
if (CAS(&tail->next, null, node)
break;
}
CAS(&q->tail, tail, node);
}
gpointer
queue_dequeue(Queue *q)
{
Node *node, *tail, *next;
while (TRUE) {
head = q->head;
tail = q->tail;
next = head->next;
if (head != q->head)
continue;
if (next == NULL)
return NULL; // Empty
if (head == tail) {
CAS(&q->tail, tail, next);
continue;
}
data = next->data;
if (CAS(&q->head, head, next))
break;
}
g_slice_free(Node, head); // This isn't safe
return data;
}
在那些没有垃圾回收器的编程语言中(C++ 就是其中之一),最后一次调用 g_slice_free 由于所谓的ABA 问题而不安全。
- 线程 T1 读取要出队的值,并在第一次调用
queue_dequeue
中的CAS
之前停止。 - 线程 T1 被抢占。T2 尝试对 T1 即将出队相同的节点执行
CAS
操作。 - 它成功了,并释放了该节点分配的内存。
- 同一个线程(或一个新线程,例如 T3)将入队一个新节点。malloc 的调用返回的地址与步骤 2-3 中移除的节点所使用的地址相同。它将该节点添加到队列中。
- T1 重新获得处理器,
CAS
操作由于地址相同而错误地成功,但它不是同一个节点。T1 移除了错误的节点。
可以通过为每个节点添加引用计数器来解决ABA 问题。在假定 CAS 操作正确之前,必须检查此引用计数器以避免 ABA 问题。好消息是本文介绍的队列不存在 ABA 问题,因为它不使用动态内存分配。
2.3 动态内存分配
在多线程系统中,必须认真考虑内存分配。标准的内存分配机制会阻塞所有共享内存空间的任务(进程的所有线程)在分配堆空间时进行内存预留。这是一种简单的做法,而且有效,因为不可能有两个线程同时被分配相同的内存地址。然而,当线程频繁分配内存时(并且需要注意的是,诸如向标准队列或标准映射插入元素之类的小事都会在堆上分配内存),它会非常慢。
有一些库会覆盖标准的分配机制,提供无锁内存分配机制来减少对堆的争用,例如libhoard[6]。有很多这类库,如果您将标准 C++ 分配器切换到其中一个基于无锁的内存分配器,它们可能会对您的系统产生巨大影响。但有时它们可能不是系统所需的,软件必须付出额外的努力来更改其同步机制。
3. 基于循环数组的无锁队列
所以,最终,这就是基于循环数组的无锁队列,本文最初的目的是介绍这个东西。它被开发出来是为了降低上述 3 个问题的影响。其特性可以总结为以下列表:
- 作为一个无锁同步机制,它降低了进程被抢占的频率,从而减少了缓存抖动。
- 同样,作为任何无锁队列,线程之间的争用大大减少,因为没有锁来保护任何数据结构:线程基本上先声明空间,然后用数据填充它。
- 它不需要在堆上分配任何东西,而其他无锁队列实现则需要。
- 它也不会遭受 ABA 问题,尽管它在数组处理中增加了一些级别的开销。
3.1 它是如何工作的?
队列基于一个数组和 3 个不同的索引
writeIndex
:新元素将被插入的位置。readIndex
:下一个元素将被提取的位置。maximumReadIndex
:它指向最新“已提交”数据插入的位置。如果它与 writeIndex 不相同,则表示有待“提交”到队列的写入,即数据已预留空间(数组中的索引),但数据尚未进入队列,因此尝试读取的线程将不得不等待其他线程将数据保存到队列中。
值得一提的是,需要 3 个不同的索引是因为队列允许设置任意数量的生产者和消费者。已经有一篇关于单生产者单消费者配置队列的文章[11]。其简单的方法绝对值得阅读(我一直喜欢KISS 原则)。这里的情况变得更复杂,因为队列必须对所有类型的线程配置都是安全的。
3.1.1 CAS 操作
这个无锁队列的同步机制基于比较和交换(CAS)CPU 指令。CAS 操作已包含在 GCC 4.1.0 版本中。由于我使用 GCC 4.4 编译此算法,因此我决定利用 GCC 内置的 CAS 操作,称为 __sync_bool_compare_and_swap
(它在此处描述)。为了支持更多编译器,此操作使用 #define
在 atomic_ops.h
文件中“映射”到单词 CAS
。
/// @brief Compare And Swap
/// If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
/// @return true if the comparison is successful and a_newVal was written
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)
如果您打算使用其他编译器编译此队列,您只需定义一个 CAS 操作,该操作以某种方式适用于您的编译器。它必须符合以下接口:
- 第一个参数是要更改的变量的地址。
- 第二个参数是旧值。
- 第三个参数是在第一个参数等于第二个参数时将保存到第一个参数中的值。
- 如果成功则返回 true(非零),否则返回 false。
3.1.2 将元素插入队列
这是负责将新元素插入队列的代码。
/* ... */
template <typename ELEM_T, uint32_t Q_SIZE>
inline
uint32_t ArrayLockFreeQueue<ELEM_T, Q_SIZE>::countToIndex(uint32_t a_count)
{
return (a_count % Q_SIZE);
}
/* ... */
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
{
uint32_t currentReadIndex;
uint32_t currentWriteIndex;
do
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
// the queue is full
return false;
}
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// We know now that this index is reserved for us. Use it to save the data
m_theQueue[countToIndex(currentWriteIndex)] = a_data;
// update the maximum read index after saving the data. It wouldn't fail if there is only one thread
// inserting in the queue. It might fail if there are more than 1 producer threads because this
// operation has to be done in the same order as the previous CAS
while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
{
// this is a good place to yield the thread in case there are more
// software threads than hardware processors and you have more
// than 1 producer thread
// have a look at sched_yield (POSIX.1b)
sched_yield();
}
return true;
}
下图描述了队列的初始配置。每个方块描述队列中的一个位置。如果用大 X 标记,表示其中包含数据。空白方块表示为空。在此特定情况下,队列中当前有两个元素。WriteIndex
指向新数据将被插入的位置。ReadIndex
指向在下一次调用 pop
时将被清空的槽。
基本上,当一个新元素将要写入队列时,生产者线程会通过增加 WriteIndex
来“预留”队列中的空间。MaximumReadIndex
指向包含有效(已提交)数据的最后一个槽。
一旦预留了新空间,当前线程就可以花时间将数据复制到队列中。然后它会增加 MaximumReadIndex
。
现在队列中已完全插入 3 个元素。在下一步,另一个任务尝试向队列中插入新元素。
它已经为将要存储的数据预留了空间,但在该任务将新数据复制到预留的槽之前,另一个线程预留了一个新槽。有两个任务同时插入元素。
线程现在将它们的数据复制到它们预留的槽中,但必须按严格顺序进行:第一个生产者线程将增加 MaximumReadIndex
,然后是第二个生产者线程。严格的顺序限制很重要,因为我们必须确保保存在槽中的数据在允许消费者线程从队列中弹出它之前已完全提交。
第一个线程已将数据提交到槽中。现在允许第二个线程增加 MaximumReadIndex
。
第二个线程也增加了 MaximumReadIndex
。现在队列中有 5 个元素。
3.1.3 从队列中移除元素
这是移除队列中元素的代码。
/* ... */
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
{
uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_theQueue[countToIndex(currentReadIndex)];
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
return true;
}
// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} while(1); // keep looping to try again!
// Something went wrong. it shouldn't be possible to reach here
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
这是“将元素插入队列”部分中相同的起始配置。队列中当前有两个元素。WriteIndex
指向新数据将被插入的位置。ReadIndex
指向在下一次调用 pop 时将被清空的槽。
即将读取的消费者线程会复制 ReadIndex
指向的元素,并尝试对同一个 ReadIndex
执行 CAS
操作。如果 CAS
操作成功,则线程已从队列中检索到该元素,并且由于 CAS
操作是原子的,因此一次只有一个线程可以增加此 ReadIndex
。如果 CAS
操作不成功,它将使用 ReadIndex
指向的下一个槽重试。
另一个线程(或同一个线程)读取下一个元素。队列为空。
现在一个任务正在尝试向队列中添加一个新元素。它成功预留了该槽,但更改尚未提交。任何尝试弹出值的其他线程都知道队列不为空,因为另一个线程已经预留了一个队列槽(writeIndex
不等于 readIndex
),但它无法读取该槽中的值,因为 MaximumReadIndex
仍等于 readIndex
。这个尝试弹出值的线程将在 pop
调用中循环,直到正在将更改提交到队列的任务增加 MaximumReadIndex
,或者直到队列再次为空(这可能发生,如果一旦生产者线程增加了 MaximumReadIndex
,另一个消费者线程在我们线程尝试之前就将其弹出了,因此 writeIndex
和 readIndex
指向同一个槽)。
当生产者线程完全将值插入队列后,大小将为 1,消费者线程将能够读取该值。
3.1.4 关于在有多个生产者线程时让出处理器的必要性
读者此时可能会注意到,push
函数可能会调用一个函数(sched_yield()
)来让出处理器,这对于声称是无锁的算法来说似乎有点奇怪。正如我在本文开头所尝试解释的,多线程影响整体性能的一种方式是缓存抖动。缓存抖动的一种典型方式是,每当一个线程被抢占时,OS 必须将退出进程的状态(上下文)保存到内存中,并恢复进入进程的上下文。但是,除了浪费时间之外,保存在缓存中的数据对于新进程将毫无用处,因为这些数据是为前一个任务缓存的。
因此,当此算法调用 sched_yield()
时,它明确地告诉 OS:“请把别人放到这个处理器上,因为我需要等待某事发生?”无锁和阻塞同步机制之间的主要区别在于,在使用无锁算法时,我们不需要被阻塞来同步线程,那么为什么我们要请求 OS 抢占?这个问题的答案并不简单。它与生产者线程如何将新数据存储到队列中有关:它们必须执行2 次 FIFO 顺序的 CAS 操作,一次分配队列中的空间,另一次通知读取者可以读取数据,因为数据已经“提交”。
如果我们的应用程序只有一个生产者线程(这正是该队列最初设计时的情况),那么 sched_yield()
调用将永远不会发生,因为第二次 CAS 操作(“提交”队列中数据的那个)不会失败。当只有一个线程插入数据时(因此修改 writeIndex 和 maximumReadIndex),此操作不可能不是 FIFO 顺序的。
当有两个或更多线程向队列中插入元素时,问题就开始了。将新元素插入队列的整个过程在第 3.1.2 节中进行了说明,但基本上生产者线程执行 1 次 CAS 操作来“分配”将存储新数据的空间,然后一旦数据被复制到已分配的空间中,再执行第二次 CAS 操作来通知消费者线程新数据可供读取。第二次 CAS 操作必须按 FIFO 顺序执行,即按第一次 CAS 操作执行的顺序。问题就出在这里。让我们考虑以下场景,有 3 个生产者和 1 个消费者线程:
- 线程 1、2 和 3 按此顺序分配插入数据的空间。第二次 CAS 操作也必须按相同的顺序执行,即先线程 1,然后线程 2,最后线程 3。
- 但是,线程 2 最先到达有问题的第二次 CAS 操作,并且失败了,因为线程 1 尚未执行。线程 3 也发生了同样的事情。
- 两个线程(2 和 3)将一直循环尝试执行它们的第二次 CAS 操作,直到线程 1 先执行。
- 线程 1 最终执行了它。现在线程 3 必须等待线程 2 执行其 CAS 操作。
- 线程 2 现在执行成功了,因为线程 1 已经完成,它的数据已完全插入队列。线程 3 也停止循环,因为在线程 2 之后,它的 CAS 操作也成功了。
在上述场景中,生产者线程可能会花费相当长的时间尝试执行第二次 CAS 操作,同时等待另一个线程执行相同的操作(按顺序)。在有比使用队列的线程更多的空闲物理处理器的多处理器机器上,这可能不太重要:线程将被卡住,尝试在保存 maximumReadIndex 的易失性变量上执行 CAS 操作,但它们等待的线程有一个分配的物理处理器,因此最终它将在另一个物理处理器上运行时执行其第二次 CAS 操作,而其他正在旋转的线程在执行其第二次 CAS 操作时也将成功。总而言之,该算法可能会让线程保持循环,但这种行为是预期的(也是期望的),因为它以最快的速度工作。因此,没有必要使用 sched_yield()
。事实上,为了达到最大的性能,应该删除该调用,因为当没有真正需要时,我们不想告诉 OS 让出处理器(以及相关的开销)。
但是,当有多个生产者线程且物理处理器数量少于总线程数时,调用 sched_yield()
对于队列的性能非常重要。再考虑一下前面的场景,当时有 3 个线程试图向队列中插入新数据:如果线程 1 在分配了队列中的新空间之后,但在执行第二次 CAS 操作之前就被抢占,那么线程 2 和 3 将永远循环等待一个永远不会发生的事情,直到线程 1 被唤醒。这就是 sched_yield()
调用之所以必要的,OS 不能让线程 2 和 3 持续循环,它们必须尽快被阻塞,以尝试让线程 1 执行第二次 CAS 操作,以便 2 和 3 可以继续并将它们的数据提交到队列中。
4. 队列的已知问题
此无锁队列的主要目标是提供一种无需动态内存分配即可实现无锁队列的方法。它已成功实现,但该算法确实存在一些已知缺点,在使用于生产环境之前应予以考虑。
4.1 使用一个以上的生产者线程
正如第 3.1.4 节中所述(在使用此队列处理多个生产者线程之前,您必须仔细阅读该节),如果存在多个生产者线程,它们可能会花费过长时间来尝试更新 MaximumReadIndex
,因为它必须按 FIFO 顺序进行。该队列最初设计时的场景只有一个生产者,而且可以肯定地说,在该场景下(即多个生产者线程),该队列的性能会显著下降。
此外,如果您计划仅使用一个生产者线程来使用此队列,则不需要第二次 CAS 操作(用于将元素提交到队列的那个)。除了第二次 CAS 操作之外,还应移除易失性变量 m_maximumReadIndex
,并将所有对它的引用更改为 m_writeIndex。因此,push
和 pop
应被以下代码片段替换:
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
{
uint32_t currentReadIndex;
uint32_t currentWriteIndex;
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
if (countToIndex(currentWriteIndex + 1) ==
countToIndex(currentReadIndex))
{
// the queue is full
return false;
}
// save the date into the q
m_theQueue[countToIndex(currentWriteIndex)] = a_data;
// No need to increment write index atomically. It is a
// requierement of this queue that only one thred can push stuff in
m_writeIndex++;
return true;
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
{
uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
do
{
// m_maximumReadIndex doesn't exist when the queue is set up as
// single-producer. The maximum read index is described by the current
// write index
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_writeIndex;
if (countToIndex(currentReadIndex) ==
countToIndex(currentMaximumReadIndex))
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
return false;
}
// retrieve the data from the queue
a_data = m_theQueue[countToIndex(currentReadIndex)];
// try to perfrom now the CAS operation on the read index. If we succeed
// a_data already contains what m_readIndex pointed to before we
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
return true;
}
// it failed retrieving the element off the queue. Someone else must
// have read the element stored at countToIndex(currentReadIndex)
// before we could perform the CAS operation
} while(1); // keep looping to try again!
// Something went wrong. it shouldn't be possible to reach here
assert(0);
// Add this return statement to avoid compiler warnings
return false;
}
如果您计划在只有一个生产者和一个消费者线程的场景中使用此队列,那么绝对值得查看 [11],其中一个类似的设计用于此特定配置的循环队列已得到详细解释。
4.2 将队列与智能指针一起使用
如果队列被实例化以保存智能指针,请注意,智能指针保护的内存直到该元素所在的索引被新智能指针占用后,才会被完全删除(智能指针的引用计数等于 0)。在繁忙的队列中这不应该是一个问题,但是程序员应该考虑到,一旦队列第一次被完全填满,应用程序占用的内存量将不会减少,即使队列是空的。
4.3 计算队列大小
原始函数 size
可能会返回无效值。这是它的一个代码片段:
template <typename ELEM_T>
inline uint32_t ArrayLockFreeQueue<ELEM_T>::size()
{
uint32_t currentWriteIndex = m_writeIndex;
uint32_t currentReadIndex = m_readIndex;
if (currentWriteIndex >= currentReadIndex)
{
return (currentWriteIndex - currentReadIndex);
}
else
{
return (m_totalSize + currentWriteIndex - currentReadIndex);
}
}
以下场景描述了此函数返回无效数据的情况:
- 当语句
currentWriteIndex = m_writeIndex
被执行时,m_writeIndex
是 3,m_readIndex
是 2。实际大小是 1。 - 之后,该线程被抢占。在此线程不活动期间,2 个元素被插入并从队列中移除,因此
m_writeIndex
是 5,m_readIndex
是 4。实际大小仍然是 1。 - 现在当前线程从抢占中恢复,并读取
m_readIndex
。currentReadIndex
是 4。 currentReadIndex
大于currentWriteIndex
,因此返回m_totalSize + currentWriteIndex - currentReadIndex
,即它返回队列几乎已满,而实际上它几乎是空的。
本文中上传的队列包含了该问题的解决方案。它包括添加一个新的类成员,其中包含队列中元素的当前数量,可以使用 AtomicAdd
/AtomicSub
操作对其进行增量/减量。此解决方案会增加重要的开销,因为这些原子操作很昂贵,因为编译器无法轻易优化它们。
例如,在 Core 2 Duo E6400 2.13 Ghz(2 个硬件处理器)上的一次测试运行中,2 个线程(1 个生产者 + 1 个消费者)在初始化为 1k 个槽大小的无锁队列中插入 10,000k 个元素大约需要 2.64 秒,而如果该变量由队列维护,则大约需要 3.42 秒(增加约 22%)。在相同的环境下,使用 2 个消费者和 1 个生产者时,也需要 22% 的时间:不可靠大小版本队列需要 3.98 秒,而另一个版本需要 5.15 秒。
这就是为什么将激活大小变量的开销留给开发者决定。队列被用于何种应用程序总是取决于您决定该开销是否值得。在 array_lock_free_queue.h
中有一个编译器预处理变量,名为 ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE
。如果已定义,则“可靠大小”开销将被激活。如果未定义,则开销将禁用,并且 size 函数可能会返回无效值。
5. 编译代码
本文包含的无锁队列和 Glib 阻塞队列都是基于模板的 C++ 类。模板化代码必须放在头文件中,因此在 .cpp 文件中使用之前不会被编译。我已将两个队列包含在一个 .zip 文件中,其中包含每个队列的示例文件,以展示其用法,并测量每个队列完成多线程测试所需的时间。
测试代码使用 gomp 编写,它是 OpenMP 应用程序编程接口 (API) 的 GNU 实现,用于 C/C++ 的跨平台共享内存并行编程[9]。GCC 自 4.2 版本以来就包含了它。OpenMP 是开发跨平台并行应用程序的简单灵活的接口,并且是一种编写多线程代码的非常简单的方式。
因此,附属于本文的代码分为 3 部分,每个部分有不同的要求:
1. 基于数组的无锁队列
- 无锁队列有两种独立版本。一种适用于任何线程配置,另一种适用于只有 1 个生产者线程的环境。它们分别存储在
array_lock_free_queue.h
和array_lock_free_queue_single_producer.h
中。 - GCC 4.1.0 及更高版本用于原子操作(
CAS
、AtomicAdd
和AtomicSub
)。如果使用其他编译器,必须在 atomic_ops.h 中定义这些操作(它们可能取决于编译器或您的平台,或两者都取决于)。 - 如果
uint32_t
类型未包含在您环境的stdint.h
实现中,您可能还需要定义它。它包含在 GNU-Linux 中,但在 Windows 中不包含。在大多数现代环境中,您只需执行以下操作:typedef unsigned int uint32_t; // int is (normally) 32bit in both 32 and 64bit machines
在此还需要注意的是,此队列未在 64 位环境中进行测试。如果原子操作不支持 64 位类型变量,GCC 可能会出现编译时错误,这就是为什么选择 32 位类型变量来实现队列(在 32 位机器上可能没有 64 位原子操作)。如果您的机器支持对 64 位变量的原子操作,我不认为该队列在操作 64 位索引时会失败。 - 适用于所有线程配置的无锁队列(对每个
push
操作执行 2 次CAS
操作的那个)还使用对处理器让出的调用:sched_yield()
。根据文档,此调用是 POSIX [10] 的一部分,因此任何符合 POSIX 的 OS 都应该可以毫无问题地编译它。
2. 基于 Glib 的阻塞队列
- 您需要系统中有 glib。这在 GNU-Linux 系统上非常直接,但在其他平台系统上可能更复杂。这里有一个 GTK+ 整个库的现成软件包可供在 GNU-Linux、Windows 和 OSX 上安装:https://gtk.org.cn/download.html
- 它还使用 glib 对互斥锁和条件变量的实现,这些是 gthread 库的一部分,因此如果您决定包含此队列,您还需要在编译应用程序时链接到它。
3. 测试应用程序
- 满足无锁队列和 Glib 阻塞队列的所需要求。
- 用于处理 makefile 的 GNU make 应用程序。您可以将编译时选项传递给编译过程,例如:
make N_PRODUCERS=1 N_CONSUMERS=1 N_ITERATIONS=10000000 QUEUE_SIZE=1000
其中- N_PRODUCERS 是生产者线程的数量。
- N_CONSUMERS 是消费者线程的数量。
- N_ITERATIONS 是将被插入和移除队列的总项数。
- QUEUE_SIZE 是队列的最大槽数。
- GCC,至少是 4.2 版本,以便能够编译 Gomp。
- 在运行测试应用程序之前,还需要将
'OMP_NESTED=TRUE'
添加到命令行,例如`OMP_NESTED=TRUE ./test_lock_free_q`
。
6. 一些图表
以下图表显示了在具有不同设置和线程配置的 2 核机器(2 个硬件处理器)上运行本文包含的测试应用程序的结果。
6.1. 第二个 CAS 操作对性能的影响
该队列的一个已知问题是在只有一个生产者线程时多余的第二个 CAS 操作。下图显示了在 2 核机器上,当只有一个生产者线程时,移除它的影响(数值越低越好)。它显示了在同时插入和提取一百万个元素时,所需秒数提高了约 30%。
6.2. 无锁队列 vs. 阻塞队列。线程数
下图显示了在推入和弹出一百万个元素时所需时间的比较(数值越低越好),具体取决于线程的设置(队列大小设置为 16384)。当只有一个生产者线程时,无锁队列的性能优于标准的阻塞算法。当使用多个生产者线程时(所有生产者都同样贪婪地将元素推入队列),无锁队列的性能会迅速下降。
6.3. 使用 4 个线程的性能
下图显示了在不同线程配置下,当推入和弹出一百万个项目时,不同队列的整体性能。
6.3.1 单个生产者线程
6.3.2 两个生产者线程
6.3.3 三个生产者线程
6.3.1 单个消费者线程
6.3.2 两个消费者线程
6.3.3 三个消费者线程
6.4. 一台 4 核机器
最好在 4 核机器上运行相同的测试,以研究当有 4 个可用物理处理器时的整体性能。这些测试还可以用于研究 sched_yield()
调用在没有实际需要时产生的影响。
7. 结论
这种基于数组的无锁队列已被证明在其两个版本中都能正常工作:一个版本在多生产者线程配置中是线程安全的,另一个版本不是(类似于 [11] 但允许多个消费者线程)。这两个队列都可以安全地用作多线程应用程序中的同步机制,因为:
- 由于
CAS
操作是原子的,尝试并行推入或弹出元素到/从队列的线程不会发生死锁。 - 多个线程尝试同时推入元素到队列中无法写入数组的同一槽,覆盖彼此的数据。
- 多个线程尝试同时弹出队列中的元素无法重复移除同一项。
- 线程无法推入数据到已满队列或从空队列中弹出数据。
- 推入和弹出都不会遭受ABA问题。
但是,需要注意的是,即使算法是线程安全的,它在多生产者环境中的性能也逊于简单的基于块的队列。因此,选择此队列而不是标准块式队列只有在以下配置之一中才有意义:
- 只有 1 个生产者线程(单生产者版本更快)。
- 有 1 个繁忙的生产者线程,但我们仍然需要队列是线程安全的,因为在某些情况下,另一个线程可能会将东西推入队列。
8. 历史
2011年1月4日:初始版本
2011年4月27日:突出显示一些关键字。删除不必要的文件。修复了几个拼写错误。
2015年7月31日:根据 Artem Elkin (单生产者)和 Member 11590800 (溢出 bug)的评论,更新了代码。
9. 参考文献
[1] Love, Robert: "Linux Kernel Development Second Edition", Novell Press, 2005
[2] Introduction to lock-free/wait-free and the ABA problem
[3] Lock Free Queue implementation in C++ and C#
[4] High performance computing: Writing Lock-Free Code
[5] M.M. Michael and M.L. Scott, "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms," Proc. 15th Ann. ACM Symp. Principles of Distributed Computing, pp. 267-275, May 1996.
[6] The Hoard Memory Allocator
[8] boost::shared_ptr documentation
[9] GNU libgomp
[10] sched_yield documentation
[11] lock-free single producer - single consumer circular queue