65.9K
CodeProject 正在变化。 阅读更多。
Home

无锁单生产者-单消费者循环队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.84/5 (57投票s)

2009年11月4日

公共领域

20分钟阅读

viewsIcon

473708

downloadIcon

5716

如何使用C++11制作一个无等待、无锁的循环先进先出队列。

简介

无等待和无锁循环队列对于时间和内存敏感的系统来说是一种有用的技术。队列的无等待特性使得每个操作都有固定的步骤数。队列的无锁特性使得单个源线程(生产者)到单个目标线程(消费者)之间的两个线程通信无需使用任何锁。

无等待和无锁相结合的强大功能使这种类型的循环队列在从中断和信号处理程序到实时系统或其他时间敏感软件等一系列领域中具有吸引力。

背景:循环先进先出(FIFO)队列 

此循环FIFO队列可用于最多两个线程之间的通信。这种情况通常被称为单生产者 → 单消费者问题。

问题描述:一个高优先级任务,不能被长时间的操作延迟,并且要求数据处理以与开始时相同的顺序完成。例如:中断/信号处理程序、GUI事件、数据记录器等。

解决方案:将耗时的工作委托给另一个线程,该线程接收(即消费)生成的消息。使用FIFO队列可以为任务从生产者传递到消费者异步委托和处理提供可预测的行为。

只要队列未满,生产者就可以向队列中添加新任务。只要队列不为空,消费者就可以从队列中移除任务。在队列中,添加或检索数据无需等待。如果没有要处理的任务,消费者线程可以继续执行其他任务。如果队列中没有空间供生产者使用,它不需要等待直到有空间。

显然,对生产者来说,队列已满是一种不理想的情况。必须仔细考虑其最大大小以及与处理满队列的负面影响相关的因素。处理这些问题(例如旧项目的覆盖、多个优先级队列或处理此场景的其他方式)超出了本文的范围。

背景:文章理由

早在2009年,我发表了我的第一篇CodeProject文章,内容是关于无锁循环FIFO队列。在军用航空电子工业中,我曾看到在我看来是一种用于创建无锁循环FIFO的危险技术。该队列用于中断处理程序,因为它将来自中断处理的传入消息推送到实时系统。

如果阅读本文的原始版本(也在此处发布!),将会有一段描述和推理,说明这种危险技术如何、为何以及何时可能奏效以及何时不会奏效。旧文章的目的是试图清楚地理解这种不被鼓励的技术,但并没有推荐它。

旧文章描述了一种线程通信类型,它违反了良好的C++实践,使用了高度平台依赖的循环FIFO队列。它很容易崩溃,因为它基本上是一个丑陋的平台黑客,通过简单的幻觉看起来很有吸引力。它可能会因为编译器更新或在另一个硬件架构上使用而崩溃。

在旧文章中,我曾承诺发布一个C++11支持的队列,通过使用std::atomic实现无锁。现在是时候了C++11及其内存模型,在Linux和Windows上均可用。使用‹atomic›库中的std::atomic有效地消除了再次使用不被鼓励的平台依赖黑客的需要。

本文的目的不仅是展示两个无等待、无锁的循环FIFO队列。本文还解释了管理C++11内存模型的规则。阅读本文后,您应该对‹atomic›库背后的一些基本原理有所了解。

目录

对于那些想了解循环队列内部工作原理的人,我在第一部分提供了它的简要描述和我的实现。对于那些已经熟悉它的人,可以通过转到第二部分来缩短阅读时间。第二部分简要描述了C++11不同的内存模型,以及它们如何与std::atomic一起用于创建这些循环队列的无锁特性。

第一部分

第二部分

免责声明

无锁,真的吗?但是...
C++11标准保证std::atomic_flag是无锁的。事实上,std::atomic_flag唯一保证无锁的类型。某些平台/库实现可能会在内部为原生原子类型使用锁。然而,尽管这取决于系统,但许多原生原子类型很可能是无锁的。

