65.9K
CodeProject 正在变化。 阅读更多。
Home

可重用的高性能套接字服务器类 - 第二部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.70/5 (31投票s)

2002年5月26日

CPOL

11分钟阅读

viewsIcon

1024297

downloadIcon

5975

为保持高性能,套接字服务器不应在其 IO 线程中进行阻塞调用。本文在前一篇文章的基础上,为我们的示例服务器添加了一个业务逻辑线程池。

以下源代码使用 Visual Studio 6.0 SP5 和 Visual Studio .NET 构建。您需要安装一个版本的 Microsoft Platform SDK

请注意,代码的调试版本由于调试跟踪输出会浪费大量 CPU 周期。只有进行发布版本的性能分析才值得。

概述

在前一篇文章 ,我们设计了一个可重用的套接字服务器类,以方便编写高性能的套接字服务器。我们展示了一系列简单的示例,从最基本的 echo 服务器到一些稍微更接近实际情况的包 echo 服务器和伪 POP3 服务器。本文将通过为服务器添加业务逻辑线程池,使示例服务器更具实际可用性,从而让消息由一个不属于 IO 线程池的线程进行处理。这有助于通过将潜在的阻塞工作转移到其自己的线程池中来保持服务器的可伸缩性和性能。

为什么我们需要另一个线程池?

为了能够处理可变负载,通常需要一个线程池,该线程池可以根据服务器的当前负载进行扩展和收缩。正如我们在上一篇文章中指出的,我们所有的异步套接字 IO 都由套接字服务器的线程池处理。这个线程池中的线程在有未完成 IO 操作时不能被终止,否则这些操作将被终止。这意味着,如果没有我们跟踪与特定工作线程关联的 IO 操作并仅在所有 IO 操作完成后才允许线程终止,套接字服务器的线程池就无法收缩。为了保持性能,我们需要确保套接字服务器线程池中的线程不阻塞,它们数量有限,如果它们全部阻塞,则在它们解除阻塞之前将不会发生任何套接字 IO。确保 IO 线程不阻塞的最简单方法是将业务逻辑处理移出 IO 线程池,移到一个新的线程池中。然后,IO 线程只需处理 IO,将数据流分块成消息,并将这些消息传递给业务逻辑线程池。

业务逻辑线程池

我们对业务逻辑线程池的要求是,它应该灵活,并能够根据服务器的负载情况增加和减少工作线程的数量。将工作项传递到线程池应该是一个非阻塞操作,以便 IO 线程能够以最大效率运行,但我们需要能够知道一个工作项是否在一定时间内没有被线程拾取,以便我们可以向池中添加更多线程。我们还需要跟踪我们有多少空闲线程,并时不时地减少池中的线程数量,以在服务器负载低时节省资源。

正如您可能预期的那样,线程池使用 IO 完成端口将工作项分派给工作线程。为了能够监控工作项的处理时间,从而能够计算出何时需要添加更多线程到池中,我们使用了一个事件。当我们向 IO 完成端口分派一个工作项时,我们在该事件上等待一个可配置的超时时间。当一个线程从完成端口拾取一个工作项时,它做的第一件事就是发出事件信号。如果我们分派工作项时所有线程都忙,我们的超时可能会在线程发出事件信号之前过期。在这种情况下,我们可能希望向池中添加另一个线程来处理工作负载。分派代码可能如下所示:

   void CThreadPool::HandleDispatch(
      ULONG_PTR completionKey, 
      DWORD dwNumBytes, 
      OVERLAPPED *pOverlapped) 
   {
      m_dispatchCompleteEvent.Reset();

      bool processed = false;
   
      m_workPort.PostStatus(completionKey, dwNumBytes, pOverlapped); 

      // wait for someone to toggle the 'got message' event?

      bool threadStarted = false;

      while (!processed)
      {
         DWORD result = m_dispatchCompleteEvent.Wait(m_timeoutMillis);

         if (result == WAIT_OBJECT_0)
         {
            processed = true;
         }
         else if (result == WAIT_TIMEOUT)
         {
            if (!threadStarted && m_processingThreads == m_activeThreads && 
                (size_t)m_activeThreads < m_maxThreads)
            {            
               StartWorkerThread();

               threadStarted = true;
            }
         }
         else
         {
            throw CWin32Exception(_T("CThreadPool::Dispatch()"),
                                  GetLastError());
         }
      }
   }

只要有可用线程处理工作项,我们就无需启动新线程。一旦池中的所有线程都处于活动状态,我们在分派时可能会超时,然后,如果我们还没有运行到我们配置的最大线程数,我们就启动一个新线程。实际代码更复杂,因为它处理关闭请求,并在我们已经达到最大线程数时调整超时。分派器需要知道池中有多少线程,以及其中有多少线程正在处理,因此每个工作线程都会回调到线程池,让池知道其状态。

这段工作项分派代码的问题在于它不满足我们在非阻塞方式下分派工作项到池中的要求。为了实现这一点,我们增加了另一个级别的间接传递,以及另一个 IO 完成端口。

