65.9K
CodeProject 正在变化。 阅读更多。
Home

无锁单生产者-单消费者循环队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.84/5 (57投票s)

2009年11月4日

公共领域

20分钟阅读

viewsIcon

473786

downloadIcon

5717

如何使用 C++11 制作一个免等待、无锁的循环 FIFO。

简介

免等待和无锁循环队列对于时间和内存敏感的系统来说是一种有用的技术。队列的免等待特性使得每个操作都有固定的步骤。队列的无锁特性使得单个源线程(生产者)到单个目标线程(消费者)之间的两个线程通信无需使用任何锁。

免等待和无锁的结合使得这种类型的循环队列在从中断和信号处理程序到实时系统或其他时间敏感软件等一系列领域中都具有吸引力。

背景:循环 FIFO 队列 

此循环 FIFO 队列可用于最多两个线程之间的通信。这种情况通常被称为单生产者 → 单消费者问题。

问题描述:一个高优先级任务,不能长时间延迟,并且要求数据处理按启动顺序完成。例如:中断/信号处理程序、GUI 事件、数据记录器等。

解决方案:将耗时的工作委托给另一个接收(即消费)已生产消息的线程。使用 FIFO 队列可以为从生产者消费者传递任务的异步委托和处理提供可预测的行为。

只要队列未满,生产者就可以向队列中添加新任务。只要队列不为空,消费者就可以从队列中移除任务。在队列中,添加或检索数据无需等待。如果没有要处理的任务,消费者线程可以继续执行其他任务。如果队列中没有空间供生产者使用,它不需要等待直到有空间。

显然,对于生产者来说,队列已满是一种不理想的情况。必须仔细考虑其最大大小以及其与处理满队列的负面影响的关系。处理这些问题(例如覆盖旧项目、多优先级队列或处理此场景的其他方式)超出了本文的范围。

背景:文章基本原理

早在 2009 年,我发表了 我的第一篇 CodeProject 文章,内容是关于无锁循环 FIFO 队列。在军用航空电子工业中,我曾见过一种在我看来是危险的技术来创建无锁循环 FIFO。该队列用于中断处理程序,因为它将来自中断处理的传入消息推送到实时系统。

如果阅读本文的原始 版本(也发布在 此处!),将会有一个描述和推理,关于何时、为何以及如何一种危险技术可能奏效而何时不会奏效。旧文章旨在尝试清晰地理解这种强烈不鼓励的技术,但并未推荐它。

旧文章描述了一种违背良好 C++ 实践的线程通信类型,它使用高度依赖于平台的循环 FIFO 队列。它很容易崩溃,因为它基本上是一个丑陋的平台 hack,通过简单的幻觉看起来很有吸引力。它可能会因为编译器更新或在另一个硬件架构上使用而崩溃。

在旧文章中,我曾承诺发表一个由 C++11 支持的队列,该队列通过使用 std::atomic 实现无锁。现在是时候了C++11 及其内存模型已在 Linux 和 Windows 上可用。使用 ‹atomic› 库中的 std::atomic 有效地消除了再次尝试使用不鼓励的 平台相关 hack 的需要。

本文的目的不仅是展示两个免等待、无锁的循环 FIFO 队列。文章还解释了管理 C++11 内存模型的规则。阅读本文后,您应该对 ‹atomic› 库背后的一些基本原理有所了解。

目录

对于那些想深入了解循环队列内部工作原理的人,我在 第一部分 提供了对其和我实现的简短描述。对于那些已经熟悉它的人,可以通过转到 第二部分 来缩短阅读时间。第二部分简要描述了 C++11 不同的内存模型以及它们如何与 std::atomic 一起用于创建这些循环队列的无锁方面。

第一部分

第二部分

免责声明

真的无锁吗?但是...
C++11 标准保证 std::atomic_flag 是无锁的。事实上,std::atomic_flag唯一保证无锁的类型。某些平台/库实现可能会在内部对原生原子类型使用锁。然而,尽管这取决于系统,但很可能许多原生原子类型都是无锁的。