CircularFifo提供了一个函数bool isLockFree() const {...},可以调用它来查看它是否真正无锁,即使是在幕后也是如此。

但是...
就像在旧的CodeProject文章中一样,本文力求简单易懂。最佳解决方案可能具备的所有细节均未涵盖。这些细节通常属于以下领域:

  • 消费者/生产者处理空/满队列。
  • 处理可变大小的项目

多个生产者,多个消费者
此队列切勿使用多个消费者或生产者。使用更多的生产者或消费者线程会破坏此设计并损坏队列。此队列只能在单个生产者单个消费者场景中使用。如果需要更多线程,则应使用不同的解决方案。

没有什么的!
本文不声称引入新概念,或编写此类无锁循环队列的新方法。相反,据我所知,本文介绍了一种众所周知、广为接受的编写此类队列的方法。最终代码实际上与旧代码惊人地相似,几乎相同。比较代码可能会给人一种只有微不足道的代码更改的错觉,但这些更改的影响是巨大的。真正的区别在于原子声明和处理。这当然是一个巨大的区别,因为幕后发生了许多事情。

旧代码应被视为损坏的,旧代码不被任何标准保证工作,而此处提供的代码应根据新的C++11标准[2]工作。

公共领域
使用代码的风险自负。我当然自己也在使用,但由于其“公共领域”性质,我不会提供任何保证。我也不会就可能出现的错误做出任何承诺。使用与否是您自己的责任。可以自由使用代码,无需对我或对许可证承担任何义务。随意修改和使用代码。

尾部优先还是头部优先?
项目应该添加到尾部并从头部移除,还是相反更合乎逻辑?观点不一。本文采用“项目添加到尾部,从头部移除”的方法,以与旧文章保持一致,并使其与常见且众所周知的pop_frontpush_back队列操作相似。

内存模型:顺序一致或宽松/获取/释放?

本文介绍了两种无等待无锁循环FIFO的实现版本。第一个版本,也是最直观的版本,使用了C++11默认的内存排序memory_order_seq_cst。另一个版本,稍微复杂一些,使用了memory_order_relaxedmemory_order_acquirememory_order_release。本文将解释这些内存排序的使用,我还会尝试证明默认的memory_order_seq_cst不仅最容易理解,而且在x86-x64架构上具有非常好的性能。

第一部分

使用代码

请记住只使用一个线程作为生产者,只使用一个线程作为消费者。如果任一方涉及多个线程,它将破坏线程安全设计并损坏生产者 → 消费者通信。

使用代码再简单不过了。

CircularFifo‹Message, 128› queue;     // 128 'Messages' will fit
  
/* Producer thread */
Message m = ...
if ( false == queue.push(m)) { /*false equals full queue */ }
  
  
/* Consumer thread */
Message m;
if ( false == queue.pop(m)) { /* false equals empty queue */ }

您可以在代码附带的单元测试中找到更多示例。

代码解释:Circularfifo‹Type, Size›

让我们直接进入顺序一致代码,不多费口舌。代码略有简化以提高文章可读性。如果您喜欢完整的语法,可以打开代码并并排查看。

template‹typename Element, size_t Size› 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); 
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic‹size_t›  _tail;  
  Element              _array[Capacity];
  std::atomic‹size_t›  _head; 
};

工作原理

首先描述的队列使用默认的、顺序的原子操作。顺序模型使得推理原子操作以及它们与其他线程中的原子操作的关系变得容易。在x86x64等具有强大硬件内存(例如处理器)架构上,与后面讨论的使用细粒度std::memory_order_{relaxed/acquire/release}的CircularFifo相比,使用顺序原子CircularFifo时会产生一些性能开销。

在弱排序内存架构上,队列之间的性能差异会更显著。

压入和弹出
bool push(Item&)将由生产者线程完成。bool pop(Item&)将由消费者线程使用。

Empty

