65.9K
CodeProject 正在变化。 阅读更多。
Home

可扩展服务器与 IO 完成端口以及如何烹饪它们

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.88/5 (59投票s)

2007 年 9 月 19 日

CPOL

23分钟阅读

viewsIcon

634856

downloadIcon

4497

开发服务器应用程序的理论与实践。

1. 引言

我承诺撰写这篇文章已经很久了。终于,它准备好了。因此,本文将是 MP3 流媒体服务器项目(始于《声音录制和 MP3 格式编码》一文)的延续,并将特别致力于一个非常有趣的主题:使用 IO 完成端口(以下简称 IOCP)编写可扩展服务器应用程序。我还将描述一个处理所有相关复杂性的框架。

从技术上讲,整个主题可以分为三个子主题:线程管理、内存管理和客户端套接字处理机制。关于后者,我将重点关注 TCP 套接字(UDP 套接字也可以实现类似的功能)。让我们继续详细介绍……

2. 线程处理

通常,在编写服务器应用程序时,线程模型的使用可以分为两大类:

  • 易于实现,一个线程服务一个连接/套接字
  • 稍微复杂一些,使用线程池(也可以是:固定大小的线程池、基于超时的线程池、从最小值到最大值的可变大小线程池等)

虽然第一种模型通常是最容易实现的,但在服务大量连接时,它并不是最好的选择。这种模型有两个强有力的论据:

  • 上下文切换,这是在两个运行进程或同一或不同进程的线程之间切换的过程。它包括以这样一种方式保存和恢复运行线程/进程的上下文,仿佛每个线程/进程都在连续运行,而实际上它们可能共享同一个 CPU。显然,上下文切换需要时间,这个时间取决于操作系统实现。例如,在 Windows 操作系统中,它需要纳秒,而在 Linux 的一些旧版本/发行版(例如,Mandrake 版本 <= 7.2)中,它需要大约 2 毫秒。因此,2 毫秒乘以(例如)1000 个线程 = 2 秒才能将上下文返回到线程,如果只有 1000 个创建的线程,并且所有线程具有相同的优先级。这是不可接受的,显然需要不同的模型。
  • 通常,在一个线程服务一个连接模型中,线程大部分时间什么都不做。更详细地说,它们在对(例如)套接字执行阻塞 I/O 操作时处于冻结状态。因此,创建大量线程无异于浪费资源。

本文描述的 IOCP 框架使用了线程框架,特别是《几乎像 Java 线程一样》一文中所述的 CSimpleThreadPool 类。尽管该线程框架没有什么特别之处,但我将提供一些我使用它的原因:

  • 它是本项目的一部分。
  • 它非常轻巧,但仍然非常有用且易于使用。
  • CSimpleThreadPool 是一个固定大小的线程池。预先创建的线程数量有助于减少创建新线程所需的运行时时间,因为它们已经创建。请注意,创建新线程需要时间(例如,系统必须创建新的系统对象(线程本身),必须创建线程的堆栈等)。因此,这对性能有好处。
  • 最重要的原因是,CSimpleThreadPool 是一个具有优先级队列实现的线程池。预测下面描述的细节,IOCP 队列不对完成请求进行优先级排序;所有完成请求都具有相同的优先级。因此,要编写(并运行)一个简单的应用程序,它只是创建数千个连接并立即关闭所有连接,然后再次创建和关闭,如此循环……IOCP 将在队列中注册大量关闭请求,实际上,服务器将忙于处理关闭请求。这是一个拒绝服务攻击的示例。很遗憾 IOCP 没有使用优先级队列来对完成请求进行优先级排序,但我认为不使用这种队列存在技术原因。但是,从 IOCP 队列接收到的完成请求(都具有相同的优先级)会传递到 CSimpleThreadPool 队列中,并在那里进行优先级排序,因此,关闭连接请求的优先级将低于其他完成请求。这只是本文中描述的 IOCP 框架防御此类攻击的一种策略。

3. 内存管理

与编写服务器应用程序相关的另一个问题是正确的内存管理。虽然像 newdelete(或对应的 malloc()/free(),或任何其他低级内存 Windows SDK API)这样非常熟悉的运算符使用起来非常简单,但当谈到需要一年 365 天、每周 7 天、每天 24 小时运行的应用程序时,它们就存在一个问题。

问题就是所谓的内存碎片。这些运算符(API)旨在分配连续的内存块,如果应用程序频繁地以不同块大小执行 new/delete,它最终会遇到 new 返回 NULL(甚至可能是持续返回)的情况,尽管应用程序在堆中还有足够的空闲空间。这可能发生的原因是……没错……没有所需大小的连续空闲空间,换句话说:我们遇到了内存碎片,服务器迟早会崩溃。

有几种解决内存碎片问题的方法:

  • 实现垃圾回收,同时执行碎片整理。
  • 自定义内存管理,旨在避免碎片化,并可能重写 new/delete,只是为了使最终解决方案更加优雅。
  • 使用 new 运算符 (malloc()) 分配固定大小的块。好吧,内存仍然容易碎片化,但“孔洞”也是固定大小的(或者说是固定大小的倍数)。这是一种非常容易实现的机制。但是,如果实际内存需求从(例如)1KB 到 1MB 不等,那么即使只需要 1KB,也分配 1MB 的固定大小块无异于浪费内存。
  • 预分配内存区域。在可以估计服务单个连接所需内存的情况下,将该值乘以允许的最大连接数(这可以是一个配置参数)。用得到的值,分配一个巨大块的该大小,并在该区域内“工作”。另一种非常易于使用的机制,当……这种估计可以执行时,效果很好。
  • 重新使用已分配的块。这个想法非常简单;一旦分配了一个块,在不再需要时不要删除它,只需将其标记为未使用,然后……下次再使用它。这可能被视为内存泄漏,而实际上并非如此;这只是一种处理内存碎片的策略。

