基本无锁构建块 - 无锁堆栈






4.87/5 (13投票s)
基本无锁构建块 - 无锁堆栈
简介 - 灵感来源
本文涵盖的观点
那么,我(读者)为什么要使用这个呢?
我为什么要使用这样一个过时的容器?……以及无锁编程的好处是什么?
入门:看看堆栈如何工作!
一个例子 - 一个密集的多线程推入/弹出操作
一个简单的数据容器
首先:stack::push
那么,这里发生了什么? (stack::push)
stack::push
循环
之后:关于下一步的想法
简介 - 灵感来源
我坐在电脑前,寻找编写新代码的灵感。我发现自己在想:我到底想创造什么?
然后,我有了主意。我想编写一个跨平台的基础框架,适合任何人使用——从随便写命令行应用的个人到编写3A级视频游戏的大公司。小事一桩。我知道我在做什么。应该花不了多长时间,对吧?(观众笑声)
这个想法来自我的过去。我做过的最有趣的事情是什么?最困难的是什么?我真正喜欢创造的项目是什么?内存管理、多线程、跨独立平台编码。这些有时都有点令人沮丧。而且,在大多数情况下,我猜这些事情对每个人来说都令人沮丧。
我考虑过:这个项目会是什么样子?
我知道了!它必须快。非常快。而且是多线程的。我还需要一个自定义的内存分配器。因为天哪,有时候调用 new 和 delete 太慢了。获取内存必须要快!
于是我开始了。
我很快想起了过去编写这类代码时遇到的所有问题。我会开始编写一个线程池,然后我会思考如何在线程之间传递对象。我记得过去我分析代码时,发现内存分配很多时候都是瓶颈。
这里不能发生这种情况。当然,对于许多项目来说这没什么大不了的,但对于这个项目来说是不可接受的。如果我要分配 20 或 20,000 字节的内存来发送某些东西到另一个线程,它必须是瞬时的。我的意思是,一毫秒都太慢了,我追求的是大约十几个处理器周期的水平。我的梦想很大!
我越想越意识到,我想编写最快的代码。要做到这一点,我需要一个无锁容器。这简直是提高吞吐量和减少延迟的最佳方式。 (戏剧性的音乐响起)
本文涵盖的观点
多线程 - 你的计算机同时做不止一件事情的想法。
数据同步 - 讨论如何在多线程环境中维护数据完整性。
基于锁的同步 - 一种广泛使用的数据同步方法。常见的例子是临界区和互斥锁。
无锁同步 - 一种替代的数据同步方法,其中线程不会互相阻塞以继续执行它们的工作。技术上仍然涉及锁,但是锁的获取和释放由处理器在一条指令中完成。
ABA 问题 - 广泛可用的 compare_exchange
指令的一个典型缺点是无法验证我们认为正在交互的数据没有改变。本文中的无锁堆栈通过使用标识号解决了 ABA 问题。
C++ 中的 compare_exchange - 关于 compare_exchange_weak
和 compare_exchange_strong
的文档。这里有一篇文章 讨论了两者之间的区别。
那么,我(读者)为什么要使用这个呢?
好的,很好,我已经告诉了你为什么我需要写这个,但你用这样的东西有什么目的呢?
如果你正在编写任何想要在线程之间发送某种数据的多线程代码,这是值得考虑使用的东西。在你不关心容器中项目顺序的情况下,它最有用。这里没有排序。甚至,你都无法枚举所有的项目。你得到的只是一个推入(push)和一个弹出(pop)。
好吧,好吧。如果你觉得这听起来很荒谬,并且想知道为什么会想要使用这样一个过时的容器,那么……
我为什么要使用这样一个过时的容器?
……以及无锁编程的好处是什么?
话都让你说了。是的,这是个好问题。简单的答案是:在编写多线程代码时,你为一切付出代价。你那么习惯使用的 vector?是啊,同步那个东西是相当昂贵的。当然,你可以做到。但这就是我出现的原因,为你提供一个你以前可能没意识到的选择。
看,这个老旧的、过时的堆栈在多线程领域有一个很大的优势。它。很。简单。
我所说的简单是指其内部的运作方式。仅仅因为它是一个堆栈并不意味着什么。我可以写一个线程安全的堆栈,但性能会很糟糕。但你不想读那种东西,对吧?我想也是 :D
那么我们说到哪儿了?哦,对了。简单性。这个堆栈不使用任何锁机制来保护你的数据——至少不是传统意义上的。新的 C++11 标准带来了一些很酷的新玩具,比如,我们现在可以在 C++ 中使用原子性的 compare_exchange
指令,而无需使用特定平台的汇编语言!
这些 compare_exchange
的玩意儿真是太神奇了。我稍后会更详细地介绍,但它们允许你锁定一个内存位置,检查那里的值是否是你期望的,然后如果是,该内存位置就会被更新为一个新值。而且它还会返回一个 true/false 来表示一切是否顺利。你可以在一个原子指令中完成所有这些操作。
能够在一个指令中完成所有这些,是驱动无锁编程的关键。这意味着城里有了一种新的数据同步方式,你可以扔掉那些肮脏的内核互斥锁了。好吧,不完全是。但我们正在朝那个方向发展。(旁注:我认为内核互斥锁在可预见的未来仍有一席之地,但随着时间的推移,它们的位置会变得不那么普遍)
所以最终,我不能告诉你使用这个堆栈是否适合你正在做的项目。但如果适合,它有机会比传统的锁机制提供更高的吞吐量和更低的延迟。基于 compare_exchange
函数的原子自旋锁在基准测试中通常表现出与无锁堆栈大致相当的性能。它们可能会是另一篇文章的主题。
以下是无锁编程相对于传统方法的一些巨大优势
- 无锁同步绝不会导致死锁。这根本不可能,因为没有锁可以死锁。
- 线程将始终保持前进。即使在非常短的时间内使用最快的锁,也可能导致多个线程之间的高延迟阻塞。一个例子是当操作系统抢占性地将你的程序从正在做的事情中拉走,让别人在CPU上轮流执行。使用无锁编程,当发生这种情况时,它只会延迟当前活动的线程,而使用锁则会延迟活动线程以及所有也想要该锁的线程。
无锁编程这东西潜力巨大,坦白说,我觉得它超酷。这是下一个大趋势。而我发现其中最有趣的是,它需要一种与以往不同的思考和推理方式。
入门:看看堆栈如何工作!
一个例子 - 一个密集的多线程推入/弹出操作
这是激动人心的部分!好吧,至少对我来说是激动人心的……
这是一个普通容器无法做到的例子:在没有锁的情况下,跨多个线程同时安全地添加和删除项目!
// These are the testing parameters, you can change them
#define data_count 10
#define loop_count 1000000
#define thread_count 4
// This is the test function
// It uses those numbers set at the top
// Using new and delete is part of the test - for our new/delete calls to not crash,
// the data inside the stack must be handled properly. When we don't crash, and there is no
// "lost data", we know that everything went properly when playing with the stack.
void thread_test(stack *s, std::atomic *max_elapsed, std::atomic *empty_count, size_t index)
{
// Initialization - create the data we'll test with
data* d[data_count];
for (size_t i = 0; i < data_count; ++i)
d[i] = new data;
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
// This is the test loop
// Push and pop x number of times and see if everything comes out ok on the other side.
// Also a good working sample of moving data into and out of the stack.
for (size_t loop = 0; loop < loop_count; ++loop)
{
for (size_t i = 0; i < data_count; ++i)
{
if (d[i])
s->push(d[i]);
}
for (size_t i = 0; i < data_count; ++i)
s->pop((node*&)d[i]);
}
std::chrono::high_resolution_clock::time_point finish = std::chrono::high_resolution_clock::now();
std::chrono::milliseconds span = std::chrono::duration_cast(finish - start);
std::cout << index << " - thread completed : " << span.count() << "\r\n";
*max_elapsed = span.count();
// If the test is successful, every location will hold a valid pointer, and no pointers
// will be duplicated. We test for a valid pointer, and by using delete we ensure that we
// didn't have the same data in two places - deleting the same data twice would crash.
// - may not crash on all platforms, so this test isn't meant to have total cross-platform support
for (size_t i = 0; i < data_count; ++i)
{
if (d[i])
delete d[i];
else
(*empty_count)++;
}
}
// This is the place where we get stuff done. Start up the test threads, wait for them, and then
// check the results to display to the user afterwards. No big deal :)
int main(int argc, const char * argv[])
{
std::thread threads[thread_count];
std::atomic max_elapsed{ 0 };
std::atomic empty_count{ 0 };
stack s;
std::cout << R"_(///////////////////////////////////////////////////////////////////////////////
/// SIMPLE LOCK FREE STACK ///
/// Copyright (c) 2014 Michael Gazonda - http://mgaz.ca ///
/// CPOL Licensed ///
/// See CPOL.htm, or https://codeproject.org.cn/info/cpol10.aspx ///
/// https://codeproject.org.cn/Articles/801537/ ///
/// https://github.com/MGaz/lock_free_stack ///
///////////////////////////////////////////////////////////////////////////////)_";
std::cout << "\r\nstarting\r\n";
// Start threads
for (size_t i = 0; i < thread_count; ++i)
threads[i] = std::thread(thread_test, &s, &max_elapsed, &empty_count, i);
// Wait for them to all finish
for (size_t i = 0; i < thread_count; ++i)
threads[i].join();
// Output information
size_t operation_count = data_count * loop_count * thread_count * 2;
if (!empty_count)
std::cout << "no lost data\r\n";
else
std::cout << "___lost data count___: " << empty_count << " of ";
std::cout << data_count * thread_count << "\r\n";
std::cout << "thread count : " << thread_count << "\r\n";
std::cout << "target processor bits: " << PROCESSOR_BITS << "\r\n";
std::cout << "total pushes and pops: " << operation_count << "\r\n";
std::cout << "operations per second: ";
std::cout << operation_count / (max_elapsed.load() > 0 ? max_elapsed.load() : 1) * 1000 << "\r\n";
std::cout << "processing time : " << max_elapsed.load() << "ms\r\n";
std::cout << "press any key to exit\r\n";
std::getchar();
return 0;
}
所以要记住的酷炫之处在于,你可以从任何线程随心所欲地push
和pop
,而无需担心数据容器的完整性。请记住,你仍然需要保证数据本身的线程安全。
内部原理:堆栈如何工作
让我们看看 stack.h
#include <atomic>
#include "node.h"
class stack
{
public:
void push(node* n);
bool pop(node*& n);
protected:
std::atomic<node> head_;
};
里面没什么东西。我们有一个push
,一个pop
,还有一个head_
元素。它看起来不多,然而,正是因为缺少了我们在更高级容器中获得的那些有趣玩具,才使得这个容器能够极其高效。
既然你会想要将自己的数据附加到节点上,让我们快速看看如何做到这一点
一个简单的数据容器:data.h
class data : public node
{
// Put your data in here
// You can make your own data structure to throw in the stack, but it _must_ derive from node
// Just an example, serves no purpose other than that - you may remove
int x_;
char buffer_[10];
};
所以,实际上,你只需要创建一个 class
或node
。它必须像那样派生自 node 吗?不,还有其他方法可以做到,但它们超出了本文的范围。这是让你现在就能上手并运行的方法。好了,让我们来看看堆栈是如何工作的。
首先:stack::push
void stack::push(node* n)
{
node old_head, new_head{ n };
n->n_ = nullptr;
while (!head_.compare_exchange_weak(old_head, new_head))
{
n->n_ = old_head.n_;
new_head.create_id(old_head);
}
}
那么,这里发生了什么?
我想首先提请你注意的是,只有一个原子操作(尽管它在一个循环中)。另一种选择是先用 head_.load()
,但我没有这么做。主要原因是,为了可读性和推理目的,我更喜欢尽可能少的原子操作。其次,我认为在运行 compare_exchange
之前预加载头部值并不会带来任何性能上的好处,尽管我目前的测试还不够广泛。你的情况可能会有所不同(YMMV)。
node old_head, new_head{ n };
n->n_ = nullptr;
这是初始化。我们将新节点(n
)的下一个指针(n->n_
)设置为 nullptr
。此时我们不知道头元素里有什么,但我们假定堆栈是完全空的继续进行。为了处理空栈的情况,我们将新的头部指向我们的新节点,并将我们新节点的下一个指针设置为 nullptr
。
while (!head_.compare_exchange_weak(old_head, new_head))
这太棒了!这就是原子的 push
(和 pop
)操作! 这个语句的意思是
LOOP LOCK head_ IF head_ == old_head head_ = new_head success = true ELSE old_head = head_ success = false END IF RELEASE LOCK head_ IF success BREAK LOOP END IF { DO __loop_code__ } REPEAT LOOP
所以这就是你通过一个原子性的 compare_exchange
循环所得到的东西。内容很多,我觉得把它分解成一种更传统的代码阅读方式会很有帮助。
最令人惊奇的事情不是我们得到了所有这些酷炫的功能,而是这一切都作为一个不可中断的处理器指令发生。大多数时候,你并不关心所有这些事情是否同时发生,但这次不同。费尽周折能够在一个操作中做这么多事情的原因是,这样你就可以在多线程应用程序中执行这些操作,而不用担心否则可能出现的问题。
换一种方式来考虑:当你使用多个线程工作时,你必须考虑到从一条指令到下一条指令之间“任何事情”都可能发生。避免在我们代码的各个小部分之间发生我们不希望的事情的方法是使用锁,或者使用一个指令(compare_exchange
),它能做一堆有用的、不可分割的事情。
stack::push
循环
// loop
{
n->n_ = old_head.n_;
new_head.create_id(old_head);
}
这段代码在我们每次尝试 compare_exchange
操作并失败时运行。这里发生了一些非常重要的事情,为下一次 compare_exchange
尝试做准备。
- 首先,我们将我们节点的下一个指针(
n->n_
)指向当前堆栈头部的下一个指针(old_head.n_
)。 - 接下来,我们将即将成为堆栈头部的 new_head 项的标识号设置为比当前头部的 id 号大 1。(这是解决顶部“延伸阅读”部分描述的“ABA”问题所必需的)
这样做可以确保当 compare_exchange
成功时,堆栈之后处于一个有效的状态。
现在,来看看 stack::pop
!
bool stack::pop(node*& n)
{
node old_head, new_head;
n = nullptr;
while (!head_.compare_exchange_weak(old_head, new_head))
{
n = old_head.next_pointer();
if (!n)
break;
new_head = node { n->n_, old_head };
}
return n != nullptr;
}
这其中很多都与 push 操作非常相似。只是有一点点不同。
node old_head, new_head;
n = nullptr;
在 push
中,我们将 next 初始化为“n
”,而现在我们将该值初始化为 nullptr
。同样,在推入时,我们将 n->n_
初始化为 nullptr
,而现在我们将 n
初始化为 nullptr
。工作原理是相同的,我们只是在与之前相反的方向上操作。
while (!head_.compare_exchange_weak(old_head, new_head))
推入(push)和弹出(pop)操作中最重要的那行代码完全相同! 这很酷,也有助于理解代码。无论是推入还是弹出,每次我们运行 compare_exchange
时,都需要准备好“new_head
”。设置过程有点不同,但仅此而已。
{
n = old_head.next_pointer();
if (!n)
break;
new_head = node{ n->n_, old_head };
}
这是我们的 pop
循环。它比 push
循环多了一些东西,但思想是相同的。
当我们的 compare_exchange
成功时,我们希望返回的 node*
指向从堆栈中弹出的那个 node
。这就是 n = old_head.next_pointer();
正在做的事情。
接下来,我们要测试指针,看那里是否有东西。如果没有东西,即 (!n
) 的值为 true,那么我们退出循环并返回 false
。如果发生这种情况,意味着我们试图弹出堆栈,但堆栈里什么都没有。
最后,我们创建 new_head
。它将指向堆栈上的第二个项(如果没有的话,就是 nullptr
)。我们还需要给 new_head
一个 ID 号,所以我们也将 old_head
传递过去。new_head
的 ID 将比 old_head
的 ID 大一。
node.h
中的魔法
struct node
{
node *n_;
#if PROCESSOR_BITS == 64
using stack_id_t = uint16_t;
inline node() : n_{ nullptr } { }
inline node(node* n) : n_{ n } { }
inline void create_id(const node& nid)
{
((stack_id_t*)this)[3] = ((const stack_id_t*)&nid)[3] + 1;
}
inline node* next_pointer()
{
return (node*)((uint64_t)n_ & 0x0000ffffffffffff);
}
#elif PROCESSOR_BITS == 32
using stack_id_t = uint32_t;
stack_id_t t_;
inline node() : n_{ nullptr }, t_{ 0 } { }
inline node(node* n) : n_{ n }, t_{ 0 } { }
inline void create_id(const node& nid) { t_ = nid.t_ + 1; }
inline node* next_pointer() { return n_; }
#else
static_assert(false, "unknown pointer size");
#endif
inline node(node* n, const node& nid) : n_{ n }
{
if (n_)
create_id(nid);
else
create_id(*this);
}
};
对于一个简单的节点来说,内容很多,是吧?它可能看起来有点奇怪,让我来解释一下。
还记得堆栈中出现的那些 ID 号吗?嗯,这就是它的工作原理。在 32 位系统上,我们简单地给我们的节点附加另一个 32 位数字,然后我们在 64 位的节点及其标识号上运行我们的原子 compare_exchange
操作。
对于 64 位计算来说,至少目前,情况并不那么简单。当给节点附加第二个 64 位数字时,我尝试的当前 C++ 编译器(clang 和 Visual Studio 2013)不允许 128 位的无锁 compare_exchange
s。我可以编写代码并运行它,但在底层,编译器将我的无锁代码转换成了使用锁的代码。
幸运的是,在 x64 系统上,只有 48 位的指针用于内存寻址。其余 16 位未使用。这为我们在运行 compare_exchange
操作时,提供了与指针一起存储 ID 号的空间。上面的代码就是对此的实现。
就这样!一个无锁堆栈。
之后:关于下一步的想法
- 在你的代码中找到需要同步的信息,看看这个解决方案是否合适
- 创建一个不使用锁作为消息容器的线程池。
- 高级 - 创建一个无锁内存分配器
- 研究常见的库调用,以及它们是否可能是阻塞的。
new
和delete
是阻塞的吗?如果它们是阻塞的,你如何编写无锁代码?
历史
- 版本 1 - 2014年7月26日
- 版本 2 - 2014年7月30日 - 增加了更多背景信息,以及人们为何/如何选择此方案用于其项目。改进了格式。添加了用于在Linux(Code::Blocks)上编译的工作项目文件。将许可证更改为CPOL。
- 版本 3 - 2014年8月7日 - 添加了目录,细微的格式改进 + 修正了代码拼写错误。