Empty Queue

当缓冲区为空时,两个索引将相同。消费者进行的任何读取都将失败。

bool wasEmpty() const
{
  return (_head.load() == _tail.load());
}

当队列为空时,消费者试图检索(bool pop(Item&))的任何尝试都将失败。使用wasEmpty()检查队列是否为空将提供队列状态的快照。当然,状态可能在读者对其采取行动之前就已改变。这没关系,应该将其视为快照,而不是其他。

生产者添加项目

生产者在由尾部索引的位置添加(push())一个新项目。写入后,尾部增加一步,如果到达队列末尾则循环回到开头。队列随着尾部的增长而增长。

Empty Queue

/* Producer only: updates tail index after setting the element in place */
bool push(Element& item_)
{	
  auto current_tail = _tail.load();            
  auto next_tail = increment(current_tail);    
  if(next_tail != _head.load())                         
  {
    _array[current_tail] = item;               
    _tail.store(next_tail);                    
    return true;
  }
  
  return false;  // full queue
}

生产者添加另一个项目,尾部再次递增。

Producer adds second item

消费者检索项目

消费者检索(pop())由头部索引的项目。头部向尾部移动,因为它增加了一步。队列随着头部的缩小而缩小。

Empty Queue

/* Consumer only: updates head index after retrieving the element */
bool pop(Element& item)
{
  const auto current_head = _head.load();  
  if(current_head == _tail.load())  
    return false;   // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head)); 
  return true;
}

如果生产者向队列中推送的项目多于消费者能够跟上的项目,队列最终将变满。

Full Queue

当队列满时,头部和尾部之间会有一个槽位差。此时,生产者进行的任何写入都将失败。是的,它必须失败,否则空队列条件,即head == tail,将变为真。

bool wasFull() const
{
  const auto next_tail = increment(_tail.load());
  return (next_tail == _head.load());
}


size_t increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

% 模运算符

如果之前没有遇到过,%模运算符可能会令人困惑。%运算符给出整数除法的余数。它可以用多种方式使用,这里它被用来计算索引何时应该以循环方式回绕到开头。

 // Example: A CircularFifo with Size of 99
 // Capacity is Size+1. The padding of +1 is used to simplify the logic
 increment(98) ---> will yield 99
 increment(99) ---> will yield 0 
}

push()中显示的next_tail比队列大小大1时,它会回绕到开头,即索引0

线程安全使用与实现

线程安全通过类设计及其预期用途得到保障。只有两个函数可以修改队列的状态:push()pop()。只允许一个线程接触push()。只允许一个线程接触pop()。当push()pop()最多由一个相应的线程使用时,线程安全得到保障。生产者线程只应访问push(),消费者线程只应访问pop()

设计可以通过接口访问进一步强调,但对于本文,我认为在这里和注释中澄清就足够了。对于一个实际项目,使用类似SingleConsumerInputQueueSingleProducerOutputQueue接口来降低因混淆而导致的错误的风险可能有意义。

按预期方式使用时的原子访问
生产者:push()只写入tail,但读取head快照以验证队列是否未满。
消费者:pop()只写入head,但读取tail快照以验证队列是否不为空。只有生产者线程会更新tail,只有消费者线程会更新head

然而,对tail和head的读写都必须是线程安全的,因为我们不希望读取旧的、缓存的值,并且读写都必须是原子的。如下所述,tailhead索引的更新是原子的。原子读/写以及此类的设计将确保最多一个生产者线程和一个消费者线程的线程安全。

重要:下文将进一步解释如何实现此循环队列的原子、不可重排的读写。这可能是本文最重要的部分。它解释了sequentialacquire-release/relaxed models

第二部分

使其工作

至关重要的是,头部和尾部索引必须在元素(pop()push())被读取或写入之后才能更新。因此,对headtail的任何更新都将确保对元素的正确访问。为此,headtail的读写必须是原子的,并且不得重新排序。