下面是一个模板类(完整源代码请参阅附件源文件中的mem_manager.h文件),它处理了上面描述的“预分配内存区域”和“重新使用已分配块”(合二为一)机制

// A template class allowing re-using of the memory blocks.

template<class T>
class QueuedBlocks {
private:
    QMutex       m_qMutex;
    set< T* >    m_quBlocks;
    vector< T* > m_allBlocks;

public:
    QueuedBlocks(int nInitSize = 1): 
        m_qMutex(), m_quBlocks(), m_allBlocks() {...};

    // get a free block from the queue, if one cannot be found
    // then NULL is returned

    T* GetFromQueue() {...};

    // get a free block from the queue, if one cannot be found
    // then a new one is created

    T* Get() {...};

    // Release the used block, place it
    // back to the queue. For performance reason,
    // we assume that the block was previously taken
    // from the queue.

    void Release(T* t) {...};

    // Return all the blocks ever allocated.

    vector<T*> *GetBlocks() {...};

    ~QueuedBlocks() {...};
};

因此,通过构造函数,我们预分配了所需数量的对象,并且

  • 如果只需要“预分配内存区域”机制,则仅使用 GetFromQueue()Release() 方法,这样它将在“分配空间内”工作。
  • 如果需要“重新使用已分配块”(与“预分配内存区域”一起),则使用 Get()Release() 方法,这样如果预分配空间不足,它将得到扩展。

由于这是一个通用模板,它允许定义不同类/结构的排队块。这种实现唯一的问题是,每个特定类都必须有一个默认(或无参数)构造函数和一个 Clear() 方法。Clear() 的目标是在块即将被重用时,在最后执行内部清理。

另一个有用的模板类是(也在mem_manager.h中定义):

// A template class used for providing free blocks as well 
// as releasing unnecessary ones. Uses "QueuedBlocks" which 
// allows reusing of the blocks.

template<class T>
class StaticBlocks {
private:
    static QueuedBlocks<T> *blocks;

public:
    static void Init(int nSize = 1) {...};
    static T *Get() {...};
    static void Release(T *b) {...};
};

它表示 QueuedBlocks 的适配器,但所有方法都定义为 static,这在需要跨不同类共享块时非常有用。

4. 关于 IOCP 的一些事实

如果您正在阅读本文,您可能知道使用 IOCP 的好处。如果您不知道,那么……IOCP 是编写可扩展服务器应用程序的最佳机制,能够以少量线程处理数千个客户端连接。这是如何实现的?通过使用一个名为“I/O 完成端口”的系统对象,它“允许应用程序接收异步 I/O 操作完成的通知”。因此,每个 I/O 操作都是异步的(立即完成),应用程序(或其线程)会收到 I/O 操作完成的通知,因此,应用程序(或应用程序的线程)有“足够的时间”执行任何其他操作(而不是像“一个线程服务一个连接/套接字”模型中通常发生的那样等待 I/O 完成)。让我们数一数我们知道的关于 IOCP 的事实。

首先,为了让套接字受益于 IOCP,必须使用 Winsock API 的重叠 I/O 版本。

WSASend(...);
WSARecv(...);

其次,套接字必须配置为支持重叠 API。

nSock = WSASocket(..., ..., ..., ..., ..., WSA_FLAG_OVERLAPPED);

我们还需要创建 IOCP 句柄。

hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );

我们需要将套接字与 IOCP 关联。

CreateIoCompletionPort( (HANDLE) nSock, hIocp, ..., ... );

现在,当对套接字使用重叠 I/O 操作时,它们将被注册到 IOCP 的队列中。再次强调,此注册仅在执行 I/O 操作时发生(!!!);否则,即使套接字已与 IOCP 关联,此注册也不会执行。重叠 I/O 操作完成后,它将从 IOCP 队列返回(实际上,只返回操作的详细信息,但这取决于一些技巧,稍后详述),通过以下调用:

GetQueuedCompletionStatus( hIocp, &dwBytesTransferred, ..., lpOverlapped, dwTimeout );

因此,此时我们知道对套接字执行的 I/O 操作已成功完成。考虑到这些事实,让我们继续实现将支持所有这些的几个类。

5. 客户端/服务器套接字

首先,让我们分析一下 ClientSocket 模板类(完整的源代码,请参阅附件源文件中的socket_client.h文件)

// A class wrapping basic client socket's operations

template<class T>
class ClientSocket {
private:
    SOCKET                 m_ClientSock;
    volatile unsigned int  m_nSession;
    QMutex                 m_qMutex;
    struct sockaddr_in     m_psForeignAddIn;
    volatile bool          m_blnIsBusy;
    T                      m_objAttachment;

protected:
public:
    ClientSocket(): m_objAttachment(), m_qMutex() {...}
    ~ClientSocket() {...}

    // Is the object assigned a socket.

    bool IsBusy() {...};

    // Method is used by 'QueuedBlocks' from
    // "mem_manager.h".

    void Clear() {...}
    
    // Returns the socket associated with the object.

    SOCKET GetSocket() {...}

    // Returns the session of the current client socket.
    // Technically, this is the counter of how may times
    // current instance was pooled, but really, this is used
    // to check if session of operation (MYOVERLAPPED) 
    // is == with the session of the socket.

    unsigned int GetSession() {...}

    // Internally used by this implementation. Don't call it.

    void Lock() {...}

