65.9K
CodeProject 正在变化。 阅读更多。
Home

一个使用 I/O 完成端口和 WinSock 的简单应用程序

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.42/5 (69投票s)

2006年3月10日

CPOL

6分钟阅读

viewsIcon

628829

downloadIcon

35831

一篇关于使用 WinSock 实现 I/O 完成端口的文章。

引言

此代码提交的主要目标是提供源代码,以演示使用 WinSock 的 IOCP。此代码提交试图通过非常易于理解的源代码来突出 IOCP 的使用:客户端和服务器执行非常简单的操作;基本上,它们发送和接收简单的字符串消息。IOCP 客户端将允许您对服务器进行压力测试。您可以发送一条消息并提供输入,说明该消息应该发送到服务器多少次。

此外,我还包含了 socket1.zip,此代码提交包含一对一客户端和服务器实现,并且不使用 IOCP。同样, socket2.zip 包含一个基于多线程套接字的服务器,它为每个客户端连接创建一个线程,这不是一个好的设计。我包含了这些额外的代码,以便读者可以将这些实现与 IOCP 实现进行比较。这将为理解使用 WinSock 的 IOCP 提供额外的见解。

背景

我正在考虑在我当前的项目中使用 IOCP。我发现使用 WinSock 的 IOCP 是一种非常有用、健壮且可扩展的机制。IOCP 允许应用程序使用线程池来处理异步 I/O 请求。这可以防止应用程序为每个客户端创建一个线程,这可能会导致严重的性能问题(socket2.zip 包含每个客户端一个线程的实现)。

使用代码

我提供了四个 zip 文件

  • Socket1.zip 包含一个使用 WinSock 的简单一对一客户端和服务器实现。这是一个非常直接的实现;我不会讨论这段代码。
  • Socket2.zip 包含一个多线程服务器,每个客户端一个线程;客户端代码保持不变,不作讨论。
  • ServerIOCP.zip 包含一个使用 IOCP 的多线程服务器,这是本文的重点。
  • ClientIOCP.zip 包含 IOCP 客户端;IOCP 客户端可以对 IOCP 服务器进行压力测试。
  • IOCPExecutables.zip 包含 IOCP 客户端和服务器可执行文件。

所有代码提交都是基于控制台的应用程序。

不使用 IOCP 的多线程服务器

此服务器的实现可以在 socket2.zip 中找到。

一旦创建了侦听套接字,就会调用 AcceptConnections() 函数,并将侦听套接字作为输入参数。

//Make the socket a listening socket
if (SOCKET_ERROR == listen(ListenSocket,SOMAXCONN))
{
    closesocket(ListenSocket);
    printf("\nError occurred while listening.");
    goto error;
}
else
{
    printf("\nlisten() successful.");
}
 
//This function will take are of multiple clients using threads
AcceptConnections(ListenSocket);

AcceptConnections() 函数将接受传入的客户端连接,并为每个新客户端连接生成一个线程。

//This function will loop on while creating
//a new thread for each client connection
void AcceptConnections(SOCKET ListenSocket)
{
     sockaddr_in ClientAddress;
     int nClientLength = sizeof(ClientAddress);
     
     //Infinite, no graceful shutdown of server implemented, 
     //preferably server should be implemented as a service
     //Events can also be used for graceful shutdown
     while (1)
     {
          //Accept remote connection attempt from the client
          SOCKET Socket = accept(ListenSocket, 
                 (sockaddr*)&ClientAddress, &nClientLength);
          
          if (INVALID_SOCKET == Socket)
          {
               printf("\nError occurred while accepting" 
                      " socket: %ld.", WSAGetLastError());
          }
          
          //Display Client's IP
          printf("\nClient connected from: %s", 
                 inet_ntoa(ClientAddress.sin_addr)); 
          
          DWORD nThreadID;
          
          //Spawn one thread for each client 
          //connection, not a wise idea.
          //One should limit the number of threads
          //or use I/O completion port
          CreateThread(0, 0, AcceptHandler, 
                 (void*)Socket, 0, &nThreadID);
     }
}

AcceptHandler() 函数是一个线程函数。它将接受的套接字作为输入,并对其执行与客户端相关的 I/O 操作。