原子操作和不同的内存模型

下面描述了您可以从不同的C++11内存模型和原子操作中获得的功能的一个子集。它绝不是全面的,但包含了解释两种C++11支持的无等待、无锁循环队列类型所需的必要部分。

C++11中有三种不同的内存排序模型,以及六种排序选项。

  1. 顺序一致模型
    • memory_order_seq_cst
  2. 获取-释放模型
    • memory_order_consume
    • memory_order_acquire
    • memory_order_release
    • memory_order_acq_rel
  3. 宽松模型
    • memory_order_relaxed

顺序一致性

在上面的代码示例中使用了默认的顺序内存排序。这种原子、顺序一致的内存模型可能是我们大多数人在推理可能存在线程原子操作序列时最熟悉的。

顺序一致性内存模型和操作的规则

  1. 顺序一致的存储将与对相同数据的顺序一致的加载同步。这得益于参与线程之间的全局同步。

  2. 所有参与这些顺序原子操作的线程都将看到完全相同的操作顺序。

  3. 同一线程内的原子操作不能重新排序。

  4. 一个线程中的一系列原子操作将具有其他线程看到的相同顺序序列。

顺序一致原子代码

这些规则保证了push()pop()中使用的顺序原子操作是安全的。让我们回顾pop()来演示这一点。

/* Consumer only: updates head index after retrieving the element */
bool pop(Element& item)
{
  const auto current_head = _head.load();  // 1. loads sequentially the head
  if(current_head == _tail.load())         // 2. compares head to snapshot of tail
    return false;   // empty queue

  item = _array[current_head];             // 3. retrievs the item pointed to by head
  _head.store(increment(current_head));    // 4. update the head index to new position
  return true;
}

