使用共享内存和 InterlockedCompareExchange 进行快速 IPC 通信(已更新!)






4.44/5 (45投票s)
2006年7月10日
18分钟阅读

505284

13554
编写一个非常快速的进程间通信类的指南。本文描述了一种不需要锁定或线程同步的 IPC 方法。
引言
IPC (进程间通信) 已经有很多文章介绍,所以“再来一篇”介绍如何实现 IPC 的文章可能没有太大必要。然而,关于如何实现一个快速 IPC 类的信息非常缺乏。这就是本文的目的。
鉴于关于 IPC 实现的信息已经很多,我将不会深入探讨 IPC 实现的内部工作原理,而是专注于如何使其非常快速。
实现 IPC 的方法有几种,这里只列举其中一些:
- 共享内存
- TCP
- 命名管道
- 文件映射
- 邮槽
- MSMQ (Microsoft 消息队列)
有这么多选择,我们怎么知道哪个最快?答案很简单,这个问题没有理想的最快解决方案。每种都有其优缺点,但有一种无疑脱颖而出……共享内存。
共享内存实现不仅最容易实现,而且也是最快的实现之一。您可能会问,为什么它会是其中最快的?开销最小。开销是在调用另一个函数时产生的。无论是内核函数还是库函数,如果您的 IPC 不调用任何其他函数,那么您就已经消除了一个巨大的瓶颈。共享内存 IPC 不需要第三方函数调用。
所以,我们现在知道了最好也是最广泛使用的 IPC 实现之一,现在我们只需要找到实现它的最快方法之一。
更新时间
本文已更新。最近的更改包括实现了带读写指针的循环链表。这些更改是为了解决 LIFO (后进先出) 问题。先前的实现由于单链表设计,只能提供一个 LIFO 的传输系统。这个问题现在已经得到解决,并带来了传输速率提高的好处。如果您想跳过初始文章,请跳转到下一个已更新的标题。
(感谢 Jean 的想法!)
背景
在任何 IPC 实现中,最好采用服务器/客户端通信方法。另外,为了简单起见,最好是单向通信,通常是发送给服务器。将 IPC 改成双向通信非常容易。只需在两个进程中创建两个 IPC 服务器。让您的 IPC 通信单向进行,可以让您专注于性能问题。
要编写一个快速的共享内存 IPC,您需要实现几件事情。
首先,您需要在分配的内存中有多个块。使用多个块的原因是,当一个线程获得 CPU 执行时间时,它能够将大量不同的数据块发布给其他线程,而不会被阻塞。最快的优化方法之一是批量处理请求。只发布一个块,然后等待上下文切换让服务器处理一个块,这将是极其低效的。我的实现使用以下块结构:
// Block that represents a piece of data to transmit between the // client and server struct Block { // Variables // Next block in the single linked list volatile LONG Next; // Amount of data help in this block DWORD Amount; // Data contained in this block BYTE Data[IPC_BLOCK_SIZE]; };
其次,您需要某种形式的进程间线程同步。没有它,多个线程可能会写入同一个块,导致最坏情况下的数据损坏,以及最糟糕的死锁 (100% CPU 使用率)。在我的实现中,我使用事件,因为它们非常快,比信号量或互斥体快得多。
// Handle to the mapped memory file HANDLE m_hMapFile; // Event used to signal when data exists HANDLE m_hSignal; // Event used to signal when some blocks become availa HANDLE m_hAvail;
最后,您需要一种方法来阻塞服务器线程的执行,使其在等待数据时处于阻塞状态。最快的方法之一是 `WaitForSingleObject` 函数调用。我的实现使用此函数并等待命名事件。
大多数共享内存 IPC 的实现与以下类似:
服务器
- 创建具有 N 个固定大小 X 块的命名共享内存
- 创建用于防止块同时访问和竞态条件的线程同步对象
- 等待一个信号,表示需要处理块
- 处理块并将它们标记为再次可用
- 转到步骤 3
客户端
- 打开命名共享内存
- 等待块可用
- 写入块
- 发出信号,表示需要处理块
- 转到步骤 2
快速同步
使用共享内存时最大的问题之一是防止多个线程同时访问块。因此,我们需要一种一次只允许一个线程访问块的方法。
您必须克服的第一个障碍是决定如何组织您的块。您必须有效地提供两组块。一组是可用于新数据的块,而第二组是需要处理的块。一种非常有效的方法是创建两个双向链表,一个包含可用于新数据的块,另一个包含需要处理的块。然后,程序员只需要在代码中编写临界区来保护这两个列表。这种方法效果很好,也是我早期实现 IPC 的方法之一。然而,在实现之后,我对速度/带宽非常失望。
使用临界区和双向链表,我在 3.2Ghz P4 上只能产生每秒 40-50k 的块传输,然后就会达到 100% 的 CPU 使用率。一点也不令人印象深刻。
那么,这里的瓶颈是什么?同步。浪费的大部分 CPU 时间都花在了进入和离开临界区、阻塞 CPU 执行以及线程上下文切换上。我读到的最好的建议之一是关于图形引擎设计,“最快的三角形是未绘制的三角形”。将其应用于当前上下文,最快的临界区就是根本没有临界区。但是如果我们不使用临界区,我们如何保护我们的列表呢?答案很简单,使用多线程安全的列表……单向链表。引入“`InterlockedCompareExchange`”。
最初,我使用 `InterlockedPopEntrySList` 和 `InterlockedPushEntrySList` 函数与单向链表。然而,我遇到了一个相当严重的问题。起初,当我在这同一个进程内测试 IPC 服务器和 IPC 客户端时,它似乎工作得很完美。然而,在两个独立进程上测试该类时,我遇到了一个很大的困境。虚拟内存。
单向链表中的每个块都必须有一个指向下一个块的指针。现在,如果我们深入研究 `SLIST_ENTRY`,我们会发现以下问题:
typedef struct _SLIST_ENTRY { struct _SLIST_ENTRY* Next; } SLIST_ENTRY, *PSLIST_ENTRY;
标准的 Windows 交错列表使用内存指针指向下一个块,但这在使用多个进程时是一个严重的问题,因为另一个进程拥有完全不同的内存地址空间。虽然内存块指针在一个线程上下文中有效,但一旦切换到另一个上下文,它就不再有效,从而导致访问冲突。
`InterlockedPopEntrySList` 不能使用。但这个概念可以,我们只需要重写函数,使其不使用指针。这时我的块结构就派上用场了。如果您回顾一下定义,您会注意到它有这个:
volatile LONG Next;
volatile 语法告诉编译器确保不要在此变量中使用 CPU 缓存。如果 CPU 使用缓存,它可能会假设 `Next` 指针是某个缓存值,但另一个线程可能在获取缓存和使用变量之间更改了它。另外,请注意变量类型是 `LONG`。这是因为它实际上表示了在整个映射内存中下一个块开始位置的字节距离。这使得下一个指针相对于当前地址空间是相对的。我们现在只需要为这个新的块结构编写我们自己的 `InterlockedPushEntrySList` 和 `InterlockedPopEntrySList` 函数。这是我的函数实现:
void osIPC::Client::postBlock(Block *pBlock) { // Loop attempting to add the block to // the singlely linked list LONG blockIndex = (PointerConverter(pBlock).ptrLong - PointerConverter(&m_pBuf->m_Blocks).ptrLong) / sizeof(Block); for (;;) { LONG iFirst = pBlock->Next = m_pBuf->m_Filled; if (InterlockedCompareExchange(&m_pBuf->m_Filled, blockIndex, iFirst) == iFirst) break; } // Signal the event SetEvent(m_hSignal); };
osIPC::Block* osIPC::Client::getBlock(void) { // Loop attempting to grab a block for (;;) { LONG blockIndex = m_pBuf->m_Available; if (blockIndex == 0) return NULL; Block *pBlock = m_pBuf->m_Blocks + blockIndex; if (InterlockedCompareExchange(&m_pBuf->m_Available, pBlock->Next, blockIndex) == blockIndex) return pBlock; } };
`m_pBuf` 是指向映射内存的指针,该内存具有以下结构:
struct MemBuff { // List of available blocks volatile LONG m_Available; // List of blocks that have been filled with data volatile LONG m_Filled; // Array of buffers that are used in the communication Block m_Blocks[IPC_BLOCK_COUNT]; };
服务器将使用非常相似的函数,但操作于相反的列表。
WaitForSingleObject 和信号
我们快完成了。我们现在有两个列表,客户端和服务器都可以同时操作,而不会出现多线程问题。我们现在只需要一种方法来告诉客户端块现在可供写入,并告诉服务器块可供处理。
这可以通过事件和 `WaitForSingleObject` 以及适当的超时来实现。一旦拥有了这些和列表,您就拥有了一个非常快速的 IPC,只要两个线程都能获得足够的 CPU 时间,它就不会进入阻塞状态或进行任何外部函数调用。
速度
到目前为止,您一定在想这个实现到底有多快。它很快,非常快!不仅每秒块数非常快,而且只要保持代码优化,带宽就可以非常巨大。
在我 3.2Ghz P4 1024 MB RAM 的笔记本电脑上,我最近测量了这些速度:
带宽测试 (包大小:3000 字节) -> 800,000,000 字节/秒
速率测试 (包大小:100 字节) -> 650,000 包/秒
这些数字确实不言而喻,我从未见过任何实现能与之相比。我已在双核计算机上进行了测试,客户端和服务器在不同的 CPU 上运行,带宽达到了 1,000,000,000 字节/秒,速率达到了 2,000,000 包/秒!!!!
当用多个客户端对其进行压力测试时,最有趣的性能提升显而易见。只要块计数与客户端数量成比例,总带宽对于中等数量的客户端几乎不受影响。
问题?
这个实现应该有一个明显的設計缺陷?FIFO (先进先出)。双向链表在其他实现中使用了很长一段时间,这是有原因的,它们允许程序员将块添加到列表的末尾。这确保了添加到列表中的第一个块将是第一个被处理的块。单向链表只允许将块添加到列表的开头,这意味着第一个添加的块将是最后一个被处理的块 (FILO)。如果块的添加速度比处理速度快,那么第一个添加的块可能会在列表中等待很长时间才能被处理,而新的块却被处理了。如果您知道并理解这个限制,那么没问题,但如果这个限制不可接受呢?例如,假设您正在尝试复制本地 TCP 连接的行为。TCP 连接必须保证 FIFO。
答案很简单。当服务器处理块时,只需创建另一个单向链表并先填充此列表。当服务器从“待处理”列表中弹出块时,它会在处理块之前将其添加到临时列表中。只要服务器确保在像往常一样处理临时列表之前,将“待处理”列表中的所有块都取出,它就能确保以 FIFO 方式处理块。在脑海中尝试推导一下,您就会明白它是如何工作的。
祝您编码愉快!
更新时间
已引入
对 FIFO 的需求导致了本文档的更新。如本文 Problems 部分所述,现有实现存在一个严重缺陷:LIFO。它只能读取添加到列表的最后一个块。它们不仅颠倒了,而且是混乱的。这意味着发送给 IPC 的所有数据都将失序,对于大多数程序来说,这是一个不可接受的限制。
我纠正这个问题的第一个计划是使用另一个单向链表,并在处理它们之前将所有块推入此列表。这将导致顺序颠倒。虽然这种方法在一定程度上可行,但仍然存在一个主要问题。如果一个块在第二个列表完全填充之前被添加到已填充的列表中,这个新块可能会出现在列表的开头和结尾之间的任何位置。这将导致既不是 FIFO 也不是 LIFO。
Jean 和其他人的评论引导我尝试了另一种实现方式。带读写游标的循环链表。IPC 类需要完全重写才能使用这种模型,所以我们直接进入概念。
概念
首先,让我解释一下提出的方案。双向循环链表等同于双向链表,但列表的开始和结束指向彼此。实际上,如果您试图遍历这样的列表,您会永远循环遍历每一个条目。例如:
现在我们引入一些读写指针。它们都必须遵循一些既定规则:
- 所有指针都不能越过彼此
- 结束指针可以等于开始指针(表示零数据)
- 读写指针不能相等
这种设计理念意味着我们可以同时向许多块写入数据,同时又从许多块读取数据。只要在写入开始游标前面有空间,写入就不会被阻塞。同样,只要在读取游标前面有空间,读取操作就不会被阻塞。
这种设计还处理了多重读取和写入。可以处理同一个 IPC 的线程数量等于其中块的数量,这包括读取和写入。正在进行读取和写入的区域(图中绿色和红色区域)只包含属于其他线程的块,这意味着它们是多线程安全的。一旦检索到块,该线程就有效地拥有该块。这种方法的主要优点是该系统现在以 FIFO 方式运行。
出于性能原因,我们不想使用任何临界区、互斥体或其他线程同步,就像现有实现一样。这意味着,移动指针的所有操作都必须原子地执行。即,从所有其他线程的角度来看,它们似乎在一个指令中完成。这确保了指针不会被多个线程尝试移动相同的读写游标而损坏。
好的。我们来深入代码。
首先,我们必须修改数据结构并设置循环链表。这都是在创建 IPC 服务器时完成的。
// Block that represents a piece of data to transmit between the // client and server struct Block { // Variables // Next block in the circular linked list LONG Next; // Previous block in the circular linked list LONG Prev; // Flag used to signal that this block has been read volatile LONG doneRead; // Flag used to signal that this block has been written volatile LONG doneWrite; // Amount of data held in this block DWORD Amount; // Padded used to ensure 64bit boundary DWORD _Padding; // Data contained in this block BYTE Data[IPC_BLOCK_SIZE]; };
我们对这个结构做了一些更改。首先,我们添加了一个 `previous` 指针,它将指向循环列表中前一个块。其次,我们添加了 `doneRead` 和 `doneWrite` 标志。稍后将详细介绍这些。我们还将所有内容都对齐到了 64 位边界。这是针对 64 位机器的优化。
// Shared memory buffer that contains everything required to transmit // data between the client and server struct MemBuff { // Block data, this is placed first // to remove the offset (optimisation) Block m_Blocks[IPC_BLOCK_COUNT]; // Array of buffers that are used in the communication // Cursors // End of the read cursor volatile LONG m_ReadEnd; // Start of read cursor volatile LONG m_ReadStart; // Pointer to the first write cursor, // i.e. where we are currently writting to volatile LONG m_WriteEnd; // Pointer in the list where we are currently writting volatile LONG m_WriteStart; };
我们将块数组移到了共享内存缓冲区的最前面。这是一个性能优化,可以消除每次引用块时需要添加的偏移量。
所有的单向链表都已被移除,并被游标(一个用于读取,一个用于写入)取代。
现在让我们构建循环链表并设置默认游标位置(服务器初始化代码)。
// Create a circular linked list int N = 1; m_pBuf->m_Blocks[0].Next = 1; m_pBuf->m_Blocks[0].Prev = (IPC_BLOCK_COUNT-1); for (;N < IPC_BLOCK_COUNT-1; N++) { // Add this block into the available list m_pBuf->m_Blocks[N].Next = (N+1); m_pBuf->m_Blocks[N].Prev = (N-1); } m_pBuf->m_Blocks[N].Next = 0; m_pBuf->m_Blocks[N].Prev = (IPC_BLOCK_COUNT-2); // Initialize the pointers m_pBuf->m_ReadEnd = 0; m_pBuf->m_ReadStart = 0; m_pBuf->m_WriteEnd = 1; m_pBuf->m_WriteStart = 1;
数组中的第一个块连接到第二个和最后一个块,同样,最后一个块连接到第一个和倒数第二个块。这就完成了循环。
然后,我们设置读写指针。开始和结束是相等的,因为当前没有进行读写操作,并且由于尚未添加数据,因此读游标正好在写游标后面。
现在,我们需要实现一个添加数据的方法。这应该将 `writeStart` 指针向前移动(但仅当有空间可以扩展时),从而增加 `writeStart` 和 `writeEnd` 游标之间的距离。一旦这个距离被线程扩展,就可以访问生成的块,允许将数据写入其中。
// Grab another block to write too // Enter a continous loop (this is to make // sure the operation is atomic) for (;;) { // Check if there is room to expand the write start cursor LONG blockIndex = m_pBuf->m_WriteStart; Block *pBlock = m_pBuf->m_Blocks + blockIndex; if (pBlock->Next == m_pBuf->m_ReadEnd) { // No room is available, wait for room to become available if (WaitForSingleObject(m_hAvail, dwTimeout) == WAIT_OBJECT_0) continue; // Timeout return NULL; } // Make sure the operation is atomic if (InterlockedCompareExchange(&m_pBuf->m_WriteStart, pBlock->Next, blockIndex) == blockIndex) return pBlock; // The operation was interrupted by another thread. // The other thread has already stolen this block, try again continue; }
首先,我们需要确保操作是原子的。为此,块索引被记录下来,以便稍后由 `InterlockedCompareExchange` 使用。
接下来,我们确保 `writeStart` 游标不会越过任何未完成的读取操作(`readEnd`)。如果会,那么我们需要等待一些空间可用。如果用户不想等待,则提供超时。如果 IPC 缓冲区已完全填满数据,就会发生这种情况。
然后使用 `InterlockedCompareExchange` 调用向前移动游标。这可以确保在所有检查进行期间游标没有被另一个线程修改,先前记录的值(`blockIndex`)被用作参考。如果调用成功,则返回块。
// Grab a block Block *pBlock = getBlock(dwTimeout); if (!pBlock) return 0; // Copy the data DWORD dwAmount = min(amount, IPC_BLOCK_SIZE); memcpy(pBlock->Data, pBuff, dwAmount); pBlock->Amount = dwAmount; // Post the block postBlock(pBlock); // Fail return 0;
这块代码负责实际的数据写入。一旦数据复制到块中,线程就通过调用 `postBlock()` 函数放弃对该块的所有权。这将向前移动 `writeEnd` 游标,允许对我们已发布的数据进行读取操作。
void osIPC::Client::postBlock(Block *pBlock) { // Set the done flag for this block pBlock->doneWrite = 1; // Move the write pointer as far forward as we can for (;;) { // Try and get the right to move the poitner DWORD blockIndex = m_pBuf->m_WriteEnd; pBlock = m_pBuf->m_Blocks + blockIndex; if (InterlockedCompareExchange(&pBlock->doneWrite, 0, 1) != 1) { // If we get here then another thread // has already moved the pointer // for us or we have reached as far // as we can possible move the pointer return; } // Move the pointer one forward (interlock protected) InterlockedCompareExchange(&m_pBuf->m_WriteEnd, pBlock->Next, blockIndex); // Signal availability of more data // but only if threads are waiting if (pBlock->Prev == m_pBuf->m_ReadStart) SetEvent(m_hSignal); } };
还记得我们在块结构中添加的标志吗?它们现在在 `postBlock()` 函数调用中使用,一个用于读取,一个用于写入。这个特定的情况使用了 `doneWrite` 标志。下面将详细解释使用这些标志的原因。
没有 `doneWrite` 标志,会有一个严重的问题。想象一下这种情况。一个属于线程的未完成块发布回一个块。这个块不是“写入”内存部分中的第一个块(即,它不等于 `m_WriteEnd`)。因此,我们不能向前移动 `writeEnd` 游标。这样做意味着实际上拥有第一个块的另一个线程已经失去了其独占所有权,更不用说如果这个其他线程试图发布这个已经被考虑在内的块时会发生什么问题了。因此,我们延迟移动 `writeEnd` 游标,将游标的责任留给拥有第一个块的线程。但是,为了表示我们已经放弃了对该块的所有权,我们设置了 `doneWrite` 标志。
如果没有这个 `doneWrite` 标志,当稍后实际向前移动第一个块时,无法知道其余块是否都已返回,因此需要这个标志。该标志初始化为零;一旦块被返回,它就被设置为一。当 `writeEnd` 指针向前移动使块再次可用时,该标志又被归零(`InterlockedCompareExchange`)。
`writeEnd` 指针将向前移动到最后一个已完成的块,即所有 `doneWrite` 标志设置为一的连续块。请注意,根据设计,这也确保了 `writeEnd` 游标永远不会越过 `writeStart` 指针。
最后,我们向所有等待读取数据的线程发出信号。
现在我们有了在缓冲区中填充块的方法,我们只需要一种方法来读取那些块。继续进行读取操作。
osIPC::Block* osIPC::Server::getBlock(DWORD dwTimeout) { // Grab another block to read from // Enter a continous loop (this is to // make sure the operation is atomic) for (;;) { // Check if there is room to expand the read start cursor LONG blockIndex = m_pBuf->m_ReadStart; Block *pBlock = m_pBuf->m_Blocks + blockIndex; if (pBlock->Next == m_pBuf->m_WriteEnd) { // No room is available, wait for room to become available if (WaitForSingleObject(m_hSignal, dwTimeout) == WAIT_OBJECT_0) continue; // Timeout return NULL; } // Make sure the operation is atomic if (InterlockedCompareExchange(&m_pBuf->m_ReadStart, pBlock->Next, blockIndex) == blockIndex) return pBlock; // The operation was interrupted by another thread. // The other thread has already stolen this block, try again continue; } }; void osIPC::Server::retBlock(osIPC::Block* pBlock) { // Set the done flag for this block pBlock->doneRead = 1; // Move the read pointer as far forward as we can for (;;) { // Try and get the right to move the poitner DWORD blockIndex = m_pBuf->m_ReadEnd; pBlock = m_pBuf->m_Blocks + blockIndex; if (InterlockedCompareExchange(&pBlock->doneRead, 0, 1) != 1) { // If we get here then another // thread has already moved the pointer // for us or we have reached as far // as we can possible move the pointer return; } // Move the pointer one forward (interlock protected) InterlockedCompareExchange(&m_pBuf->m_ReadEnd, pBlock->Next, blockIndex); // Signal availability of more data // but only if a thread is waiting if (pBlock->Prev == m_pBuf->m_WriteStart) SetEvent(m_hAvail); } }; DWORD osIPC::Server::read(void *pBuff, DWORD buffSize, DWORD dwTimeout) { // Grab a block Block *pBlock = getBlock(dwTimeout); if (!pBlock) return 0; // Copy the data DWORD dwAmount = min(pBlock->Amount, buffSize); memcpy(pBuff, pBlock->Data, dwAmount); // Return the block retBlock(pBlock); // Success return dwAmount; };
请注意,此实现也与写入块的操作相同,只是游标和标志不同。仔细想想,这是有道理的,因为我们实际上在做同样的事情,即按顺序移动一块区域。如果我们愿意,我们可以在读写游标之间添加另一个游标来执行其他操作。显然,这个特定的实现不需要这个,但它可能对其他人有用。
基本上就这些了。我们有写入块的能力,也有读取块的能力。所有函数都是多线程安全的,它们都使用最小的开销,并且除非绝对必要,否则不会阻塞/将 CPU 执行传递给其他线程。
现在我们拥有了执行 IPC 通信所需的所有功能,并且是以 FIFO 方式进行的。让我们看看它的表现!!!
性能
带宽测试
传输速率测试
所有数字都在上升,这是好事!!请特别注意传输速率的提高。每秒超过 200 万次块传输!这是巨大的传输潜力。带宽主要受限于数据传输到块和从块传输数据的速度,即完成 `memcpy` 调用所需的时间,但即使在进行大量内存传输时,IPC 的性能也远超 7.5 GBit/s。这已经非常接近物理 RAM 的写入速度了。为了更直观地理解,可以尝试在一个简单的循环中放置一个 `memcpy` 调用,看看您能获得什么样的传输速率。如果您将此 IPC 实现的开销与其他程序部分进行比较,您会发现它几乎没有开销。性能的提升空间仅受硬件本身的限制。
使用我能编写的最佳 BSP 树,我只能实现每秒两百万次 BSP 树查找,而使用我最好的哈希表,则观察到每秒四百万次查找。一个几乎什么都不做的简单循环每秒只能执行一千万次迭代。此 IPC 实现的使用可以被视为几乎不消耗 CPU。
在内存消耗方面,无论您使用 IPC 的多少,都只使用 2 MB RAM。通过降低块计数(`IPC_BLOCK_COUNT`)或块大小(`IPC_BLOCK_SIZE`),甚至可以减少内存使用量。内存碎片是不可能的,因为所有内存块都位于连续的内存区域中。(这对 CPU 缓存也有好处。)
所有这些额外的性能都是一个巨大的优势,考虑到我们最初只想让 IPC 以 FIFO 方式运行!
希望这些代码能对他人有所帮助。祝您编码愉快!