//Thread procedure one thread will be created for each client.
DWORD WINAPI AcceptHandler(void* Socket) 
{
     SOCKET RemoteSocket = (SOCKET)Socket;
     
     char szBuffer[256];
     
     //Cleanup and Init with 0 the szBuffer
     ZeroMemory(szBuffer,256);
     
     int nBytesSent;
     int nBytesRecv;
     
     //Receive data from a connected or bound socket
     nBytesRecv = recv(RemoteSocket, szBuffer, 255, 0 );
     
     if (SOCKET_ERROR == nBytesRecv)
     {
          closesocket(RemoteSocket);
          printf("\nError occurred while receiving from socket.");
          return 1; //error
     }
     else
     {
          printf("\nrecv() successful.");
     }
     
     //Display the message received on console
     printf("\nThe following message was received: %s", szBuffer);
     
     //Send data on a connected socket to the client
     nBytesSent = send(RemoteSocket, ACK_MESG_RECV , 
                                        strlen(ACK_MESG_RECV), 0);
     
     if (SOCKET_ERROR == nBytesSent) 
     {
          closesocket(RemoteSocket);
          printf("\nError occurred while writing to socket.");
          return 1; //error
     }
     else
     {
          printf("\nsend() successful.");
     }
     
     return 0; //success
}

此设计不可扩展,并且可能会导致严重的性能问题。

使用 IOCP 的多线程服务器

Screenshot - server1.JPG

Screenshot - server2.jpg

此服务器的实现可以在 ServerIOCP.zip 中找到。

在 IOCP 的实现中使用了以下 API 集。快速阅读或查阅 MSDN 将有助于快速掌握以下代码。

  • CreateIoCompletionPort()
  • GetQueuedCompletionStatus()
  • PostQueuedCompletionStatus()

InitializeIOCP() 函数将初始化 IOCP,并创建一个工作线程池来处理 IOCP 请求。通常,每个处理器创建两个线程。许多程序会找出主机上的处理器数量,并创建 <处理器数量 * 2> 个工作线程。这可以设置为可配置参数。这将允许应用程序用户配置线程数量,以尝试微调应用程序。在我的代码中,主机上的每个处理器将创建两个线程。

///Function to Initialize IOCPbool InitializeIOCP()
{
     //Create I/O completion port
     g_hIOCompletionPort = 
       CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
     
     if ( NULL == g_hIOCompletionPort)
     {
          printf("\nError occurred while creating IOCP: %d.", 
                 WSAGetLastError());
          return false;
     }
     
     DWORD nThreadID;
     
     //Create worker threads
     for (int ii = 0; ii < g_nThreads; ii++)
     {
          g_phWorkerThreads[ii] = CreateThread(0, 0, 
              WorkerThread, (void *)(ii+1), 0, &nThreadID);
     }
     
     return true;
}

IOCP 必须与重叠 I/O 一起使用。以下代码显示了如何创建重叠套接字。

//Overlapped I/O follows the model established
//in Windows and can be performed only on 
//sockets created through the WSASocket function 
ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, 
                       NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == ListenSocket) 
{
     printf("\nError occurred while opening socket: %ld.", 
                                       WSAGetLastError());
     goto error;
}
else
{
     printf("\nWSASocket() successful.");
}

为了优雅地关闭此服务器,我使用了 WSAEventSelect()。我们将处理 Accept 事件,而不是阻塞 accept() 调用。下面演示了 WSAEVENT 的创建。

g_hAcceptEvent = WSACreateEvent();
 
     if (WSA_INVALID_EVENT == g_hAcceptEvent)
     {
          printf("\nError occurred while WSACreateEvent().");
          goto error;
     }
    
     if (SOCKET_ERROR == WSAEventSelect(ListenSocket, 
                        g_hAcceptEvent, FD_ACCEPT))
     {
          printf("\nError occurred while WSAEventSelect().");
          WSACloseEvent(g_hAcceptEvent);
          goto error;
     }

AcceptThread() 将不断查找 Accept 事件。