CircularFifo 提供了一个函数 bool isLockFree() const {...},可以调用它来查看它是否真正无锁,即使是在幕后。´

但是...
就像在 旧的 CodeProject 文章中一样,本文旨在简单易懂。一个最佳解决方案可能拥有的所有细节都未涵盖。这些细节通常在以下领域:

  • 消费者/生产者处理空/满队列。
  • 处理可变大小的项目

多个生产者,多个消费者
此队列绝不能使用多个消费者或生产者。使用更多的生产者或消费者线程会破坏此设计并损坏队列。此队列必须仅在单个生产者单个消费者场景中使用。如果需要更多线程,则应使用不同的解决方案。

没什么“新”的!
本文不声称引入了一种新的概念,或者一种新的编写这种无锁循环队列的方式。相反,据我所知,本文介绍了一种众所周知且广为接受的编写此类队列的方式。最终的代码实际上与 旧代码惊人地相似,几乎相同。比较代码可能会给人一种只有微不足道的代码更改的错觉,但这些更改的影响是巨大的。真正的区别在于原子声明和处理。这当然是一个巨大的区别,因为幕后发生了很多事情。

旧代码应被视为损坏的,并且旧代码不保证符合任何标准,而这里提供的代码应根据新的 C++11 标准 [2] 工作。

公共领域
使用代码风险自负。我当然自己也在使用它,但由于其处于公共领域状态,我不会提供任何保证。我也不会对可能遗漏的错误做出任何承诺。使用与否是您自己的责任。可以自由使用代码,无需对我或对许可证承担任何义务。随意修改和使用代码。

尾部优先还是头部优先?
项目应该在尾部添加并在头部移除,还是相反更合乎逻辑?意见不一。本文采用“项目在尾部添加,从头部移除”的方法,以与旧文章保持一致,并使其与常见且众所周知的 pop_frontpush_back 队列操作相似。

内存模型:顺序一致还是宽松/获取/释放?

本文介绍了两个版本的免等待和无锁循环 FIFO。第一个,最直观的,使用 C++11 默认的内存顺序 memory_order_seq_cst。另一个,稍微复杂一些,使用 memory_order_relaxedmemory_order_acquirememory_order_release。内存顺序的使用将被解释,我还会尝试表明默认的 memory_order_seq_cst 不仅最容易推理,而且在 x86-x64 架构上具有非常好的性能。

第一部分

使用代码

请记住,只使用一个线程作为生产者,只使用一个线程作为消费者。如果任何一方涉及多个线程,将破坏线程安全设计并损坏生产者→消费者通信。

使用代码再简单不过了。

CircularFifo‹Message, 128› queue;     // 128 'Messages' will fit
  
/* Producer thread */
Message m = ...
if ( false == queue.push(m)) { /*false equals full queue */ }
  
  
/* Consumer thread */
Message m;
if ( false == queue.pop(m)) { /* false equals empty queue */ }

您可以在代码附带的单元测试中找到更多示例。

代码解释:Circularfifo‹类型,大小›

我们直接进入顺序一致代码,不再赘述。代码略微简化以提高文章可读性。如果您喜欢完整的语法,可以打开代码并并排查看。

template‹typename Element, size_t Size› 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); 
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic‹size_t›  _tail;  
  Element              _array[Capacity];
  std::atomic‹size_t›  _head; 
};

工作原理

第一个描述的队列使用默认的、顺序的原子操作。顺序模型使得推理原子操作以及它们与其他线程中原子操作的关系变得容易。在 x86x64 等强大的硬件内存(例如处理器)架构上,与后面讨论的使用细粒度 std::memory_order_{relaxed/acquire/release} 的 CircularFifo 相比,使用顺序原子 CircularFifo 会带来一些性能开销。

在弱序内存架构上,队列之间的性能差异会更显著。

压入和弹出
bool push(Item&) 将由生产者线程完成。bool pop(Item&) 将由消费者线程使用。

Empty

Empty Queue