原子顺序规则保证步骤3-4不会重新排序( (重新排序无论如何都不会有害)。原子顺序规则还保证消费者线程在用head.load()读取尾部快照时不会得到乱序更新。

正是有了这种顺序内存模型,您才能感到内心温暖而愉悦。推理起来似乎很容易,对吧?

顺序内存模型的注意事项

注意事项1:
非顺序的、宽松的内存原子操作可能不会看到与顺序的、原子操作相同的操作顺序。要获得易于推理的操作排序的强大功能,应将顺序一致的内存排序用于所有交互的原子操作。

注意事项2
在采用强硬件内存模型[6]的x86-x64处理器架构上,顺序一致模型相对便宜。在弱序硬件内存架构[7]上,顺序一致模型更昂贵,因为它需要更多的内部同步操作。在这种弱序硬件内存架构上,顺序一致模型将由于更高的同步开销而比获取-释放内存模型遭受显著的性能损失。

简化的原子顺序操作

关于顺序原子操作的最后一点:它们不仅易于理解,也更容易编写。由于它们是默认选择,因此无需显式写入内存操作类型。

bool CircularFifo::wasFull() const
{
  auto index = _tail.load();
  auto next_tail = (index + 1) % Capacity;
  return (next_tail == _head.load());
}

使用默认值意味着您可以编写_tail.load()而无需指定内存顺序。您当然可以显式写入它正在使用顺序一致内存顺序std::memory_order_seq_cst

bool CircularFifo::wasFull() const
{
  auto index = _tail.load(std::memory_order_seq_cst);
  auto next_tail = (index + 1) % Capacity;
  return (next_tail == _head.load(std::memory_order_seq_cst));
}

精炼内存顺序

结合使用细粒度内存模型“获取-释放”和“宽松”,可以由于较少的同步开销而获得更好的性能。顺序一致模型所提供的“全局同步保证”这种易于理解的优势不复存在。

线程现在可能会看到交互原子操作的不同视图。是时候非常小心地进行线程操作了(双关语)。引用安东尼·威廉姆斯的话:[1,第5.3.3章 原子操作的内存排序]

"在没有其他排序约束的情况下,唯一的要求是所有线程就每个独立变量的修改顺序达成一致"

下面是两个细粒度内存模型——宽松和获取-释放的简短简化示例。该领域比这里解释的更高级,但足以解释这个无锁代码,并可作为细粒度原子操作的初步介绍。

宽松内存模型

宽松的原子操作如果与获取-释放操作一起使用,可能会非常强大。但其本身,宽松操作可能会给粗心的人带来不少困惑。

宽松内存模型和操作的规则

  1. 在同一线程上,对同一变量进行宽松原子操作,保证该线程内部的happens-before排序。

  2. 同一线程上,对同一变量的操作不会被重新排序。

  3. 对于线程间的同步排序,必须有一对获取 - 释放

  4. 只有对变量的原子操作的修改顺序才传达给其他线程,但由于宽松内存操作不使用同步-with,结果可能会让你感到惊讶...

宽松内存操作示例

示例灵感来自C++11标准 [2, §29.3 原子操作库:“顺序和一致性”]。

// A possible thread interleaving snapshot, i.e. below is an 
// possible 'chronological' order of events between two threads
// as they actually "happen"  

// x, y are initially zero

// thread_1
auto r1 = y.load(std::memory_order_relaxed);    // t1: 1
x.store(r1, std::memory_order_relaxed);         // t1: 2

// thread_2
auto r2 = x.load(std:memory_order_relaxed);     // t2: 1
y.store(123, std::memory_order_relaxed);        // t2: 2

这些是内存无序的线程操作。即使线程交错快照以上述时间顺序发生,事件也可能以不同的序列影响

即,以下影响序列是可能的

y.store(123, std::memory_order_relaxed);        // t2: 2
auto r1 = y.load(std::memory_order_relaxed);    // t1: 1
x.store(r1, std::memory_order_relaxed);         // t1: 2
auto r2 = x.load(std:memory_order_relaxed);     // t2: 1

结果是r1 = r2 = 123;

获取-释放内存模型

获取-释放内存模型为线程同步提供一些保证

获取-释放内存模型和操作的规则

  1. 不保证所有原子操作的顺序

  2. 尽管如此,它仍然比宽松的同步具有更强的同步顺序。

  3. 保证线程对排序同步。即,在执行释放的线程和执行获取的线程之间。

  4. 重排序受到限制,但仍非顺序...

    // thread_1: 
    y.store(true, std::memory_order_release);
     
    //thread_2:
    while(!y.load(std::memory_order_acquire));

    保证thread_2在某个时刻会看到y为真并退出while循环memory_order_releasememory_order_acquire同步。

  5. 宽松操作遵循一个happens-before规则,可以与获取-释放操作一起使用...

    // thread_1
    x.store(123, std::memory_order_relaxed); // relaxed X happens-before Y store
    y.store(true, std::memory_order_release);
    
    // thread_2
    while(!y.load(std::memory_order_acquire));
    assert(123 == x.load(std::memory_order_relaxed));

    happens-before关系保证assert(123 == x.load(relaxed))中不会出现断言失败

    由于thread_1: x.store(123,relaxed)y.store(true, release)happens-before,因此保证当thread_2: 读取true == y.load(acquire)时,x的值必须已经为123。

代码解释:获取-释放/宽松循环先进先出队列

上述细粒度内存模型和操作可用于创建更有效的push()pop()操作。

请记住,生产者只能更新一个变量(tail),消费者只能更新一个成员变量(head)。

宽松内存操作仅用于在该负责更新变量的线程中加载原子变量。在同一线程内,宽松操作不能对该变量进行重新排序。

bool push(const Element& item)

bool push(const Element& item)
{       
  const auto current_tail = _tail.load(std::memory_order_relaxed);  // 1
  const auto next_tail = increment(current_tail);                   
  if(next_tail != _head.load(std::memory_order_acquire))            // 2               
  {     
    _array[current_tail] = item;                                    // 3
    _tail.store(next_tail, std::memory_order_release);              // 4
    return true;
  }
  return false; // full queue
}
  1. tail值进行宽松加载。由于生产者线程是唯一调用push()的线程,因此保证tail值将是最新值。加载不需要跨线程同步。
  2. next_tailhead值快照进行比较。head值是成对的获取-释放同步的。它保证永远不会与随后的head更新乱序
  3. 项目保存在由尚未同步next_tail指向的位置。这happens-before(4)中的release存储,并保证在(4)之前看到保存。
  4. tailrelease顺序更新。与消费者线程的成对同步通过消费者对tailacquire读取得到保证。

bool pop(const Element& item)

bool pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);    // 1
  if(current_head == _tail.load(std::memory_order_acquire))           // 2
    return false; // empty queue

  item = _array[current_head];                                       // 3
  _head.store(increment(current_head), std::memory_order_release);   // 4
  return true;
}
  1. head值进行宽松加载。由于消费者线程是唯一调用pop()的线程,因此保证head值将是最新值。加载不需要跨线程同步。
  2. current_headtail值快照进行比较。tail值是成对的获取-释放同步的。它保证永远不会与随后的tail更新乱序
  3. current_head指向的位置检索项目。这happens-before(4)中的release存储。
  4. headrelease顺序更新。与生产者线程的成对同步通过生产者对tailacquire读取得到保证。