//This thread will look for accept event
DWORD WINAPI AcceptThread(LPVOID lParam)
{
     SOCKET ListenSocket = (SOCKET)lParam;
     
     WSANETWORKEVENTS WSAEvents;
     
     //Accept thread will be around to look for 
     //accept event, until a Shutdown event is not Signaled.
     while(WAIT_OBJECT_0 != 
           WaitForSingleObject(g_hShutdownEvent, 0))
     {
          if (WSA_WAIT_TIMEOUT != WSAWaitForMultipleEvents(1, 
              &g_hAcceptEvent, FALSE, WAIT_TIMEOUT_INTERVAL, FALSE))
          {
               WSAEnumNetworkEvents(ListenSocket, 
                     g_hAcceptEvent, &WSAEvents);
               if ((WSAEvents.lNetworkEvents & FD_ACCEPT) && 
                   (0 == WSAEvents.iErrorCode[FD_ACCEPT_BIT]))
               {
                    //Process it.
                    AcceptConnection(ListenSocket);
               }
          }
     }
     
     return 0;
}

AcceptConnection() 将处理 Accept 事件。它还将套接字关联到 IOCP,并在套接字上发布 WSARecv() 以接收传入的客户端数据。

//This function will process the accept event
void AcceptConnection(SOCKET ListenSocket)
{
     sockaddr_in ClientAddress;
     int nClientLength = sizeof(ClientAddress);
     
     //Accept remote connection attempt from the client
     SOCKET Socket = accept(ListenSocket, 
           (sockaddr*)&ClientAddress, &nClientLength);
     
     if (INVALID_SOCKET == Socket)
     {
          WriteToConsole("\nError occurred while " + 
                         "accepting socket: %ld.", 
                         WSAGetLastError());
     }
     
     //Display Client's IP
     WriteToConsole("\nClient connected from: %s", 
                    inet_ntoa(ClientAddress.sin_addr));
     
     //Create a new ClientContext for this newly accepted client
     CClientContext   *pClientContext  = new CClientContext;
     
     pClientContext->SetOpCode(OP_READ);
     pClientContext->SetSocket(Socket);
     
     //Store this object
     AddToClientList(pClientContext);
     
     if (true == AssociateWithIOCP(pClientContext))
     {
          //Once the data is successfully received, we will print it.
          pClientContext->SetOpCode(OP_WRITE);
          
          WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
          OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();
          
          //Get data.
          DWORD dwFlags = 0;
          DWORD dwBytes = 0;
          
          //Post initial Recv
          //This is a right place to post a initial Recv
          //Posting a initial Recv in WorkerThread
          //will create scalability issues.
          int nBytesRecv = WSARecv(pClientContext->GetSocket(), 
                           p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL);
          
          if ((SOCKET_ERROR == nBytesRecv) && 
              (WSA_IO_PENDING != WSAGetLastError()))
          {
               WriteToConsole("\nError in Initial Post.");
          }
     }
} 

AssociateWithIOCP() 将接受的套接字关联到 IOCP。

bool AssociateWithIOCP(CClientContext   *pClientContext)
{
    //Associate the socket with IOCP
    HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->GetSocket(), 
        g_hIOCompletionPort, (DWORD)pClientContext, 0);

    if (NULL == hTemp)
    {
        WriteToConsole("\nError occurred while" + 
            " executing CreateIoCompletionPort().");

        //Let's not work with this client
        RemoveFromClientListAndFreeMemory(pClientContext);

        return false;
    }

    return true;
}

CClientContext 类用于存储客户端相关信息,例如客户端套接字,并且它有一个专门用于重叠 I/O 的缓冲区。我们需要确保在重叠 I/O 进行时,重叠 I/O 中使用的缓冲区不会被更新。