当缓冲区为空时,两个索引将相同。消费者进行的任何读取都将失败。

bool wasEmpty() const
{
  return (_head.load() == _tail.load());
}

当队列为空时,消费者尝试检索 bool pop(Item&) 的任何尝试都将失败。使用 wasEmpty() 检查队列是否为空将提供队列状态的快照。当然,在读取器对其采取行动之前,状态可能会改变。这没问题,应该将其视为快照,而不是其他。

生产者添加项目

生产者在尾部索引的位置添加(push())新项目。写入后,尾部增加一步,如果到达队列末尾则循环到开头。队列随着尾部增长。

Empty Queue

/* Producer only: updates tail index after setting the element in place */
bool push(Element& item_)
{	
  auto current_tail = _tail.load();            
  auto next_tail = increment(current_tail);    
  if(next_tail != _head.load())                         
  {
    _array[current_tail] = item;               
    _tail.store(next_tail);                    
    return true;
  }
  
  return false;  // full queue
}

生产者添加另一个项目,尾部再次递增。

Producer adds second item

消费者检索项目

消费者检索(pop())头部索引的项目。头部随着递增一步而向尾部移动。队列随着头部缩小。

Empty Queue

/* Consumer only: updates head index after retrieving the element */
bool pop(Element& item)
{
  const auto current_head = _head.load();  
  if(current_head == _tail.load())  
    return false;   // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head)); 
  return true;
}

如果生产者向队列中推送的项目多于消费者能够跟上的,队列最终将变满。

Full Queue

当队列已满时,头部和尾部之间会有一个槽位差异。此时,生产者的任何写入都将失败。是的,它甚至必须失败,否则空队列条件,即 head == tail,将变为真。

bool wasFull() const
{
  const auto next_tail = increment(_tail.load());
  return (next_tail == _head.load());
}


size_t increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

% 模数运算符

% 模数运算符如果以前没有遇到过可能会让人感到困惑。% 运算符给出整数除法的余数。它可以用多种方式使用,这里它用于计算索引何时应该以循环方式返回到开头。

 // Example: A CircularFifo with Size of 99
 // Capacity is Size+1. The padding of +1 is used to simplify the logic
 increment(98) ---> will yield 99
 increment(99) ---> will yield 0 
}

push() 中显示的 next_tail 比队列大小大一时,它会循环回到开头,即 索引 0

线程安全使用和实现

线程安全通过类设计及其预期用途得到保障。只有两个函数可以修改队列的状态:push()pop()。只允许一个线程接触 push()。只允许一个线程接触 pop()。通过这种方式,当 push()pop() 由最多一个相应的线程使用时,线程安全得到保证。生产者线程只应访问 push(),消费者线程只应访问 pop()

通过接口访问可以进一步强调设计,但对于本文,我认为在这里和注释中澄清就足够了。对于实际项目,使用像 SingleConsumerInputQueueSingleProducerOutputQueue 这样的接口可能有助于减少因混淆而导致的错误的风险。

按预期方式使用时的原子访问
生产者:push() 仅写入 tail,但读取 head 快照以验证队列是否未满。
消费者:pop() 仅写入 head,但读取 tail 快照以验证队列是否不为空。只有生产者线程会更新 tail,只有消费者线程会更新 head

然而,对 tailhead 的读写都必须是线程安全的,因为我们不想读取旧的、缓存的值,并且读写都必须是原子的。如下所述,tailhead 索引的更新是原子的。原子读/写以及此类的设计将确保最多一个生产者线程和一个消费者线程的线程安全。

重要:下文解释了如何为这个循环队列实现原子、不重新排序的读写。这可能是本文最重要的部分。它解释了顺序获取-释放/宽松模型

第二部分

使其工作

至关重要的是,头部和尾部索引必须仅在元素(pop()push())读取或写入之后更新。因此,对 headtail 的任何更新都将确保对元素的正确访问。为了实现这一点,对 headtail 的读写必须是原子的,并且不能重新排序。

原子操作和不同的内存模型