    // Internally used by this implementation. Don't call it.

    void UnLock() {...}

    // Associate the object with a socket.

    int Associate(SOCKET sSocket, struct sockaddr_in *psForeignAddIn) {...}

    // Returns the attachment of the object.
    // Use the attachment to associate the socket with any other
    // additional information required.

    T* GetAttachment() {...}

    // Write data to the socket,
    // uses overlapped style.

    int WriteToSocket(char *pBuffer, DWORD buffSize) {...}

    // Read data from the socket,
    // uses overlapped style.

    int ReadFromSocket(char *pBuffer, DWORD buffSize) {...}

    // Closes the socket associated with this instance 
    // of the object and performs clean-up.

    void CloseSocket() {...}
};

关于这个类有几点需要提及:

  • 它被设计为仅与 ServerSocket 模板类(更多详细信息见下文)一起使用。单独使用它不会很有用。
  • 它封装了 Winsock API 的重叠 I/O 版本。
  • 它是一个通用模板类,用于支持“附件”(参见 GetAttachment() 方法)。虽然 ClientSocket 的基本功能非常标准,但附件允许扩展 ClientSocket 类的功能(例如,保持套接字的当前状态,包含读/写数据缓冲区等)。用作附件的类/结构的唯一限制是:它应该有一个默认(无参数)构造函数和一个 Clear() 方法(用于执行内部清理)。还有一些其他要求,但稍后会描述。
  • 它被设计为允许 ServerSocket 重用已创建的实例;正如我们稍后将看到的,ServerSocket 包含一个 QueuedBlocks<ClientSocket<T> >ClientSocket 必须实现 Clear())。这就是为什么物理套接字处理程序没有传递给构造函数,而是与一个空闲 (IsBusy()) 实例(通过 Associate())关联的原因。

其次,让我们分析 ServerSocket 模板类(完整的源代码,请参阅附件源文件中的socket_servert.h文件)

// A class for handling basic server socket operations.

template<class T>
class ServerSocket {
private:
    // Server socket.

    SOCKET    m_ServSock;

    // Pool of client sockets.

    QueuedBlocks<ClientSocket<T> > m_SockPool;

protected:
public:
    // Set the size of the socket pool to the maximum number of accepted
    // client connections.

    ServerSocket(unsigned int nPort, unsigned int nMaxClients,
              bool blnCreateAsync = false, 
              bool blnBindLocal = true): m_SockPool(nMaxClients) {...}
    
    // Returns a pointer to the ClientSocket instance, bind 
    // to the accepted client socket (incoming connection).

    ClientSocket<T>* Accept() {...}
    
    // Release the client socket, will try closing connection and 
    // will place it back to the pool.

    void Release(ClientSocket<T> *sock) {...}

    vector<ClientSocket<T> *> *GetPool() {...}

    ~ServerSocket() {...}
};

关于 ServerSocket 模板类的一些细节:

  • 它是一个通用模板类……仅仅因为它与通用 ClientSocket 协同工作(再次强调,重点在于“附件”)。
  • 它使用固定数量的 ClientSocket 实例。这就是为什么 Accept() 内部调用 m_SockPoolGetFromQueue() 方法(而不仅仅是 Get())。这模拟了服务器支持最大数量的同时活动连接的事实。最大活动连接数由传递给构造函数的 nMaxClients 参数决定。
  • Accept() 方法从 m_SockPool 返回一个可用的 ClientSocket 实例(或 NULL)。该实例与接受的传入连接的物理套接字句柄关联。Release() 方法用于关闭物理套接字句柄(如果尚未关闭)并将 ClientSocket 实例推回 m_SockPool,以便稍后重用。
  • GetPool() 方法封装了 m_SockPoolGetBlocks() 方法。它简单地返回池中注册的所有 ClientSocket 实例(绝对所有,甚至那些未分配物理套接字处理程序的实例)。ServerService 在将池传递给 CTimeOutChecker 时会调用此方法,但……稍后会详细介绍。
  • 该类的构造函数在内部执行所有必要的操作,以创建物理服务器套接字处理程序,将其绑定到指定端口(通过 nPort 传递给构造函数)和 IP 地址,并将套接字设置为侦听模式(请参阅代码中的详细信息)。如果其中一个操作失败,将抛出异常。

6. MYOVERLAPPED 结构

OVERLAPPED(不是 MYOVERLAPPED)是 Winsock API 和 IOCP 的重叠 I/O 版本的一个非常重要的结构。它被 IOCP 系统对象(不仅如此,但我将跳过不必要的细节)用于“跟踪”I/O 操作的状态。然而,OVERLAPPED 本身相当无用。这就是为什么需要一些更高级的东西,当然,带有一些小技巧(完整定义在socket_client.h中)。

// Overlapped structure used with this implementation of the IOCP.
// Only the "Overlapped" member is used in context with the
// standard IOCP API, anything else is just to help this particular
// implementation.

typedef struct {
    // It is very important "Overlapped" to be the very first 
    // member of the structure because pointer pointing to 
    // "this" is == to the address of the first member. 
    // This makes usage of the MYOVERLAPPED equivalent 
    // to the OVERLAPPED in context of the standard IOCP API (just a trick).

    OVERLAPPED     Overlapped;

    // A structure that points to the data packet and size of the data 
    // packet associated with current overlapped operation.

    WSABUF         DataBuf;

    // Operation type associated with current overlapped operation.

    IO_SOCK_MODES  OperationType;

    // Internally used. If nSession != ClientSocket::m_nSession
    // that means that socket was closed (recently) and
    // operation retrieved from IOCP queue or TaskPool queue 
    // is not relevant, to the current session of the socket.

    volatile unsigned int    nSession;

    // Method is used by 'QueuedBlocks' from
    // "mem_manager.h".

    void Clear() {...}
} MYOVERLAPPED;