摘要
生产者和消费者之间对tailhead存在线程对排序同步。生产者绝不会看到消费者更新的head的乱序更新,反之亦然,对于tail也是如此。

x86-x64对比:顺序一致与精炼内存顺序

那么,使用获取-释放-宽松类型的CircularFifo与顺序一致型相比,真的有区别吗?

在Windows (VS2012) 和 Linux (gcc 4.7.2) 上进行的一项非常快速且非科学的测试显示

  • Windows (x64):获取-释放比顺序一致快约23%。
  • Windows (x86):获取-释放比顺序一致快约22%。
  • Linux (x64):获取-释放比顺序一致快约44%。
  • Linux (x86):获取-释放比顺序一致快约36%。

结论

当我写原始文章时,它是一个平台黑客。这样的黑客不再是必需的。有了C++11,std::atomic可以提供所有必要的功能。最大的问题是,除了顺序一致模型之外,很难推理操作如何交互。

顺序一致模型非常容易理解和实现在像CircularFifo这样的简单结构中。你是否使用它或者获取-释放类型取决于你。

最后,我终于写完了这篇长文。感谢您的阅读。任何评论、改进建议或问题都非常欢迎。

历史

初始版本  

  • 2009年11月3日:初始版本
  • 2009年11月4日:修正句子并突出显示关键词
  • 2009年11月12日:根据读者反馈修正文本。修复图片和代码显示问题
  • 2011年6月10日:根据读者反馈,更新“满队列”图片及相应文本。

为C++11原子操作带来的安全使用而进行的完全重写 

  • 2012年11月27日:完全重写。使用C++11原子操作,展示顺序一致和获取-释放、宽松有序的原子操作来构建循环先进先出队列。
  • 2012年12月2日:拼写检查修正,文章分为第一部分和第二部分,以便于阅读。
  • 2014年12月29日。更新了原子顺序文本中关于队列重排序和完整性的部分。修正了错别字。

参考文献

  1. 安东尼·威廉姆斯的书《C++并发编程实战:实用多线程》
  2. C++11 ISO IEC_14882:2011 (收费)”。
    一个免费的,稍新的草案 (n3337)版本(但包含的内容比标准更多)。
    一个较旧的免费草案版本 (n3242)
  3. 本文的原始版本警告:高度依赖平台
  4. 维基百科关于FIFO队列
  5. std::atomic_flag 类
  6. 解释:弱内存模型与强内存模型 (Preshing谈编程)
  7. 弱序硬件内存模型的示例 (Preshing谈编程)
© . All rights reserved.