下面描述了您可以从不同的 C++11 内存模型和原子操作中获得的部分功能。它绝不是全面的,但包含了必要的部分来解释两种类型的 C++11 赋能的免等待、无锁循环队列。

C++11 中有三种不同的内存排序模型,以及六种排序选项。

  1. 顺序一致模型
    • memory_order_seq_cst
  2. 获取-释放模型
    • memory_order_consume
    • memory_order_acquire
    • memory_order_release
    • memory_order_acq_rel
  3. 宽松模型
    • memory_order_relaxed

顺序一致

上述代码示例中使用了默认的顺序内存排序。这种原子、顺序一致的内存模型可能是在推理线程原子操作的可能序列时我们大多数人感到舒适的。

顺序一致内存模型和操作的规则

  1. 顺序一致的存储将与对相同数据的顺序一致加载同步。这得益于相关线程之间的全局同步。

  2. 所有参与这些顺序原子操作的线程都将看到完全相同的操作顺序。

  3. 同一线程内的原子操作不能重新排序。

  4. 一个线程中的一系列原子操作将被其他线程以相同的顺序看到

顺序一致原子代码

这些规则确保了 push()pop() 中使用的顺序原子操作是安全的。让我们回顾一下 pop() 来演示这一点。

/* Consumer only: updates head index after retrieving the element */
bool pop(Element& item)
{
  const auto current_head = _head.load();  // 1. loads sequentially the head
  if(current_head == _tail.load())         // 2. compares head to snapshot of tail
    return false;   // empty queue

  item = _array[current_head];             // 3. retrievs the item pointed to by head
  _head.store(increment(current_head));    // 4. update the head index to new position
  return true;
}

