可扩展服务器与 IO 完成端口以及如何烹饪它们
开发服务器应用程序的理论与实践。
1. 引言
我承诺撰写这篇文章已经很久了。终于,它准备好了。因此,本文将是 MP3 流媒体服务器项目(始于《声音录制和 MP3 格式编码》一文)的延续,并将特别致力于一个非常有趣的主题:使用 IO 完成端口(以下简称 IOCP)编写可扩展服务器应用程序。我还将描述一个处理所有相关复杂性的框架。
从技术上讲,整个主题可以分为三个子主题:线程管理、内存管理和客户端套接字处理机制。关于后者,我将重点关注 TCP 套接字(UDP 套接字也可以实现类似的功能)。让我们继续详细介绍……
2. 线程处理
通常,在编写服务器应用程序时,线程模型的使用可以分为两大类:
- 易于实现,一个线程服务一个连接/套接字
- 稍微复杂一些,使用线程池(也可以是:固定大小的线程池、基于超时的线程池、从最小值到最大值的可变大小线程池等)
虽然第一种模型通常是最容易实现的,但在服务大量连接时,它并不是最好的选择。这种模型有两个强有力的论据:
- 上下文切换,这是在两个运行进程或同一或不同进程的线程之间切换的过程。它包括以这样一种方式保存和恢复运行线程/进程的上下文,仿佛每个线程/进程都在连续运行,而实际上它们可能共享同一个 CPU。显然,上下文切换需要时间,这个时间取决于操作系统实现。例如,在 Windows 操作系统中,它需要纳秒,而在 Linux 的一些旧版本/发行版(例如,Mandrake 版本 <= 7.2)中,它需要大约 2 毫秒。因此,2 毫秒乘以(例如)1000 个线程 = 2 秒才能将上下文返回到线程,如果只有 1000 个创建的线程,并且所有线程具有相同的优先级。这是不可接受的,显然需要不同的模型。
- 通常,在一个线程服务一个连接模型中,线程大部分时间什么都不做。更详细地说,它们在对(例如)套接字执行阻塞 I/O 操作时处于冻结状态。因此,创建大量线程无异于浪费资源。
本文描述的 IOCP 框架使用了线程框架,特别是《几乎像 Java 线程一样》一文中所述的 CSimpleThreadPool
类。尽管该线程框架没有什么特别之处,但我将提供一些我使用它的原因:
- 它是本项目的一部分。
- 它非常轻巧,但仍然非常有用且易于使用。
CSimpleThreadPool
是一个固定大小的线程池。预先创建的线程数量有助于减少创建新线程所需的运行时时间,因为它们已经创建。请注意,创建新线程需要时间(例如,系统必须创建新的系统对象(线程本身),必须创建线程的堆栈等)。因此,这对性能有好处。- 最重要的原因是,
CSimpleThreadPool
是一个具有优先级队列实现的线程池。预测下面描述的细节,IOCP 队列不对完成请求进行优先级排序;所有完成请求都具有相同的优先级。因此,要编写(并运行)一个简单的应用程序,它只是创建数千个连接并立即关闭所有连接,然后再次创建和关闭,如此循环……IOCP 将在队列中注册大量关闭请求,实际上,服务器将忙于处理关闭请求。这是一个拒绝服务攻击的示例。很遗憾 IOCP 没有使用优先级队列来对完成请求进行优先级排序,但我认为不使用这种队列存在技术原因。但是,从 IOCP 队列接收到的完成请求(都具有相同的优先级)会传递到CSimpleThreadPool
队列中,并在那里进行优先级排序,因此,关闭连接请求的优先级将低于其他完成请求。这只是本文中描述的 IOCP 框架防御此类攻击的一种策略。
3. 内存管理
与编写服务器应用程序相关的另一个问题是正确的内存管理。虽然像 new
和 delete
(或对应的 malloc()
/free()
,或任何其他低级内存 Windows SDK API)这样非常熟悉的运算符使用起来非常简单,但当谈到需要一年 365 天、每周 7 天、每天 24 小时运行的应用程序时,它们就存在一个问题。
问题就是所谓的内存碎片。这些运算符(API)旨在分配连续的内存块,如果应用程序频繁地以不同块大小执行 new
/delete
,它最终会遇到 new
返回 NULL
(甚至可能是持续返回)的情况,尽管应用程序在堆中还有足够的空闲空间。这可能发生的原因是……没错……没有所需大小的连续空闲空间,换句话说:我们遇到了内存碎片,服务器迟早会崩溃。
有几种解决内存碎片问题的方法:
- 实现垃圾回收,同时执行碎片整理。
- 自定义内存管理,旨在避免碎片化,并可能重写
new
/delete
,只是为了使最终解决方案更加优雅。 - 使用
new
运算符 (malloc()
) 分配固定大小的块。好吧,内存仍然容易碎片化,但“孔洞”也是固定大小的(或者说是固定大小的倍数)。这是一种非常容易实现的机制。但是,如果实际内存需求从(例如)1KB 到 1MB 不等,那么即使只需要 1KB,也分配 1MB 的固定大小块无异于浪费内存。 - 预分配内存区域。在可以估计服务单个连接所需内存的情况下,将该值乘以允许的最大连接数(这可以是一个配置参数)。用得到的值,分配一个巨大块的该大小,并在该区域内“工作”。另一种非常易于使用的机制,当……这种估计可以执行时,效果很好。
- 重新使用已分配的块。这个想法非常简单;一旦分配了一个块,在不再需要时不要删除它,只需将其标记为未使用,然后……下次再使用它。这可能被视为内存泄漏,而实际上并非如此;这只是一种处理内存碎片的策略。
下面是一个模板类(完整源代码请参阅附件源文件中的mem_manager.h文件),它处理了上面描述的“预分配内存区域”和“重新使用已分配块”(合二为一)机制
// A template class allowing re-using of the memory blocks.
template<class T>
class QueuedBlocks {
private:
QMutex m_qMutex;
set< T* > m_quBlocks;
vector< T* > m_allBlocks;
public:
QueuedBlocks(int nInitSize = 1):
m_qMutex(), m_quBlocks(), m_allBlocks() {...};
// get a free block from the queue, if one cannot be found
// then NULL is returned
T* GetFromQueue() {...};
// get a free block from the queue, if one cannot be found
// then a new one is created
T* Get() {...};
// Release the used block, place it
// back to the queue. For performance reason,
// we assume that the block was previously taken
// from the queue.
void Release(T* t) {...};
// Return all the blocks ever allocated.
vector<T*> *GetBlocks() {...};
~QueuedBlocks() {...};
};
因此,通过构造函数,我们预分配了所需数量的对象,并且
- 如果只需要“预分配内存区域”机制,则仅使用
GetFromQueue()
和Release()
方法,这样它将在“分配空间内”工作。 - 如果需要“重新使用已分配块”(与“预分配内存区域”一起),则使用
Get()
和Release()
方法,这样如果预分配空间不足,它将得到扩展。
由于这是一个通用模板,它允许定义不同类/结构的排队块。这种实现唯一的问题是,每个特定类都必须有一个默认(或无参数)构造函数和一个 Clear()
方法。Clear()
的目标是在块即将被重用时,在最后执行内部清理。
另一个有用的模板类是(也在mem_manager.h中定义):
// A template class used for providing free blocks as well
// as releasing unnecessary ones. Uses "QueuedBlocks" which
// allows reusing of the blocks.
template<class T>
class StaticBlocks {
private:
static QueuedBlocks<T> *blocks;
public:
static void Init(int nSize = 1) {...};
static T *Get() {...};
static void Release(T *b) {...};
};
它表示 QueuedBlocks
的适配器,但所有方法都定义为 static
,这在需要跨不同类共享块时非常有用。
4. 关于 IOCP 的一些事实
如果您正在阅读本文,您可能知道使用 IOCP 的好处。如果您不知道,那么……IOCP 是编写可扩展服务器应用程序的最佳机制,能够以少量线程处理数千个客户端连接。这是如何实现的?通过使用一个名为“I/O 完成端口”的系统对象,它“允许应用程序接收异步 I/O 操作完成的通知”。因此,每个 I/O 操作都是异步的(立即完成),应用程序(或其线程)会收到 I/O 操作完成的通知,因此,应用程序(或应用程序的线程)有“足够的时间”执行任何其他操作(而不是像“一个线程服务一个连接/套接字”模型中通常发生的那样等待 I/O 完成)。让我们数一数我们知道的关于 IOCP 的事实。
首先,为了让套接字受益于 IOCP,必须使用 Winsock API 的重叠 I/O 版本。
WSASend(...);
WSARecv(...);
其次,套接字必须配置为支持重叠 API。
nSock = WSASocket(..., ..., ..., ..., ..., WSA_FLAG_OVERLAPPED);
我们还需要创建 IOCP 句柄。
hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
我们需要将套接字与 IOCP 关联。
CreateIoCompletionPort( (HANDLE) nSock, hIocp, ..., ... );
现在,当对套接字使用重叠 I/O 操作时,它们将被注册到 IOCP 的队列中。再次强调,此注册仅在执行 I/O 操作时发生(!!!);否则,即使套接字已与 IOCP 关联,此注册也不会执行。重叠 I/O 操作完成后,它将从 IOCP 队列返回(实际上,只返回操作的详细信息,但这取决于一些技巧,稍后详述),通过以下调用:
GetQueuedCompletionStatus( hIocp, &dwBytesTransferred, ..., lpOverlapped, dwTimeout );
因此,此时我们知道对套接字执行的 I/O 操作已成功完成。考虑到这些事实,让我们继续实现将支持所有这些的几个类。
5. 客户端/服务器套接字
首先,让我们分析一下 ClientSocket
模板类(完整的源代码,请参阅附件源文件中的socket_client.h文件)
// A class wrapping basic client socket's operations
template<class T>
class ClientSocket {
private:
SOCKET m_ClientSock;
volatile unsigned int m_nSession;
QMutex m_qMutex;
struct sockaddr_in m_psForeignAddIn;
volatile bool m_blnIsBusy;
T m_objAttachment;
protected:
public:
ClientSocket(): m_objAttachment(), m_qMutex() {...}
~ClientSocket() {...}
// Is the object assigned a socket.
bool IsBusy() {...};
// Method is used by 'QueuedBlocks' from
// "mem_manager.h".
void Clear() {...}
// Returns the socket associated with the object.
SOCKET GetSocket() {...}
// Returns the session of the current client socket.
// Technically, this is the counter of how may times
// current instance was pooled, but really, this is used
// to check if session of operation (MYOVERLAPPED)
// is == with the session of the socket.
unsigned int GetSession() {...}
// Internally used by this implementation. Don't call it.
void Lock() {...}
// Internally used by this implementation. Don't call it.
void UnLock() {...}
// Associate the object with a socket.
int Associate(SOCKET sSocket, struct sockaddr_in *psForeignAddIn) {...}
// Returns the attachment of the object.
// Use the attachment to associate the socket with any other
// additional information required.
T* GetAttachment() {...}
// Write data to the socket,
// uses overlapped style.
int WriteToSocket(char *pBuffer, DWORD buffSize) {...}
// Read data from the socket,
// uses overlapped style.
int ReadFromSocket(char *pBuffer, DWORD buffSize) {...}
// Closes the socket associated with this instance
// of the object and performs clean-up.
void CloseSocket() {...}
};
关于这个类有几点需要提及:
- 它被设计为仅与
ServerSocket
模板类(更多详细信息见下文)一起使用。单独使用它不会很有用。 - 它封装了 Winsock API 的重叠 I/O 版本。
- 它是一个通用模板类,用于支持“附件”(参见
GetAttachment()
方法)。虽然ClientSocket
的基本功能非常标准,但附件允许扩展ClientSocket
类的功能(例如,保持套接字的当前状态,包含读/写数据缓冲区等)。用作附件的类/结构的唯一限制是:它应该有一个默认(无参数)构造函数和一个Clear()
方法(用于执行内部清理)。还有一些其他要求,但稍后会描述。 - 它被设计为允许
ServerSocket
重用已创建的实例;正如我们稍后将看到的,ServerSocket
包含一个QueuedBlocks<ClientSocket<T> >
(ClientSocket
必须实现Clear()
)。这就是为什么物理套接字处理程序没有传递给构造函数,而是与一个空闲 (IsBusy()
) 实例(通过Associate()
)关联的原因。
其次,让我们分析 ServerSocket
模板类(完整的源代码,请参阅附件源文件中的socket_servert.h文件)
// A class for handling basic server socket operations.
template<class T>
class ServerSocket {
private:
// Server socket.
SOCKET m_ServSock;
// Pool of client sockets.
QueuedBlocks<ClientSocket<T> > m_SockPool;
protected:
public:
// Set the size of the socket pool to the maximum number of accepted
// client connections.
ServerSocket(unsigned int nPort, unsigned int nMaxClients,
bool blnCreateAsync = false,
bool blnBindLocal = true): m_SockPool(nMaxClients) {...}
// Returns a pointer to the ClientSocket instance, bind
// to the accepted client socket (incoming connection).
ClientSocket<T>* Accept() {...}
// Release the client socket, will try closing connection and
// will place it back to the pool.
void Release(ClientSocket<T> *sock) {...}
vector<ClientSocket<T> *> *GetPool() {...}
~ServerSocket() {...}
};
关于 ServerSocket
模板类的一些细节:
- 它是一个通用模板类……仅仅因为它与通用
ClientSocket
协同工作(再次强调,重点在于“附件”)。 - 它使用固定数量的
ClientSocket
实例。这就是为什么Accept()
内部调用m_SockPool
的GetFromQueue()
方法(而不仅仅是Get()
)。这模拟了服务器支持最大数量的同时活动连接的事实。最大活动连接数由传递给构造函数的nMaxClients
参数决定。 Accept()
方法从m_SockPool
返回一个可用的ClientSocket
实例(或NULL
)。该实例与接受的传入连接的物理套接字句柄关联。Release()
方法用于关闭物理套接字句柄(如果尚未关闭)并将ClientSocket
实例推回m_SockPool
,以便稍后重用。GetPool()
方法封装了m_SockPool
的GetBlocks()
方法。它简单地返回池中注册的所有ClientSocket
实例(绝对所有,甚至那些未分配物理套接字处理程序的实例)。ServerService
在将池传递给CTimeOutChecker
时会调用此方法,但……稍后会详细介绍。- 该类的构造函数在内部执行所有必要的操作,以创建物理服务器套接字处理程序,将其绑定到指定端口(通过
nPort
传递给构造函数)和 IP 地址,并将套接字设置为侦听模式(请参阅代码中的详细信息)。如果其中一个操作失败,将抛出异常。
6. MYOVERLAPPED 结构
OVERLAPPED
(不是 MYOVERLAPPED
)是 Winsock API 和 IOCP 的重叠 I/O 版本的一个非常重要的结构。它被 IOCP 系统对象(不仅如此,但我将跳过不必要的细节)用于“跟踪”I/O 操作的状态。然而,OVERLAPPED
本身相当无用。这就是为什么需要一些更高级的东西,当然,带有一些小技巧(完整定义在socket_client.h中)。
// Overlapped structure used with this implementation of the IOCP.
// Only the "Overlapped" member is used in context with the
// standard IOCP API, anything else is just to help this particular
// implementation.
typedef struct {
// It is very important "Overlapped" to be the very first
// member of the structure because pointer pointing to
// "this" is == to the address of the first member.
// This makes usage of the MYOVERLAPPED equivalent
// to the OVERLAPPED in context of the standard IOCP API (just a trick).
OVERLAPPED Overlapped;
// A structure that points to the data packet and size of the data
// packet associated with current overlapped operation.
WSABUF DataBuf;
// Operation type associated with current overlapped operation.
IO_SOCK_MODES OperationType;
// Internally used. If nSession != ClientSocket::m_nSession
// that means that socket was closed (recently) and
// operation retrieved from IOCP queue or TaskPool queue
// is not relevant, to the current session of the socket.
volatile unsigned int nSession;
// Method is used by 'QueuedBlocks' from
// "mem_manager.h".
void Clear() {...}
} MYOVERLAPPED;
那么,技巧在哪里呢?技巧在于 Overlapped
成员是 MYOVERLAPPED
结构中的第一个成员。因此,MYOVERLAPPED
实例的内存地址将始终等于结构的第一个成员(在此例中为 Overlapped
)的地址。因此,传递 (WSASend()
/WSARecv()
) 或接收 (GetQueuedCompletionStatus()
) 指向 MYOVERLAPPED
结构的指针(当然,通过正确的类型转换)与我们使用原生 OVERLAPPED
是相同的。
这只是一个小技巧,有几个好处:
- 通过
MYOVERLAPPED
,我们总是可以通过OperationType
成员知道成功完成的操作类型(例如,读取或写入)。因此,我冒昧地将MYOVERLAPPED
的实例定义为“操作”(为简单起见)。 - 通过
DataBuf
成员,我们知道成功完成的操作数据缓冲区的详细信息(请参阅ClientSocket
的WriteToSocket()
和ReadFromSocket()
方法)。 - 嗯,
nSession
成员是本文描述的 IOCP 框架使用的另一个技巧。ClientSocket
有一个会话(参见上面的第 5 节),MYOVERLAPPED
也有一个会话。每次调用ServerSocket::Release()
时,ClientSocket
实例的会话都会增加(这意味着该实例已准备好重用)。当调用(例如)WriteToSocket()
或ReadFromSocket()
方法时,它们会在内部将ClientSocket
实例的当前会话设置为用于该操作的MYOVERLAPPED
实例的会话。当使用GetQueuedCompletionStatus()
接收完成的操作时,如果返回操作的会话与启动该操作的ClientSocket
实例的会话不同,那么该操作就不再相关,因为ClientSocket
实例已成功关闭,并且可能在操作(例如)卡在 IOCP 队列中时被重新使用。因此,在整个代码中,您会看到一些这样的检查。
并且,仅仅因为 MYOVERLAPPED
结构在不同类之间共享,以下(如上面第 3 节所述)很重要:
// a pool of the MYOVERLAPPED structures
typedef StaticBlocks<MYOVERLAPPED> Overlapped;
QueuedBlocks<MYOVERLAPPED> *Overlapped::blocks = NULL;
7. IOCPSimple 类
本文所述框架的关键模板类之一是 IOCPSimple
(完整源代码请参阅附件源文件中的iocp.h文件)。
// A template class (wrapper) to handle basic operations
// related to IO Completion Ports (IOCP).
template<class T>
class IOCPSimple {
private:
HANDLE m_hIocp;
DWORD m_dwTimeout;
protected:
// Useful method to queue a socket (handler in fact) in the IOCP's
// queue, without requesting any 'physical' I/O operations.
BOOL PostQueuedCompletionStatus( ClientSocket<T> *sock,
MYOVERLAPPED *pMov ) {...}
public:
~IOCPSimple() {:}
IOCPSimple( DWORD dwTimeout = 0 ) {...}
// Method associates Socket handler with the handler
// of the IOCP. It is important to do this association
// first of all, once Socket is accepted and is valid.
// In fact, this method registers socket with the IOCP,
// so IOCP will monitor every I/O operation happening
// with the socket.
void AssociateSocket( ClientSocket<T> *sock ) {...}
// Queue in the IOCP's queue with the status 'pending'.
// This is a fictive status (not real). Method is useful in
// cases when packets/data buffers to send via socket are yet
// not ready (e.g. due some internal, time consuming, calculations)
// and rather than keeping socket somewhere, in separate structures,
// place it in the queue with status pending. It will be returned
// shortly with the "GetQueuedCompletionStatus" call anyway.
BOOL SetPendingMode( ClientSocket<T> *sock ) {...}
// Queue in the IOCP's queue with the status 'close'.
// This is a fictive status (not real). Useful only
// when close socket cases should be treated/handled
// within the routine, handling I/O completion statuses,
// and just to not spread the code across different
// modules/classes.
BOOL SetCloseMode( ClientSocket<T> *sock,
MYOVERLAPPED *pMov = NULL ) {...}
// Queue in the IOCP's queue with the status 'accept'.
// This is a fictive status (not real). When a new
// client connection is accepted by the server socket,
// method is used to acknowledge the routine, handling
// I/O completion statuses, of this. Same reason as per
// 'close' status, just to have a unique piece of code
// handling all the possible statuses.
BOOL SetAcceptMode( ClientSocket<T> *sock ) {...}
// This method is just a wrapper. For more details,
// see MSDN for "GetQueuedCompletionStatus". The only
// difference is, method is adjusted to handle
// 'ClientSocket<T>' (as registered with 'AssociateSocket(...)')
// rather than pointer to the 'DWORD'.
BOOL GetQueuedCompletionStatus( LPDWORD pdwBytesTransferred,
ClientSocket<T> **sock, MYOVERLAPPED **lpOverlapped ) {...}
};
因此,该类实际上只是一个与 IOCP 相关的 API 函数的简单包装器,是的,它是一个通用模板类,因为 ClientSocket
(“附件”之类的东西)。与 IOCP 逻辑相关的大部分细节,我们已经在第 4 节(上面)中捕获;但是,这里还有一些额外的内容要写。
以下函数的第三个参数
CreateIoCompletionPort( ..., ..., Key, ... );
API 中预期是一个 DWORD
,名为 Key。当 I/O 操作完成时,此 Key 将返回到以下函数的第三个参数:
GetQueuedCompletionStatus( ..., ..., &Key, ..., ... );
Key 可以是任何适合 DWORD
的内容(当然,需要进行类型转换)。例如,这可以是一个指向对象的指针。如果我们将 ClientSocket
实例的地址(指针)用作 Key,对其执行 I/O 操作呢?没错,这将使我们不仅可以知道已完成的操作,还可以知道应用了 I/O 操作的 ClientSocket
实例。这是另一个技巧,这也是 IOCPSimple
的 AssociateSocket()
和 GetQueuedCompletionStatus()
方法所做的。
8. IOCPSimple 类的 SetPendingMode() 方法
我应该写一些关于 IOCPSimple
模板类的 SetPendingMode()
方法的内容。它同时是一个非常有用和危险的方法。考虑这样一个情况:您的应用程序正在分块从外部源读取数据并将其发送到 ClientSocket
的实例。如果外部源尚未准备好返回下一个块(出于任何原因),那么将不会有数据发送到 ClientSocket
,因此,IOCP 中也不会排队任何 I/O 操作(如第 4 节所述)。因此,ClientSocket
的状态未定义,并且很可能“丢失”这样的套接字。与其实现一个缓存存储来“托管此类套接字并在数据可用时稍后处理它们”,不如使用 SetPendingMode()
方法。此方法将把套接字推送到 IOCP 队列,很快,套接字将通过 ISockEvent
的 OnPending()
事件返回(更多详细信息见下一节)。很可能在那个时候,外部源的下一个数据块将准备好发送(如果不是,请再次使用 SetPendingMode()
)。
这非常有用;但是,请注意 SetPendingMode()
只是将套接字推送到 IOCP 队列,而实际上并没有注册任何 I/O 操作。很可能,以这种方式将套接字推送到 IOCP 队列时,套接字可能会被远程端关闭。不幸的是,IOCP 不会处理这种“意外”关闭,直到对套接字执行物理 I/O 操作。这种情况通过 CTimeOutChecker
(更多详细信息见下一节……所以,不要低估它)过滤得非常好,并且迟早(取决于传递的超时值),这样的套接字将通过 ISockEvent
的 OnClose()
事件“拒绝”,前提是超时值大于零(并且使用了 CTimeOutChecker
)。请务必注意。
9. CTimeOutChecker 类
这个模板类(由于“附件”原因而成为模板),定义在 server_service.h 文件中,其作用是检查每个活动连接的超时情况。当自上次 I/O 操作以来经过的时间大于传递给构造函数的 nTimeOutValue
值时,就会发生超时情况。
// A template class designed for verifying client sockets
// against time-out cases. Time-out case = if no I/O actions
// happen with a socket during a configured number of
// seconds. This implies that the attachment of the
// client socket must implement "GetTimeElapsed()"
// and "ResetTime(...)" methods.
template<class T>
class CTimeOutChecker: public IRunnable {
private:
unsigned int m_nTimeOutValue;
vector<ClientSocket<T> *> *m_arrSocketPool;
IOCPSimple<T> *m_hIocp;
protected:
// method checks sockets to detect time-out cases
virtual void run() {...}
public:
// Constructor of the template class. Requires:
// - a pointer to a collection of pointers to client sockets.
// This is the collection of sockets to be checked against
// time-out cases.
// - a pointer to the IO completion port object responsible
// for checking I/O events against passed collection of
// client sockets.
// - value of the time-out, in seconds.
CTimeOutChecker( vector<ClientSocket<T> *> *arrSocketPool,
IOCPSimple<T> *hIocp, unsigned int nTimeOutValue ) {...}
// Nothing to destruct as inside the template class
// we keep/use just pointers obtained from external
// sources.
~CTimeOutChecker() {};
};
该类由 ServerService
模板类(下文描述)使用,因此,其 run()
方法将由与 ServerService
关联的线程池 (CSimpleThreadPool
) 的线程执行。ServerService
还会负责将套接字池(通过 GetPool()
方法,来自关联的 ServerSocket
)传递给 CTimeOutChecker
(的实例),以便 CTimeOutChecker
知道要“检查”哪些套接字。
如果发生超时情况,则相关的 ClientSocket
实例将以“关闭”状态传递到 IOCP 队列。IOCP 迟早会从队列中返回它,并将其“传递”给 ServerService
。ServerService
将创建一个“事件任务”,并将其传递给 CSimpleThreadPool
。“事件任务”将由 CSimpleThreadPool
的线程执行,它将“触发”ISockEvent
(的实现)的 OnClose()
“事件”并关闭套接字。这就是它的工作原理。
此外,CTimeOutChecker
要求“附件”实现两个额外的方法:GetTimeElapsed()
和 ResetTime( bool )
。因此,从技术上讲,CTimeOutChecker
的行为由这两个方法的实现控制:
- 如果
GetTimeElapsed()
对于特定的ClientSocket
(的附件)返回零,则超时情况不适用于该ClientSocket
。这是一种禁用特定套接字超时检查的方法。 - 尽管
ResetTime( bool )
的实现可以是任意的(取决于开发者,但必须遵循签名),但实际要求是: - 调用
ResetTime( true )
(带true
!)必须强制任何后续的GetTimeElapsed()
调用返回零。这允许在某个点(需要时)禁用特定套接字的超时检查。例如,在编写 Web (HTTP) 服务器时,只有在等待来自ClientSocket
的完整 HTTP 请求时才启用超时检查才有意义。一旦收到完整的 HTTP 请求,禁用相关ClientSocket
的超时检查就有意义。 - 调用
ResetTime( false )
应该强制任何后续的GetTimeElapsed()
调用返回自上次调用ResetTime( false )
以来实际经过的时间(以秒为单位)。换句话说,它应该注册当前日期时间,并且GetTimeElapsed()
应该返回当前日期时间与注册日期时间之间的差值(以秒为单位)。开发者还应在ClientSocket::WriteToSocket()
或ClientSocket::ReadFromSocket()
成功完成时调用ResetTime( false )
(取决于正在开发的服务器的实现策略)。
CTimeOutChecker
有什么用?嗯,仅仅是为了关闭那些(在配置的时间内)“没有响应”的套接字,从而为其他传入连接腾出一些空间(注意 ServerSocker
类的 nMaxClients
参数!)。
但是,如果不需要此逻辑,有一种方法可以(完全)禁用 CTimeOutChecker
。只需将 ServerService
构造函数的 timeout
参数设置为零即可。
10. ISockEvent 和 ServerService 类
最后,让我们来看看框架的核心。如果您不记得上面描述的所有细节也没关系,因为它们都封装在 ServerService
模板类的逻辑中(原因仍然相同,“附件”之类的东西),尽管记住第 8 节和第 9 节会很棒。这两个类都定义在 server_service.h 文件中。让我们从 ServerService
开始:
// Well, finally, here is the template class joining all
// the stuff together. Considering the Aspect Oriented paradigm,
// this template class may be seen as an Aspect. The "individualizations"
// of this aspect are "ISockEvent<T>" and "T" itself. "T" is nothing
// else but attachment of the client socket (see ClientSocket<T> template
// class for more details). Implementing "ISockEvent<T>" and "T" will
// define the individual behaviour of this aspect.
// It is a composition of the IOCP, server socket, time-out checker
// and thread pool. Class implements the business logic that makes all
// these entities working together.
template<class T>
class ServerService: public IRunnable {
private:
ServerSocket<T> m_ServerSocket;
IOCPSimple<T> m_hIocp;
ISockEvent<T> *m_pSEvent;
CTimeOutChecker<T> *m_TChecker;
// thread pool which will execute the tasks
CSimpleThreadPool *m_ThPool;
// a pool of the CSockEventTask<T> objects
QueuedBlocks<CSockEventTask<T> > m_SockEventTaskPool;
protected:
// This method will be executed by a thread
// of the task pool.
virtual void run() {...}
public:
// Constructor or the class.
// pSEvent - pointer to an instance implementing
// ISockEvent<T>. This instance will be used
// as a client socket event handler.
// nPort - port number to bind server socket to.
// nMaxClients - the maximum number of accepted (concurrent)
// client connections. To be passed to
// the server socket and also will be used
// as the initial size for the pool of the
// CSockEventTask<T> objects.
// nNoThreads - indicative (the real one is computed,
// see below) number of the threads
// to be created by the thread pool.
// timeout - the value of the time-out, in seconds.
// Will be passed to the time-out checker.
// If time-out is zero, time-out checker
// will not be created.
// blnBindLocal - see ServerSocket<T> for more details.
// If "true" then server socket is bind
// to localhost ("127.0.0.1").
// If "false" then server socket is bind
// to INADDR_ANY ("0.0.0.0").
ServerService( ISockEvent<T> *pSEvent, unsigned int nPort,
unsigned int nMaxClients, unsigned int nNoThreads,
unsigned int timeout, bool blnBindLocal = true ):
m_ServerSocket( nPort, nMaxClients, true, blnBindLocal ),
m_hIocp( 200 ), m_SockEventTaskPool( nMaxClients )
{...}
virtual ~ServerService() {...}
// Start the threat pool (== all the
// threads in the pool).
void start() {...}
};
就是这样,没有更多的技巧。与代码相关的注释(以及代码本身)应该足以突出逻辑。但是,我将重点提供关于 ISockEvent
模板类(实际上是模板接口)的一个小细节,该类作为参数传递给 ServerService
的构造函数。
ISockEvent
是一个带有纯虚方法的模板接口:
// A template interface showing how the client socket event
// handler should look like.
template<class T>
class ISockEvent {
public:
// Client socket ("pSocket") is about to be closed.
virtual void OnClose( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was just accepted by
// the server socket (new connection with a client
// is created).
virtual void OnAccept( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from the IOCP
// queue with status _PENDING. For more details see
// "IOCPSimple<T>::SetPendingMode(...)". This method
// is invoked only if there was a call to
// "IOCPSimple<T>::SetPendingMode(...)", performed by a
// user code, internally "SetPendingMode(...)"
// is never called.
virtual void OnPending( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from IOCP
// queue with status _READ. This means that overlapped
// reading operation, started previously with
// "ClientSocket<T>::ReadFromSocket(...)", was
// successfully finished.
// - "pOverlap->DataBuf" structure points to the data
// buffer and buffer's size that were passed to the
// "ClientSocket<T>::ReadFromSocket(...)".
// - "dwBytesTransferred" will indicate how many
// bytes were read.
virtual void OnReadFinalized( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
DWORD dwBytesTransferred,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from IOCP
// queue with status _WRITE. This means that overlapped
// writting operation, started previously with
// "ClientSocket<T>::WriteToSocket(...)", was
// successfully finished.
// - "pOverlap->DataBuf" structure points to the data
// buffer and buffer's size that were passed to the
// "ClientSocket<T>::WriteToSocket(...)".
// - "dwBytesTransferred" will indicate how many
// bytes were written.
virtual void OnWriteFinalized( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
DWORD dwBytesTransferred,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
};
实现这个接口(当然还有“附件”)将定义服务器应用程序。真的,没有更多需要做的了!ISockEvent
包含了所有必要的方法(我们称之为事件)来跟踪和处理套接字的所有可能状态。让我们看看:
- 当
ServerSocket
接受新连接时,ServerSevice
将确保调用OnAccept()
事件。因此,如何处理此事件取决于开发人员。通常,通过此事件,您可能会根据待开发服务器的策略(服务器期望客户端发起“对话”反之亦然)发起pSocket->ReadFromSocket()
或pSocket->WriteToSocket()
。 - 如果一个套接字即将关闭(远程端关闭连接;
CTimeOutChecker
启动此操作,或者您在事件代码中调用pHIocp->SetCloseMode(pSocket)
),则会调用OnClose()
事件。使用此事件执行任何所需的清理。如果您在OnClose()
中不关闭或忘记关闭套接字(使用pServerSocket->Release( pSocket )
)也没关系,因为在此事件完成后,套接字无论如何都会关闭。 - 如果在事件代码中调用了
pHIocp->SetPendingMode(pSocket)
……是的,根据上面的第 8 节,请确保通过OnPending()
事件处理它。 - 如果在事件代码中(除了
OnClose()
之外),调用了pSocket->ReadFromSocket()
(并且返回值未指示错误),则一旦“读取”操作完成,将调用OnReadFinalized()
事件,但不要期望传递的缓冲区将完全填充到缓冲区的大小。在此期间(从ReadFromSocket()
到OnReadFinalized()
)避免使用传递的缓冲区。这是从 IOCP API 继承的要求,因此,我们在这里真的无能为力。 - 如果在事件代码中(除了
OnClose()
之外),调用了pSocket->WriteToSocket()
(并且返回值未指示错误),则一旦“写入”操作完成,将调用OnWriteFinalized()
事件。这将意味着整个缓冲区已发送。出于与上述相同的原因,在此期间(从WriteToSocket()
到OnWriteFinalized()
)避免使用传递的缓冲区。 - 最后,您不需要创建额外的线程,因为每个事件都是在与
ServerService
关联的CSimpleThreadPool
的一个线程中调用的。如果您需要更多线程,请通过ServerService
构造函数的nNoThreads
参数传递适当的值。是的,请确保每个事件实现迟早会终止。我的意思是,尽量避免事件中的无限循环;否则,停止ServerService
运行将很麻烦(尽管如果在循环中检查CThread::currentThread().isInterrupted()
仍然可能)。
最后一点(主要是)针对那些计划为多 CPU 平台编写服务器应用程序的人。如果环境有 N 个 CPU,那么为了更好的性能(如 MSDN 推荐),CSimpleThreadPool
的 N 个线程将精确执行 ServerService::run()
方法。请确保不要在事件主体内多次调用 pSocket->ReadFromSocket()
或 pSocket->WriteToSocket()
(尝试将应用程序设计为只成功(无错误)调用一次)。这并不是说这个框架无法处理多次调用的场景。问题是,虽然发送/接收操作的顺序保证遵循调用顺序(在 IOCP 队列中),但报告操作完成的顺序不保证遵循调用顺序,因为 GetQueuedCompletionStatus()
将由多个线程调用。但是,如果您不关心正确的顺序,那么请忽略此说明。
11. 一个简单的回显服务器实现
作为“概念验证”,让我们实现一个简单的回显服务器,它从套接字读取 7 个字符,一旦缓冲区填满,就将它们发回。完整的代码随源文件提供。
A. 首先,让我们定义服务器参数和缓冲区大小:
#define BUFF_SIZE 8
#define MAX_CONNECTIONS 10
#define NO_THREADS 4
#define TIME_OUT 10
#define PORT 8080
B. 现在,让我们定义“附件”:
struct Attachment {
volatile time_t tmLastActionTime;
char sString<BUFF_SIZE>;
DWORD dwStringSize; // current string's size
Attachment() { Clear(); };
bool Commit( DWORD dwBytesTransferred ) {
DWORD dwSize = dwStringSize + dwBytesTransferred;
if ( dwBytesTransferred <= 0 ) return false;
if ( dwSize >= BUFF_SIZE ) return false;
dwStringSize = dwSize;
sString[dwStringSize] = 0;
return true;
};
// as requested by the API of the framework
void Clear() { memset(this, 0, sizeof(Attachment) ); };
// as requested by the API of the framework
void ResetTime( bool toZero ) {
if (toZero) tmLastActionTime = 0;
else {
time_t lLastActionTime;
time(&lLastActionTime);
tmLastActionTime = lLastActionTime;
}
};
// as requested by the API of the framework
long GetTimeElapsed() {
time_t tmCurrentTime;
if (0 == tmLastActionTime) return 0;
time(&tmCurrentTime);
return (tmCurrentTime - tmLastActionTime);
};
};
C. 现在,我们需要将模板类实例化为实际类:
typedef ClientSocket<Attachment> MyCSocket;
typedef ServerSocket<Attachment> MySSocket;
typedef IOCPSimple<Attachment> MyIOCPSimple;
typedef ISockEvent<Attachment> MyISockEvent;
typedef ServerService<Attachment> MyServerService;
D. 现在,我们实现 ISockEvent<Attachment>
:
class MyISockEventHandler: public MyISockEvent {
public:
MyISockEventHandler() {};
~MyISockEventHandler() {};
// empty method, not used
virtual void OnClose( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};
// empty method, not used
virtual void OnPending( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};
virtual void OnAccept( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
int nRet;
DWORD dwSize;
char *temp;
dwSize = BUFF_SIZE - 1;
temp = pSocket->GetAttachment()->sString;
// initiate the reading with OnAccept
nRet = pSocket->ReadFromSocket( temp, dwSize );
pSocket->GetAttachment()->ResetTime( false );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
};
virtual void OnReadFinalized( MyCSocket *pSocket,
MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
int nRet;
DWORD dwSize, dwPos;
char *temp;
// finalize the filling of the buffer
pSocket->GetAttachment()->Commit( dwBytesTransferred );
dwSize = BUFF_SIZE - 1;
dwPos = pSocket->GetAttachment()->dwStringSize;
temp = pSocket->GetAttachment()->sString;
nRet = pSocket->ReadFromSocket( temp + dwPos, dwSize - dwPos );
pSocket->GetAttachment()->ResetTime( false );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
else if ( nRet == RECV_BUFFER_EMPTY ) {
// means that dwSize - dwPos == 0, so send the data
// back to the socket
nRet = pSocket->WriteToSocket( temp, dwSize );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
}
};
virtual void OnWriteFinalized( MyCSocket *pSocket,
MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
// clean the attachment
pSocket->GetAttachment()->Clear();
// and once again
OnAccept(pSocket, NULL,pServerSocket, NULL);
};
};
E. 最后:
int main(int argc, char* argv[])
{
int nRet;
MyServerService *sService;
MyISockEventHandler *mSockHndl;
WSAData wsData;
nRet = WSAStartup(MAKEWORD(2,2),&wsData);
if ( nRet < 0 ) {
Log::LogMessage(L"Can't load winsock.dll.\n");
return -1;
}
try {
Overlapped::Init( MAX_CONNECTIONS );
mSockHndl = new MyISockEventHandler();
sService = new MyServerService((MyISockEvent *) mSockHndl,
PORT, MAX_CONNECTIONS, NO_THREADS, TIME_OUT, false);
sService->start();
printf("hit <enter> to stop ...\n");
while( !_kbhit() ) ::Sleep(100);
delete sService;
delete mSockHndl;
}
catch (const char *err) {
printf("%s\n", err);
}
catch (const wchar_t *err) {
wprintf(L"%ls\n", err);
}
WSACleanup();
return 0;
}
12. Microsoft VC++ 6.0 和在多处理器环境中使用 STL
您可能已经注意到,在代码中,我滥用了不同的 STL 模板类,如 vector
、set
、queue
等。当使用“默认”编译选项编译应用程序并在单 CPU 环境中运行时,它们运行良好。但是,该代码在多 CPU 环境中将无法正常工作。为了解决这个问题(正如 Microsoft 在此链接中建议的那样),必须执行以下操作:
- 打开项目。
- 在项目菜单上,单击设置。
- 在配置列表中,单击发布。
- 单击C/C++选项卡,然后在类别列表中单击代码生成。
- 在运行时库列表中,单击多线程 (/MT)。
- 在配置列表中,单击调试。
- 在运行时库列表中,单击多线程调试 (/MTd)。
- 如果在配置列表中还有其他配置,也请为它们设置适当的运行时库选项。
- 单击确定,然后重新生成项目。
13. 结论
嗯,现在看起来很简单,不是吗?在下一篇文章中,我将描述更多类的实现,包括专门定制的 Attachment
和 ISockEvent<Attachment>
类,以处理 MP3 流媒体。那将是本系列的最后一篇文章,很快再见……