那么,技巧在哪里呢?技巧在于 Overlapped 成员是 MYOVERLAPPED 结构中的第一个成员。因此,MYOVERLAPPED 实例的内存地址将始终等于结构的第一个成员(在此例中为 Overlapped)的地址。因此,传递 (WSASend()/WSARecv()) 或接收 (GetQueuedCompletionStatus()) 指向 MYOVERLAPPED 结构的指针(当然,通过正确的类型转换)与我们使用原生 OVERLAPPED 是相同的。

这只是一个小技巧,有几个好处:

  • 通过 MYOVERLAPPED,我们总是可以通过 OperationType 成员知道成功完成的操作类型(例如,读取或写入)。因此,我冒昧地将 MYOVERLAPPED 的实例定义为“操作”(为简单起见)。
  • 通过 DataBuf 成员,我们知道成功完成的操作数据缓冲区的详细信息(请参阅 ClientSocketWriteToSocket()ReadFromSocket() 方法)。
  • 嗯,nSession 成员是本文描述的 IOCP 框架使用的另一个技巧。ClientSocket 有一个会话(参见上面的第 5 节),MYOVERLAPPED 也有一个会话。每次调用 ServerSocket::Release() 时,ClientSocket 实例的会话都会增加(这意味着该实例已准备好重用)。当调用(例如)WriteToSocket()ReadFromSocket() 方法时,它们会在内部将 ClientSocket 实例的当前会话设置为用于该操作的 MYOVERLAPPED 实例的会话。当使用 GetQueuedCompletionStatus() 接收完成的操作时,如果返回操作的会话与启动该操作的 ClientSocket 实例的会话不同,那么该操作就不再相关,因为 ClientSocket 实例已成功关闭,并且可能在操作(例如)卡在 IOCP 队列中时被重新使用。因此,在整个代码中,您会看到一些这样的检查。

并且,仅仅因为 MYOVERLAPPED 结构在不同类之间共享,以下(如上面第 3 节所述)很重要:

// a pool of the MYOVERLAPPED structures

typedef StaticBlocks<MYOVERLAPPED> Overlapped;
QueuedBlocks<MYOVERLAPPED> *Overlapped::blocks = NULL;

7. IOCPSimple

本文所述框架的关键模板类之一是 IOCPSimple(完整源代码请参阅附件源文件中的iocp.h文件)。

// A template class (wrapper) to handle basic operations 
// related to IO Completion Ports (IOCP).

template<class T>
class IOCPSimple {
private:
    HANDLE  m_hIocp;
    DWORD   m_dwTimeout;

protected:
    // Useful method to queue a socket (handler in fact) in the IOCP's
    // queue, without requesting any 'physical' I/O operations. 

    BOOL PostQueuedCompletionStatus( ClientSocket<T> *sock, 
             MYOVERLAPPED *pMov ) {...}

public:
    ~IOCPSimple() {:}
    IOCPSimple( DWORD dwTimeout = 0 ) {...}

    // Method associates Socket handler with the handler
    // of the IOCP. It is important to do this association
    // first of all, once Socket is accepted and is valid.
    // In fact, this method registers socket with the IOCP, 
    // so IOCP will monitor every I/O operation happening 
    // with the socket.

    void AssociateSocket( ClientSocket<T> *sock ) {...}

    // Queue in the IOCP's queue with the status 'pending'.
    // This is a fictive status (not real). Method is useful in
    // cases when packets/data buffers to send via socket are yet
    // not ready (e.g. due some internal, time consuming, calculations) 
    // and rather than keeping socket somewhere, in separate structures, 
    // place it in the queue with status pending. It will be returned
    // shortly with the "GetQueuedCompletionStatus" call anyway.

    BOOL SetPendingMode( ClientSocket<T> *sock ) {...}

    // Queue in the IOCP's queue with the status 'close'.
    // This is a fictive status (not real). Useful only 
    // when close socket cases should be treated/handled 
    // within the routine, handling I/O completion statuses, 
    // and just to not spread the code across different 
    // modules/classes.

    BOOL SetCloseMode( ClientSocket<T> *sock, 
             MYOVERLAPPED *pMov = NULL ) {...}

    // Queue in the IOCP's queue with the status 'accept'.
    // This is a fictive status (not real). When a new
    // client connection is accepted by the server socket,
    // method is used to acknowledge the routine, handling 
    // I/O completion statuses, of this. Same reason as per 
    // 'close' status, just to have a unique piece of code 
    // handling all the possible statuses.

    BOOL SetAcceptMode( ClientSocket<T> *sock ) {...}

    // This method is just a wrapper. For more details, 
    // see MSDN for "GetQueuedCompletionStatus". The only
    // difference is, method is adjusted to handle 
    // 'ClientSocket<T>' (as registered with 'AssociateSocket(...)')
    // rather than pointer to the 'DWORD'.

    BOOL GetQueuedCompletionStatus( LPDWORD pdwBytesTransferred,
        ClientSocket<T> **sock, MYOVERLAPPED **lpOverlapped ) {...}
};

因此,该类实际上只是一个与 IOCP 相关的 API 函数的简单包装器,是的,它是一个通用模板类,因为 ClientSocket(“附件”之类的东西)。与 IOCP 逻辑相关的大部分细节,我们已经在第 4 节(上面)中捕获;但是,这里还有一些额外的内容要写。

以下函数的第三个参数