原子顺序规则保证步骤 3-4 不会重新排序( (重新排序无论如何都不会有害)。原子顺序规则还保证消费者线程在用 head.load() 读取尾部快照时不会得到乱序更新。

正是有了这种顺序内存模型,您才能感到内心温暖。推理起来似乎很容易,对吗?

顺序内存模型的注意事项

注意事项 1:
非顺序的、宽松内存原子操作可能无法看到与顺序原子操作相同的操作顺序。为了获得易于推理的操作排序能力,所有相互作用的原子操作都应该使用顺序一致内存排序。

注意事项 2
在采用强硬件内存模型 [6] 的 x86-x64 处理器架构上,顺序一致模型相对便宜 。在弱序硬件内存架构 [7] 上,顺序一致模型更昂贵,因为它需要更多的内部同步操作。在这种弱序硬件内存架构上,顺序一致模型将由于同步开销远高于获取-释放内存模型而导致显著的性能损失。

简化的原子顺序操作

关于顺序原子操作的最后一点:它们不仅易于推理,而且编写起来也更简单。由于它们是默认选择,因此无需显式写入内存操作类型。

bool CircularFifo::wasFull() const
{
  auto index = _tail.load();
  auto next_tail = (index + 1) % Capacity;
  return (next_tail == _head.load());
}

使用默认值意味着您可以编写 _tail.load() 而无需指定内存顺序。您当然可以显式写入它正在使用顺序一致内存顺序 std::memory_order_seq_cst

bool CircularFifo::wasFull() const
{
  auto index = _tail.load(std::memory_order_seq_cst);
  auto next_tail = (index + 1) % Capacity;
  return (next_tail == _head.load(std::memory_order_seq_cst));
}

精细内存顺序

使用细粒度内存模型获取-释放宽松,可以共同挤出更好的性能,这得益于更少的同步开销。顺序一致模型提供的易于推理的全局同步保证不复存在。

线程现在可能会看到相互作用的原子操作的不同视图。现在是时候非常小心了 (双关语)。引用 Anthony Williams 的话:[1,第 5.3.3 章 原子操作的内存排序]

“在没有其他排序约束的情况下,唯一的要求是所有线程就每个独立变量的修改顺序达成一致”

下面写了两个细粒度内存模型(宽松和获取-释放)的简短简化示例。该领域比此处解释的更高级,但足以解释此无锁代码,并可作为细粒度原子操作的初步介绍。

宽松内存模型

如果与获取-释放操作一起使用,宽松原子操作可能功能强大。但其本身,宽松操作可能会给粗心大意的人带来相当多的困惑。

宽松内存模型和操作的规则

  1. 同一线程上、同一变量的宽松原子操作保证在该线程内“先发生于”排序。

  2. 同一线程、同一变量上的操作不会重新排序

  3. 为了在线程之间实现同步排序,必须有一对获取 - 释放

  4. 只有变量上原子操作的修改顺序才会被传达给其他线程,但由于宽松内存操作不使用同步-与,结果可能会让你感到惊讶...

宽松内存操作示例

示例灵感来自 C++11 标准 [2, §29.3 原子操作库:“顺序和一致性”]。

// A possible thread interleaving snapshot, i.e. below is an 
// possible 'chronological' order of events between two threads
// as they actually "happen"  

// x, y are initially zero

// thread_1
auto r1 = y.load(std::memory_order_relaxed);    // t1: 1
x.store(r1, std::memory_order_relaxed);         // t1: 2

// thread_2
auto r2 = x.load(std:memory_order_relaxed);     // t2: 1
y.store(123, std::memory_order_relaxed);        // t2: 2

这些是内存无序的线程操作。即使线程交错快照以以上按时间顺序发生,事件也可能以不同的序列影响

即,以下影响序列是可能的

y.store(123, std::memory_order_relaxed);        // t2: 2
auto r1 = y.load(std::memory_order_relaxed);    // t1: 1
x.store(r1, std::memory_order_relaxed);         // t1: 2
auto r2 = x.load(std:memory_order_relaxed);     // t2: 1

导致 r1 = r2 = 123;

获取-释放内存模型

获取-释放内存模型为线程同步提供一些保证

获取-释放内存模型和操作的规则

  1. 不能保证所有原子操作的顺序

  2. 尽管如此,它仍然具有比宽松模型更强的同步顺序

  3. 保证线程对排序同步。即,在执行释放的线程和执行获取的线程之间

  4. 重排序受到限制,但仍不是顺序一致的...

    // thread_1: 
    y.store(true, std::memory_order_release);
     
    //thread_2:
    while(!y.load(std::memory_order_acquire));

    保证 thread_2在某个时刻看到 y 为真并退出 while-loopmemory_order_releasememory_order_acquire 同步。

  5. 宽松操作遵循“先发生于”规则,该规则可与获取-释放操作一起使用...

    // thread_1
    x.store(123, std::memory_order_relaxed); // relaxed X happens-before Y store
    y.store(true, std::memory_order_release);
    
    // thread_2
    while(!y.load(std::memory_order_acquire));
    assert(123 == x.load(std::memory_order_relaxed));

    “先发生于”关系保证assert(123 == x.load(relaxed))不会发生断言失败

    由于 thread_1: x.store(123,relaxed) 先发生于 y.store(true, release),因此保证当 thread_2: 读取 true == y.load(acquire) 时,x 的值必须已经为 123。

代码解释:获取-释放/宽松循环 FIFO

上述精细内存模型和操作可用于创建更有效的 push()pop() 操作。

请记住,生产者只能更新一个变量 (tail),消费者只能更新一个成员变量 (head)。

宽松内存操作仅用于在负责更新变量的那个线程中加载原子变量。在同一线程内,宽松操作不能对该变量进行重新排序。

bool push(const Element& item)

bool push(const Element& item)
{       
  const auto current_tail = _tail.load(std::memory_order_relaxed);  // 1
  const auto next_tail = increment(current_tail);                   
  if(next_tail != _head.load(std::memory_order_acquire))            // 2               
  {     
    _array[current_tail] = item;                                    // 3
    _tail.store(next_tail, std::memory_order_release);              // 4
    return true;
  }
  return false; // full queue
}
  1. tail 值的宽松加载。由于生产者线程是唯一调用 push() 的线程,因此保证 tail 值为最新。加载不需要跨线程同步。
  2. next_tailhead 值快照进行比较。head 值是成对的获取-释放同步的。它保证永远不会与后续的 head 更新乱序
  3. 该项存储在尚未同步next_tail 所指向的位置。这先于 (4) 中的 release 存储,并保证在 (4) 之前看到该存储。
  4. tailrelease 顺序更新。与消费者线程的成对同步通过消费者对 tailacquire 读取来保证。

bool pop(const Element& item)

bool pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);    // 1
  if(current_head == _tail.load(std::memory_order_acquire))           // 2
    return false; // empty queue

  item = _array[current_head];                                       // 3
  _head.store(increment(current_head), std::memory_order_release);   // 4
  return true;
}
  1. head 值的宽松加载。由于消费者线程是唯一调用 pop() 的线程,因此保证 head 值为最新。加载不需要跨线程同步。
  2. current_headtail 值快照进行比较。tail 值是成对的获取-释放同步的。它保证永远不会与后续的 tail 更新乱序
  3. 该项目从 current_head 指向的位置检索。这先于 (4) 中的 release 存储。
  4. headrelease 顺序更新。与生产者线程的成对同步通过生产者对 tailacquire 读取来保证。

