Push Framework - 用于高性能服务器开发的 C++ 工具包






4.96/5 (86投票s)
用几行代码编写异步、多线程服务器。通过一个仅部署的仪表板实时监控活动。
- 下载库源代码 (VC++ 2008) - 133 KB
- 下载库精要 (lib + public headers + DLL) - 70.9 KB
- 下载示例 1:聊天应用程序 (服务器 + 客户端 + XMLProtocol + TCPSocket 模块) - 292 KB
- 下载示例 2:Google ProtoBuf 集成示例 (服务器 + 客户端 + ProtoBufProtocol 模块) - 33.9 KB
- 下载 QoS 验证应用程序 (客户端 + 服务器 + Protocol 模块) - 20.5 KB
- 下载 ChatBots 模拟器 (用于测试目的的流量生成器) - 6.7 KB
目录
引言
除了网站之外,还有其他类型的应用程序,如果不是不可能,也无法轻松地使用 Web 构建。Web 作为一项技术,适用于我们需要发布内容并确保对其进行更改的即时和轻松部署的情况。用户只对内容做出反应。如果他们与其他用户互动,“反应”会编译到内容中,持久化,然后在其他人请求时显示给他们。需要实时广播信息或希望用户之间实时“互动”的应用程序需要专门的技术。
最简单的例子,可以想想聊天应用程序或媒体流服务器。您还可以考虑许多复杂的应用程序,这些应用程序从大量远程连接的设备收集信号并提供及时的决策。此类技术应该能够管理大量活动的双工连接。它应该能够即时地将信息从一个连接中继到另一个连接,并启用信息的有效和异步广播,以便客户端能够收到自发和及时的响应。我想在这里贡献我开发的 C++ 库,该库源自一个业余国际象棋服务器的服务器端代码的多次重构,我希望人们在其他领域也能找到它的用途。除了本文,您还可以在 Push Framework 主页找到文章、完整的文档指南和其他有用的材料:http://www.pushframework.com。
部署布局
让我们看一下这里的部署图。您的服务器逻辑将基于 Push Framework 库编写。如果您想部署应用程序,那么您需要一台专用的服务器机器。您应该设计/使用一种协议,以帮助您的客户端应用程序和服务器端之间进行高效通信。客户端将连接到您指定的 TCP 端口。您可能有使用不同协议与您的服务器通信的异构客户端。然而,该库的一个主要特性是其自我剖析并发送定期统计信息的能力。您只需要通过简单的 API 调用激活该功能,并将一个单独开发的 Applet——Dashboard,连接到服务器的“监控”端口,这样您就可以接收这些统计信息的“馈送”。
开发者视角
就我个人而言,作为一名开发者,当我发现一个新库并想评估它时,我急于查看一个示例用法代码。从那里,我可以看到全局。所以,在这里我将牺牲许多基本细节,只报告主要视图。单个实例封装了库的所有功能。
PushFramework::Server server;
通过调用成员方法配置这样的实例,然后调用 start
来运行您的服务器。当您完成时,围绕该实例的代码看起来是这样的。
PushFramework::Server server;
server.setClientFactory(pClientFactory);
server.setProtocol(pProtocol);
server.createListener(sPort);
server.registerService(myServiceId, pService, "myService";
//
server.start(false);
现在会产生一些问题。首先,pClientFactory
应该指向 PushFramework::ClientFactory
类的具体实现。这个类管理连接到服务器的客户端的生命周期。特别是,它决定何时将新接受的连接转换为合法的客户端(PhysicalConnection => LogicalConnection)。此外,它处理客户端重新连接的情况(可能会发生突然的 TCP 故障,然后同一个客户端重新连接),以及何时需要将新的 TCP 套接字绑定到现有的客户端信息和先前分配的资源。请注意,在服务代码中,您永远不会处理套接字,而是处理一个 PushFramework::LogicalConnection
对象(代表客户端)的引用。
协议是所有关键信息。您可以通过将 PushFramework::Protocol
类的具体实现提供给 ::setProtocol
方法,或者在 ::createListener
的 ListenerOptions
参数中提供(如果您打算支持多种协议),来传递此信息。通过重写其纯虚方法,您将告诉库如何反序列化接收到的信息以及如何序列化传出的信息。要将信息发送给特定客户端或特定客户端组(广播),PushFramework 期望以 PushFramework::OutgoingPacket
类型实例的形式输入。编码和定界会作用于此类。当数据接收到时,会触发解码和解定界,这被称为反序列化。最终,反序列化的数据会以 IncomingPacket
实例的形式传回。这些实例被分派到您的“服务代码”,您可能会发现将其组织成多个“服务”很有用(当存在多种类型的请求且每种类型都需要特殊处理时非常有用)。
服务通过 PushFramework::Service
子类的实例来实现。您通过重写其 handle
方法来设计业务逻辑,然后为其实例分配一个 ID。当然,您应该在反序列化时注意传递该 ID 信息,以便库知道如何路由数据包。
让我们看看如何处理传入的数据。在第一种情况下,我们将处理一个特定的请求并回显一个响应给请求者。
MyService::handle(LogicalConnection* pClient, IncomingPacket* pRequest)
{
/*See what client wants by examining pRequest (cast it first to your custom structure).
Then build and send the response.*/
OutgoingPacket response;
pClient->push(&response);
}
该库是多线程的。更准确地说,会启动多个线程来执行服务代码。因此,您服务类 handle
方法中的代码是并发的。让我们考虑这个例子,其中信息需要从一个连接中继到另一个连接。例如,假设我们在一个聊天应用程序的上下文中,并且我们想路由一条聊天消息。
MyService::handle(LogicalConnection* pClient, IncomingPacket* pRequest)
{
ClientKey recipientKey;// furnish this info from pRequest
char* msg// to be furnished from pRequest.
CMyClient* pRecipient = FindClient(recipientKey);
if(pRecipient)
{
OutgoingPacket response;
//build the chat response by putting the sender info and the msg
pRecipient->push(&response);
ReturnClient(pRecipient);
}
}
最后,让我们看看如何发送广播信息。
MyService: public PushFramework::Service
{
void handle(LogicalConnection* pClient, IncomingPacket* pRequest)
{
//Create the message :
PushFramework::OutgoingPacket* pResponse = new PushFramework::OutgoingPacket();
//Just publish it to the specific broadcast queue :
broadcastManager.PushPacket(pResponse, quot;broadcastGroup1");
}
}
技术架构
了解库的内部结构是很好的,尽管您只会发现自己处理少数封装了所有细节的公共类。让我通过列出要点来简化它。
- 为了有效地管理与系统的交互,该库对 Windows Server 使用完成端口 (IOCP),对 Linux 使用两个 Epoll 队列。
IOQueue
充当多生产者多消费者队列。Demux 模块负责建立消费者线程。这些线程接收系统 IO 事件(Windows 为完成事件,Linux 为就绪事件)并进行处理。 - 主线程负责管理系统的不同部分:它启动独立的监听线程、工作线程、流线程,并调度垃圾回收、审查非法连接、收集性能度量等定期任务。它还监听终止信号,以便可以停止所有线程并清理所有内存。
- 监听由一个单独的线程执行。传入的连接请求被传达给 Channels 工厂。在那里,决定将连接(PhysicalConnection)与新的 Client 对象(LogicalConnection)关联,或者将其附加到现有对象(客户端重新连接识别)。
- Demux 将发送/接收操作的完成状态传回 Dispatcher。当一个写操作完成时,会检查中间的 Send 缓冲区是否有任何待发送的数据。当一个读操作完成时,Dispatcher 会触发反序列化,然后反序列化调用协议定义的解定界和解码代码,然后将控制权交还给负责传入请求类型的 Service 对象的
handle
方法。 - 大部分执行时间都花在可用的
Service::handle
方法上。您可以在其中放置每种传入请求类型的处理逻辑。当然,您会在这里将数据推送到特定的客户端或广播频道,以便库将信息隐式地流式传输到已订阅的客户端列表中。 BroadcastStreamerManager
启动一系列线程,将广播消息从广播队列流式传输到已订阅的 LogicalConnections。因此,每个 streamer 都负责一组客户端。- 每个广播频道有四个属性:
requireSubscription
、maxPacket
、priority
和quota
。如果requireSubscription
为false
,则所有已登录的客户端都自动订阅它。否则,您必须显式注册每个您想要的客户端。 - 每个广播频道维护一个 FIFO 队列,用于放入其中的数据包。
maxPacket
是其最大大小。当队列已满时,会释放旧的数据包。避免广播放大现象(不可控的内存消耗增加)是以数据包丢失为代价的,对于那些 B(i) < F(j) 的客户端,其中 B(i) 是服务器和客户端 i 之间的可用带宽,F(j) 是客户端 i 所订阅的广播频道 j 的填充率。 - 当客户端 x 与服务器之间的带宽以及服务器的活动程度不足以流出所有数据时,客户端 x 所订阅的每个广播频道所占的份额取决于优先级和配额属性。
- 假设广播频道 {Ci} 的排序方式是:如果 i < j => (Pi > Pj 或(Pi = Pj 且 Qi >= Qj)),P 和 Q 分别是优先级和配额属性。
- 我们用 Fi 表示 Ci 被新数据包填充的速率。
- 假设 S 是发送到客户端 x 的广播数据的总速率。
- 进一步假设所有出站消息都具有相同的长度。
- 如果启用了远程监控,则会启动监控监听器。它有助于接受来自远程 Dashboard 的连接,以便发送分析信息。
那么,广播频道 i 的份额 Si 由下式给出:
教程和示例
聊天应用程序
了解 Push Framework 实操的最佳方法是指导您开发一个基于它的全新应用程序。这可以更好地评估库并同时权衡多个因素。
您可以在此处的分步教程中学习。源代码和二进制文件也可用。
开发的应用程序使用 XML 来编码请求和响应。服务器和客户端“看到”表示这些请求/响应的相同原型。如果收到直接聊天消息,服务器会知道目标客户端并将请求转发到相应的连接。启动时,聊天服务器会创建广播频道来表示“可用的聊天室”。如果收到房间聊天请求,房间聊天响应将被推送到相应的广播频道,以便房间里的每个参与者都能收到该响应。加入房间在技术上通过“订阅”客户端到广播频道来实现。同样,如何让聊天参与者互相看到:设置一个广播频道,并在每次客户端登录时填充信号数据包。
使用 Google Protobuf 协议的客户端-服务器
Google 表示,它创建了自己的协议,以使其异构应用程序能够交换数据。编码/解码速度极快,并且生成的消息很小。为了让希望与 Push Framework 一起使用此协议的用户受益,我们创建了一个客户端-服务器应用程序。您可以在此处的完整教程中学习。功能极其简单:客户端发送一个关于具有特定 ID 的某个项目的请求。预期响应将包含该项目的详细信息。您可以想象任何内容,这里的目的是仅仅让 ProtoBuf 和 Push Framework 之间的集成得到理解。
通信数据必须用特定语言设计,然后由 ProtoBuf 编译,以便我们可以生成可以使用的类。对于我们的示例,我们只需要设计这三个结构。
// content of loginrequest.proto :
package gprotoexample;
message LoginRequest {
}
//content of loginresponse.proto
package gprotoexample;
message LoginResponse {
required bool result = true;
}
//content of datainforrequest.proto
package gprotoexample;
message DataInfoRequest {
required int32 id = 2; // id for item.
}
//content of datainforesponse.proto
package gprotoexample;
message DataInfoResponse {
enum ResultType {
InfoFound = 0;
InfoNotFound = 1;
InfounspecifiedError = 2;
}
required ResultType result = 0;
required int32 id = 1; //
optional string description = 1;
}
//content of logoutrequest.proto
package gprotoexample;
message LoginResponse {
required bool result = true;
}
对于 C++,ProtoBuf 编译器会生成 C++ 类,我们在项目中包含这些类。为了适应 ProtoBuf 并允许在同一连接中发送多个消息(ProtoBuf 消息不是自定界的),我们创建了一个新的 PushFramework::Protocol
子类以及一个模板类,它们都派生自 PushFramework::OutgoingPacket
和 PushFramework::IncomingPacket
,这是 Push Framework 的流通货币。
class ProtobufPacketImpl : public PushFramework::IncomingPacket,
public PushFramework::OutgoingPacket
{
public:
ProtobufPacketImpl(int serviceId);
~ProtobufPacketImpl(void);
protected:
virtual bool Decode(char* pBuf, unsigned int nSize);
virtual bool Encode();
virtual google::protobuf::Message& getStructuredData() = 0;
private:
int serviceId;
std::string* pEncodedStream;
public:
std::string* getEncodedStream() const { return pEncodedStream; }
int getEncodedStreamSize();
int getServiceId() const { return serviceId; }
};
template<class T>
class ProtobufPacket : public ProtobufPacketImpl
{
public:
ProtobufPacket(int serviceId)
: ProtobufPacketImpl(serviceId)
{
//
}
~ProtobufPacket()
{
//
}
private:
T data;
public:
virtual google::protobuf::Message& getStructuredData()
{
return data;
}
};
这是当我们在接收到 datainforequest
时,服务代码的样子。
void CDataInfoRequestService::handle( LogicalConnection* _pClient, IncomingPacket* pRequest )
{
ExampleClient* pClient = (ExampleClient*) _pClient;
ProtobufPacket<DataInfoRequest>* pDataInfoRequest =
(ProtobufPacket<DataInfoRequest>*) pRequest;
ProtobufPacket<DataInfoResponse> response(DataInfoResponseID);
response.getData().set_id(pDataInfoRequest->getData().id());
response.getData().set_result(DataInfoResponse_ResultType_InfoFound);
response.getData().set_description("this is a description");
//
pClient->pushPacket(&response);
}
使用 Websocket 与 Web 客户端通信
您可以使用此库创建使用 Websocket 协议与 Web 应用程序通信的服务器。这使得创建显示实时信息的 Web 界面成为可能,方法是接收来自 Push Framework 的“流式数据”。此外,还可以创建一个通过 Web 进行实时互动的实时社区。
A C++ Websocket Server For realtime interaction with Web clients 是一个开源项目,其中包含一个完整的应用程序,该应用程序通过 Websocket 作为定界协议与基于 PF 的服务器进行通信。
基准测试
监控聊天应用程序
Push Framework 的一个有趣特性是它能够自我剖析并将实时报告发送到远程 Dashboard,在那里数字以定性图表和网格的形式显示。您可以通过调用服务器对象的简单方法来激活此功能。
//Assuming server is your Server instance
server.enableRemoteMonitor(monitoringPort, password);
//And to activate profiling, you do :
server.enableProfiling(samplingRate);
monitoringPort
是监控监听器将监听的端口。当然,您必须将此信息以及服务器 IP 和密码放入 Dashboard 登录信息中,以便您的连接成功。Dashboard 可用于执行远程命令。通过重写 Server::handleMonitorRequest
,您可以接收用户在 Dashboard 控制台中输入的任何内容。答案会以文本形式重定向到控制台。除非调用 Server::enableProfiling
,否则统计信息和性能信息无法在 Dashboard 中显示。这指示库开始收集性能值,并以 samplingRate
秒的固定间隔发送报告。
现在的目标是让您看到远程监控功能的使用。因此,让我们在之前开发的聊天服务器中启用分析,并尝试使用监控 Dashboard 连接到服务器。为了贴近现实场景,即大量用户连接到同一个聊天服务器并相互聊天,我制作了一个单独的多客户端模拟器,能够启动许多连接,并大致模拟发送聊天、回复接收到的聊天以及参与聊天室的人类参与者。这包含在 ChatRobots 项目中。说实话,模拟器不是非常优化,因为它为每个 Chat Robot 启动一个线程。因此,在启动许多客户端时可能会导致一些问题。
在一个测试中,启动了 100 个机器人来对之前的聊天服务器进行测试,该服务器以 10 秒的间隔报告分析信息。以下是 Dashboard“性能”选项卡的屏幕截图。
您可以在本文中找到其他屏幕截图并阅读详细信息:Bursting the Library。对“时间处理区域”的放大显示了
垂直线段代表每 10 秒发送的周期性报告。线段中心是特定观察间隔内接收到的所有请求的平均处理时间。垂直长度是服务器计算并发送的离散度。
当您设计应用程序并决定部署它时,此分析功能可以为您提供很大帮助。我认为即使在部署时,仍然可以保留分析功能,因为它成本不高,并且可以提供有关访问者行为、交换数据的传入/传出带宽以及服务器在吞吐量(每秒请求数)、处理时间等方面能够提供的性能的宝贵信息。
广播 QoS
本文的目标是验证 Push Framework 能够为每个广播频道提供不同优先级和配额的声明。此 QoS 功能在开发者指南页面(请参阅“广播数据”段落)中有解释。为此,将开发一个新的客户端-服务器应用程序。基于 Push Framework 的服务器将设置多个具有不同优先级/配额属性的广播频道,并不断用数据包填充它们。在客户端,我们收集接收到的数据包并尝试报告统计信息。用于演示的协议非常简单:我们只需要记录每个传入数据包所属的广播频道。然而,会添加一个 8K 的额外负载,以使我们处于现实应用程序协议中。广播频道(广播组)在调用 Server::start
之前进行设置。
broadcastManager.CreateQueue(
broadcastId, 100, false, uPriority, uQuota);
server.start();
对于每个广播组,会启动一个新线程来将数据包推入其中。您可以在 QoS 应用程序验证包中查看代码细节。我在本文中报告了更多结果和细节,我只在此报告一种场景,其中有三个具有相同优先级的广播频道。
1 => { priority = 5, quota = 10 }
2 => { priority = 5, quota = 20}
3 => { priority = 5, quota = 5}
配额参数的效果可以在以下客户端统计信息中看到。
参考文献
这项工作得以完成,离不开其他人的贡献,他们扫清了服务器开发的许多技术障碍。
- Ruslan Ciurca 的《使用 IO 完成端口构建可扩展服务器及烹饪指南》。
- 使用 IO 完成端口开发真正可扩展的 WinSock 服务器
- 简单易用的 IOCP 服务器框架
- Amin Gholiha 的《简单的 IOCP 服务器/客户端类》
历史
这是本文的第一个版本。此时,提供的材料可能已更改或过时,因此请访问库主页以获取最新更新,并在论坛空间参与,以帮助他人并获得帮助。