CreateIoCompletionPort( ..., ..., Key, ... );

API 中预期是一个 DWORD,名为 Key。当 I/O 操作完成时,此 Key 将返回到以下函数的第三个参数:

GetQueuedCompletionStatus( ..., ..., &Key, ..., ... );

Key 可以是任何适合 DWORD 的内容(当然,需要进行类型转换)。例如,这可以是一个指向对象的指针。如果我们将 ClientSocket 实例的地址(指针)用作 Key,对其执行 I/O 操作呢?没错,这将使我们不仅可以知道已完成的操作,还可以知道应用了 I/O 操作的 ClientSocket 实例。这是另一个技巧,这也是 IOCPSimpleAssociateSocket()GetQueuedCompletionStatus() 方法所做的。

8. IOCPSimple 类的 SetPendingMode() 方法

我应该写一些关于 IOCPSimple 模板类的 SetPendingMode() 方法的内容。它同时是一个非常有用和危险的方法。考虑这样一个情况:您的应用程序正在分块从外部源读取数据并将其发送到 ClientSocket 的实例。如果外部源尚未准备好返回下一个块(出于任何原因),那么将不会有数据发送到 ClientSocket,因此,IOCP 中也不会排队任何 I/O 操作(如第 4 节所述)。因此,ClientSocket 的状态未定义,并且很可能“丢失”这样的套接字。与其实现一个缓存存储来“托管此类套接字并在数据可用时稍后处理它们”,不如使用 SetPendingMode() 方法。此方法将把套接字推送到 IOCP 队列,很快,套接字将通过 ISockEventOnPending() 事件返回(更多详细信息见下一节)。很可能在那个时候,外部源的下一个数据块将准备好发送(如果不是,请再次使用 SetPendingMode())。

这非常有用;但是,请注意 SetPendingMode() 只是将套接字推送到 IOCP 队列,而实际上并没有注册任何 I/O 操作。很可能,以这种方式将套接字推送到 IOCP 队列时,套接字可能会被远程端关闭。不幸的是,IOCP 不会处理这种“意外”关闭,直到对套接字执行物理 I/O 操作。这种情况通过 CTimeOutChecker(更多详细信息见下一节……所以,不要低估它)过滤得非常好,并且迟早(取决于传递的超时值),这样的套接字将通过 ISockEventOnClose() 事件“拒绝”,前提是超时值大于零(并且使用了 CTimeOutChecker)。请务必注意。

9. CTimeOutChecker

这个模板类(由于“附件”原因而成为模板),定义在 server_service.h 文件中,其作用是检查每个活动连接的超时情况。当自上次 I/O 操作以来经过的时间大于传递给构造函数的 nTimeOutValue 值时,就会发生超时情况。

// A template class designed for verifying client sockets
// against time-out cases. Time-out case = if no I/O actions
// happen with a socket during a configured number of
// seconds. This implies that the attachment of the
// client socket must implement "GetTimeElapsed()"
// and "ResetTime(...)" methods.

template<class T>
class CTimeOutChecker: public IRunnable {
private:
    unsigned int               m_nTimeOutValue;
    vector<ClientSocket<T> *>  *m_arrSocketPool;
    IOCPSimple<T>              *m_hIocp;

protected:
    // method checks sockets to detect time-out cases

    virtual void run() {...}

public:
    // Constructor of the template class. Requires:
    // - a pointer to a collection of pointers to client sockets.
    //   This is the collection of sockets to be checked against
    //   time-out cases.

    // - a pointer to the IO completion port object responsible
    //   for checking I/O events against passed collection of 
    //   client sockets.
    // - value of the time-out, in seconds.

    CTimeOutChecker( vector<ClientSocket<T> *> *arrSocketPool, 
                     IOCPSimple<T> *hIocp, unsigned int nTimeOutValue ) {...}

    // Nothing to destruct as inside the template class

    // we keep/use just pointers obtained from external

    // sources.

    ~CTimeOutChecker() {};
};

该类由 ServerService 模板类(下文描述)使用,因此,其 run() 方法将由与 ServerService 关联的线程池 (CSimpleThreadPool) 的线程执行。ServerService 还会负责将套接字池(通过 GetPool() 方法,来自关联的 ServerSocket)传递给 CTimeOutChecker(的实例),以便 CTimeOutChecker 知道要“检查”哪些套接字。

如果发生超时情况,则相关的 ClientSocket 实例将以“关闭”状态传递到 IOCP 队列。IOCP 迟早会从队列中返回它,并将其“传递”给 ServerServiceServerService 将创建一个“事件任务”,并将其传递给 CSimpleThreadPool。“事件任务”将由 CSimpleThreadPool 的线程执行,它将“触发”ISockEvent(的实现)的 OnClose()“事件”并关闭套接字。这就是它的工作原理。

此外,CTimeOutChecker 要求“附件”实现两个额外的方法:GetTimeElapsed()ResetTime( bool )。因此,从技术上讲,CTimeOutChecker 的行为由这两个方法的实现控制:

  • 如果 GetTimeElapsed() 对于特定的 ClientSocket(的附件)返回零,则超时情况不适用于该 ClientSocket。这是一种禁用特定套接字超时检查的方法。
  • 尽管 ResetTime( bool ) 的实现可以是任意的(取决于开发者,但必须遵循签名),但实际要求是:
    • 调用 ResetTime( true )(带 true!)必须强制任何后续的 GetTimeElapsed() 调用返回零。这允许在某个点(需要时)禁用特定套接字的超时检查。例如,在编写 Web (HTTP) 服务器时,只有在等待来自 ClientSocket 的完整 HTTP 请求时才启用超时检查才有意义。一旦收到完整的 HTTP 请求,禁用相关 ClientSocket 的超时检查就有意义。
    • 调用 ResetTime( false ) 应该强制任何后续的 GetTimeElapsed() 调用返回自上次调用 ResetTime( false ) 以来实际经过的时间(以秒为单位)。换句话说,它应该注册当前日期时间,并且 GetTimeElapsed() 应该返回当前日期时间与注册日期时间之间的差值(以秒为单位)。开发者还应在 ClientSocket::WriteToSocket()ClientSocket::ReadFromSocket() 成功完成时调用 ResetTime( false )(取决于正在开发的服务器的实现策略)。

