SocketPro 高性能可扩展持久消息队列简介
连续的行内请求/结果批处理、实时流发送/处理、异步数据传输和并行计算,以实现最佳性能和可扩展性。
引言
持久消息队列允许运行在不同机器/进程上的应用程序以一种容错的方式进行通信。消息队列是一个临时的存储位置或文件,消息可以从中可靠地保存和读取,只要条件允许。与需要始终保持直接连接的套接字和其他常见通道不同,持久消息队列使得可能并非始终连接的应用程序之间能够进行通信。存在许多以自己方式实现的持久消息队列。SocketPro 提供了一个极高性能的持久消息队列供您自由重用。
SocketPro 的客户端和服务器核心库都内置了持久消息队列。其客户端队列用于备份请求,以便在服务器因任何原因(如服务器断电、服务器应用程序崩溃、网络中断等)无法访问时,所有请求都可以重新发送到服务器进行处理。本质上,客户端队列用作故障自动恢复的工具,以提高应用程序的稳定性和降低开发复杂性,如在克隆 https://github.com/udaparts/socketpro 的源代码后,在socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp)目录中所示。
本文档重点介绍 SocketPro 服务器端的持久消息队列。需要注意的是,持久消息队列的预编译 SocketPro 服务器端库是**完全免费**提供给您的,并且其开源代码极其简单易懂。您也可以依赖这些开源代码来扩展它们以满足您的复杂需求。
源代码和示例
所有相关的源代码和示例都位于 https://github.com/udaparts/socketpro。在通过 GIT 将其克隆到您的计算机后,请注意 `socketpro/samples/module_sample` 目录下的 `uasyncqueue` 子目录。您可以看到这些示例是由 .NET、C/C++、Java 和 Python 开发环境创建的。它们可以在 Linux 或 Windows 平台上编译和运行。SocketPro 提供了一个预编译的系统库 `uasyncqueue`,它分别位于 Windows 和 Linux 平台的 `socketpro/bin/win` 和 `socketpro/bin/linux` 目录中。此外,通过查看 `socketpro/tutorials/(cplusplus|csharp|vbnet|java/src)/all_servers` 目录下的教程示例 `all_servers`,您可以了解如何使用您熟悉的开发环境将 SocketPro 队列服务加载到服务器应用程序中。然而,在本文中,我们仅使用 C# 客户端代码(socketpro/samples/module_sample/uasyncqueue /test_csahrp)进行解释。
在运行这些示例应用程序之前,您应该将 `socketpro/bin` 目录中的系统库分发到您的系统目录中。关于 SocketPro 通信框架,您也可以参考其开发指南文档 `socketpro/doc/SocketPro development guide.pdf`。
主函数
SocketPro 的底层设计旨在通过使用一个或多个非阻塞套接字池来支持并行计算。每个池可能由一个或多个线程组成,每个线程在客户端托管一个或多个非阻塞套接字。为了提高可扩展性,您可以创建多个池,其中包含连接到不同队列服务器的多个非阻塞套接字,这样您就可以以并行方式发送消息进行排队。但是,为了演示的清晰起见,我们这里只使用了一个池。此外,在这个客户端示例中,该池仅由一个线程和一个套接字组成,如以下代码片段 1 所示。
static void Main(string[] args) {
Console.WriteLine("Remote host: "); string host = Console.ReadLine();
CConnectionContext cc = new CConnectionContext
(host, 20901, "async_queue_client", "pwd_for_async_queue");
using (CSocketPool<CAsyncQueue> spAq = new CSocketPool<CAsyncQueue>()) {
//spAq.QueueName = "aq_backup"; //uncomment for message no loss
//by use of local message queue
if (!spAq.StartSocketPool(cc, 1, 1)) {
Console.WriteLine("Failed in connecting to remote async queue server,
and press any key to close the application ......");
Console.Read(); return;
}
CAsyncQueue aq = spAq.Seek(); //CAsyncQueue aq = spAq.SeekByQueue();
//Optionally, you can enqueue messages with transaction style
//by calling the methods StartQueueTrans and EndQueueTrans in pair
aq.StartQueueTrans(TEST_QUEUE_KEY, (errCode) => {
//error code could be one of CAsyncQueue.QUEUE_OK,
//CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
});
TestEnqueue(aq);
//test manual message batching
using (CScopeUQueue sb = new CScopeUQueue()) {
CUQueue q = sb.UQueue;
CAsyncQueue.BatchMessage(idMessage3, "Hello", "World", q);
CAsyncQueue.BatchMessage(idMessage4, true, 234.456, "MyTestWhatever", q);
aq.EnqueueBatch(TEST_QUEUE_KEY, q, (res) => {
System.Diagnostics.Debug.Assert(res == 2);
});
}
aq.EndQueueTrans(false);
TestDequeue(aq); aq.WaitAll();
//get a queue message count and queue file size with default option oMemoryCached
aq.FlushQueue(TEST_QUEUE_KEY, (messageCount, fileSize) => {
Console.WriteLine("Total message count={0},
queue file size={1}", messageCount, fileSize);
});
aq.GetKeys((keys) => {
});
aq.CloseQueue(TEST_QUEUE_KEY, (errCode) => {
//error code could be one of CAsyncQueue.QUEUE_OK,
//CAsyncQueue.QUEUE_TRANS_ALREADY_STARTED, ......
});
Console.WriteLine("Press any key to close the application ......"); Console.Read();
}
}
启动一个套接字池:上面的代码片段 1 启动了一个套接字池,该池只有一个工作线程,只托管一个非阻塞套接字(`if (!spAq.StartSocketPool(cc, 1, 1))`),以便使用一个连接上下文实例来清晰地演示。请注意,如果您需要,可以在一个客户端应用程序中创建多个池。之后,我们获取一个异步 `CAsyncQueue` 句柄(`CAsyncQueue aq = spAq.Seek();`)。
流式消息:我们可以将单个消息以流式方式发送到服务器进行保存,而无需在客户端进行批处理(`TestEnqueue(aq);`)。我们将在新的 `TestEnqueue` 部分详细讨论。
手动消息批处理:当有大量小消息需要发送保存时,由于线程同步、函数处理、SocketPro 内部行内批处理等原因,这些小消息将在客户端和服务器端都需要大量的 CPU 成本。为了降低这些成本,我们可以将这些小消息批处理成一个更大的块,并将它们作为一个更大的单元发送到服务器进行保存(`using (CScopeUQueue sb = new CScopeUQueue()) { ....}`)。这是提高消息入队性能的一种方法,但它也会增加延迟,因为它需要一个时间间隔(通常超过 1 毫秒)来收集足够多的小消息,然后进行手动批处理。此外,它还需要更多的代码。只要流式消息队列的性能满足您的需求,或者消息大小不是非常小,**不**推荐使用 SocketPro。
事务式消息保存:SocketPro 持久消息队列支持事务式消息保存。要使用此功能,您必须成对调用 `StartQueueTrans` 和 `EndQueueTrans` 方法,如上面的代码片段 1 所示。请注意,批处理消息的总大小不应超过四千兆字节。
从多个消费者读取队列文件中的消息:当然,您可以读取队列中的消息(`TestDequeue(aq);`)。我们将在接下来的 `TestDequeue` 部分更详细地阐述。请注意,一个 SocketPro 队列可以同时支持来自多个生产者写入消息和来自多个消费者读取消息。仅供参考,许多其他队列实现不支持在一个队列文件上使用多个消费者。
可扩展性:客户端能够创建一个拥有连接到不同服务器队列机的多个套接字的池。客户端能够使用池方法 `Seek` 或 `SeekByQueue`,并将消息分发到不同的服务器进行保存。不要被这个示例代码所迷惑,因为演示是为了清晰和初学者设计的。
无消息丢失:消息保存需要将消息从客户端或生产者传输到消息队列服务器。由于各种原因,服务器和网络可能会中断。因此,如果没有额外的代码处理,消息可能会丢失。您可以轻松地通过使用客户端或本地消息队列备份这些消息,然后再将其发送到网络来防止这种情况(`spAq.QueueName = "aq_backup";`)。如果服务器或网络中断,当队列服务器应用程序重新可访问时,SocketPro 可以重新发送已备份在本地或客户端消息队列文件中的消息。
其他功能:SocketPro 持久消息队列提供了其他方法来检查消息计数、队列文件大小和不同消息队列的键,以及关闭队列,如上面代码片段 1 结尾所示。
TestEnqueue
此函数是一个入队消息的示例,如下面的代码片段 2 所示,非常简单。
static bool TestEnqueue(CAsyncQueue aq) {
bool ok = true; Console.WriteLine("Going to enqueue 1024 messages ......");
for (int n = 0; n < 1024; ++n) {
string str = n + " Object test";
ushort idMessage;
switch (n % 3) {
case 0:
idMessage = idMessage0;
break;
case 1:
idMessage = idMessage1;
break;
default:
idMessage = idMessage2;
break;
}
//en-queue two unicode strings and one int
ok = aq.Enqueue(TEST_QUEUE_KEY, idMessage, "SampleName", str, n);
if (!ok) break;
}
return ok;
}
如上面代码片段 2 所示,我们可以以流式方式连续发送单个消息(`aq.Enqueue`)。您可以看到使用 SocketPro 入队消息非常容易。
TestDequeue
下面的代码片段 3 是一个批量出队消息的演示。
static void TestDequeue(CAsyncQueue aq) {
//prepare callback for parsing messages dequeued from server side
aq.ResultReturned += (sender, idReq, q) => {
bool processed = true;
switch (idReq) {
case idMessage0:
case idMessage1:
case idMessage2:
Console.Write("message id={0}", idReq);
{
string name, str; int index;
//parse a dequeued message which should be the same
//as the above enqueued message (two unicode strings and one int)
q.Load(out name).Load(out str).Load(out index);
Console.WriteLine(", name={0}, str={1}, index={2}", name, str, index);
}
break;
case idMessage3: {
string s1, s2;
q.Load(out s1).Load(out s2);
Console.WriteLine("{0} {1}", s1, s2);
}
break;
case idMessage4: {
bool b; double dbl; string s;
q.Load(out b).Load(out dbl).Load(out s);
Console.WriteLine("b= {0}, d= {1}, s= {2}", b, dbl, s);
}
break;
default:
processed = false;
break;
}
return processed;
};
//prepare a callback for processing returned result of dequeue request
CAsyncQueue.DDequeue d = (messageCount, fileSize, messages, bytes) => {
Console.WriteLine("Total message count={0}, queue file size={1},
messages dequeued={2}, message bytes dequeued={3}", messageCount, fileSize, messages, bytes);
if (messageCount > 0) {
//there are more messages left at server queue, we re-send a request to dequeue
aq.Dequeue(TEST_QUEUE_KEY, aq.LastDequeueCallback);
}
};
Console.WriteLine("Going to dequeue messages ......");
bool ok = aq.Dequeue(TEST_QUEUE_KEY, d);
//optionally, add one extra to improve processing concurrency at both client
//and server sides for better performance and through-output
ok = aq.Dequeue(TEST_QUEUE_KEY, d);
}
代码片段 3 中的回调函数(`aq.ResultReturned += (sender, idReq, q) => { .....};`)用于解析来自远程消息队列文件的消息。代码(`case idMessage0: case idMessage1: case idMessage2:`)用于解析来自前面代码片段 2 的消息。代码(`case idMessage3: case idMessage4:`)用于解析手动批处理的消息(`using (CScopeUQueue sb = new CScopeUQueue()) { ....}`),这些消息来自前面的代码片段 1。正如注释(`//prepare a callback for processing returned result of dequeue request`)所提示的,回调函数(`CAsyncQueue.DDequeue d = ......`)用于监控关键的消息队列数据,如消息计数(待出队的消息)、服务器队列文件大小、通过下面的 `Dequeue` 调用传输的消息以及消息大小(字节)。在回调函数内部,如果服务器队列文件中还有消息,则有必要递归调用 `Dequeue` 方法(`if (messageCount > 0)`)。
在准备好以上两个回调后,我们最终调用 `Dequeue` 方法,向服务器发送一个请求以批量读取消息。可选地,我们可以再调用一次或两次 `Dequeue` 方法,以提高出队吞吐量或性能,因为客户端消息解析和服务器消息读取在处理方面可以有更好的并发性。
性能研究
SocketPro 从一开始就旨在支持通过非阻塞套接字和内部算法进行流式请求,以实现最佳的网络和代码效率。性能研究示例,由 C++、Java 和 C# 编写,位于 `socketpro/samples/qperf` 目录。此外,我们还将 SocketPro 队列与流行的 Kafka 和 RabbitMQ 进行了比较,如两篇短文 `perf_comparison.pdf` 和 `sq_kafka_perf.pdf` 所示。
我们的结果表明,SocketPro 队列不仅比 RabbitMQ 快,而且比 Kafka 快得多,尤其是在写入大量小消息时。为便于理解,本文档重点将 SocketPro 队列与 Kafka 进行性能比较。
在本地区域网络(**LAN**)环境中比较 SocketPro 队列与 Kafka 最为重要,因为这种情况在大多数情况下更接近实际的队列应用。我们的测试结果列在下图 1 中。请注意,Kafka 性能测试脚本 `kafka_perf_test.txt` 位于 `../socketpro/samples/qperf` 目录。SocketPro 队列天生支持实时流式消息入队,通过持续发送消息。此外,SocketPro 还通过其手动批处理功能支持将所有批处理消息作为一条大消息发送,该功能旨在牺牲延迟以换取更好的入队速度或吞吐量。最后,需要注意的是,Kafka 的入队性能测试始终以批处理方式完成,通过设置配置属性 `batch.size`。
图 1:SocketPro 和 Kafka 在**LAN**上的队列性能比较
小消息:对于小消息(4 字节和 32 字节),如果**不**使用 SocketPro 手动批处理功能,Kafka 在入队([1,083,000 & 819,300]/[839,300 & 752,300])和出队([1,406,000 & 1,043,000]/[1,195,000 & 974,100])方面比 SocketPro 稍快(< 25%)。SocketPro 队列在入队时较慢,因为在没有手动批处理的情况下,通过网络传输小消息的 CPU 成本非常高。然而,如果 SocketPro 采用类似 Kafka 的手动批处理,SocketPro 队列的性能将远远优于 Kafka(5,988,000/1,083,00 = 5.53 或 450%)。
如图 1 所示,Kafka 在小消息的出队方面比 SocketPro 队列快约 15%。其解释是,SocketPro 出队总是自动从消费者向服务器发送出队确认。此外,出队确认还会在服务器端引起磁盘查找以标记所有已出队的消息。这对于降低消费者端的编码复杂性是默默进行的,但显然会稍微降低 SocketPro 的出队性能。Kafka 在出队消息方面非常简单,并且完全不支持类似的出队确认。
中等消息:考虑到中等大小的消息(200 字节),即使 SocketPro 不使用手动批处理,其消息入队速度也明显优于 Kafka。如果 SocketPro 采用类似 Kafka 的手动批处理,SocketPro 的性能将比 Kafka 快 90%(457,500/238,900 = 1.91 = 90%)。然而,在出队中等大小的消息方面,SocketPro 和 Kafka 的性能相似。
大消息:SocketPro 在入队大消息(1024 字节和 10240 字节)方面比 Kafka 快约 40%。在出队中等大小的消息方面,SocketPro 和 Kafka 没有性能差异。
最后,需要指出的是,只要消息入队速度满足您的需求,**不**推荐使用 SocketPro 手动批处理。原因是手动批处理会显着增加消息入队延迟,因为您必须有一个时间间隔来收集足够的消息,然后才能在实际应用程序中将消息发送到网络。只有当您的应用程序需要更高的入队速度来处理大量小消息时,才应使用 SocketPro 手动批处理。
广域网:现在我们来谈谈广域网(**WAN**)。WAN 挑战分布式应用程序的开发,因为它不仅带宽低,而且延迟非常大。我们想在 WAN 上比较 SocketPro 与 Kafka 的远程消息入队和出队性能。我们发现 SocketPro 在性能和稳定性方面都运行得非常好,如图 2 所示。不幸的是,我们无法完成 Kafka 在 WAN 上的性能测试,在尝试了几天并搜索了许多网站以寻找其可能的配置设置后,我们最终放弃了。在我们看来,Kafka 根本不支持远程消息入队或出队!
图 2:SocketPro 持久队列在 Google 云数据中心之间两台廉价虚拟机上的性能结果
上图 2 显示了在 Google 云数据中心之间两台廉价虚拟机上测得的 SocketPro 队列结果。网络带宽约为 40 Mbps,高延迟约 35 毫秒。客户端和服务器应用程序代码(sq_client 和 sq_server)都可以在 `../socketpro/samples/qperf/cperf` 目录中找到。通过 GIT 克隆 SocketPro 并检出 `linux_tests` 分支后,您将在 `../socketpro/test_apps` 目录中找到这两个预编译的应用程序。您可以自行使用这两个应用程序进行测试。
上述测试结果表明,无论是在入队还是出队消息时,网络带宽都得到了充分利用。WAN 带宽决定了消息入队和出队的速度。图 2 还表明,如果消息大小不超过 300 字节,入队和出队速度都可以轻松超过每秒 10,000 条消息。请注意,如果可以开启 SocketPro 的在线压缩和解压缩功能,性能数据可以翻倍。
如果消息大小小于 64 字节,您还可以使用 SocketPro 手动批处理功能来进一步提高性能。
最后,需要指出的是,据我们所知,目前市面上没有其他消息队列能够获得类似的 WAN 性能结果。如果您找到一个,请告知我们,以便我们尽快更改此声明。
SocketPro 持久消息队列的亮点
最后,值得强调 SocketPro 持久消息队列相比 Kafka 的优势。
- SocketPro 持久消息队列**无需复杂的配置设置**供您理解和配置。相反,Kafka 要求您必须预先理解许多配置设置。
- SocketPro 持久消息队列在**WAN**上运行良好,具有不错的性能和稳定性,而 Kafka 则不行。
- SocketPro 持久消息队列支持**手动事务**以提高稳定性,而 Kafka 不支持。
- 使用 SocketPro 队列,一个队列文件可以**同时被多个消费者共享**,而 Kafka 不具备此能力。
- 只要开启本地或客户端消息队列,SocketPro 队列就可以保证**无消息丢失**,而 Kafka 则无法做到。
- SocketPro 队列支持消息可用性,可以实时通知所有连接的消费者,以实现最短的延迟。其延迟**总是**是网络延迟的 1.5 倍,在本地区域网络上最低可达 0.3 毫秒。Kafka 的最低延迟在最佳情况下为 1 毫秒,前提是您必须为此配置特定设置。
- SocketPro 队列比 Kafka**快**得多,尤其是在开启手动批处理时,写入大量小消息。
- 您可以使用 SocketPro**选择性地**入队一部分消息,而 Kafka 则强制您入队所有消息。此外,您可以将消息队列与 SocketPro 的其他功能集成,例如在线消息总线、本地消息队列、客户端服务器通信等。
- SocketPro 持久消息队列的客户端和服务器代码都**极其简单**,您可以轻松地扩展和修改它们以满足您的复杂需求。对于 Kafka,您很难做到这一点。
- 您可以**更简单地分发并低依赖地**将 SocketPro 队列嵌入您的应用程序系统中。对于 Kafka,您很难做到这一点。
- 与 Kafka 一样,SocketPro 队列也具有极高的可扩展性。
历史
- 2018/02/14 ==> 初始发布
- 2018/04/08 ==> 添加性能测试结果并修正文档错误