一个可重用、高性能的套接字服务器类 - 第一部分






4.83/5 (41投票s)
编写一个运行在Windows NT上并使用套接字与外界通信的高性能服务器并不难,只要你深入研究API文档。更重要的是,大多数代码对于你可能想编写的各种服务器来说都是通用的。
下面的源代码是使用Visual Studio 6.0 SP5和Visual Studio .NET构建的。你需要安装一个版本的Microsoft Platform SDK。
请注意,代码的调试版本会因为调试跟踪输出而浪费大量的CPU周期。只有在发布版本下进行性能分析才是有意义的。
- JBSocketServer1 - 又一个回显服务器 - 68 KB
- JBSocketServer2 - 一个基于数据包的回显服务器 - 70 KB
- JBSocketServer3 - 一个模拟的POP3服务器 - 70 KB
- JBServerShutdown - 服务器的关机开关 - 42 KB
概述
编写一个运行在Windows NT上并使用套接字与外界通信的高性能服务器并不难,只要你深入研究API文档。更重要的是,大多数代码对于你可能想编写的各种服务器来说都是通用的。应该可以将所有通用代码封装成易于重用的类。然而,当我寻找用于编写第一个套接字服务器的类时,我发现所有的示例和文章都要求用户几乎从头开始,或者在他们想在自己的服务器中使用代码时采用“复制粘贴重用”。此外,更复杂的示例,例如使用I/O完成端口的那些,往往止步于展示实际应用。毕竟,随便谁都能写一个回显服务器……本文的目的是解释我为编写套接字服务器设计的一组可重用类,并展示如何将它们用于比简单回显接收到的每个字节更复杂的服务器。请注意,我不会费心解释I/O完成端口等的原理和原因,因为有很多参考资料可用。
套接字服务器需要做什么?
套接字服务器需要能够监听特定端口,接受连接,并从套接字读取和写入数据。高性能、可扩展的套接字服务器应使用异步套接字I/O和I/O完成端口。由于我们使用I/O完成端口,我们需要维护一个线程池来处理I/O完成数据包。如果我们仅限于在Win2k及以上版本运行,我们可以使用QueueUserWorkItem
API来处理我们的线程需求,但为了使我们能够运行在最广泛的操作系统上,我们必须自己完成这项工作。
在开始接受连接之前,我们需要有一个监听套接字。由于设置这样的套接字有许多不同的方法,我们将允许派生类通过提供一个virtual
函数来创建此套接字,如下所示:
virtual SOCKET CreateListeningSocket(
unsigned long address,
unsigned short port);
服务器类提供了一个默认实现,在大多数情况下都足够了。它看起来是这样的:
SOCKET CSocketServer::CreateListeningSocket(
unsigned long address,
unsigned short port)
{
SOCKET s = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0,
WSA_FLAG_OVERLAPPED);
if (s == INVALID_SOCKET)
{
throw CWin32Exception(_T("CSocketServer::CreateListeningSocket()"),
::WSAGetLastError());
}
CSocket listeningSocket(s);
CSocket::InternetAddress localAddress(address, port);
listeningSocket.Bind(localAddress);
listeningSocket.Listen(5);
return listeningSocket.Detatch();
}
请注意,我们使用一个辅助类CSocket
来处理监听套接字的设置。这个类充当套接字的“智能指针”,在范围结束时自动关闭套接字以释放资源,并且还用引发异常的成员函数封装了标准的套接字API调用。
现在我们有了一个监听套接字,我们可以开始接收连接了。我们将使用WSAAccept()
函数来接受连接,因为它比高性能的AcceptEx()
更容易使用。我们将在后续文章中与AcceptEx()
进行性能比较。
当发生连接时,我们创建一个Socket
对象来封装SOCKET
句柄。我们将此对象与我们的I/O完成端口关联,以便为我们的异步I/O生成I/O完成数据包。然后,我们通过调用OnConnectionEstablished()
virtual
函数来通知派生类连接已建立。派生类随后可以对连接进行任何操作,但最常见的是在向客户端发送欢迎消息后,在套接字上发出读取请求。
void CSocketServer::OnConnectionEstablished(
Socket *pSocket,
CIOBuffer *pAddress)
{
const std::string welcomeMessage("+OK POP3 server ready\r\n");
pSocket->Write(welcomeMessage.c_str(), welcomeMessage.length());
pSocket->Read();
}
异步I/O
由于我们所有的I/O操作都是异步运行的,它们会立即返回到调用代码。这些操作的实际实现因一个事实而变得稍微复杂:任何未完成的I/O请求都会在发出这些请求的线程退出时被终止。由于我们希望确保我们的I/O请求不会被不当终止,我们将这些调用编组到我们套接字服务器的I/O线程池中,而不是从调用线程发出它们。这是通过向套接字服务器的I/O完成端口发布一个I/O完成数据包来实现的。服务器的工作线程知道如何处理4种操作:读取请求、读取完成、写入请求和写入完成。请求操作是通过调用PostQueuedCompletionStatus
生成的,而完成操作是在调用WSARecv
和WSASend
异步完成时生成的。
为了能够读写数据,我们需要一个存储空间,所以我们需要某种内存缓冲区。为了减少内存分配,我们可以池化这些缓冲区,这样我们就不在它们完成后删除它们,而是将它们保留在一个列表中以供重用。我们的数据缓冲区由一个分配器管理,该分配器通过向套接字服务器构造函数传递参数来配置。这允许用户设置I/O缓冲区的尺寸,以及控制列表中保留多少缓冲区以供重用。CIOBuffer
类作为我们的数据缓冲区,遵循标准的I/O完成端口模式,是一个扩展的“重叠”结构。
正如所有关于I/O完成端口的优秀参考资料所告诉你的,调用GetQueuedCompletionStatus
会阻塞你的线程,直到有完成数据包可用,并且当它可用时,返回一个完成键、传输的字节数和一个“重叠”结构。完成键代表“每个设备”数据,重叠结构代表“每个调用”数据。在我们的服务器中,我们使用完成键来传递我们的Socket
类,并使用重叠结构来传递我们的数据缓冲区。我们的Socket
类和数据缓冲区类都允许用户将“用户数据”与它们关联。这是以一个无符号长整型值的形式(可以始终用于存储指向更大结构的指针)。
套接字服务器的工作线程持续循环,阻塞在它们的完成端口上直到有工作可用,然后从完成数据中提取Socket
和CIOBuffer
并处理I/O请求。循环看起来是这样的:
int CSocketServer::WorkerThread::Run()
{
while (true)
{
DWORD dwIoSize = 0;
Socket *pSocket = 0;
OVERLAPPED *pOverlapped = 0;
m_iocp.GetStatus((PDWORD_PTR)&pSocket, &dwIoSize, &pOverlapped);
CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
switch pBuffer->GetUserData()
{
case IO_Read_Request :
Read(pSocket, pBuffer);
break;
case IO_Read_Completed :
ReadCompleted(pSocket, pBuffer);
break;
case IO_Write_Request :
Write(pSocket, pBuffer);
break;
case IO_Write_Completed :
WriteCompleted(pSocket, pBuffer);
break;
}
}
}
读取和写入请求会导致在套接字上执行读取或写入。请注意,实际的读取/写入是由我们的I/O线程执行的,因此它们不会因为线程退出而被过早终止。当读取或写入实际完成时,会调用ReadCompleted()
和WriteCompleted()
方法。工作线程将这些调用编组到套接字服务器类中,套接字服务器提供两个虚拟函数,允许调用者的派生类处理这些情况。大多数时候,用户不会对写入完成感兴趣,但派生类是唯一可以处理读取完成的地方。
virtual void ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer) = 0;
virtual void WriteCompleted(
Socket *pSocket,
CIOBuffer *pBuffer);
我们的客户端如果愿意,可以提供自己的工作线程,它应该派生自套接字服务器的工作线程。如果客户端决定这样做,那么我们需要一种方法让服务器配置为使用这个派生工作线程而不是默认线程。每当服务器创建一个工作线程(这只发生在服务器刚启动时,因为线程会运行直到服务器生命周期结束),它都会调用以下virtual
函数:
virtual WorkerThread *CreateWorkerThread(
CIOCompletionPort &iocp);
如果我们想提供自己的工作线程实现,那么我们应该覆盖这个函数并创建我们的线程对象并将其返回给调用者。
优雅关机
套接字可以独立关闭其发送和接收端。当客户端关闭其与服务器的连接的发送端时,服务器上该套接字上的任何挂起读取都将返回0
。派生类可以通过覆盖以下virtual
函数来选择在客户端关闭其连接的发送端时接收通知:
virtual void OnConnectionClientClose(
Socket *pSocket);
即使客户端关闭套接字时服务器有多个挂起的读取,服务器也只会收到一次此通知。服务器不会收到0长度的读取完成。客户端套接字发送端的关闭,也就是服务器的接收端,并不会阻止服务器向客户端发送更多数据,它仅仅意味着客户端没有更多数据要发送给服务器。
服务器可以使用Socket
类上的Shutdown()
方法关闭连接。与底层的WinSock2
函数一样,shutdown接受一个值,该值指示应关闭连接的哪些部分。如果服务器已完成发送数据,则可以调用Shutdown()
并使用SD_SEND
来终止连接的发送端,使用SD_RECEIVE
来终止接收端,或使用SD_BOTH
来终止两端。关闭服务器连接的发送端实际上只有在所有挂起的写入完成后才会发生。这使得服务器开发人员可以编写如下所示的代码,而无需担心I/O线程处理写入和关闭发生之间存在竞态条件。
pSocket->Write(pBuffer);
pSocket->Shutdown(SD_SEND);
套接字关闭
当套接字上没有挂起的读取或写入,并且没有持有对套接字的引用时,套接字将被关闭。此时,派生类将通过调用OnConnectionClosing()
virtual
函数收到通知。
virtual bool OnConnectionClosing(
Socket *pSocket);
服务器对该函数的默认实现仅仅返回false
,这意味着派生类不希望负责关闭套接字。然后,套接字服务器类在关闭套接字之前关闭套接字,并关闭linger选项。这会导致一个中止式关闭,并且如果发送的数据在关闭发生时尚未全部发送,则可能会丢失。派生类可以选择自行处理套接字关闭,如果是这样,则应覆盖OnConnectionClosing()
并返回true
以指示它已处理关闭。在返回之前,套接字应该已经由派生类显式关闭,或者应该调用AddRef()
并保持引用,以便稍后可以关闭套接字。派生类只有一次机会干预套接字关闭,如果它未能关闭套接字,则在释放套接字的最后一个引用时,套接字将被中止关闭,而不会有进一步的通知。
bool CSocketServer::OnConnectionClosing(
Socket *pSocket)
{
// we'll handle socket closure so that we can do a lingering close
// note that this is not ideal as this code is executed on the an
// IO thread. If we had a thread pool we could fire this off to the
// thread pool to handle.
pSocket->Close();
return true;
}
请注意,只有在连接未被服务器显式关闭时,才会调用OnConnectionClosing()
。如果服务器希望显式关闭连接,则可以通过简单地调用Socket
类的Close()
或AbortiveClose()
方法来实现。
一个简单的服务器
现在我们有了一个创建服务器的框架。用户只需提供一个派生自CSocketServer
的类,并处理OnConnectionEstablished()
和ReadCompleted()
情况。该类可能看起来像这样:
class CMySocketServer : CSocketServer
{
public :
CMySocketServer (
unsigned long addressToListenOn,
unsigned short portToListenOn);
private :
virtual void OnConnectionEstablished(
Socket *pSocket,
CIOBuffer *pAddress);
virtual bool OnConnectionClosing(
Socket *pSocket);
virtual void ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer);
};
上面已经介绍了OnConnectionEstablished()
和OnConnectionClosing()
的实现。剩下的是我们的套接字服务器的ReadCompleted()
方法的实现。这是服务器处理传入数据的地方,对于简单的回显服务器来说,它可能非常简单;)
void CMySocketServer::ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer)
{
pSocket->Write(pBuffer);
}
YAES - 又一个回显服务器
完整的回显服务器可在此处下载,文件名为JBSocketServer1.zip。服务器只需将传入的字节流回显给客户端。除了实现上述方法外,套接字服务器派生类还实现了服务器调用的几个“通知”方法,以告知派生类各种内部活动。回显服务器只是在这些通知发生时向屏幕(和日志文件)输出一条消息,但它们的目的是让派生类能够通过性能计数器或其他类似方式报告内部服务器状态。
您可以使用telnet来测试回显服务器。只需在端口5001(样本默认使用的端口)上telnet到localhost,然后键入内容并观察它被回显回来。服务器一直运行,直到设置了一个命名事件然后关闭。非常简单的服务器关机程序,此处可用,提供了一个服务器的关机开关。
一个更贴近实际的例子
除了回显字节流之外什么都不做的服务器很少见,除了作为糟糕的例子。通常,服务器会期望某种形式的消息,消息的确切格式取决于协议,但两种常见的格式是:带有某种消息长度指示器的二进制消息(在头部中)和带有预定义命令集和固定命令终止符(通常是“\r\n
”)的ASCII文本消息。一旦你开始处理实际数据,你就会遇到回显服务器根本不关心的一个实际问题。实际服务器需要能够将TCP/IP套接字接口提供的输入字节流分解为不同的命令。发出单个套接字读取的结果可以是任意数量的字节,最多到你提供的缓冲区大小。你可能只收到一个单独的、完整的消息,或者只收到消息的一半,或者3条消息,你无法确定。太多经验不足的套接字开发人员假设他们总是会收到一个完整、独立的的消息,而且他们的测试方法通常在开发过程中确保了这一点。
分块字节流
服务器可以实现的最简单的协议之一是基于数据包的协议,其中前X个字节是头部,头部包含完整数据包的长度信息。服务器可以读取头部,计算还需要多少数据,并继续读取直到获得完整的数据包。此时,它可以将数据包传递给知道如何处理它的业务逻辑。处理这种情况的代码可能看起来像这样:
void CMySocketServer::ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer)
{
pBuffer = ProcessDataStream(pSocket, pBuffer);
pSocket->Read(pBuffer);
}
CIOBuffer *CMySocketServer::ProcessDataStream(
Socket *pSocket,
CIOBuffer *pBuffer)
{
bool done;
do
{
done = true;
const size_t used = pBuffer->GetUsed();
if (used >= GetMinimumMessageSize())
{
const size_t messageSize = GetMessageSize(pBuffer);
if (used == messageSize)
{
// we have a whole, distinct, message
EchoMessage(pSocket, pBuffer);
pBuffer = 0;
done = true;
}
else if (used > messageSize)
{
// we have a message, plus some more data
// allocate a new buffer, copy the extra data into it and try again...
CIOBuffer *pMessage = pBuffer->SplitBuffer(messageSize);
EchoMessage(pSocket, pMessage);
pMessage->Release();
// loop again, we may have another complete message in there...
done = false;
}
else if (messageSize > pBuffer->GetSize())
{
Output(_T("Error: Buffer too small\nExpecting: ") + ToString(messageSize) +
_T("Got: ") + ToString(pBuffer->GetUsed()) + _T("\nBuffer size = ") +
ToString(pBuffer->GetSize()) + _T("\nData = \n") +
DumpData(pBuffer->GetBuffer(), pBuffer->GetUsed(), 40));
pSocket->Shutdown();
// throw the rubbish away
pBuffer->Empty();
done = true;
}
}
}
while (!done);
// not enough data in the buffer, reissue a read into the same buffer
// to collect more data
return pBuffer;
}
上面代码的关键点在于我们需要知道我们是否有足够的数据来开始查看头部,如果有,那么我们可以以某种方式计算消息的大小。一旦我们知道我们拥有所需的最少数据量,我们就可以计算出我们是否拥有构成该消息的所有数据。如果有,那就好了,我们处理它。如果缓冲区只包含我们的消息,那么我们只需处理消息,由于处理仅仅包括我们为数据缓冲区发布一个写入请求,所以我们返回0
,以便下一次读取使用一个新的缓冲区。如果我们有一个完整的数据包和一些额外数据,那么我们将缓冲区分成两部分:一个新的缓冲区包含我们的完整数据包,旧的缓冲区将额外数据复制到缓冲区的前面。然后,我们将完整的数据包传递给业务逻辑进行处理,并循环处理我们剩余的数据。如果我们没有足够的数据,我们返回缓冲区,并且我们在ReadCompleted()
中发出的Read()
会读取更多数据到同一个缓冲区,从现在这个点开始。
由于我们是一个简单的服务器,我们有一个相当重要的限制,那就是我们的所有消息都必须适合我们服务器使用的I/O缓冲区大小。通常,这是一个实际的限制,最大消息大小可以提前知道,并且通过将我们的I/O缓冲区大小设置为至少等于我们的最大消息大小,我们可以避免复制数据。如果这对你的服务器来说不是一个可行的限制,那么你将需要一个替代策略,将数据从I/O缓冲区复制到足够大的东西来容纳你的整个消息,或者分块处理消息……
在我们的简单服务器中,如果消息太大,我们就简单地关闭套接字连接,丢弃垃圾数据,然后等待客户端离开……
那么我们如何实现GetMinimumMessageSize()
和GetMessageSize()
呢?嗯,这显然取决于协议,但对于我们的数据包回显服务器,我们这样做:
size_t CMySocketServer::GetMinimumMessageSize() const
{
return 1;
}
size_t CMySocketServer::GetMessageSize(CIOBuffer *pBuffer) const
{
size_t messageSize = *pBuffer->GetBuffer();
return messageSize;
}
计数引用缓冲区?
你可能已经注意到,在我们有消息和额外数据的情况下,我们调用了SplitBuffer()
将完整消息分解成自己的缓冲区,然后,一旦我们处理完它,我们就调用了Release()
。这是套接字服务器缓冲区分配器实现的一部分。缓冲区是引用计数的。我们唯一需要担心的是,如果我们使用SplitBuffer
创建一个新缓冲区,或者如果我们决定调用缓冲区的AddRef()
,因为我们希望将其传递给另一个线程进行处理。我们将在下一篇文章中更详细地介绍这一点,但其要点是:每次我们发布一个读取或写入,缓冲区的引用计数就会增加,而每次读取或写入完成时,计数就会减少,当没有挂起的引用时,缓冲区就会返回到池中以供重用。
数据包回显服务器
一个基于数据包的回显服务器可在此处下载,文件名为JBSocketServer2.zip。服务器期望接收最多256字节的数据包,这些数据包有一个1字节的头部。头部字节包含数据包的总长度(包括头部)。服务器读取完整的数据包并将其回显给客户端。如果你觉得自己很聪明的话,可以使用telnet来测试回显服务器;)只需在端口5001(样本默认使用的端口)上telnet到localhost,然后键入内容并观察它被回显回来。(提示:CTRL B是2,这是包含数据的最小数据包)。
真实的互联网RFC协议
一些常见的互联网协议,如RFC 1939(POP3),使用crlf终止的ASCII文本流命令结构。可以在JBSocketServer3.zip中找到一个使用此处提供的CSocketServer
类实现此类服务器的示例。
What Next?
这里提供的类提供了一种简单的方法来开发使用I/O完成和线程池的可扩展套接字服务器,这样类的用户就不需要关心这些低级问题。要创建自己的服务器,只需派生自CSocketServer
并处理连接建立、字节流分块和业务逻辑。您还可以选择处理任何您需要的通知。运行您的服务器就像这样简单:
CSocketServer server(
"+OK POP3 server ready\r\n",
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024); // buffer size
server.Start();
server.StartAcceptingConnections();
然后您的代码可以做任何它想做的事情,套接字服务器将在自己的线程上运行。当您完成时,只需调用:
server.WaitForShutdownToComplete();
然后套接字服务器将关闭。
在下一篇文章中,我们将讨论如何将业务逻辑从I/O线程池移到它自己的线程池中,以便长时间操作不会阻塞I/O线程。
修订历史
- 2002年5月21日 - 初始修订
- 2002年5月27日 - 为所有服务器和服务器关机程序添加了暂停/恢复功能。使用
CSocket
来防止在创建监听套接字时发生资源泄漏。重构了Socket
和CIOBuffer
类,使通用的列表管理代码现在位于CNodeList
中,通用的用户数据代码现在位于COpaqueUserData
中。 - 2002年5月29日 - Linting和一般代码清理
- 2002年6月18日 - 删除了创建监听套接字时的
ReuseAddress()
调用,因为它不是必需的 - 感谢Alun Jones向我指出这一点 - 2002年6月28日 - 调整了我们处理套接字关闭的方式,并向文章添加了优雅关机和套接字关闭部分
- 2002年6月30日 - 删除了用户子类化工作线程类的要求。现在可以通过简单地子类化套接字服务器类来完成所有工作。
- 2002年7月15日 - 当服务器在有活动连接时关闭时,套接字关闭通知现在会发生。现在可以设置
SocketServer
来确保读取和写入数据包的顺序。 - 2002年8月12日 - 删除了套接字关闭中的竞态条件 - 感谢David McConnell指出这一点。派生类可以接收连接重置和连接错误通知。Socket提供了一种确定发送/接收是否已连接的方法。通用代码清理和lint问题。