class CClientContext
//To store and manage client related information
{
private:
     
     OVERLAPPED        *m_pol;
     WSABUF            *m_pwbuf;
     
     int               m_nTotalBytes;
     int               m_nSentBytes;
     
     //accepted socket
     SOCKET            m_Socket;
     //will be used by the worker thread 
     //to decide what operation to perform
     int               m_nOpCode;
     char              m_szBuffer[MAX_BUFFER_LEN];
     
public:
     
     //Get/Set calls
     void SetOpCode(int n)
     {
          m_nOpCode = n;
     }
     
     int GetOpCode()
     {
          return m_nOpCode;
     }
     
     void SetTotalBytes(int n)
     {
          m_nTotalBytes = n;
     }
     
     int GetTotalBytes()
     {
          return m_nTotalBytes;
     }
     
     void SetSentBytes(int n)
     {
          m_nSentBytes = n;
     }
     
     void IncrSentBytes(int n)
     {
          m_nSentBytes += n;
     }
     
     int GetSentBytes()
     {
          return m_nSentBytes;
     }
     
     void SetSocket(SOCKET s)
     {
          m_Socket = s;
     }
     
     SOCKET GetSocket()
     {
          return m_Socket;
     }
     
     void SetBuffer(char *szBuffer)
     {
          strcpy(m_szBuffer, szBuffer);
     }
     
     void GetBuffer(char *szBuffer)
     {
          strcpy(szBuffer, m_szBuffer);
     }
     
     void ZeroBuffer()
     {
          ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
     }
     
     void SetWSABUFLength(int nLength)
     {
          m_pwbuf->len = nLength;
     }
     
     int GetWSABUFLength()
     {
          return m_pwbuf->len;
     }
     
     WSABUF* GetWSABUFPtr()
     {
          return m_pwbuf;
     }
     
     OVERLAPPED* GetOVERLAPPEDPtr()
     {
          return m_pol;
     }
     
     void ResetWSABUF()
     {
          ZeroBuffer();
          m_pwbuf->buf = m_szBuffer;
          m_pwbuf->len = MAX_BUFFER_LEN;
     }
     
     //Constructor
     CClientContext()
     {
          m_pol = new OVERLAPPED;
          m_pwbuf = new WSABUF;
          
          ZeroMemory(m_pol, sizeof(OVERLAPPED));
          
          m_Socket =  SOCKET_ERROR;
          
          ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
          
          m_pwbuf->buf = m_szBuffer;
          m_pwbuf->len = MAX_BUFFER_LEN;
          
          m_nOpCode = 0;
          m_nTotalBytes = 0;
          m_nSentBytes = 0;
     }
     
     //destructor
     ~CClientContext()
     {
          //Wait for the pending operations to complete
          while (!HasOverlappedIoCompleted(m_pol)) 
          {
               Sleep(0);
          }
          
          closesocket(m_Socket);
          
          //Cleanup
          delete m_pol;
          delete m_pwbuf;
     }
};

接下来是工作线程函数 WorkerThread()。此函数将等待来自 IOCP 的请求,并处理它们。根据提供的操作码,它将执行适当的操作。WorkerThread() 反过来通过设置 CClientContext 的适当操作码向 IOCP 发出操作请求。这些请求将路由到其中一个工作线程,包括请求工作线程。