CTimeOutChecker 有什么用?嗯,仅仅是为了关闭那些(在配置的时间内)“没有响应”的套接字,从而为其他传入连接腾出一些空间(注意 ServerSocker 类的 nMaxClients 参数!)。

但是,如果不需要此逻辑,有一种方法可以(完全)禁用 CTimeOutChecker。只需将 ServerService 构造函数的 timeout 参数设置为零即可。

10. ISockEventServerService

最后,让我们来看看框架的核心。如果您不记得上面描述的所有细节也没关系,因为它们都封装在 ServerService 模板类的逻辑中(原因仍然相同,“附件”之类的东西),尽管记住第 8 节和第 9 节会很棒。这两个类都定义在 server_service.h 文件中。让我们从 ServerService 开始:

// Well, finally, here is the template class joining all
// the stuff together. Considering the Aspect Oriented paradigm, 
// this template class may be seen as an Aspect. The "individualizations"
// of this aspect are "ISockEvent<T>" and "T" itself. "T" is nothing
// else but attachment of the client socket (see ClientSocket<T> template
// class for more details). Implementing "ISockEvent<T>" and "T" will
// define the individual behaviour of this aspect.
// It is a composition of the IOCP, server socket, time-out checker 
// and thread pool. Class implements the business logic that makes all 
// these entities working together.

template<class T>
class ServerService: public IRunnable {
private:
    ServerSocket<T>      m_ServerSocket;
    IOCPSimple<T>        m_hIocp;
    ISockEvent<T>        *m_pSEvent;
    CTimeOutChecker<T>   *m_TChecker;

    // thread pool which will execute the tasks

    CSimpleThreadPool    *m_ThPool;

    // a pool of the CSockEventTask<T> objects

    QueuedBlocks<CSockEventTask<T> > m_SockEventTaskPool;

protected:
    // This method will be executed by a thread
    // of the task pool.

    virtual void run() {...}

public:
    // Constructor or the class.

    // pSEvent   - pointer to an instance implementing 
    //                ISockEvent<T>. This instance will be used
    //                as a client socket event handler.

    // nPort        - port number to bind server socket to.

    // nMaxClients  - the maximum number of accepted (concurrent)
    //                client connections. To be passed to
    //                the server socket and also will be used 
    //                as the initial size for the pool of the 
    //                CSockEventTask<T> objects.

    // nNoThreads   - indicative (the real one is computed,
    //                see below) number of the threads
    //                to be created by the thread pool.

    // timeout      - the value of the time-out, in seconds.
    //                Will be passed to the time-out checker.
    //                If time-out is zero, time-out checker
    //                will not be created.

    // blnBindLocal - see ServerSocket<T> for more details.
    //                If "true" then server socket is bind 
    //                to localhost ("127.0.0.1"). 
    //                If "false" then server socket is bind 
    //                to INADDR_ANY ("0.0.0.0").

    ServerService( ISockEvent<T> *pSEvent, unsigned int nPort,
                unsigned int nMaxClients, unsigned int nNoThreads, 
                unsigned int timeout, bool blnBindLocal = true ): 
                    m_ServerSocket( nPort, nMaxClients, true, blnBindLocal ), 
                    m_hIocp( 200 ), m_SockEventTaskPool( nMaxClients ) 
    {...}

    virtual ~ServerService() {...}

    // Start the threat pool (== all the 

    // threads in the pool).

    void start() {...}
};

就是这样,没有更多的技巧。与代码相关的注释(以及代码本身)应该足以突出逻辑。但是,我将重点提供关于 ISockEvent 模板类(实际上是模板接口)的一个小细节,该类作为参数传递给 ServerService 的构造函数。

ISockEvent 是一个带有纯虚方法的模板接口:

// A template interface showing how the client socket event 

// handler should look like.

template<class T>
class ISockEvent {
public:
    // Client socket ("pSocket") is about to be closed.

    virtual void OnClose( ClientSocket<T> *pSocket, 
            MYOVERLAPPED *pOverlap,
            ServerSocket<T> *pServerSocket,
            IOCPSimple<T> *pHIocp
        ) = 0;

    // Client socket ("pSocket") was just accepted by
    // the server socket (new connection with a client
    // is created). 

    virtual void OnAccept( ClientSocket<T> *pSocket, 
            MYOVERLAPPED *pOverlap,
            ServerSocket<T> *pServerSocket,
            IOCPSimple<T> *pHIocp
        ) = 0;

    // Client socket ("pSocket") was returned from the IOCP
    // queue with status _PENDING. For more details see
    // "IOCPSimple<T>::SetPendingMode(...)". This method
    // is invoked only if there was a call to 
    // "IOCPSimple<T>::SetPendingMode(...)", performed by a 
    // user code, internally "SetPendingMode(...)"
    // is never called.

    virtual void OnPending( ClientSocket<T> *pSocket, 
            MYOVERLAPPED *pOverlap,
            ServerSocket<T> *pServerSocket,
            IOCPSimple<T> *pHIocp
        ) = 0;

