高级:优化线程以进行函数调用,包括返回值






3.76/5 (8投票s)
C++11 头文件类,在单独线程上执行函数,包括返回值。具有独特定制的、低级的、无锁的双缓冲队列;数据格式速度快,仅用6条CPU指令即可执行函数队列:lea,call,mov,add,cmp,jb
前言
Hello World
#include "CommandQueue.hpp" CommandQueue commandQ; void helloWorld() { printf( "Hello World" ); } int main() { commandQ( helloWorld ); commandQ( [] { printf( "Hello World" ); } ); commandQ( [] { printf( "Hello " ); } )( [] { printf( "World" ); } ); commandQ.join(); return 0; }
从线程返回值
FILE* f; commandQ.returns( fopen, &f, "image.png", "r" ); //... commandQ.join(); //... fclose( f );
代码的起源
该代码最初设计为一个高性能渲染引擎的一部分,其中绘图命令(命令队列)在一个专用的渲染线程上执行。发出渲染命令需要极度优化;高性能和FPS(每秒帧数)是一切!
它必须足够灵活,可以发出任何类型的图形调用;从加载和绘制模型、纹理和着色器,到进行状态更改和处理诸如视口更改等事件;允许我将多个函数调用排队以便在后台渲染,而其他代码则并行执行,例如碰撞检测。该设计最终变得如此有用和灵活,以至于我决定将其用于所有线程间通信,包括通用的事件/消息队列、网络、GUI输入(鼠标/键盘事件)、游戏逻辑、AI、音频流等。
在做一些无关的研究时,我注意到其他人使用std::queue/vector
、环形缓冲区甚至链表来做同样的事情(从线程执行函数);虽然所有这些技术都有效,但与此相比它们非常慢!我决定公开我的代码,因为它可能对某些人有用。
背景
我相信您在这里找到的可能是最易用、最灵活、强大和最快的线程通信代码,每秒执行高达 1000 万次函数调用,在一个单核i5线程上。这是我的呈现,但您在这里看到的甚至没有触及该代码全部潜力的皮毛!它仅受您的想象力的限制!
我从未想过与任何人分享这段代码,因为我非常喜欢其设计的优雅和简洁;这是 25 年经验的成果,其中 10 多年的经验专注于编写不同的多线程实现;从事件和消息,到高性能渲染和网络队列;多年来不断改进和完善设计、灵活性、速度和易用性。
我希望至少有 50% 的考虑多线程的人是为了性能原因,如果只有 1% 的人愿意阅读和考虑我的实现,我将感到高兴!如今关于这类底层技术的文章非常少,我认为我可以为任何愿意理解和欣赏此实现的人做出有意义的贡献。
但是请不要带着评判我的方法或代码的心态来阅读这篇文章。首先尝试完全理解它,我为什么要这样做,我试图解决什么问题;然后您可能会欣赏并喜爱其简洁、潜力和原始速度!要有耐心,花时间仔细阅读!如果您第一次不理解,请再读一遍,宝石隐藏在字里行间!要完全理解和欣赏该代码的全部潜力,您需要带着开放的心态阅读和思考!当您阅读我的代码和技术时,您应该思考“我如何将这个概念应用到我的项目、我的库、我的应用程序或我的游戏中”。在您抨击我使用函数指针、原始指针算术和非结构化缓冲区之前,请考虑这项技术可能对其他人有用。
这是关于一个想法、一个概念、一种方法论的演变,是关于您思考和实现线程通信的另一种方式。让线程以更具表现力、不受限制和强大的方式通信;并附带以光速完成它的额外好处!它是关于以一种能够释放您指尖上的多线程未开发原始潜力、力量和吞吐量的方式编写代码。
据我所知,这是唯一一篇讨论这项技术与多线程相关的文章。但我并没有真正呈现任何新颖或革命性的东西,尤其是对于底层程序员来说,因为这里的一切都已经被做过了!我想让人们重新思考,扩展他们解决线程通信问题的方法。我想向您展示您如何整合您已经拥有的所有知识;关于函数指针、跳转表、模板、指针的指针、指针解引用、单/双/多/环形缓冲区、线程同步、无锁线程、堆栈和队列;并结合所有这些知识来创建您自己的、强大、灵活、高性能、简单且易于使用的线程通信协议!
然而,关于这种方法,没有任何内容是从其他关于线程通信的文章中 übernommen 的。我实际上对看到其他程序员发布的关于这个主题的设计技术感到非常难过。事实上,我在世界上任何地方都没有找到一篇我认为可以勉强使用的文章或方法。因此,找不到关于这个主题的合适文章促使我重新考虑发布我的代码。
所以请坐好,放松,让我带您乘坐魔毯,深入探索我命令队列的奇妙世界!
这篇文章花了我一周多的时间(100+小时)来写。我不是作家,我只希望您能欣赏花费的时间!
目标受众
您正在寻找一种快速、灵活且简单的方法,在轻量级工作线程上执行多个函数。
您想在工作线程上执行带有返回值的函数;但您绝不会使用`futures`和`promises`!别担心,我也没有使用这些怪物!您的理智在我这里是安全的!
您对底层 C/C++ 编程技术感兴趣,或者您能欣赏它们!
这篇文章主要面向 C/C++ 程序员,也许还有汇编程序员,但我猜任何接受函数指针作为有效数据类型的语言都可以使用这项技术。
这篇文章不是关于线程间通信如何工作的;我假设您已经知道如何向线程发送消息;我也假设您已经成功实现了一种或多种有效的方法。如果您从未实现过多线程,那么这篇文章可能不适合您!
我个人的编码理念
在开始之前,我想谈谈我个人的编码风格,这样您就能理解我的立场!
无论是否同意以下任何一点都不是必需的,只要您接受这是我的权利!
我来自游戏引擎、渲染、网络背景,那里的代码性能应该受到赞赏。
我热爱优美的代码。不仅是代码的结构、布局和间距;还有其设计的优美!
我提倡使用正统 C++、理智 C++ 和名义 C++,或者大多数反现代 C++ 的风格!
我会适度使用 templates
和 operator
重载,尤其是在它们没有运行时开销的情况下。
我喜欢使用 nullptr
!auto
也可以很好。我永远、永远不会使用 std::shared_ptr
或 std::unique_ptr
。
我很少使用 virtual
函数。但没有什么能阻止您用 virtual
函数构建一个强大而灵活的命令队列!
我从不使用 RTTI、异常或 Boost!我憎恨它们!别问我为什么!接受它并继续前进!
我个人从不使用 STL 容器;std::string
/queue
/array
/vector
等。太慢了!这只是个人选择!
我通常不会使用 std::atomic
、std::mutex
、std::thread
,但这次我破例了!我其实并不介意它们,我只是更喜欢直接调用 API,这是我习惯的!而且它们更慢!是的,它们就是!
我从不使用 static_cast
/dynamic_cast
/reinterpret_cast
!太丑陋了!如果您想用,尽管用,但我不用!
我是一名高级汇编程序员;以及高级Intel intrinsics开发者(我精通 SSE/AVX intrinsics)!
我只在Intel/AMD 32位(x86)/ 64位(x64)上开发!我就是从没必要在其他平台上开发过!
我通常喜欢尽可能底层地开发代码,越底层越好!但要在合理的范围内!
我也有抽象层,但我通常在最低层之上构建它们,并且有充分的理由!
我通常更喜欢使用直接的内核 API 调用,但我更喜欢使用 malloc
/free
而不是 HeapAlloc
!
当我有一个工作要做时,使用我讨厌的东西,我首先会考虑它的使用影响,并可能会提出抗议,并提供有效的理由。如果我的抗议无人听取,我只会默默地使用它,但至少我尝试过了!我总能在家里按我想要的方式编程!
“如果我们从不重新发明轮子,我们是不是还在乘坐马车?” - 我
如果您热爱编写高性能代码,那么我们拥有共同的热情!
致谢
这段代码和技术是站在巨人的肩膀上的!
我将这篇文章和技术献给John Carmack、Mark J. Kilgard、Michael Abrash、Bob Jenkins 和 Agner Fog,你们在我眼中都是传奇!我们还会再见到你们这样的人吗?
引言
本文讨论的是 Command Queue 中使用的函数调用协议和双缓冲队列
Command Queue 是一个易于使用、轻量级、高性能、低开销、低延迟的头文件 C++11 类。实例化时,对象创建一个无锁双缓冲队列并生成一个线程(消费者线程);该线程和缓冲区在对象生命周期内保持活动状态。
术语“command queue”(命令队列)用于指代内部用于存储和执行函数调用的定制设计协议。双缓冲用于存储函数指针和函数(命令)所需的输入值,一个缓冲区由消费者线程(内部线程)用于执行命令,而另一个缓冲区可供生产者线程(您的线程)使用以添加新命令。当内部消费者线程完成执行内部缓冲区上的所有命令(函数调用)后,它会重置缓冲区并与生产者线程交换缓冲区;为他们提供一个新的/干净的/重置的缓冲区,同时它处理其他缓冲区上的任何命令。当没有更多命令可用时,内部消费者线程将进入等待状态(睡眠),直到添加新命令。
Command Queue 的设计目标是实现函数调用最大吞吐量,而不特别关注内存消耗。通过吞吐量和低延迟,我指的是从添加新命令到执行的耗时,以及/或执行队列上连续命令之间的耗时,这是两种不同的场景,两者都通过低级优化实现最大吞吐量。
如果生产者线程没有足够的空间容纳新命令,缓冲区将动态扩展。在典型的实际场景中,缓冲区不应超过几百字节或 KB,因为缓冲区执行和轮换速度很快。然而,在持续一万亿次函数调用(每秒 1000 万次函数调用对比 std::thread 的 200,000 次)的合成压力测试期间,内存消耗达到了 4MB x2 RAM(8MB)。内存消耗在很大程度上取决于添加/调用的函数数量以及函数调用完成所需的时间。内存使用量通常是每个函数调用 20 字节,这通常在函数调用后立即被缓冲区轮换释放(重用)。
注意:代码和示例已尽可能做到易于阅读!我知道阅读和理解别人的代码有时会很困难,需要一段时间才能整体理解。这里讨论的一些概念可能是`高级`的(完全取决于您的技能水平,我不知道您是谁或您了解什么),但我希望将其分解并尽可能保持简单和易懂!重要的方面是整体概念,而不是实际的代码/实现!您可以以任何您想要的方式实现它!
优点和缺点
优点
- 能够直接从另一个线程执行的函数返回!无需 futures 和 promises!
- 低开销:每个命令 8 字节(x86)/ 12 字节(x64)+ 数据,额外+8 字节用于
execute()
存根 - 低延迟内循环:执行队列中的每个命令需要 6 条指令,这代表函数调用之间`延迟`,从开始到结束:
lea,call,mov,add,cmp,jb
- 零执行开销:函数有机会产生无额外调用开销!例如,使用 VS2015 64 位 Release 版编译并禁用堆栈帧的
printf("Hello World")
在回调中产生的总指令数为 2 条:mov
、jmp
。这也完全取决于您的函数! - 使用内部分配器在通用自管理缓冲区上进行快速的连续内存分配。`
alloc
` 的调用只是在预分配的缓冲区上预留空间,只有在必要时才使用 `realloc()
` 来扩展缓冲区。缓冲区只是被重用和轮换。 - 函数调用所需的所有输入数据,包括函数指针,都存储在连续的位置,紧跟在下一个命令之后。
- 无内存碎片,无链表或单独分配,无
free()
语句,无间隙,无临时内存分配 - 可扩展:可以从多个生产者那里每秒执行数百万次函数调用,内存占用空间小
- 队列没有传统的 push/pop 操作,`pop` 实际上只是内循环中的 `
add
` 指令 - 仅头文件:编译器拥有做出所有期望优化所需的所有信息!在 Release 版本中,VS2015 将所有我期望它做的都内联了,即使是默认内联!模板都被内联和优化,生成了我认为非常可接受的代码
- 模板的编译器警告可以提示您大多数(或所有)错误,但它们很晦涩!
- 高效睡眠:线程在没有命令时进入高效等待状态(睡眠),并在向队列添加新命令时立即激活
- 无
switch-case
语句、enums
、函数类型或跳转表查找等。 - 合成基准测试:每秒 10,000,000 次函数调用,相比之下
std::thread
的 200,000 次
缺点
- 缓冲区上的分配未对齐,它们是打包的。未对齐的内存可能会影响一些旧架构。对所有内存进行对齐需要更多的时钟周期和更多的内存。
- 不支持具有无限数量输入值的函数,目前支持最多 6 个输入参数的函数。可以添加支持调用 6 个以上参数的函数!
- 一些编译器错误消息非常晦涩,通常看起来像 Command Queue 坏了,但实际上只是函数输入值或数据类型数量不匹配,通常不容易看出错误所在!
使用示例
请下载文件!这里放不下太多!文件中有一些注释!
下载中的文件
examples.cpp - 通用示例
returns.cpp - 几个返回值示例
benchmark.cpp - Command Queue 与 std::thread
之间的合成基准测试 -不公平,因为我没有不断生成新线程!仅作为不断生成线程与重用线程成本的示例,它归结为 Command Queue 可以在 std::thread
生成所需的时间内进行500 次函数调用!
下载 commandQ.zip
Lambda 表达式
让 lambda 表达式正常工作可能很麻烦。首先,lambda 表达式不能与模板一起使用,因为我需要 lambda 的`函数指针地址`,而它们不知道如何`转换为`函数指针!
commandQ( [] { printf( "Hello World" ); } ); // OK
这没关系,因为我没有使用任何模板来处理 void(*)()
函数!
void execute( void (*function)() ) // My code to handle the lambda above
但是对于其他函数我做不到!我需要模板,它们对我来说比 lambda 表达式更重要
Lambda 表达式的转换
方法 1
commandQ.returns( [] { return "Hello World"; }, &hw ); // Not Ok! commandQ.returns( (const char*(*)()) [] { return "Hello World"; }, &hw ); // Ok! Must be cast!
我知道!这是因为 lambda 表达式可以自动转换,但不能在您使用模板时!因为它们不知道要转换为哪种函数类型!我对此无能为力,除非我创建使用固定函数的函数。
方法 2(与上面相同!)
const char* hw = ""; // Crazy I know! It's a const! const char*( *test )( ) = [] { return "Hello World"; }; commandQ.returns( test, &hw ); // Yes the lambda will replace the const! commandQ.join(); printf( hw );
一些疯狂的事情!但是,这种方法稍微容易一些,因为编译器可以帮助您!在上面的方法(方法 1)中,编译器会给我关于我的模板的消息,这让我非常困惑。看起来我的模板坏了……问题在于`const
`,是的,即使在返回值中,我知道!
性能与内存使用
显然,这个主题有很多层次和方面。所以我会尝试将其分解为各个组成部分
总内存使用:368 字节(不包括双缓冲)
在 Windows 7, Visual Studio 2015 64 位
std::thread
- 16 字节 x1
std::condition_variable
= 72 字节 x2 (144 字节)
std::mutex
= 80 字节 x2 (160 字节)
std::atomic
= 8 字节 x2(每个缓冲区一个原子变量)
每个缓冲区16 字节(8 字节地址指针,4 字节总大小,4 字节已用大小)(双缓冲区共 32 字节)
bool
`shutdown` 变量
合成基准测试
在合成测试中,将 Command Queue 与使用 std::thread
对象创建和执行函数的性能进行比较,Command Queue 在 Core i5 上能够执行每秒 1000 万次函数调用,而 std::thread
只能执行200,000 次函数调用。此基准测试纯粹是合成测试,仅能有效地描述生成线程的`成本`,因为它表明 Command Queue 可以在 std::thread
生成线程所需的时间内执行多达 500 次(小型)函数调用。
由于其轻量级和极其高效的算法,通常在单个线程上执行连续函数调用比生成新线程甚至运行线程池更有效,因为上下文切换可能会损害性能!
创建对象
这相当昂贵,因为它必须生成 std::thread
和 malloc()
双缓冲。
我使用 malloc()
因为我使用 realloc()
在运行时动态扩展缓冲区,当缓冲区需要更多空间时
创建线程
在我最初的设计中,我使用了直接 API 调用,这显然更快!但是,对于这个组件,我只是使用 std::thread
,因为我通常不会生成很多这些对象,所以 std::thread
的成本是可以接受的。
16 字节,比原生 API 调用慢,但我只创建一个内部线程!所以是可以容忍的。
队列
我使用自定义的内部`queue`实现,每个大小为 16 字节。我不使用 std::queue
;我写了一个很长的故事解释为什么不使用它,但最终删掉了,因为我不想冒犯太多人!就说 std::queue
比较慢,我宁愿开枪打自己的腿也不用 std::queue
,就这样吧!
不,等等,让我补充一点,我毫无敬意地看待那些使用这种怪物(std::queue
)作为其工作线程`任务`或执行队列的人(能力上的,并非个人……好的……我闭嘴了!
std::condition_variable
72 字节……什么?我对此真的很抱歉!不是我的代码!这是 VS2015 C++11 的结果!条件变量用于在对象`等待状态`(睡眠)期间进行通知。我至少确保在进入等待状态前清空两个队列,因为它非常昂贵,但我尽量减小了影响!
std::mutex
80 字节……我的天!同样,不知道为什么它使用 80 字节!我也不敢相信!条件变量需要互斥锁,所以我无法避免!在原生 Windows API 中,我可以使用临界区来实现条件变量,它要轻量得多!std::mutex
和 std::condition_variable
的速度比原生 API 调用慢得多!但我需要它们来支持 C++11 的`跨平台`支持。我实际上宁愿实现原生 Windows API 和 Linux API 调用,也不愿使用这些怪物!
std::atomic
谢天谢地,至少这个编译成了一个 `xchg` 指令!我可以安然入睡了!
术语
command
一个命令是函数调用(指针地址)和执行它所需的输入值(数据)的组合!
command queue
命令队列是预分配缓冲区上一个二进制安全、顺序打包的 FIFO 命令队列。
在双缓冲配置中存在两个队列。命令由生产者线程添加到其中一个命令队列中,而命令由内部消费者线程在另一个队列上执行。
command queue 协议
命令队列协议是指命令在命令队列上的布局、存储和执行
stub function
存根函数是特殊的内部函数,用于`解包`来自队列的函数调用的输入值。`存根函数`还负责传递函数返回值,就像普通函数一样!
内部组件(高级)
本节可能只与对类似组件开发感兴趣的人,或对如何解决`返回值`等挑战感兴趣的人相关。这个级别的知识不是必需的,也不是使用组件所必需的!我建议大多数人直接跳过本节,直接阅读下面的`代码分析`!阅读/理解它不是必需的!
它被标记为`高级`,因为我认为大多数人不知道创建这样一个组件所涉及的挑战、难度级别或技能集(这不是一个普通的队列/缓冲区/线程!),所以除非您尝试过制作类似的东西,否则希望您不介意这个区别;而且因为我猜大多数程序员可能对底层技术细节不感兴趣。
所以,如果您仍在阅读……我将尝试尽可能`用户友好`地呈现细节,为高级受众!我尽量使事物简单,因为即使我有时也难以理解某些文章和概念,仅仅因为它们没有以简单的方式呈现。这里的概念并不那么难……
command
伪结构
struct command_t { void (*command)(void*); // function (pointer) to execute on remote thread uint size; // sizeof( command* ) + sizeof( uint ) + sizeof( data ) char data[0..n]; // (optional) input data for function call };
命令 = 命令处理程序(函数地址)+(可选)输入值
命令是函数(通过地址)和执行它所需的输入值的组合!
注意:我实际上不使用这个结构,我直接用指针编写这个结构!它很简单,而且只写了一次
x64 内存中的命令布局
命令处理程序(最低级别)
typedef void ( *PFNCommandHandler ) ( void* data );
从外部看它非常简单,它只是一个`函数指针`typedef,但它可能是我所说的`协议`的核心和灵魂。当我提到协议时,我实际上是指由消费者线程(内部线程)内循环执行这些函数。由消费者线程执行的所有函数都具有相同的结构;它们不返回任何内容,并且接受一个单一数据指针。
数据指针就是这样一个指针,指向命令在命令队列缓冲区上的`个人`数据存储。如果您查看上面的命令的内存结构布局,数据指针就是指向橙色部分的指针。如果命令需要无数据(零)分配,那么它实际上将是指向队列中下一个命令的指针。
这个指针让命令完全控制它在命令队列缓冲区上存储的内容。这实际上可以是任何东西,从接收的网络数据包、原始文本/字符串、加载的文件、原始模型/纹理/音频数据、对象和结构的副本等。
在这个级别上处理原始数据实际上有点令人困惑。它需要大量的指针的指针和指针解引用才能工作。此外,如果您的函数需要 2、3 或 4 个参数,提取参数会有点令人困惑且容易出错。因此,我创建了存根函数,下面将对此进行解释。
有一个完整的章节专门介绍存根函数,但我只是想在这里谈谈它们;因为它们实际上是命令处理程序,在这个级别处理原始数据。它们负责从此处传递的数据指针中提取参数并调用函数。如果函数有返回值,那么存根函数会从这个数据指针中提取返回地址,并将函数返回值写入您的地址。它们只是这个级别的包装器,用于自动化处理数据指针!
伪命令处理程序示例
void cmdPrintf( char** pstring ) { printf( *pstring ); } // Note: char**, and *pstring void cmdRender( Model** model ) { (*model)->render(); } void cmdSetEvent( HANDLE* ev ) { SetEvent( *ev ); } void cmdMouseMove( int* xy ) { state->mouse_move( xy[0], xy[1] ); } void cmdTranslate( float* vec3 ) { glTranslatef( vec3[0], vec3[1], vec3[2] ); } ... rawExecute( cmdPrintf, "Hello World" ); rawExecute( cmdRender, myModel ); rawExecute( cmdSetEvent, myEvent ); rawExecute( cmdMouseMove, x, y ); rawExecute( cmdTranslate, x, y, z );
如您所见,我确实让事情更容易处理,通过重新定义 `void* data` 为任何我想要的指针。这些是`简单的`例子,只有一个输入数据类型,当您有多个不同类型的参数时,情况会变得棘手。对于同一种情况,我可以使用数组语法,但对于各种情况,我必须进行手动指针算术。所以这时存根函数就派上用场了!但是,在这个级别上,我也可以通过 rawExecuteWithCopy()
将原始数据复制到缓冲区……如果您想了解更多信息,可以阅读代码!
command queue
command queue = 二进制安全、顺序打包的命令 FIFO 队列(带可选输入数据的函数)
命令由生产者线程(您的线程)添加到命令队列中,这些线程预先确定并设置内部线程上要调用的函数,并写入函数调用的可选输入值。
每个命令(函数调用)包含其自己的可选(私有)输入数据缓冲区,该缓冲区本质上是线程安全的,并且直接存储在命令队列上。
添加到队列后,生产者线程(您的)会通知内部线程队列上有新命令可用,然后命令按顺序执行。需要注意的是,内部线程实际上不需要通知,除非它已处于等待状态,因为它会先检查两个缓冲区以确保没有命令存在,然后才进入等待状态,因为进入等待状态是`昂贵的`!
命令及其数据不是以传统意义上的方式`压入`或`弹出`队列的。在队列中的所有命令执行完毕后,缓冲区将被重置(通过将`used`索引设置为零),并与提供给生产者线程的可用队列进行重用/轮换,并查找下一组命令;因此,此缓冲区上的数据分配永远不会被释放,它们只是临时预留空间,直到缓冲区被轮换。
我使用两个独立的缓冲区,一个供`生产者线程`(您的线程)添加命令,而消费者线程(内部)在内部缓冲区上执行命令(函数调用)。在内部缓冲区上的所有命令执行完毕后,缓冲区会被交换,然后过程重新开始。所有这一切都是无锁完成的!
command queue 内存布局
stub functions
存根函数是特殊的内部受保护函数,用于从内部命令队列协议中`解包`函数调用。当消费者线程准备调用您的函数时,它会调用这些函数。
我想解释一下它们,因为它们可能对想知道我如何解决从线程返回值的`问题`的人来说很有趣。它们实际上是一个非常强大的`隐形中间人`,在幕后做了大量工作;并本质上赋予 Command Queue 对象调用常规函数调用的`幻觉`;它们还负责写入返回值。
当您在生产者线程(您的线程)上调用 execute()
和 returns()
时,该过程开始,它们将调用您的函数所需的值写入命令队列缓冲区。写入的值取决于您的意图;例如:您的函数需要多少个参数,以及它是否返回值?这些函数还将特定存根函数的地址写入缓冲区,该函数在消费者线程端执行,然后在调用您的函数之前,将负责从命令队列缓冲区中提取参数。
目前有 14 个存根函数,7 个用于 execute()
,7 个用于 returns()
。每个存根函数处理不同的情况,具体取决于您的函数调用所需的参数数量,从零到六个参数(0-6 个输入值)。没有为需要 6 个以上参数的函数提供支持,我不认识需要 6 个以上参数的函数,但如果您确实需要,可以添加它们……
只有 execute()
和 returns()
在生产者线程上调用,它们在将值写入命令队列后返回,该队列最终将由消费者线程执行。所以,当消费者线程最终准备执行您的命令时,它首先调用存根函数!然后存根函数从命令队列中`解包`您的函数地址、任何可选参数以及可选的返回值地址。
然后存根函数调用您的函数,并可选地写入您的返回值!
有趣的是,它们可以做的所有事情都可以`手动`完成,通过调用 rawExecute()
,这让您可以完全控制添加到命令队列缓冲区的内容,例如直接在缓冲区上写入网络数据包或原始文件!存根函数实际上只是最常见任务的便捷内置包装器,它们展示了命令队列协议的灵活性!
命令队列上的 64 位存根函数布局
代码分析
command handler typedef
typedef void ( *PFNCommandHandler ) ( void* data );
仅在`内部`消费者线程的内循环中使用!
双缓冲command queue 结构
struct queue_buffer_t { char* commands; uint32_t size; uint32_t used; }; queue_buffer_t buffer[ 2 ]; std::atomic< queue_buffer_t* > primary = &buffer[ 0 ]; std::atomic< queue_buffer_t* > secondary = &buffer[ 1 ]; std::mutex mtxDequeue; std::condition_variable cvDequeue; std::mutex mtxJoin; std::condition_variable cvJoin; std::thread* hThread; bool volatile shutdown = false;
CommandQueue() { this->init( 256 ); } CommandQueue( const uint32_t size ) { this->init( size ); } void init( const uint32_t size ) { this->buffer[ 0 ].commands = ( char* ) ::malloc( size ); this->buffer[ 1 ].commands = ( char* ) ::malloc( size ); this->buffer[ 0 ].size = size; this->buffer[ 1 ].size = size; this->buffer[ 0 ].used = 0; this->buffer[ 1 ].used = 0; this->hThread = new std::thread( &CommandQueue::thread, this ); } ~CommandQueue() { this->shutdown = true; this->cvDequeue.notify_one(); this->hThread->join(); free( this->buffer[ 0 ].commands ); free( this->buffer[ 1 ].commands ); }
commands
在运行时动态扩展,我通常通过调用 realloc()
来使大小加倍!
每个缓冲区独立管理,没有限制您可以拥有一个缓冲区池!
生产者线程写入命令到一块缓冲区,而消费者线程在该缓冲区上执行命令
无锁
std::atomic< queue_buffer_t* > primary = &buffer[ 0 ]; std::atomic< queue_buffer_t* > secondary = &buffer[ 1 ];
我的方法非常简单和粗糙,可能不是最高效的实现方式!
生产者和消费者线程通过替换变量中的值(交换/切换)为 nullptr
(xchg
)来拥有 primary
/secondary
指针。当生产者线程完成将命令添加到缓冲区后,它会将地址放回 primary
!所以 `if( primary == nullptr ) then`…它正在被使用!
无等待
在单生产者场景下,这些方法是无等待的,有了我的双缓冲,生产者线程总是会有一个缓冲区可用用于写入新命令!
但是,如果您有多个生产者,两个缓冲区可能不足以实现无等待!您可能需要添加更多缓冲区,或者更改设计以允许多个生产者写入同一个缓冲区。留给您练习!
consumer thread
执行命令的`内部`线程
void thread() { queue_buffer_t* buffer = secondary.exchange( nullptr ); while ( true ) { buffer = primary.exchange( buffer ); while ( buffer == nullptr ) buffer = secondary.exchange( nullptr ); if ( buffer->used ) { char* base_addr = buffer->commands; const char* end = buffer->commands + buffer->used; do // the inner loop - 6 CPU instructions to process a queue { (*(PFNCommandHandler*)base_addr)(base_addr+sizeof(PFNCommandHandler*)+sizeof(uint32_t)); base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) ); } while ( base_addr < end ); buffer->used = 0; } else if ( this->shutdown ) break; else { std::unique_lock<std::mutex> lock( mtxDequeue ); cvDequeue.wait( lock ); lock.unlock(); } } }
queue_buffer_t* buffer = secondary.exchange( nullptr );
消费者线程首先拥有 secondary
缓冲区(其中将没有任何命令),然后再进入主循环。
while ( true ) { buffer = primary.exchange( buffer ); ... }
是的,我知道,无限循环……令人震惊,不是吗!而且我不会使用 while(!shutdown)
或类似的东西!
由于我们在进入循环之前获得的 secondary
缓冲区是空的,所以我们不必处理其中的任何命令。进入循环后,我们立即将其与 primary
缓冲区交换,检查是否有任何生产者线程向其中添加了命令。
在正常的循环过程中,在处理完当前缓冲区后,它会被重置/回收并与当前活动的 primary
缓冲区交换(我们给生产者线程一个干净的缓冲区,以换取他们`已使用`的缓冲区)。这样他们就可以从一个新的干净缓冲区开始,而消费者线程则处理添加到 primary
缓冲区中的任何命令。
注意:primary
变量实际上存在高争用!通常,生产者线程获取此缓冲区,写入数据,然后放回。但消费者线程也是如此!当消费者线程完成处理缓冲区中的命令时,它会重置缓冲区并将其与 primary
变量中的缓冲区进行交换。
while ( buffer == nullptr ) buffer = secondary.exchange( nullptr );
`while ( buffer == nullptr ) then`…等待生产者线程完成它的缓冲区……进入这个循环意味着我们在执行上一次交换时,primary
线程正忙于向缓冲区添加新命令。是的,这使用了 100% 的 CPU 使用率,这是我的渲染线程,它本来就占用了 100%!
这是放置某种可等待事件系统或仅 Sleep( 0 );
的地方
在 `while` 语句之前的行中,我们将一个新但干净/空的缓冲区放入 primary
变量。但当我们交换它时,我们得到一个 nullptr
值,这意味着一个生产者线程当前正在忙于处理本应在那里的缓冲区。当生产者线程完成写入缓冲区后,并尝试将其值写回变量时,它将检测到那里有一个非空值(新的干净/空的缓冲区),而会将它的缓冲区地址放入 secondary
。这就是我们正在等待它的!
更多澄清
有两个指针:primary
和 secondary
。每个都保存一个缓冲区的地址
primary
中存在 nullptr
意味着缓冲区正在被使用(锁定,是的,我知道我说过这是`无锁`,但它确实不相关)。变量中的地址意味着缓冲区可供使用!
通常,当调用线程(生产者)要向队列写入数据时,它会通过移除地址(xchg
)并将其与 nullptr
交换来拥有 primary
缓冲区;然后写入其数据到缓冲区,并将地址放回 primary
变量!当它拥有缓冲区时,primary
变量的值为 nullptr
。当它完成缓冲区的使用后,它只需将指针放回 primary
。
然而,消费者线程可能已经完成了读取/调用/处理它当前拥有的命令处理程序(回调)的所有工作。当它完成时,它会重置缓冲区,并将其与 primary
变量中的缓冲区进行交换,但如果 primary
变量中有一个 nullptr
,则意味着另一个线程正在写入新命令。
if ( buffer->used ) { ... } else if ( this->shutdown ) break;
如果命令队列缓冲区中没有数据,那么我们测试`退出线程`条件变量!我们只在`停机时间`测试它,当线程没有其他事情可做时!
char* base_addr = buffer->commands;
base_addr
是 `char*` 的原因是,它为指针算术提供了字节大小的灵活性!
例如,`base_addr + sizeof( PFNCommandHandler* )`…移动到下一个值!
const char* end = buffer->commands + buffer->used;
只存储循环的结束地址!将内循环的指令数减少 2 条!
内循环
do { (*(PFNCommandHandler*) base_addr)( base_addr + sizeof(PFNCommandHandler*) + sizeof(uint32_t)); base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) ); } while ( base_addr < end );
(*(PFNCommandHandler*) base_addr)(base_addr + sizeof( PFNCommandHandler* ) + sizeof( uint32_t ));
我们终于可以开始调用命令处理程序,并将指向其输入数据的指针传递给它们!
记住结构:[command*][uint32_t][data]
输入 void* data
地址 = base_addr + sizeof( command* ) + sizeof( uint32_t )
base_addr += ( *( uint32_t* ) ( base_addr + sizeof( PFNCommandHandler* ) ) );
移动到下一个地址。我们命令结构中的 `[uint32_t]` 值提供了:`data size + 12 bytes`(x64),这是到下一个命令的偏移量!
将 `+ 12 bytes`(8 字节回调* + sizeof(uint32_t))(32 位为 8 字节)直接存储在我们的偏移索引中,实际上减少了内循环的指令数 1 条!它由生产者线程预先计算,生产者线程在写入缓冲区结构时已经完成了这个计算,所以是双赢!
while ( base_addr < end );
end
将是最后一个命令之后的地址,如果我们没有获取缓冲区进行处理,那么这里将是`追加`新命令的下一个地址。
buffer->used = 0;
此时,我们已完成执行命令队列中的所有命令。
在这里我们实际上重置/回收了这个缓冲区,它只是一个非常简单的索引变量,我们设置它!
之后,这里的缓冲区被轮换/交换到 primary
变量中。
重复执行!
producer thread
任何添加命令到命令队列的线程。
command = command handler (函数指针) +(可选)函数所需的输入
低级队列管理函数
这些是生产者线程端添加命令到命令队列所需的唯一函数!
acquireBuffer()
获取命令队列缓冲区的所有权。
queue_buffer_t* acquireBuffer() { queue_buffer_t* result; while ( ( result = primary.exchange( nullptr ) ) == nullptr ) ; // nothing in loop return result; }
releaseBuffer()
void releaseBuffer( queue_buffer_t* buffer ) { queue_buffer_t* exp = nullptr; if ( !primary.compare_exchange_strong( exp, buffer ) ) secondary = buffer; this->cvDequeue.notify_one(); }
当生产者线程完成写入命令时调用。
场景:生产者线程正在忙着将命令写入命令队列缓冲区(最初在 primary
中);在此期间,消费者线程完成了队列的处理,因此它将一个已回收的队列放入 primary
变量。当我们尝试将我们的值写回 primary
时,我们检测到这种情况,因为 primary != nullptr
。所以现在我们需要将我们的值写入 secondary
变量,因为消费者线程在那里等待它!
同时,在另一边,消费者线程正在等待,这是消费者线程循环中的代码
消费者线程循环中的相应代码
buffer = primary.exchange( buffer ); while ( buffer == nullptr ) buffer = secondary.exchange( nullptr );
第一行将一个新值写入 primary
变量,这被上述场景检测到!
消费者线程还会检测到 primary
变量中的值为 nullptr
,并等待生产者线程执行 releaseBuffer()
,当它这样做时,它会将值写入 secondary
变量!
allocCommand()
template< typename TCB > char* allocCommand( queue_buffer_t* buffer, const TCB function, const uint32_t size ) { const uint32_t base = buffer->used; const uint32_t reserved = sizeof( TCB* ) + sizeof( uint32_t ) + size; buffer->used += reserved; if ( buffer->used > buffer->size ) { do buffer->size *= 2; while ( buffer->used > buffer->size ); buffer->commands = (char*) ::realloc( buffer->commands, buffer->size ); } char* command = &buffer->commands[ base ]; *( ( TCB* ) command ) = function; *( ( uint32_t* ) ( command + sizeof( TCB* ) ) ) = reserved; return command + sizeof( TCB* ) + sizeof( uint32_t ); }
这是 malloc()
的命令队列等价物,它返回一个指向`size`字节的预留空间的指针。
命令位于命令队列上,这是一个更大的预分配缓冲区。所以这些分配只是在这个动态缓冲区上预留空间。如果缓冲区空间不足,它会使用 realloc()
动态增长。
同样,与 malloc()
的内部操作类似,它还在返回地址之前的内存位置写入一些隐藏数据。它写入函数指针地址和命令使用的总大小,然后返回立即在 4 字节 uint32_t 之后的字节的地址。
通用结构:[function*][uint32 size][reserved data bytes][next function* ...
注意:每个命令只能调用此函数一次!因此必须预先确定所需大小!
execute()
这些命令有一个非常长的行,我无法很好地格式化,所以我将其分成 2 行。
共有 7 个这样的函数,我不会全部显示,它们支持从零到六个(0-6)参数
接受 2 个参数的 execute() 模板
template< typename TCB, typename T1, typename T2 > void execute( const TCB function, const T1 v1, const T2 v2 ) { queue_buffer_t* buffer = acquireBuffer(); char* data = allocCommand( buffer, executeStubV2< TCB, T1, T2 >, sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( T1 ) + sizeof( T2 ) ); *( ( TCB* ) data ) = function; *( ( T1* ) ( data + sizeof( TCB* ) ) ) = v1; *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( T1 ) ) ) = v2; releaseBuffer( buffer ); }
template< typename TCB, typename T1, typename T2 >
TCB = Typename/Template Callback - 我用来表示函数调用的通用名称
void execute( const TCB function, const T1 v1, const T2 v2 )
还能说什么?
queue_buffer_t* buffer = acquireBuffer();
`获取`(获取所有权)一个缓冲区,供独占使用,该缓冲区`获取`的时间非常短,仅几条指令,除非缓冲区需要调整大小
char* data = allocCommand( buffer, executeStubV2< TCB, T1, T2 >,
第一部分:我们调用 allocCommand()
,这是`低级`调用之一,类似于 malloc()
的调用,因为它返回指向我们可以写入数据的*数据*部分的指针!
executeStubV2()
这是特殊内部处理程序函数的地址,它将在另一端从命令队列缓冲区中`解包`值,并实际进行函数调用
executeStubV2()
也是一个模板函数,接受相同的参数。此函数的地址直接写入命令队列缓冲区,通过调用 allocCommand()
完成,
sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( T1 ) + sizeof( T2 ) );
第二部分:我们在命令队列缓冲区中预留这么多空间。如果空间不足以满足此请求,则使用 realloc()
调整缓冲区大小
*( ( TCB* ) data ) = function;
您想要调用的`函数`实际上是写入缓冲区*数据*部分的第一项!
*( ( T1* ) ( data + sizeof( TCB* ) ) ) = v1; *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( T1 ) ) ) = v2;
写入函数后,我们写入函数所需的输入值(参数)。
returns()
`returns()
`调用的结构与 execute()
几乎相同!
template< typename TCB, typename R, typename T1, typename T2 > void returns( const TCB function, const R ret, const T1 v1, const T2 v2 ) { queue_buffer_t* buffer = acquireBuffer(); char* data = allocCommand( buffer, returnStubV2< TCB, R, T1, T2 >, sizeof( PFNCommandHandler* ) + sizeof( TCB* ) + sizeof( R ) + sizeof(T1) + sizeof(T2) ); *( ( TCB* ) data ) = function; *( ( R* ) ( data + sizeof( TCB* ) ) ) = ret; // This line is the main difference *( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) ) = v1; *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) ) = v2; releaseBuffer( buffer ); }
主要区别在于使用`R
`来表示 ret
urn 地址类型。
另外,数据写入的顺序略有不同。我们在写入函数指针后,但在写入参数之前,将`R ret
`值写入缓冲区!
这迫使我们重新计算输入值的位置,注意写入 T1
和 T2
时 sizeof( R )
!
相应的另一端的 returnStubV2()
函数将`解包`这些值!
returnStubV#()
template< typename TCB, typename R, typename T1, typename T2 > static void returnStubV2( char* data ) { const TCB function = *( ( TCB* ) data ); const T1 v1 = *( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) ); const T2 v2 = *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) ); **( ( R* ) ( data + sizeof( TCB* ) ) ) = function( v1, v2 ); }
有 7 个这样的函数,我只展示对应于上面 returns()
的一个!
template< typename TCB, typename R, typename T1, typename T2 >
与 returns()
几乎相同的模板
static void returnStubV2( char* data )
`应该`是 static
,此函数将由内部线程调用
非常重要的一点是 char* data
由于这是一个`命令`处理程序,并且将由内部消费者线程调用,因此它需要一个特定的预定义结构,其中包括接受一个指针。我使用 char*
的原因是它使指针加法(sizeof()
)更容易!
const TCB function = *( ( TCB* ) data );
我们从数据指针中提取函数地址
const T1 v1 = *( ( T1* ) ( data + sizeof( TCB* ) + sizeof( R ) ) ); const T2 v2 = *( ( T2* ) ( data + sizeof( TCB* ) + sizeof( R ) + sizeof( T1 ) ) );
提取 2 个参数,因为这是`2 参数`版本
**( ( R* ) ( data + sizeof( TCB* ) ) ) = function( v1, v2 );
这是我们实际进行函数调用的地方,我们将返回值直接写入存储在缓冲区中的地址。这看起来可能有点复杂,但实际上并非如此。
在数据缓冲区上,sizeof( TCB*)
之后是返回值的地址(R
),我们存储在那里。
结束
抱怨
抱歉,我必须说出来!这可以写成一整篇文章!
免责声明:我只希望更多的程序员能被激励编写执行速度更快的代码,但我们并非都拥有相同的目标或相同的知识水平,而且我知道程序员面临着按时交付的外部压力,我只希望情况不是这样!
关于其他多线程代码
真正促使我发布代码的是看了几篇关于同一主题的文章和源代码;它们简直让我恶心!我很抱歉这么说,但没有礼貌的方式来解释我看到那些邪恶的怪物、排泄物和对如此神圣事物的亵渎时感受到的!不仅源代码看起来像地狱的绝对怪物,而且整个概念都是对我的信念的亵渎,尤其是关于高性能编程!当然,您想使用环形缓冲区来`提高性能`,并可能减少内存消耗,我明白了!但问题不仅仅是缓冲区,您整个实现都有缺陷且慢得要死?试图让某样东西运行得更快有什么意义,当您只处理一半问题时,因为您整个设计都有缺陷,并且遭受了巨大的概念瓶颈,规模巨大!如果您要花时间向读者展示环形缓冲区的工作原理,因为您认为有必要加快您的实现,但您完全不知道其余代码是多么的拙劣!看在老天的份上,学会如何优化其余部分!您不需要该死的环形缓冲区!您需要了解事物在较低级别上是如何实际工作的!对于那些为执行队列实现 std::queue
/std::vector
的人,我没什么好说的!当您想到多线程时,您想到的是`性能`,不是吗?这就是重点,不是吗?那么您为什么要把整个函数队列送进一个绞肉机,一堆垃圾,慢如骡子,像 std::queue
这样的东西,您难道不知道如何编写优化代码吗?您真的看过那个怪物内部的结构吗?您对您的代码有一点尊重吗?抱歉,这部分是针对其他撰写此类文章的人,`推广`我认为不赞成的概念、想法和代码!各有所好!不是个人攻击!
关于编程的总体状况
低级程序员都去哪儿了?为什么现在关于低级技术的文章这么少?它们都已经被发现了?为什么似乎每个人都在追逐下一个热门语言或 C++ 版本,它真的是圣杯吗?难道没有人停下来思考或审视他们在做什么或使用什么,还是他们只是盲目接受而不敢窥探那些邪恶的内部结构?“我需要做 xyz”,答案是:“用 Boost 吧”!您知道 std::string
为什么有 40 字节的开销吗?为什么我感觉有必要在标题中加上“低级”这个词,以便主流/`现代`程序员不会批评我使用函数指针和原始指针算术,它们真的那么该死吗?难道没人对这些技术感兴趣了吗?这些概念对我来说甚至不算低级,而且我认为每个专业的 C/C++ 程序员都应该熟练掌握它们!为什么我该死的 Firefox 浏览器目前需要80 个线程和2GB RAM才能打开6 个标签?Gmail 和 YouTube真的需要那么多才能工作吗?同样,为什么 Google Chrome 目前需要7 个进程,99 个线程和 700MB RAM来打开单个 Gmail 标签??我刚才数过的!而且我甚至无法判断我的 Gmail 标签是在拥有45 个线程的进程上运行,还是在那个使用 350MB RAM 的进程上运行!看在老天的份上,这只是 Gmail!您知道用99 个线程和700MB RAM 我能做什么吗?一个 &^$%@&*的 Gmail 标签真的需要这么多吗??99 个线程!!!如果 Firefox 或 V8 的任何人在真正关心性能,我的浏览体验会更快!我需要不断关闭标签页,因为 Chrome 和 Firefox 消耗了我全部 8GB RAM!为什么人们普遍认为摩尔定律仍然有效,但感觉不是这样,因为每个人都在不断添加更多的抽象层,抽象之上再抽象;“嘿,看这个,我的抽象使用了另一个抽象层!”这些概念真的那么难理解和处理吗?提高标准吧,让我们回到黑客的原始意义上!
好的……抱怨完毕!呼!
消费者线程循环的汇编输出
这是我的 Windows API 版本,我只是将其留作参考。C++11 版本类似。
while ( true ) { commandQ = (command_queue_t*) InterlockedExchangePointer( &primary, commandQ ); 13F9ECDE0 48 87 1D E9 B0 00 00 xchg rbx,qword ptr [primary (013F9F7ED0h)] while ( commandQ == nullptr ) 13F9ECDE7 48 85 DB test rbx,rbx 13F9ECDEA 75 13 jne (013F9ECDFFh) 13F9ECDEC 0F 1F 40 00 nop dword ptr [rax] { commandQ = (command_queue_t*) InterlockedExchangePointer( &secondary, nullptr ); 13F9ECDF0 48 8B DD mov rbx,rbp 13F9ECDF3 48 87 1D CE B0 00 00 xchg rbx,qword ptr [(013F9F7EC8h)] 13F9ECDFA 48 85 DB test rbx,rbx 13F9ECDFD 74 F1 je (013F9ECDF0h) } if ( commandQ->used ) 13F9ECDFF 8B 43 0C mov eax,dword ptr [rbx+0Ch] 13F9ECE02 85 C0 test eax,eax 13F9ECE04 74 20 je (013F9ECE26h) { char* base_addr = commandQ->commands; 13F9ECE06 48 8B 3B mov rdi,qword ptr [rbx] const char* end = commandQ->commands + commandQ->used; 13F9ECE09 48 8D 34 07 lea rsi,[rdi+rax] 13F9ECE0D 0F 1F 00 nop dword ptr [rax] do { ( *(PFNCommandCallback*) base_addr )( base_addr + sizeof(PFNCommandCallback*) + sizeof(UINT) ); 13F9ECE10 48 8D 4F 0C lea rcx,[rdi+0Ch] 13F9ECE14 FF 17 call qword ptr [rdi] base_addr += ( *( UINT* ) ( base_addr + sizeof( PFNCommandCallback* ) ) ); 13F9ECE16 8B 47 08 mov eax,dword ptr [rdi+8] 13F9ECE19 48 03 F8 add rdi,rax } while ( base_addr < end ); 13F9ECE1C 48 3B FE cmp rdi,rsi 13F9ECE1F 72 EF jb (013F9ECE10h) commandQ->used = 0; 13F9ECE21 89 6B 0C mov dword ptr [rbx+0Ch],ebp 13F9ECE24 EB BA jmp (013F9ECDE0h) } else if ( exitThread ) 13F9ECE26 8B 05 20 B3 00 00 mov eax,dword ptr [(013F9F814Ch)] 13F9ECE2C 85 C0 test eax,eax 13F9ECE2E 74 B0 je (013F9ECDE0h) break; }