摘要
生产者和消费者之间存在针对 tailhead线程对排序同步。生产者永远不会看到消费者更新的 head 的乱序更新,反之亦然,对于 tail 也是如此。

x86-x64 比较:顺序一致 vs. 精细内存顺序

那么,使用获取-释放-宽松类型的 CircularFifo 与顺序一致的 CircularFifo 相比真的有关系吗?

在 Windows (VS2012) 和 Linux (gcc 4.7.2) 上进行的一项非常快速且不那么科学的测试显示

  • Windows (x64):获取-释放比顺序一致快约 23%。
  • Windows (x86):获取-释放比顺序一致快约 22%。
  • Linux (x64):获取-释放比顺序一致快约 44%。
  • Linux (x86):获取-释放比顺序一致快约 36%。

结论

当我撰写原始文章时,它是一个平台 hack。这种 hack 不再是必需的。借助 C++11 std::atomic 可以提供所有所需的功能。最大的问题是,除了顺序一致模型之外,很难推理操作是如何相互作用的。

顺序一致模型非常容易推理和在像 CircularFifo 这样的简单结构中实现。您是否使用它或获取-释放类型取决于您。

最后,我终于读完了这篇长文章。感谢您的阅读。欢迎提出任何评论、改进建议或问题。

历史

初始版本   

  • 2009 年 11 月 3 日:初始版本
  • 2009 年 11 月 4 日:修正了句子并突出显示了关键词
  • 2009 年 11 月 12 日:根据读者反馈修正了文本。修复了图片和代码显示问题
  • 2011 年 6 月 10 日:根据读者的反馈,更新了“满队列”图片和相应文本

为安全使用而进行的完全重写,得益于 C++11 原子操作 

  • 2012 年 11 月 27 日:完全重写。使用 C++11 原子操作,展示顺序一致和获取-释放、宽松有序原子操作来构建循环 FIFO。
  • 2012 年 12 月 2 日:拼写检查更正,将文章分为第一部分和第二部分,以便于阅读。
  • 2014 年 12 月 29 日。更新了有关重排序和队列完整性的原子顺序文本。更正了拼写错误。 

参考文献

  1. Anthony Williams 的书 C++ 并发编程实战:实用多线程
  2. C++11 ISO IEC_14882:2011 (收费)
    一个免费的、稍微新一点的 草案 (n3337) 版本(但包含的内容比标准多)。
    一个较旧的免费草案 版本 (n3242)
  3. 这篇文章原始版本。警告:高度依赖于平台
  4. 维基百科关于 FIFO 队列
  5. std::atomic_flag 类
  6. 解释:弱内存模型与强内存模型 (Preshing on programming)
  7. 弱序硬件内存模型的示例 (Preshing on programming)
© . All rights reserved.