    // Client socket ("pSocket") was returned from IOCP
    // queue with status _READ. This means that overlapped
    // reading operation, started previously with 
    // "ClientSocket<T>::ReadFromSocket(...)", was 
    // successfully finished. 

    // - "pOverlap->DataBuf" structure points to the data 
    //   buffer and buffer's size that were passed to the
    //   "ClientSocket<T>::ReadFromSocket(...)".

    // - "dwBytesTransferred" will indicate how many 
    //   bytes were read.

    virtual void OnReadFinalized( ClientSocket<T> *pSocket, 
            MYOVERLAPPED *pOverlap,
            DWORD dwBytesTransferred,
            ServerSocket<T> *pServerSocket,
            IOCPSimple<T> *pHIocp
        ) = 0;

    // Client socket ("pSocket") was returned from IOCP
    // queue with status _WRITE. This means that overlapped
    // writting operation, started previously with 
    // "ClientSocket<T>::WriteToSocket(...)", was 
    // successfully finished. 

    // - "pOverlap->DataBuf" structure points to the data 
    //   buffer and buffer's size that were passed to the
    //   "ClientSocket<T>::WriteToSocket(...)".

    // - "dwBytesTransferred" will indicate how many 
    //   bytes were written.

    virtual void OnWriteFinalized( ClientSocket<T> *pSocket, 
            MYOVERLAPPED *pOverlap,
            DWORD dwBytesTransferred,
            ServerSocket<T> *pServerSocket,
            IOCPSimple<T> *pHIocp
        ) = 0;
};

实现这个接口(当然还有“附件”)将定义服务器应用程序。真的,没有更多需要做的了!ISockEvent 包含了所有必要的方法(我们称之为事件)来跟踪和处理套接字的所有可能状态。让我们看看:

  • ServerSocket 接受新连接时,ServerSevice 将确保调用 OnAccept() 事件。因此,如何处理此事件取决于开发人员。通常,通过此事件,您可能会根据待开发服务器的策略(服务器期望客户端发起“对话”反之亦然)发起 pSocket->ReadFromSocket()pSocket->WriteToSocket()
  • 如果一个套接字即将关闭(远程端关闭连接;CTimeOutChecker 启动此操作,或者您在事件代码中调用 pHIocp->SetCloseMode(pSocket)),则会调用 OnClose() 事件。使用此事件执行任何所需的清理。如果您在 OnClose() 中不关闭或忘记关闭套接字(使用 pServerSocket->Release( pSocket ))也没关系,因为在此事件完成后,套接字无论如何都会关闭。
  • 如果在事件代码中调用了 pHIocp->SetPendingMode(pSocket) ……是的,根据上面的第 8 节,请确保通过 OnPending() 事件处理它。
  • 如果在事件代码中(除了 OnClose() 之外),调用了 pSocket->ReadFromSocket()(并且返回值未指示错误),则一旦“读取”操作完成,将调用 OnReadFinalized() 事件,但不要期望传递的缓冲区将完全填充到缓冲区的大小。在此期间(从 ReadFromSocket()OnReadFinalized())避免使用传递的缓冲区。这是从 IOCP API 继承的要求,因此,我们在这里真的无能为力。
  • 如果在事件代码中(除了 OnClose() 之外),调用了 pSocket->WriteToSocket()(并且返回值未指示错误),则一旦“写入”操作完成,将调用 OnWriteFinalized() 事件。这将意味着整个缓冲区已发送。出于与上述相同的原因,在此期间(从 WriteToSocket()OnWriteFinalized())避免使用传递的缓冲区。
  • 最后,您不需要创建额外的线程,因为每个事件都是在与 ServerService 关联的 CSimpleThreadPool 的一个线程中调用的。如果您需要更多线程,请通过 ServerService 构造函数的 nNoThreads 参数传递适当的值。是的,请确保每个事件实现迟早会终止。我的意思是,尽量避免事件中的无限循环;否则,停止 ServerService 运行将很麻烦(尽管如果在循环中检查 CThread::currentThread().isInterrupted() 仍然可能)。

最后一点(主要是)针对那些计划为多 CPU 平台编写服务器应用程序的人。如果环境有 N 个 CPU,那么为了更好的性能(如 MSDN 推荐),CSimpleThreadPoolN 个线程将精确执行 ServerService::run() 方法。请确保不要在事件主体内多次调用 pSocket->ReadFromSocket()pSocket->WriteToSocket()(尝试将应用程序设计为只成功(无错误)调用一次)。这并不是说这个框架无法处理多次调用的场景。问题是,虽然发送/接收操作的顺序保证遵循调用顺序(在 IOCP 队列中),但报告操作完成的顺序不保证遵循调用顺序,因为 GetQueuedCompletionStatus() 将由多个线程调用。但是,如果您不关心正确的顺序,那么请忽略此说明。

11. 一个简单的回显服务器实现

作为“概念验证”,让我们实现一个简单的回显服务器,它从套接字读取 7 个字符,一旦缓冲区填满,就将它们发回。完整的代码随源文件提供。

A. 首先,让我们定义服务器参数和缓冲区大小:

#define BUFF_SIZE       8
#define MAX_CONNECTIONS 10
#define NO_THREADS      4
#define TIME_OUT        10
#define PORT            8080

B. 现在,让我们定义“附件”:

struct Attachment {
    volatile time_t    tmLastActionTime;
    char            sString<BUFF_SIZE>;
    DWORD           dwStringSize; // current string's size


    Attachment() { Clear(); };

