广播突发队列





5.00/5 (13投票s)
BBQ - 编写高度可扩展的 Windows Socket 服务器
引言
BBQ(Broadcast Burst Queue 的缩写,我们简称为“烧烤”)是一个小而实用的库,用于通过 Windows 套接字加速消息发送。它曾是我们通信库测试集的一部分,但我们最终决定不在 Safetica 的产品中使用它。因此,我们现在可以将其代码与大家分享。
背景
编写任何类型的 IPC 总是与需求、需要和设计有关,当然,它们之间也存在权衡。那么,让我们看看我们为 BBQ设定的主要需求是什么。
- 每个 PC 上有一个或多个多线程可扩展服务器,可处理“无限”数量的客户端。
- 通信是单向的,不需要回复(客户端 -> 服务器)。
- 我们不想使用任何可能导致死锁的同步机制,客户端和服务器都应该能够随时崩溃,而无需释放锁来保持系统稳定等。
- 我们希望尽可能多地使用系统缓冲区,以实现最大性能和响应能力。我们希望能够推送大量数据,并且系统能够处理这些数据而不会降低我们的性能。
- 我们希望能够根据当前工作负载动态更改重新连接参数(等待时间和重新连接尝试次数)。
- 我们希望客户端和服务器能够在不同的计算机上工作。发送的数据应通过网络集中处理。
您可以轻易地意识到,此项目的典型用法是某种进程间客户端/服务器日志记录,您是正确的。由于它使用 Windows 套接字,BBQ 可以在服务器和所有客户端都在不同计算机上运行的网络中工作。
有很多方法可以处理一个服务器具有多个客户端连接。问题是:如何强制操作系统在当前连接得到妥善处理之前一直保持所有其他客户端的连接?如何尽可能快地处理这些消息并尽可能多地使用 CPU 时间,以使其他客户端等待的时间尽可能短?许多有用的文章已经讨论了这个问题,我可以向您推荐这三篇,都来自同一位作者——Changs Hu Liu(谢谢!):
您可以在这里找到一个非常清新且详细的各种 I/O 方法的比较,我强烈推荐您阅读,其中包含说明性图片(感谢 Thomas Bleeker!)。
您可以在以下位置找到大量代码示例:
为了实现我们的目标,我们在 BBQ 中使用了以下各种方法:
-
重叠 I/O 操作
简单来说,重叠操作是一种异步 I/O 调用,允许您发起所需的操作,然后做其他事情,最后等待操作的结果。如果设计得当,这种方法可以充分利用您的处理器——最小化无用的阻塞。
-
ConnectEx()
和AcceptEx()
Socket 2 APIAcceptEx()
是一个有用的重叠函数,它允许您在后台接受连接,并在一步中获取通过ConnectEx()
API 发送的消息的大小。我们使用这些函数来优化代码,例如,我们可以在等待慢速连接的同时分配内存。同样,我们可以最小化代码中的无用阻塞和等待,同时让处理器为我们做更多的工作。 -
I/O 完成端口
I/O 完成端口是一种高效的机制,允许您在一个或多个工作线程中处理多个并发连接。它最小化了执行所有操作所需的上下文切换。这种方法在 Windows 平台上的可扩展性最佳,因此是首选,尽管它有点复杂。
-
工作线程池
为了在多处理器系统上尽快处理消息,BBQ 使用了多个预创建的工作线程。所有这些线程都已准备就绪,并随时等待处理消息。
极端情况
正如您所见,在操作系统及其提供的异步函数的强力帮助下,无用的阻塞被最小化了。该系统为套接字请求提供了大量缓冲区,因此我们可以将这些请求排队,并在获得所需资源后进行处理。但是,这些缓冲区不是无限的,所以在所有资源都耗尽的极端情况下,我们只能做两件事之一:1/ 从服务器拒绝客户端的连接,未能传递消息;或 2/ 等待直到一些资源被释放——这可能导致无限等待,或者最终再次导致失败。所以,最终没有一种方法可以最终解决资源有限和请求无限的问题,尽管我们尽力利用系统缓冲区,从而将不可避免的失败时刻推迟了。
设计
BBQ 服务器由几个部分组成:
-
监听线程
监听连接并将它们传递给处理线程
-
处理线程
发起消息接收,并将其传递给一个可用的工作线程
-
工作线程
检索消息并调用其相应的消息处理程序
您可能会问,为什么要有处理线程?难道没有更好的办法将所有东西都放在工作线程中吗?处理线程有其目的:由于大部分代码都在工作线程中执行(消息处理程序中可能会发生一些阻塞),并且这是异步发生的,因此可以在更短的时间内处理其他待处理的连接。简单来说,这种概念再次提高了服务器的整体可扩展性。
BBQ 客户端非常简单,只包含一个单线程的发送器,但有两点值得一提:
-
重新连接参数
如果客户端无法在预期时间内成功发送消息,它会降低重新连接尝试次数并延长重连之间的睡眠时间。否则,每次成功发送后,客户端会反向调整这些值,直到达到原始值。因此,如果服务器过载,客户端将在几次尝试失败后停止发送消息,延长它们之间的等待时间,从而使服务器能够恢复。一旦完成,客户端会相应地调整这些参数。这是一种自动负载平衡功能。
-
无等待发送
您可以告知客户端您不需要它们等待成功发送。这可以使 BBQ 客户端工作得更快,但您不会收到消息传递出现问题的通知。
代码示例
现在,让我们详细看看 BBQ 服务器代码 bbq_server.cpp!BBQServerListeningThread()
。我们将从监听线程开始。
--- WSACreateEvent() WSASocket() memory allocation for new item --- LOOP AcceptEx() - accepts connection --- WSACreateEvent() WSASocket() memory allocation for next item --- WSAWaitForMultipleEvents() - now the connection is really accepted --- add to queue for later processing replace old event, socket and queue item with the allocated ones END LOOP
您可以看到代码的设计有点奇怪。为什么我们不在需要时才分配事件、套接字和项内存?这是因为分配需要一些时间才能完成。监听线程预先分配了所需的资源,这些资源以后可以重复使用,这样我们在处理传入消息时就不会被它减慢。分配代码在我们实际接受网络连接时执行,这通常是一个耗时的操作,无论如何都必须完成。因此,这是重叠 I/O 操作的典型用法,这使得 BBQ 更具可扩展性。
现在让我们看看处理线程 bbq_server.cpp!BBQServerProcessingThread()
。
LOOP get queue item CreateIoCompletionPort(socket) memory allocation based on size we have received via AcceptEx() WSARecv() remove item from memory END LOOP
处理线程甚至比监听线程更简单。它将套接字添加到我们的 I/O 完成端口,为传入消息分配内存,开始接收,然后让一个可用的工作线程处理其余部分,因此它可以立即处理另一条消息。我们尽量使监听线程中的 AcceptEx()
循环尽可能短。
最后,我们在这里有工作线程的代码。
LOOP GetQueuedCompletionStatus() call message handler closesocket() release resources END LOOP
当 GetQueuedCompletionStatus()
API 成功时,消息就已送达。最终,这并不算太难,对吧?操作系统为我们完成了剩下的工作。可扩展性的关键在于让每一段代码都随时准备好立即执行任务,并且不做任何等待。
测试和衡量
我们在软件包中包含了服务器和客户端的性能/压力测试(test/test-server
和 test/test-client
项目和解决方案)。客户端向服务器发送 4kB 的消息,服务器在工作线程中处理它们。客户端比服务器快得多,因此服务器很快就会不堪重负。
在我连接到互联网的 Intel Core i5-2500 @ 3.30 GHz 上(因此,可能会有其他流量),拥有 4 个服务器工作线程,4 个客户端发送线程,运行时长 60 秒,结果如下:
SERVER setting up ctrl+c handler... initializing BBQ server... creating BBQ server... running BBQ server... creating mutex... waiting for termination (ctrl+c or 60000 miliseconds)... runtime: 60062 server accepts: 64324 (1070.96 per sec) server megabytes: 263.47 (4.39 per sec) server receives: 64324 (1070.96 per sec) max pending msgs: 1647 curr pending msgs: 0 Press any key to continue . . .
CLIENT waiting for server mutex... setting up ctrl+c handler... creating working threads... initializing BBQ client... resuming threads... waiting for termination... send failed #1, thread: 8320, type: 6, err: 10055 send failed #2, thread: 14456, type: 6, err: 10055 send failed #3, thread: 12344, type: 6, err: 10055 send failed #4, thread: 5144, type: 6, err: 10055 send failed #5, thread: 8320, type: 6, err: 10055 send failed #6, thread: 14456, type: 6, err: 10055 send failed #7, thread: 12344, type: 6, err: 10055 send failed #8, thread: 5144, type: 6, err: 10055 send failed #9, thread: 8320, type: 6, err: 10055 send failed #10, thread: 14456, type: 6, err: 10055 send failed #11, thread: 12344, type: 6, err: 10055 send failed #12, thread: 5144, type: 6, err: 10055 send failed #13, thread: 14456, type: 6, err: 10055 send failed #14, thread: 12344, type: 6, err: 10055 send failed #15, thread: 5144, type: 6, err: 10055 send failed #16, thread: 8320, type: 6, err: 10055 send failed #17, thread: 12344, type: 6, err: 10055 send failed #18, thread: 5144, type: 6, err: 10055 send failed #19, thread: 8320, type: 6, err: 10055 send failed #20, thread: 14456, type: 6, err: 10055 send failed #21, thread: 12344, type: 6, err: 10055 send failed #22, thread: 5144, type: 6, err: 10055 send failed #23, thread: 8320, type: 6, err: 10055 send failed #24, thread: 14456, type: 6, err: 10055 send failed #25, thread: 12344, type: 6, err: 10055 send failed #26, thread: 5144, type: 6, err: 10055 send failed #27, thread: 8320, type: 6, err: 10055 send failed #28, thread: 14456, type: 6, err: 10055 send failed #29, thread: 14456, type: 6, err: 10055 send failed #30, thread: 8320, type: 6, err: 10055 send failed #31, thread: 12344, type: 6, err: 10055 send failed #32, thread: 5144, type: 6, err: 10055 send failed #33, thread: 14456, type: 6, err: 10055 send failed #34, thread: 8320, type: 6, err: 10055 send failed #35, thread: 12344, type: 6, err: 10055 send failed #36, thread: 5144, type: 6, err: 10055 send failed #37, thread: 14456, type: 6, err: 10055 send failed #38, thread: 8320, type: 6, err: 10055 send failed #39, thread: 12344, type: 6, err: 10055 send failed #40, thread: 5144, type: 6, err: 10055 send failed #41, thread: 14456, type: 6, err: 10055 send failed #42, thread: 8320, type: 6, err: 10055 send failed #43, thread: 12344, type: 6, err: 10055 send failed #44, thread: 5144, type: 6, err: 10055 send failed #45, thread: 14456, type: 6, err: 10055 send failed #46, thread: 8320, type: 6, err: 10055 send failed #47, thread: 12344, type: 6, err: 10055 send failed #48, thread: 5144, type: 6, err: 10055 send failed #49, thread: 14456, type: 6, err: 10055 send failed #50, thread: 8320, type: 6, err: 10055 send failed #51, thread: 12344, type: 6, err: 10055 send failed #52, thread: 5144, type: 6, err: 10055 send failed #53, thread: 14456, type: 6, err: 10055 send failed #54, thread: 8320, type: 6, err: 10055 send failed #55, thread: 12344, type: 6, err: 10055 send failed #56, thread: 5144, type: 6, err: 10055 runtime: 63969 clients sends: 64324 (1005.55 per sec) clients megabytes: 263.47 (4.12 per sec) clients fails: 56 (0.88 per sec) connections fails: 184 (2.88 per sec) queued msgs: 64324 (1005.55 per sec) min sendtime: 0.00 secs max sendtime: 5.03 secs avg sendtime: 3.81 secs Press any key to continue . . .
您可以看到,在 60 秒内,客户端能够发送 64,324 条 4kB 的长消息(总计 263.47 MB),即 4.14 MB/s。有 56 次传递失败,错误代码为 10055 (WSAENOBUFS
),表示服务器缓冲区不足。峰值时,服务器不得不处理队列中存储的 1647 条消息。我们将套接字推向了极限。
许可证和联系方式
您可以在 BSD 许可证下使用和分发 BBQ。如果您有任何问题、建议或改进意见,您可以随时联系我或任何其他 Safetica 的同事。
Marek Strihavka,又名 Benny
marek.strihavka@safetica.com