用于与 Web 客户端进行实时交互的 C++ Websocket 服务器






4.81/5 (23投票s)
基于 Ush Framework 实时库的 WebSocket 协议实现,以及一个演示示例,展示了 HTML5 Web 客户端和服务器之间四种类型的通信工作流。
- 下载源代码 - 347 KB
- 下载 Push Framework - 187 KB
- 下载 Web 客户端 - 117 KB
- 下载 WebSocket 协议 - 22 KB
- 下载 WebSocket 服务器 - 9.66 KB
目录
引言
WebSocket 协议的引入标志着 Web 演进过程中的一个有趣里程碑。终于,网页可以与远程服务器建立全双工连接,并能异步接收数据,而无需轮询。这为许多想法的实现打开了大门:可以轻松实现一个部署到多种设备上的 Web 前端,以及一个能够处理数千个同时连接客户端的自定义服务器端应用程序,而该应用程序可以部署在一台低成本服务器上。
在本文中,我们开发了一个 WebSocket 服务器应用程序,并展示了它与网页的交互。该解决方案基于一个先前在 CodeProject: Push Framework 上发布的实时通信库。协议层设计在一个独立的库项目中,开发人员可以轻松复用。
协议扩展层
本文提出的解决方案基于 Push Framework,它为创建能够管理大量同时连接客户端的实时服务器提供了基础。Push Framework 是协议无关的:我们通过具体实现以下“抽象类”来提供协议细节和信息。
IncomingPacket
:传入消息的原型是什么?也就是说,客户端发送的消息,服务器需要对其做出反应?OutgoingPacket
:传出消息的原型是什么?大多数协议是对称的,所以它应该与IncomingPacket
相同。Protocol
:传入数据包如何反序列化,传出数据包如何序列化以便通过网络发送。
要使PushFramework::Protocol
成为一个具体类,必须重写以下虚方法:
encodeOutgoingPacket
:接收一个OutgoingPacket
并对其进行编码。frameOutgoingPacket
:接收一个已编码的OutgoingPacket
并将其插入输出套接字缓冲区。tryDeframeIncomingPacket
:提供对接收数据的引用。应该检查这些数据,并可能返回一个IncomingPacket
对象。decodeIncomingPacket
:如果tryDeframeIncomingPacket
成功创建一个IncomingPacket
,则此函数应解码其内容。
这些方法由 PF 在序列化和反序列化时内部调用,它们为大多数协议提供了足够的抽象。
处理 WebSocket 协议时,应该理解编码/解码部分是一个独立的实现:这是因为规范更像一个帧协议。它详细说明了有效载荷如何与头部信息一起封装到帧中以便在网络上传输。但它不规定有效载荷如何被“编码”。因此,与 WebSocket 真正相关的函数是frameOutgoingPacket
和tryDeframeIncomingPacket
。在我们的示例中,我们在编码阶段并没有做太多工作。开发人员可能会发现修改它很有用,例如添加一个 JSON 层。
然而,该规范提到了协议中的两个通信阶段,这促使我们创建两种类型的数据结构:
- 握手消息:当连接在传输层被接受时,开始一个进行一些协商的握手阶段。
- WebSocket 数据消息:这将代表握手阶段完成后交换的数据消息。
帧代码应该区分这两个阶段。
int WebsocketProtocol::tryDeframeIncomingPacket( PushFramework::DataBuffer& buffer,
PushFramework::IncomingPacket*& pPacket, int& serviceId,
unsigned int& nExtractedBytes, ConnectionContext* pContext )
{
if (buffer.GetDataSize() == 0)
return Protocol::eIncompletePacket;
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage* pMessage =
new WebsocketHandshakeMessage(buffer.GetBuffer(), buffer.GetDataSize());
serviceId = 0;
nExtractedBytes = buffer.GetDataSize();
pPacket = pMessage;
return Protocol::Success;
}
//In the other cases, we should expect a data message :
int nMinExpectedSize = 6;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE payloadFlags = buffer.getAt(0);
if (payloadFlags != 129)
return Protocol::eUndefinedFailure;
BYTE basicSize = buffer.getAt(1) & 0x7F;
unsigned __int64 payloadSize;
int masksOffset;
if (basicSize <= 125)
{
payloadSize = basicSize;
masksOffset = 2;
}
else if (basicSize == 126)
{
nMinExpectedSize += 2;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohs( *(u_short*) (buffer.GetBuffer() + 2) );
masksOffset = 4;
}
else if (basicSize == 127)
{
nMinExpectedSize += 8;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
payloadSize = ntohl( *(u_long*) (buffer.GetBuffer() + 2) );
masksOffset = 10;
}
else
return Protocol::eUndefinedFailure;
nMinExpectedSize += payloadSize;
if (buffer.GetDataSize() < nMinExpectedSize)
return Protocol::eIncompletePacket;
BYTE masks[4];
memcpy(masks, buffer.GetBuffer() + masksOffset, 4);
char* payload = new char[payloadSize + 1];
memcpy(payload, buffer.GetBuffer() + masksOffset + 4, payloadSize);
for (unsigned __int64 i = 0; i < payloadSize; i++) {
payload[i] = (payload[i] ^ masks[i%4]);
}
payload[payloadSize] = '\0';
WebsocketDataMessage* pMessage = new WebsocketDataMessage(payload);
serviceId = 1;
nExtractedBytes = nMinExpectedSize;
pPacket = pMessage;
delete payload;
return Protocol::Success;
}
WebSocket 服务器
在WebsocketServer
中,我们实例化一个派生自PushFramework::Server
的主对象,通过描述一个 Protocol 对象、一个 Service 对象和一个ClientFactory
对象来初始化它,然后通过调用::Start
成员函数来启动它。
调用此函数时,会配置许多资源:
- 一个监听线程
- 一个线程池(IO Workers)用于处理 IO 事件
- 一个主线程用于管理整体服务器结构
- 多个“流线程”,这些线程会将数据流式传输到广播队列以供订阅者使用。
提供的 Protocol 对象应该派生自设计在独立 DLL 项目中的WebsocketProtocol
类。至于ClientFactory
子类,它应该管理已连接客户端的生命周期。特别是,它决定何时将新接受的连接(PhysicalConnection
)转换为合法的客户端(LogicalConnection
)。在我们的例子中,这种转换依赖于两个验证:WebSocket 协议描述的握手验证,以及一个登录验证,其中我们只要求客户端发送一个唯一的昵称。
int WebsocketClientFactory::onFirstRequest( IncomingPacket& _request,
ConnectionContext* pConnectionContext, LogicalConnection*& lpClient,
OutgoingPacket*& lpPacket )
{
//received messages belong to a physical connection
//that still did not transform into a logical connection :
//understand in which stage we are :
WebsocketConnectionContext* pCxt = (WebsocketConnectionContext*) pConnectionContext;
if (pCxt->GetStage() == WebsocketConnectionContext::HandshakeStage)
{
WebsocketHandshakeMessage& request = (WebsocketHandshakeMessage&) _request;
if (!request.Parse())
{
return ClientFactory::RefuseAndClose;
}
WebsocketHandshakeMessage *pResponse = new WebsocketHandshakeMessage();
if (WebsocketProtocol::ProcessHandshake(request, *pResponse))
{
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::LoginStage);
}
return ClientFactory::RefuseRequest;
// Will not close the connection, but we still wait
// for login message to create a logical client.
}
if (pCxt->GetStage() == WebsocketConnectionContext::LoginStage)
{
WebsocketDataMessage& request = (WebsocketDataMessage&) _request;
WebsocketClient* pClient = new WebsocketClient(request.GetArg1());
lpClient = pClient;
WebsocketDataMessage *pResponse = new WebsocketDataMessage(LoginCommunication);
pResponse->SetArguments("Welcome " + request.GetArg1());
lpPacket = pResponse;
pCxt->SetStage(WebsocketConnectionContext::ConnectedStage);
return ClientFactory::CreateClient;
}
//Impossible to come here.
}
服务器业务代码组织在“Service”类中。每个类绑定到特定类型的请求。
WebsocketServer server;
server.registerService(EchoCommunication, new EchoService, "echo");
server.registerService(Routedcommunication, new RoutedCommunicationService, "routed");
server.registerService(GroupCommunication, new GroupCommunicationService, "grouped");
server.registerService(StreamedCommunication, new StreamedCommunicationService, "streamed");
让我们看一下其中两个类的源代码。
void RoutedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
LogicalConnection* pRecipient = FindClient(request.GetArg1().c_str());
if (pRecipient)
{
WebsocketDataMessage response(Routedcommunication);
response.SetArguments(client.getKey(), request.GetArg2());
pRecipient->PushPacket(&response);
}
}
对于第四种情况,所有服务器都只关心处理订阅和取消订阅请求。
void StreamedCommunicationService::handle( LogicalConnection* pClient, IncomingPacket* pRequest )
{
WebsocketDataMessage& request = (WebsocketDataMessage&)(*pRequest);
WebsocketClient& client = (WebsocketClient&) (*pClient);
string opType = request.GetArg1();
if (opType == "subscribe")
{
broadcastManager.SubscribeConnectionToQueue(client.getKey(), "streamingQueue");
}
if (opType == "unsubscribe")
{
broadcastManager.UnsubscribeConnectionFromQueue(client.getKey(), "streamingQueue");
}
}
事实上,PF 已经有一个发布/订阅机制,所以我们只需要设置队列,将客户端订阅到这些队列,然后发布消息。消息发送者不知道接收者,接收者也不知道发送者。可用数据会持续流式传输给那些对此感兴趣的人。
客户端
我们的网页显示四个选项卡,每个选项卡都可以触发一种类型的操作。
- Echo 选项卡:发送一条消息,服务器只将其回显给客户端。
- 路由通信:将消息发送给特定客户端,服务器负责将其路由到目的地。
- 组通信:将消息发送给服务器,然后将其发布到广播队列。我们可以远程订阅该队列,开始接收所有内容。
- 流式通信:允许订阅/取消订阅一个广播队列,该队列的内容会自动发布。一个服务器线程将执行此发布,因此我们可以在客户端体验实时数据。
要登录,客户端输入一个昵称然后点击“连接”。然后服务器会回复。
您可以测试不同类型的通信工作流,例如回显通信和流式通信,在流式通信中,您将获得服务器自动创建并发送到网页的实时消息流。