//Worker thread will service IOCP requests
DWORD WINAPI WorkerThread(LPVOID lpParam)
{    
    int nThreadNo = (int)lpParam;

    void *lpContext = NULL;
    OVERLAPPED       *pOverlapped = NULL;
    CClientContext   *pClientContext = NULL;
    DWORD            dwBytesTransfered = 0;
    int nBytesRecv = 0;
    int nBytesSent = 0;
    DWORD             dwBytes = 0, dwFlags = 0;

    //Worker thread will be around to process requests,
    //until a Shutdown event is not Signaled.
    while (WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
    {
        BOOL bReturn = GetQueuedCompletionStatus(
            g_hIOCompletionPort,
            &dwBytesTransfered,
            (LPDWORD)&lpContext,
            &pOverlapped,
            INFINITE);

        if (NULL == lpContext)
        {
            //We are shutting down
            break;
        }

        //Get the client context
        pClientContext = (CClientContext *)lpContext;

        if ((FALSE == bReturn) || ((TRUE == bReturn) && 
        (0 == dwBytesTransfered)))
        {
            //Client connection gone, remove it.
            RemoveFromClientListAndFreeMemory(pClientContext);
            continue;
        }

        WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
        OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();

        switch (pClientContext->GetOpCode())
        {
        case OP_READ:

            pClientContext->IncrSentBytes(dwBytesTransfered);

            //Write operation was finished, see if all the data was sent.
            //Else post another write.
            if(pClientContext->GetSentBytes() < 
            pClientContext->GetTotalBytes())
            {
                pClientContext->SetOpCode(OP_READ);

                p_wbuf->buf += pClientContext->GetSentBytes();
                p_wbuf->len = pClientContext->GetTotalBytes() - 
                    pClientContext->GetSentBytes();

                dwFlags = 0;

                //Overlapped send
                nBytesSent = WSASend(pClientContext->GetSocket(), 
                    p_wbuf, 1, &dwBytes, dwFlags, p_ol, NULL);

                if ((SOCKET_ERROR == nBytesSent) && 
                (WSA_IO_PENDING != WSAGetLastError()))
                {
                    //Let's not work with this client
                    RemoveFromClientListAndFreeMemory(pClientContext);
                }
            }
            else
            {
                //Once the data is successfully received, we will print it.
                pClientContext->SetOpCode(OP_WRITE);
                pClientContext->ResetWSABUF();

                dwFlags = 0;

                //Get the data.
                nBytesRecv = WSARecv(pClientContext->GetSocket(), p_wbuf, 1, 
                    &dwBytes, &dwFlags, p_ol, NULL);

                if ((SOCKET_ERROR == nBytesRecv) && 
                (WSA_IO_PENDING != WSAGetLastError()))
                {
                    WriteToConsole("\nThread %d: Error occurred" 
                    " while executing WSARecv().", nThreadNo);

                    //Let's not work with this client
                    RemoveFromClientListAndFreeMemory(pClientContext);
                }
            }

            break;

        case OP_WRITE:

            char szBuffer[MAX_BUFFER_LEN];

            //Display the message we recevied
            pClientContext->GetBuffer(szBuffer);

            WriteToConsole("\nThread %d: The following message" 
            " was received: %s", nThreadNo, szBuffer);

            //Send the message back to the client.
            pClientContext->SetOpCode(OP_READ);


            pClientContext->SetTotalBytes(dwBytesTransfered);
            pClientContext->SetSentBytes(0);

            p_wbuf->len  = dwBytesTransfered;

            dwFlags = 0;

            //Overlapped send
            nBytesSent = WSASend(pClientContext->GetSocket(), p_wbuf, 1, 
                &dwBytes, dwFlags, p_ol, NULL);

            if ((SOCKET_ERROR == nBytesSent) && 
            (WSA_IO_PENDING != WSAGetLastError()))
            {
                WriteToConsole("\nThread %d: Error " 
                "occurred while executing WSASend().", nThreadNo);

                //Let's not work with this client
                RemoveFromClientListAndFreeMemory(pClientContext);
            }

            break;

        default:
            //We should never be reaching here, under normal circumstances.
            break;
        } // switch
    } // while

    return 0;
}

下面显示了一些用于初始化和清理的函数。请注意,CleanUp() 中使用了 PostQueuedCompletionStatus(),以帮助 WorkerThread() 摆脱对 GetQueuedCompletionStatus() 的阻塞调用。

bool Initialize()
{
     //Find out number of processors and threads
     g_nThreads = WORKER_THREADS_PER_PROCESSOR * GetNoOfProcessors();
     
     printf("\nNumber of processors on host: %d", GetNoOfProcessors());
     
     printf("\nThe following number of worker threads" + 
            " will be created: %d", g_nThreads);
     
     //Allocate memory to store thread handless
     g_phWorkerThreads = new HANDLE[g_nThreads];
     
     //Initialize the Console Critical Section
     InitializeCriticalSection(&g_csConsole);
     
     //Initialize the Client List Critical Section
     InitializeCriticalSection(&g_csClientList);
     
     //Create shutdown event
     g_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
     
     // Initialize Winsock
     WSADATA wsaData;
     
     int nResult;
     nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
     
     if (NO_ERROR != nResult)
     {
          printf("\nError occurred while executing WSAStartup().");
          return false; //error
     }
     else
     {
          printf("\nWSAStartup() successful.");
     }
     
     if (false == InitializeIOCP())
     {
          printf("\nError occurred while initializing IOCP");
          return false;
     }
     else
     {
          printf("\nIOCP initialization successful.");
     }
     
     return true;
}
 
//Function to Initialize IOCP
bool InitializeIOCP()
{
     //Create I/O completion port
     g_hIOCompletionPort = 
        CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
     
     if ( NULL == g_hIOCompletionPort)
     {
          printf("\nError occurred while creating IOCP: %d.", 
                                          WSAGetLastError());
          return false;
     }
     
     DWORD nThreadID;
     
     //Create worker threads
     for (int ii = 0; ii < g_nThreads; ii++)
     {
          g_phWorkerThreads[ii] = CreateThread(0, 0, WorkerThread, 
                                 (void *)(ii+1), 0, &nThreadID);
     }
     
     return true;
}
 
void CleanUp()
{
     //Ask all threads to start shutting down
     SetEvent(g_hShutdownEvent);
     
     //Let Accept thread go down
     WaitForSingleObject(g_hAcceptThread, INFINITE);
     
     for (int i = 0; i < g_nThreads; i++)
     {
          //Help threads get out of blocking - GetQueuedCompletionStatus()
          PostQueuedCompletionStatus(g_hIOCompletionPort, 0, 
                                    (DWORD) NULL, NULL);
     }
     
     //Let Worker Threads shutdown
     WaitForMultipleObjects(g_nThreads, g_phWorkerThreads, TRUE, INFINITE);
     
     //We are done with this event
     WSACloseEvent(g_hAcceptEvent);
     
     //Cleanup dynamic memory allocations, if there are any.
     CleanClientList();
}
 
void DeInitialize()
{
     //Delete the Console Critical Section.
     DeleteCriticalSection(&g_csConsole);
     
     //Delete the Client List Critical Section.
     DeleteCriticalSection(&g_csClientList);
     
     //Cleanup IOCP.
     CloseHandle(g_hIOCompletionPort);
     
     //Clean up the event.
     CloseHandle(g_hShutdownEvent);
     
     //Clean up memory allocated for the storage of thread handles
     delete[] g_phWorkerThreads;
     
     //Cleanup Winsock
     WSACleanup();
}

为了存储客户端信息,我使用了一个向量;为了管理列表,我使用了以下一组调用。

//Store client related information in a vector
void AddToClientList(CClientContext   *pClientContext)
{
     EnterCriticalSection(&g_csClientList);
     
     //Store these structures in vectors
     g_ClientContext.push_back(pClientContext);
     
     LeaveCriticalSection(&g_csClientList);
}
 
//This function will allow to remove one single client out of the list
void RemoveFromClientListAndFreeMemory(CClientContext   *pClientContext)
{
     EnterCriticalSection(&g_csClientList);
     
     std::vector <cclientcontext>::iterator IterClientContext;
     
     //Remove the supplied ClientContext 
     //from the list and release the memory
     for (IterClientContext = g_ClientContext.begin(); 
          IterClientContext != g_ClientContext.end(); 
          IterClientContext++)
     {
          if (pClientContext == *IterClientContext)
          {
               g_ClientContext.erase(IterClientContext);
               
               //i/o will be cancelled and socket 
               //will be closed by destructor.
               delete pClientContext;
               break;
          }
     }
 
     LeaveCriticalSection(&g_csClientList);
}
 
//Clean up the list, this function 
//will be executed at the time of shutdown
void CleanClientList()
{
     EnterCriticalSection(&g_csClientList);
     
     std::vector <cclientcontext>::iterator IterClientContext;
     
     for (IterClientContext = g_ClientContext.begin(); 
          IterClientContext != g_ClientContext.end( ); 
          IterClientContext++)
     {
          //i/o will be cancelled and socket 
          //will be closed by destructor.
          delete *IterClientContext;
     }
     
     g_ClientContext.clear();
     
     LeaveCriticalSection(&g_csClientList);
}

我创建并使用了一个 WriteToConsole() 函数,它将同步工作线程发送的控制台输出;由于我使用 printf() 写入控制台,因此控制台将存在竞争条件。它使用临界区进行同步。

//Function to synchronize console output
//Threads need to be synchronized while they write to console.
//WriteConsole() API can be used, it is thread-safe, I think.
//I have created my own function.
void WriteToConsole(char *szFormat, ...)
{
     EnterCriticalSection(&g_csConsole);
     
     va_list args;
     va_start(args, szFormat);
     
     vprintf(szFormat, args );
     
     va_end(args);
     
     LeaveCriticalSection(&g_csConsole);
}

最后是 GetNoOfProcessors() 的代码,一个获取主机上处理器数量的函数。

//The use of static variable will ensure that
//we will make a call to GetSystemInfo() 
//to find out number of processors, 
//only if we don't have the information already.
//Repeated use of this function will be efficient.
int GetNoOfProcessors()
{
     static int nProcessors = 0;
     
     if (0 == nProcessors)
     {
          SYSTEM_INFO si;
          
          GetSystemInfo(&si);
          
          nProcessors = si.dwNumberOfProcessors;
     }
     
     return nProcessors;
}

IOCP 客户端

Screenshot - client1.jpg

Screenshot - client2.jpg

以下是一个 IOCP 客户端;它将允许我们对服务器进行压力测试。它使用传统的套接字调用。我使用线程来对服务器施加额外的压力。

#include "stdafx.h"
 
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>
#include <string.h>
#include <winsock2.h>
 
#include "ClientIOCP.h"
 
int main(int argc, char* argv[])
{
     //Validate the input
     if (argc < 3) 
     {
          printf("\nUsage: %s hostname port.", argv[0]);
          return 1; //error
     }
     
     //Initialize Winsock
     WSADATA wsaData;
     
     int nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
     
     if (NO_ERROR != nResult)
     {
          printf("\nError occurred while executing WSAStartup().");
          return 1; //error
     }
     
     //Initialize the Console Critical Section
     InitializeCriticalSection(&g_csConsole);
     
     int nPortNo = atoi(argv[2]);
     
     char szBuffer[MAX_BUFFER_LEN];
     int nNoOfThreads = 0;
     int nNoOfSends = 0;
     
     printf("\nPlease enter message to be sent to the server: ");
     
     //Read the message from server
     gets(szBuffer);
     
     printf("\nPlease enter number of threads to be created: ");
     
     //No. of times we will send the message to the server
     scanf("%d", &nNoOfThreads);
     
     printf("\nPlease enter number of times the" + 
            " messages needs to be sent: ");
     
     //No. of times we will send the message to the server
     scanf("%d", &nNoOfSends);
     
     HANDLE *p_hThreads = new HANDLE[nNoOfThreads];
     ThreadInfo *pThreadInfo = new ThreadInfo[nNoOfThreads];
     
     bool bConnectedSocketCreated = false;
     
     DWORD nThreadID;
     
     for (int ii = 0; ii < nNoOfThreads; ii++)
     {
          bConnectedSocketCreated = 
            CreateConnectedSocket(&(pThreadInfo[ii].m_Socket), 
                                  argv[1], nPortNo);
          
          if (!bConnectedSocketCreated)
          {
               //Clean up memory
               delete[] p_hThreads;
               delete[] pThreadInfo;
               
               //failed in creating of connected socket, error out.
               return 1;
          }
          
          //Populate ThreadInfo
          pThreadInfo[ii].m_nNoOfSends = nNoOfSends;
          pThreadInfo[ii].m_nThreadNo = ii+1;
          sprintf(pThreadInfo[ii].m_szBuffer, 
                  "Thread %d - %s", ii+1, szBuffer);
          
          //Create thread and start banging the server
          p_hThreads[ii] = CreateThread(0, 0, WorkerThread, 
                           (void *)(&pThreadInfo[ii]), 0, &nThreadID);
     }
     
     //Let Worker Threads shutdown
     WaitForMultipleObjects(nNoOfThreads, p_hThreads, TRUE, INFINITE);
     
     //Close the sockets here
     for (ii = 0; ii < nNoOfThreads; ii++)
     {
          closesocket(pThreadInfo[ii].m_Socket);
     }
     
     //Clean up memory
     delete[] p_hThreads;
     delete[] pThreadInfo;
     
     //Delete the Console Critical Section.
     DeleteCriticalSection(&g_csConsole);
     
     //Cleanup Winsock
     WSACleanup();
     return 0;
}
 
//vprintf() is not thread safe
void WriteToConsole(char *szFormat, ...)
{
     EnterCriticalSection(&g_csConsole);
     
     va_list args;
     va_start(args, szFormat);
     
     vprintf(szFormat, args );
     
     va_end(args);
     
     LeaveCriticalSection(&g_csConsole);
}
 
bool CreateConnectedSocket(SOCKET *pSocket, char *szHost, int nPortNo)
{
     struct sockaddr_in ServerAddress;
     struct hostent *Server;
     
     //Create a socket
     *pSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     
     if (INVALID_SOCKET == *pSocket) 
     {
          WriteToConsole("\nError occurred while" + 
                         " opening socket: %d.", WSAGetLastError());
          return false; //error
     }
     
     //Server name will be supplied as a commandline argument
     //Get the server details
     Server = gethostbyname(szHost);
     
     if (Server == NULL) 
     {
          closesocket(*pSocket);
          WriteToConsole("\nError occurred no such host.");
          return false; //error
     }
     
     //Cleanup and Init with 0 the ServerAddress
     ZeroMemory((char *) &ServerAddress, sizeof(ServerAddress));
     
     ServerAddress.sin_family = AF_INET;
     
     //Assign the information received from gethostbyname()
     CopyMemory((char *)&ServerAddress.sin_addr.s_addr, 
          (char *)Server->h_addr,
          Server->h_length);
     
     ServerAddress.sin_port = htons(nPortNo);
     
     //Establish connection with the server
     if (SOCKET_ERROR == connect(*pSocket, 
         reinterpret_cast<const>(&ServerAddress), 
         sizeof(ServerAddress))) 
     {
          closesocket(*pSocket);
          WriteToConsole("\nError occurred while connecting.");
          return false; //error
     }
     
     return true;
}
 
DWORD WINAPI WorkerThread(LPVOID lpParam)
{
     ThreadInfo *pThreadInfo = (ThreadInfo*)lpParam;
     
     char szTemp[MAX_BUFFER_LEN];
     
     int nBytesSent = 0;
     int nBytesRecv = 0;
     
     for (int ii = 0; ii < pThreadInfo->m_nNoOfSends; ii++)
     {
          sprintf(szTemp, "%d. %s", ii+1, pThreadInfo->m_szBuffer);
          
          //Send the message to the server, include the NULL as well
          nBytesSent = send(pThreadInfo->m_Socket, szTemp, strlen(szTemp), 0);
          
          if (SOCKET_ERROR == nBytesSent) 
          {
               WriteToConsole("\nError occurred while " + 
                              "writing to socket %ld.", 
                              WSAGetLastError());
               return 1; //error
          }
          
          //Get the message from the server
          nBytesRecv = recv(pThreadInfo->m_Socket, szTemp, 255, 0);
          
          if (SOCKET_ERROR == nBytesRecv) 
          {
               WriteToConsole("\nError occurred while reading " + 
                              "from socket %ld.", WSAGetLastError());
               return 1; //error
          }
          
          //Display the server message
          WriteToConsole("\nServer sent the following message: %s", szTemp);
     }
     
     return 0; //success
}

关注点

I/O 完成端口是一种非常强大的机制,可以创建高度可扩展和健壮的服务器应用程序,需要与重叠 I/O 一起使用。它应该在服务多个客户端的基于套接字的服务器应用程序中与 WinSock 一起使用。IOCP 将为设计增添很多价值。

历史

  • 2006 年 3 月 24 日 – 使用重叠 I/O 实现。
  • 2006 年 4 月 – 花时间研究重叠 I/O。进行了与重叠 I/O 相关的更新。
  • 2006 年 5 月 29 日 – 更改了 IOCP 实现,并实现了服务器的优雅关闭。
  • 2006 年 6 月 9 日 – 添加了注释以便于理解代码,并进行了少量代码更新。
  • 2006 年 8 月 19 日 – 更新了客户端和服务器以进行压力测试。
  • 2006 年 8 月 22 日 – 客户端现在是多线程的。客户端获得了额外的冲击力。
  • 2006 年 9 月 24 日 – IOCP 服务器上创建的工作线程数量将与主机上的处理器数量成比例。此外,客户端和服务器都使用新的改进的 WriteToConsole()
  • 2006 年 9 月 28 日 – 次要更新。
  • 2007 年 2 月 3 日 - 根据 xircon (CodeProject) 的输入修复了一个缺陷,并进行了 Visual C++ 2005 迁移更新。
  • 2007 年 3 月 - 通过更新 WSAWaitForMultipleEvents() 降低了 CPU 使用率。
© . All rights reserved.