    bool Commit( DWORD dwBytesTransferred ) {
        DWORD dwSize = dwStringSize + dwBytesTransferred;
    
        if ( dwBytesTransferred <= 0 ) return false;
        if ( dwSize >= BUFF_SIZE ) return false;

        dwStringSize = dwSize;
        sString[dwStringSize] = 0;
        return true;
    };

    // as requested by the API of the framework

    void Clear() { memset(this, 0, sizeof(Attachment) ); };

    // as requested by the API of the framework

    void ResetTime( bool toZero ) { 
        if (toZero) tmLastActionTime = 0;
        else {
            time_t    lLastActionTime;
            time(&lLastActionTime); 
            tmLastActionTime = lLastActionTime;
        }
    };

    // as requested by the API of the framework

    long GetTimeElapsed() {
        time_t tmCurrentTime;

        if (0 == tmLastActionTime) return 0;

        time(&tmCurrentTime);
        return (tmCurrentTime - tmLastActionTime);
    };
};

C. 现在,我们需要将模板类实例化为实际类:

typedef ClientSocket<Attachment> MyCSocket;
typedef ServerSocket<Attachment> MySSocket;
typedef IOCPSimple<Attachment> MyIOCPSimple;
typedef ISockEvent<Attachment> MyISockEvent;
typedef ServerService<Attachment> MyServerService;

D. 现在,我们实现 ISockEvent<Attachment>

class MyISockEventHandler: public MyISockEvent {
public:
    MyISockEventHandler() {};
    ~MyISockEventHandler() {};

    // empty method, not used

    virtual void OnClose( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 
            MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};

    // empty method, not used

    virtual void OnPending( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 
            MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};

    virtual void OnAccept( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 
        MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
        int nRet;
        DWORD dwSize;
        char *temp;

        dwSize = BUFF_SIZE - 1;
        temp = pSocket->GetAttachment()->sString;

        // initiate the reading with OnAccept

        nRet = pSocket->ReadFromSocket( temp, dwSize );
        pSocket->GetAttachment()->ResetTime( false );

        if ( nRet == SOCKET_ERROR ) {
            pServerSocket->Release( pSocket );
        }
    };

    virtual void OnReadFinalized( MyCSocket *pSocket, 
                MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred, 
                MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
        int nRet;
        DWORD dwSize, dwPos;
        char *temp;

        // finalize the filling of the buffer

        pSocket->GetAttachment()->Commit( dwBytesTransferred );

        dwSize = BUFF_SIZE - 1;
        dwPos = pSocket->GetAttachment()->dwStringSize;
        temp = pSocket->GetAttachment()->sString;

        nRet = pSocket->ReadFromSocket(    temp + dwPos, dwSize - dwPos );
        pSocket->GetAttachment()->ResetTime( false );

        if ( nRet == SOCKET_ERROR ) {
            pServerSocket->Release( pSocket );
        }
        else if ( nRet == RECV_BUFFER_EMPTY ) {
            // means that dwSize - dwPos == 0, so send the data 

            // back to the socket


            nRet = pSocket->WriteToSocket( temp, dwSize );
            if ( nRet == SOCKET_ERROR ) {
                pServerSocket->Release( pSocket );
            }
        }
    };

    virtual void OnWriteFinalized( MyCSocket *pSocket, 
                MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred, 
                MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
        // clean the attachment

        pSocket->GetAttachment()->Clear();

        // and once again

        OnAccept(pSocket, NULL,pServerSocket, NULL);
    };
};

E. 最后:

int main(int argc, char* argv[])
{
    int    nRet;
    MyServerService *sService;
    MyISockEventHandler *mSockHndl;
    WSAData    wsData;

    nRet = WSAStartup(MAKEWORD(2,2),&wsData);
    if ( nRet < 0 ) {
        Log::LogMessage(L"Can't load winsock.dll.\n");
        return -1;
    }

    try {
        Overlapped::Init( MAX_CONNECTIONS );
        mSockHndl = new MyISockEventHandler();

        sService = new MyServerService((MyISockEvent *) mSockHndl,
                   PORT, MAX_CONNECTIONS, NO_THREADS, TIME_OUT, false);
        sService->start();

        printf("hit <enter> to stop ...\n");
        while( !_kbhit() ) ::Sleep(100);
        
        delete sService;
        delete mSockHndl;
    }
    catch (const char *err) {
        printf("%s\n", err);
    }
    catch (const wchar_t *err) {
        wprintf(L"%ls\n", err);
    }

    WSACleanup();
    return 0;
}

12. Microsoft VC++ 6.0 和在多处理器环境中使用 STL

您可能已经注意到,在代码中,我滥用了不同的 STL 模板类,如 vectorsetqueue 等。当使用“默认”编译选项编译应用程序并在单 CPU 环境中运行时,它们运行良好。但是,该代码在多 CPU 环境中将无法正常工作。为了解决这个问题(正如 Microsoft 在此链接中建议的那样),必须执行以下操作:

  • 打开项目。
  • 项目菜单上,单击设置
  • 配置列表中,单击发布
  • 单击C/C++选项卡,然后在类别列表中单击代码生成
  • 运行时库列表中,单击多线程 (/MT)
  • 配置列表中,单击调试
  • 运行时库列表中,单击多线程调试 (/MTd)
  • 如果在配置列表中还有其他配置,也请为它们设置适当的运行时库选项。
  • 单击确定,然后重新生成项目。

13. 结论

嗯,现在看起来很简单,不是吗?在下一篇文章中,我将描述更多类的实现,包括专门定制的 AttachmentISockEvent<Attachment> 类,以处理 MP3 流媒体。那将是本系列的最后一篇文章,很快再见……

© . All rights reserved.