非阻塞分派

为了确保希望将工作项分派到线程池的用户可以这样做而不阻塞,我们按如下方式实现用户级别分派函数:

   void CThreadPool::Dispatch(
      ULONG_PTR completionKey, 
      DWORD dwNumBytes /*= 0*/, 
      OVERLAPPED *pOverlapped /*= 0*/) 
   {
      if (completionKey == 0)
      {
         throw CException(_T("CThreadPool::Dispatch()"),
                       _T("0 is an invalid value for completionKey"));
      }

      m_dispatchPort.PostStatus(completionKey, dwNumBytes, pOverlapped); 
   }

完成键值为 0 的限制很不幸,但它允许我们通过向其完成端口发送 0 来关闭线程池的分派线程。线程池现在有两个 IO 完成端口。分派端口由单个维护线程服务,该线程执行 HandleDispatch() 方法将工作项分派给工作线程。用户进行分派时不会阻塞,维护线程会阻塞地进行分派,以便在需要时扩展线程池。工作项端口由可变数量的线程服务。我们已经看到了如何知道何时需要扩展线程数,现在我们将看看当工作负载较低时如何减少线程数。

关闭空闲线程

工作项通常成批到来,线程池变得繁忙,扩展,处理所有工作项,然后变得不那么繁忙。此时,池中包含未使用的线程,但它们仍在消耗资源。这些空闲线程可以安全地关闭,因为当负载增加时,池可以再次扩展。问题是,我们如何决定何时关闭一些线程?

处理我们阻塞式分派的维护线程也负责检查空闲线程。每隔一段时间(可配置),维护线程会使用一个算法来确定是否应关闭某些线程。当前算法如下:

   void CThreadPool::HandleDormantThreads()
   {
      if ((size_t)m_activeThreads > m_minThreads)
      {
         const size_t dormantThreads = m_activeThreads - m_processingThreads;

         if (dormantThreads > m_maxDormantThreads)
         {
            const size_t threadsToShutdown = 
                            (dormantThreads - m_maxDormantThreads) / 2 + 1;

            StopWorkerThreads(threadsToShutdown);
         }
      }
   }

如果我们拥有的线程多于我们允许的最少线程数,请找出当前有多少线程未在处理工作项,如果该数量超过我们允许的空闲线程数,则关闭其中一半(向上取整)。停止工作线程只是向工作端口发送一个 0 的 IO 完成键,以表示我们要关闭的每个工作线程。

执行工作

现在我们有了一个满足我们要求的线程池:根据负载自动扩展和收缩,以及为用户提供非阻塞分派。剩下的工作是允许派生类提供自己的 WorkerThread 类来执行工作。工作线程类必须实现以下接口:

   virtual bool Initialise();

   virtual void Process(
      ULONG_PTR completionKey,
      DWORD dwNumBytes,
      OVERLAPPED *pOverlapped) = 0;

   virtual void Shutdown();

Initialise() 在线程首次创建时调用,Shutdown() 在线程终止时调用,Process() 为每个工作项调用。

带有业务逻辑线程池的套接字服务器

现在我们有了合适的线程池,我们可以将其与我们的伪 POP3 套接字服务器集成,以便实际的命令处理可以在业务逻辑池中进行,而 IO 池则专注于 IO 操作。我们还可以将套接字关闭移至业务逻辑池,以避免 IO 线程被延迟的套接字关闭阻塞。

我们需要做的第一件事是创建和配置我们的线程池。然后,我们可以将它的引用传递给我们的套接字服务器类,以便它能将引用传递给我们的 IO 线程。

   CThreadPool pool(
      5,                    // initial number of threads to create
      5,                    // minimum number of threads to keep in the pool
      10,                   // maximum number of threads in the pool
      5,                    // maximum number of "dormant" threads
      5000,                 // pool maintenance period (millis)
      100,                  // dispatch timeout (millis)
      10000);               // dispatch timeout for when pool is at max threads

   pool.Start();

   CSocketServer server(
      INADDR_ANY,          // address to listen on
      5001,                // port to listen on
      10,                  // max number of sockets to keep in the pool
      10,                  // max number of buffers to keep in the pool
      1024,                // buffer size 
      pool);

   server.Start();

当我们的套接字服务器有一个完整、独立的消息需要处理时,它可以将其分派给线程池进行处理,而不是在一个 IO 线程上处理。

   void CSocketServer::ProcessCommand(
      CSocketServer::Socket *pSocket,
      CIOBuffer *pBuffer)
   {
      pSocket->AddRef();
      pBuffer->AddRef();

      m_pool.Dispatch(reinterpret_cast<ULONG_PTR>(pSocket), 
                      0, pBuffer->GetAsOverlapped());
   }

由于我们将套接字和 IO 缓冲区传递给另一个线程,我们必须增加它们的引用计数,以防止它们在我们不知情的情况下被清理掉。在我们业务逻辑线程中,我们可以最终处理消息,然后释放我们对套接字和 IO 缓冲区的引用。

void CThreadPoolWorkerThread::Process(
   ULONG_PTR completionKey,
   DWORD operation,
   OVERLAPPED *pOverlapped)
{
   Socket *pSocket = reinterpret_cast<Socket *>(completionKey);
   CIOBuffer *pBuffer = CIOBuffer::FromOverlapped(pOverlapped);
   
   ProcessMessage(pSocket, pBuffer);

   pSocket->Release();
   pBuffer->Release();
}

由于套接字类将所有 IO 请求都 marshalled 回 IO 线程池,因此我们可以在业务逻辑线程中安全地进行读写请求,即使该线程在 IO 请求完成之前可能已被终止。

维护每个连接的状态

我们的服务器可能需要做的最后一件事是将一些内部服务器状态与特定的套接字连接关联起来。Socket 类使这特别容易,因为它提供了以下成员函数:

   void *GetUserPtr() const;

   void SetUserPtr(void *pData);

   unsigned long GetUserData() const;

   void SetUserData(unsigned long data);

这些函数提供了对存储在 Socket 中的单个 void * 用户数据指针的访问。这个用户数据的常见用法模式如下:当连接建立时,套接字服务器会通过 OnConnectionEstablished() 收到通知,服务器可以分配一个新的每个连接的数据结构,并通过调用 SetUserPtr() 将其与传递给 OnConnectionEstablished() 的套接字关联起来。在后续的读写完成时,可以通过 GetUserPtr() 提取指向每个连接数据结构的用户数据的指针。当连接终止时,服务器会通过 OnConnectionClosed 收到通知,然后可以检索和删除每个连接的用户数据。

尽管有两个版本的用户数据访问函数,一个用于 void *,一个用于 unsigned long,但只有一个存储位置。这两个版本仅仅是为了方便,并在用户数据只是指向内部服务器结构索引而不是指针时减少类型转换。

示例服务器会将 OnConnectionEstablished()OnConnectionClosed() 调用 marshalled 到业务逻辑线程池,并在那里维护一些相当琐碎的每个连接的用户数据。我们维护的数据是客户端连接的地址(从传递到 OnConnectionEstablished() 的缓冲区中获取)以及在此特定连接上已处理的消息数量。

完整示例

一个在独立的线程池中执行业务逻辑处理而不是 IO 的 POP3 服务器的框架可以从 此处 下载。服务器在其消息处理代码中有一个对 ::Sleep() 的调用,以使处理花费一些时间并阻塞。请注意,其他连接上的 IO 如何不受影响,如果您愿意,可以向我们在上一篇文章末尾开发的服务器添加一个类似的调用,并比较行为。

与其他示例一样,只需 telnet 到 localhost 5001 即可测试服务器。服务器运行直到一个命名事件被设置然后关闭。非常简单的服务器关闭程序,可在 此处 获取,提供了关闭开关。

修订历史

  • 2002年5月21日 - 初始版本。
  • 2002年5月27日 - 为所有服务器和服务器关闭程序添加了暂停/恢复功能。使用 CSocket 来防止创建监听套接字时资源泄漏。重构了 Socket 和 CIOBuffer 类,以便通用的列表管理代码现在位于 CNodeList 中,通用的用户数据代码现在位于 COpaqueUserData 中。
  • 2002年5月29日 - Linting 和一般代码清理
  • 2002年6月18日 - 移除了创建监听套接字期间对 ReuseAddress() 的调用,因为它不是必需的 - 感谢 Alun Jones 指出这一点。
  • 2002年6月28日 - 调整了我们处理套接字关闭的方式。
  • 2002年6月30日 - 移除了用户子类化套接字服务器工作线程类的要求。现在可以通过简单地子类化套接字服务器类来完成所有工作。
  • 2002年7月15日 - 套接字关闭通知现在在服务器在有活动连接时关闭时发生。SocketServer 现在可以设置为确保读写包序列。
  • 2002年7月23日 - 修正了 CSocketServer::ProcessDataStream() 中的 bug。我们不应该重用缓冲区。代码在 6 月 30 日的更改之前是好的,现在也是好的。感谢一位匿名的 CodeProject 读者指出这一点。
  • 2002年8月12日 - 移除了套接字关闭中的竞争条件 - 感谢 David McConnell 指出这一点。派生类可以接收连接重置和连接错误通知。Socket 提供了确定发送/接收是否已连接的手段。分派到线程池现在使用共享枚举而不是硬编码的常量值。一般的代码清理和 Lint 问题。

系列中的其他文章

  1. 一个可重用的套接字服务器类
  2. 套接字服务器中的业务逻辑处理
  3. 使用AcceptEx加速套接字服务器连接
  4. 处理多个挂起的套接字读写操作
  5. 使用 C# 和 .Net 测试套接字服务器
  6. 用于VB的、高性能的TCP/IP套接字服务器COM组件
© . All rights reserved.