一个使用 I/O 完成端口和 WinSock 的简单应用程序






4.42/5 (69投票s)
一篇关于使用 WinSock 实现 I/O 完成端口的文章。
- 下载一对一客户端和服务器 - 2.52 Kb
- 下载多线程服务器 - 2.89 Kb
- 下载使用 IOCP 的多线程服务器 - 10.7 Kb
- 下载 IOCP 客户端 - 7.1 Kb
- 下载 IOCP 客户端和服务器可执行文件 - 67.9 Kb
引言
此代码提交的主要目标是提供源代码,以演示使用 WinSock 的 IOCP。此代码提交试图通过非常易于理解的源代码来突出 IOCP 的使用:客户端和服务器执行非常简单的操作;基本上,它们发送和接收简单的字符串消息。IOCP 客户端将允许您对服务器进行压力测试。您可以发送一条消息并提供输入,说明该消息应该发送到服务器多少次。
此外,我还包含了 socket1.zip,此代码提交包含一对一客户端和服务器实现,并且不使用 IOCP。同样, socket2.zip 包含一个基于多线程套接字的服务器,它为每个客户端连接创建一个线程,这不是一个好的设计。我包含了这些额外的代码,以便读者可以将这些实现与 IOCP 实现进行比较。这将为理解使用 WinSock 的 IOCP 提供额外的见解。
背景
我正在考虑在我当前的项目中使用 IOCP。我发现使用 WinSock 的 IOCP 是一种非常有用、健壮且可扩展的机制。IOCP 允许应用程序使用线程池来处理异步 I/O 请求。这可以防止应用程序为每个客户端创建一个线程,这可能会导致严重的性能问题(socket2.zip 包含每个客户端一个线程的实现)。
使用代码
我提供了四个 zip 文件
- Socket1.zip 包含一个使用 WinSock 的简单一对一客户端和服务器实现。这是一个非常直接的实现;我不会讨论这段代码。
- Socket2.zip 包含一个多线程服务器,每个客户端一个线程;客户端代码保持不变,不作讨论。
- ServerIOCP.zip 包含一个使用 IOCP 的多线程服务器,这是本文的重点。
- ClientIOCP.zip 包含 IOCP 客户端;IOCP 客户端可以对 IOCP 服务器进行压力测试。
- IOCPExecutables.zip 包含 IOCP 客户端和服务器可执行文件。
所有代码提交都是基于控制台的应用程序。
不使用 IOCP 的多线程服务器
此服务器的实现可以在 socket2.zip 中找到。
一旦创建了侦听套接字,就会调用 AcceptConnections
()
函数,并将侦听套接字作为输入参数。
//Make the socket a listening socket if (SOCKET_ERROR == listen(ListenSocket,SOMAXCONN)) { closesocket(ListenSocket); printf("\nError occurred while listening."); goto error; } else { printf("\nlisten() successful."); } //This function will take are of multiple clients using threads AcceptConnections(ListenSocket);
AcceptConnections
()
函数将接受传入的客户端连接,并为每个新客户端连接生成一个线程。
//This function will loop on while creating //a new thread for each client connection void AcceptConnections(SOCKET ListenSocket) { sockaddr_in ClientAddress; int nClientLength = sizeof(ClientAddress); //Infinite, no graceful shutdown of server implemented, //preferably server should be implemented as a service //Events can also be used for graceful shutdown while (1) { //Accept remote connection attempt from the client SOCKET Socket = accept(ListenSocket, (sockaddr*)&ClientAddress, &nClientLength); if (INVALID_SOCKET == Socket) { printf("\nError occurred while accepting" " socket: %ld.", WSAGetLastError()); } //Display Client's IP printf("\nClient connected from: %s", inet_ntoa(ClientAddress.sin_addr)); DWORD nThreadID; //Spawn one thread for each client //connection, not a wise idea. //One should limit the number of threads //or use I/O completion port CreateThread(0, 0, AcceptHandler, (void*)Socket, 0, &nThreadID); } }
AcceptHandler()
函数是一个线程函数。它将接受的套接字作为输入,并对其执行与客户端相关的 I/O 操作。
//Thread procedure one thread will be created for each client. DWORD WINAPI AcceptHandler(void* Socket) { SOCKET RemoteSocket = (SOCKET)Socket; char szBuffer[256]; //Cleanup and Init with 0 the szBuffer ZeroMemory(szBuffer,256); int nBytesSent; int nBytesRecv; //Receive data from a connected or bound socket nBytesRecv = recv(RemoteSocket, szBuffer, 255, 0 ); if (SOCKET_ERROR == nBytesRecv) { closesocket(RemoteSocket); printf("\nError occurred while receiving from socket."); return 1; //error } else { printf("\nrecv() successful."); } //Display the message received on console printf("\nThe following message was received: %s", szBuffer); //Send data on a connected socket to the client nBytesSent = send(RemoteSocket, ACK_MESG_RECV , strlen(ACK_MESG_RECV), 0); if (SOCKET_ERROR == nBytesSent) { closesocket(RemoteSocket); printf("\nError occurred while writing to socket."); return 1; //error } else { printf("\nsend() successful."); } return 0; //success }
此设计不可扩展,并且可能会导致严重的性能问题。
使用 IOCP 的多线程服务器
此服务器的实现可以在 ServerIOCP.zip 中找到。
在 IOCP 的实现中使用了以下 API 集。快速阅读或查阅 MSDN 将有助于快速掌握以下代码。
CreateIoCompletionPort
()
GetQueuedCompletionStatus
()
PostQueuedCompletionStatus
()
InitializeIOCP
()
函数将初始化 IOCP,并创建一个工作线程池来处理 IOCP 请求。通常,每个处理器创建两个线程。许多程序会找出主机上的处理器数量,并创建 <处理器数量 * 2> 个工作线程。这可以设置为可配置参数。这将允许应用程序用户配置线程数量,以尝试微调应用程序。在我的代码中,主机上的每个处理器将创建两个线程。
///Function to Initialize IOCPbool InitializeIOCP() { //Create I/O completion port g_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( NULL == g_hIOCompletionPort) { printf("\nError occurred while creating IOCP: %d.", WSAGetLastError()); return false; } DWORD nThreadID; //Create worker threads for (int ii = 0; ii < g_nThreads; ii++) { g_phWorkerThreads[ii] = CreateThread(0, 0, WorkerThread, (void *)(ii+1), 0, &nThreadID); } return true; }
IOCP 必须与重叠 I/O 一起使用。以下代码显示了如何创建重叠套接字。
//Overlapped I/O follows the model established //in Windows and can be performed only on //sockets created through the WSASocket function ListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == ListenSocket) { printf("\nError occurred while opening socket: %ld.", WSAGetLastError()); goto error; } else { printf("\nWSASocket() successful."); }
为了优雅地关闭此服务器,我使用了 WSAEventSelect
()
。我们将处理 Accept 事件,而不是阻塞 accept()
调用。下面演示了 WSAEVENT
的创建。
g_hAcceptEvent = WSACreateEvent(); if (WSA_INVALID_EVENT == g_hAcceptEvent) { printf("\nError occurred while WSACreateEvent()."); goto error; } if (SOCKET_ERROR == WSAEventSelect(ListenSocket, g_hAcceptEvent, FD_ACCEPT)) { printf("\nError occurred while WSAEventSelect()."); WSACloseEvent(g_hAcceptEvent); goto error; }
AcceptThread
()
将不断查找 Accept 事件。
//This thread will look for accept event DWORD WINAPI AcceptThread(LPVOID lParam) { SOCKET ListenSocket = (SOCKET)lParam; WSANETWORKEVENTS WSAEvents; //Accept thread will be around to look for //accept event, until a Shutdown event is not Signaled. while(WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0)) { if (WSA_WAIT_TIMEOUT != WSAWaitForMultipleEvents(1, &g_hAcceptEvent, FALSE, WAIT_TIMEOUT_INTERVAL, FALSE)) { WSAEnumNetworkEvents(ListenSocket, g_hAcceptEvent, &WSAEvents); if ((WSAEvents.lNetworkEvents & FD_ACCEPT) && (0 == WSAEvents.iErrorCode[FD_ACCEPT_BIT])) { //Process it. AcceptConnection(ListenSocket); } } } return 0; }
AcceptConnection
()
将处理 Accept 事件。它还将套接字关联到 IOCP,并在套接字上发布 WSARecv
()
以接收传入的客户端数据。
//This function will process the accept event
void AcceptConnection(SOCKET ListenSocket)
{
sockaddr_in ClientAddress;
int nClientLength = sizeof(ClientAddress);
//Accept remote connection attempt from the client
SOCKET Socket = accept(ListenSocket,
(sockaddr*)&ClientAddress, &nClientLength);
if (INVALID_SOCKET == Socket)
{
WriteToConsole("\nError occurred while " +
"accepting socket: %ld.",
WSAGetLastError());
}
//Display Client's IP
WriteToConsole("\nClient connected from: %s",
inet_ntoa(ClientAddress.sin_addr));
//Create a new ClientContext for this newly accepted client
CClientContext *pClientContext = new CClientContext;
pClientContext->SetOpCode(OP_READ);
pClientContext->SetSocket(Socket);
//Store this object
AddToClientList(pClientContext);
if (true == AssociateWithIOCP(pClientContext))
{
//Once the data is successfully received, we will print it.
pClientContext->SetOpCode(OP_WRITE);
WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();
//Get data.
DWORD dwFlags = 0;
DWORD dwBytes = 0;
//Post initial Recv
//This is a right place to post a initial Recv
//Posting a initial Recv in WorkerThread
//will create scalability issues.
int nBytesRecv = WSARecv(pClientContext->GetSocket(),
p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL);
if ((SOCKET_ERROR == nBytesRecv) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
WriteToConsole("\nError in Initial Post.");
}
}
}
AssociateWithIOCP()
将接受的套接字关联到 IOCP。
bool AssociateWithIOCP(CClientContext *pClientContext)
{
//Associate the socket with IOCP
HANDLE hTemp = CreateIoCompletionPort((HANDLE)pClientContext->GetSocket(),
g_hIOCompletionPort, (DWORD)pClientContext, 0);
if (NULL == hTemp)
{
WriteToConsole("\nError occurred while" +
" executing CreateIoCompletionPort().");
//Let's not work with this client
RemoveFromClientListAndFreeMemory(pClientContext);
return false;
}
return true;
}
CClientContext
类用于存储客户端相关信息,例如客户端套接字,并且它有一个专门用于重叠 I/O 的缓冲区。我们需要确保在重叠 I/O 进行时,重叠 I/O 中使用的缓冲区不会被更新。
class CClientContext
//To store and manage client related information
{
private:
OVERLAPPED *m_pol;
WSABUF *m_pwbuf;
int m_nTotalBytes;
int m_nSentBytes;
//accepted socket
SOCKET m_Socket;
//will be used by the worker thread
//to decide what operation to perform
int m_nOpCode;
char m_szBuffer[MAX_BUFFER_LEN];
public:
//Get/Set calls
void SetOpCode(int n)
{
m_nOpCode = n;
}
int GetOpCode()
{
return m_nOpCode;
}
void SetTotalBytes(int n)
{
m_nTotalBytes = n;
}
int GetTotalBytes()
{
return m_nTotalBytes;
}
void SetSentBytes(int n)
{
m_nSentBytes = n;
}
void IncrSentBytes(int n)
{
m_nSentBytes += n;
}
int GetSentBytes()
{
return m_nSentBytes;
}
void SetSocket(SOCKET s)
{
m_Socket = s;
}
SOCKET GetSocket()
{
return m_Socket;
}
void SetBuffer(char *szBuffer)
{
strcpy(m_szBuffer, szBuffer);
}
void GetBuffer(char *szBuffer)
{
strcpy(szBuffer, m_szBuffer);
}
void ZeroBuffer()
{
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
}
void SetWSABUFLength(int nLength)
{
m_pwbuf->len = nLength;
}
int GetWSABUFLength()
{
return m_pwbuf->len;
}
WSABUF* GetWSABUFPtr()
{
return m_pwbuf;
}
OVERLAPPED* GetOVERLAPPEDPtr()
{
return m_pol;
}
void ResetWSABUF()
{
ZeroBuffer();
m_pwbuf->buf = m_szBuffer;
m_pwbuf->len = MAX_BUFFER_LEN;
}
//Constructor
CClientContext()
{
m_pol = new OVERLAPPED;
m_pwbuf = new WSABUF;
ZeroMemory(m_pol, sizeof(OVERLAPPED));
m_Socket = SOCKET_ERROR;
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
m_pwbuf->buf = m_szBuffer;
m_pwbuf->len = MAX_BUFFER_LEN;
m_nOpCode = 0;
m_nTotalBytes = 0;
m_nSentBytes = 0;
}
//destructor
~CClientContext()
{
//Wait for the pending operations to complete
while (!HasOverlappedIoCompleted(m_pol))
{
Sleep(0);
}
closesocket(m_Socket);
//Cleanup
delete m_pol;
delete m_pwbuf;
}
};
接下来是工作线程函数 WorkerThread()
。此函数将等待来自 IOCP 的请求,并处理它们。根据提供的操作码,它将执行适当的操作。WorkerThread()
反过来通过设置 CClientContext
的适当操作码向 IOCP 发出操作请求。这些请求将路由到其中一个工作线程,包括请求工作线程。
//Worker thread will service IOCP requests
DWORD WINAPI WorkerThread(LPVOID lpParam)
{
int nThreadNo = (int)lpParam;
void *lpContext = NULL;
OVERLAPPED *pOverlapped = NULL;
CClientContext *pClientContext = NULL;
DWORD dwBytesTransfered = 0;
int nBytesRecv = 0;
int nBytesSent = 0;
DWORD dwBytes = 0, dwFlags = 0;
//Worker thread will be around to process requests,
//until a Shutdown event is not Signaled.
while (WAIT_OBJECT_0 != WaitForSingleObject(g_hShutdownEvent, 0))
{
BOOL bReturn = GetQueuedCompletionStatus(
g_hIOCompletionPort,
&dwBytesTransfered,
(LPDWORD)&lpContext,
&pOverlapped,
INFINITE);
if (NULL == lpContext)
{
//We are shutting down
break;
}
//Get the client context
pClientContext = (CClientContext *)lpContext;
if ((FALSE == bReturn) || ((TRUE == bReturn) &&
(0 == dwBytesTransfered)))
{
//Client connection gone, remove it.
RemoveFromClientListAndFreeMemory(pClientContext);
continue;
}
WSABUF *p_wbuf = pClientContext->GetWSABUFPtr();
OVERLAPPED *p_ol = pClientContext->GetOVERLAPPEDPtr();
switch (pClientContext->GetOpCode())
{
case OP_READ:
pClientContext->IncrSentBytes(dwBytesTransfered);
//Write operation was finished, see if all the data was sent.
//Else post another write.
if(pClientContext->GetSentBytes() <
pClientContext->GetTotalBytes())
{
pClientContext->SetOpCode(OP_READ);
p_wbuf->buf += pClientContext->GetSentBytes();
p_wbuf->len = pClientContext->GetTotalBytes() -
pClientContext->GetSentBytes();
dwFlags = 0;
//Overlapped send
nBytesSent = WSASend(pClientContext->GetSocket(),
p_wbuf, 1, &dwBytes, dwFlags, p_ol, NULL);
if ((SOCKET_ERROR == nBytesSent) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
//Let's not work with this client
RemoveFromClientListAndFreeMemory(pClientContext);
}
}
else
{
//Once the data is successfully received, we will print it.
pClientContext->SetOpCode(OP_WRITE);
pClientContext->ResetWSABUF();
dwFlags = 0;
//Get the data.
nBytesRecv = WSARecv(pClientContext->GetSocket(), p_wbuf, 1,
&dwBytes, &dwFlags, p_ol, NULL);
if ((SOCKET_ERROR == nBytesRecv) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
WriteToConsole("\nThread %d: Error occurred"
" while executing WSARecv().", nThreadNo);
//Let's not work with this client
RemoveFromClientListAndFreeMemory(pClientContext);
}
}
break;
case OP_WRITE:
char szBuffer[MAX_BUFFER_LEN];
//Display the message we recevied
pClientContext->GetBuffer(szBuffer);
WriteToConsole("\nThread %d: The following message"
" was received: %s", nThreadNo, szBuffer);
//Send the message back to the client.
pClientContext->SetOpCode(OP_READ);
pClientContext->SetTotalBytes(dwBytesTransfered);
pClientContext->SetSentBytes(0);
p_wbuf->len = dwBytesTransfered;
dwFlags = 0;
//Overlapped send
nBytesSent = WSASend(pClientContext->GetSocket(), p_wbuf, 1,
&dwBytes, dwFlags, p_ol, NULL);
if ((SOCKET_ERROR == nBytesSent) &&
(WSA_IO_PENDING != WSAGetLastError()))
{
WriteToConsole("\nThread %d: Error "
"occurred while executing WSASend().", nThreadNo);
//Let's not work with this client
RemoveFromClientListAndFreeMemory(pClientContext);
}
break;
default:
//We should never be reaching here, under normal circumstances.
break;
} // switch
} // while
return 0;
}
下面显示了一些用于初始化和清理的函数。请注意,CleanUp()
中使用了 PostQueuedCompletionStatus()
,以帮助 WorkerThread()
摆脱对 GetQueuedCompletionStatus()
的阻塞调用。
bool Initialize()
{
//Find out number of processors and threads
g_nThreads = WORKER_THREADS_PER_PROCESSOR * GetNoOfProcessors();
printf("\nNumber of processors on host: %d", GetNoOfProcessors());
printf("\nThe following number of worker threads" +
" will be created: %d", g_nThreads);
//Allocate memory to store thread handless
g_phWorkerThreads = new HANDLE[g_nThreads];
//Initialize the Console Critical Section
InitializeCriticalSection(&g_csConsole);
//Initialize the Client List Critical Section
InitializeCriticalSection(&g_csClientList);
//Create shutdown event
g_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// Initialize Winsock
WSADATA wsaData;
int nResult;
nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
if (NO_ERROR != nResult)
{
printf("\nError occurred while executing WSAStartup().");
return false; //error
}
else
{
printf("\nWSAStartup() successful.");
}
if (false == InitializeIOCP())
{
printf("\nError occurred while initializing IOCP");
return false;
}
else
{
printf("\nIOCP initialization successful.");
}
return true;
}
//Function to Initialize IOCP
bool InitializeIOCP()
{
//Create I/O completion port
g_hIOCompletionPort =
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
if ( NULL == g_hIOCompletionPort)
{
printf("\nError occurred while creating IOCP: %d.",
WSAGetLastError());
return false;
}
DWORD nThreadID;
//Create worker threads
for (int ii = 0; ii < g_nThreads; ii++)
{
g_phWorkerThreads[ii] = CreateThread(0, 0, WorkerThread,
(void *)(ii+1), 0, &nThreadID);
}
return true;
}
void CleanUp()
{
//Ask all threads to start shutting down
SetEvent(g_hShutdownEvent);
//Let Accept thread go down
WaitForSingleObject(g_hAcceptThread, INFINITE);
for (int i = 0; i < g_nThreads; i++)
{
//Help threads get out of blocking - GetQueuedCompletionStatus()
PostQueuedCompletionStatus(g_hIOCompletionPort, 0,
(DWORD) NULL, NULL);
}
//Let Worker Threads shutdown
WaitForMultipleObjects(g_nThreads, g_phWorkerThreads, TRUE, INFINITE);
//We are done with this event
WSACloseEvent(g_hAcceptEvent);
//Cleanup dynamic memory allocations, if there are any.
CleanClientList();
}
void DeInitialize()
{
//Delete the Console Critical Section.
DeleteCriticalSection(&g_csConsole);
//Delete the Client List Critical Section.
DeleteCriticalSection(&g_csClientList);
//Cleanup IOCP.
CloseHandle(g_hIOCompletionPort);
//Clean up the event.
CloseHandle(g_hShutdownEvent);
//Clean up memory allocated for the storage of thread handles
delete[] g_phWorkerThreads;
//Cleanup Winsock
WSACleanup();
}
为了存储客户端信息,我使用了一个向量;为了管理列表,我使用了以下一组调用。
//Store client related information in a vector
void AddToClientList(CClientContext *pClientContext)
{
EnterCriticalSection(&g_csClientList);
//Store these structures in vectors
g_ClientContext.push_back(pClientContext);
LeaveCriticalSection(&g_csClientList);
}
//This function will allow to remove one single client out of the list
void RemoveFromClientListAndFreeMemory(CClientContext *pClientContext)
{
EnterCriticalSection(&g_csClientList);
std::vector <cclientcontext>::iterator IterClientContext;
//Remove the supplied ClientContext
//from the list and release the memory
for (IterClientContext = g_ClientContext.begin();
IterClientContext != g_ClientContext.end();
IterClientContext++)
{
if (pClientContext == *IterClientContext)
{
g_ClientContext.erase(IterClientContext);
//i/o will be cancelled and socket
//will be closed by destructor.
delete pClientContext;
break;
}
}
LeaveCriticalSection(&g_csClientList);
}
//Clean up the list, this function
//will be executed at the time of shutdown
void CleanClientList()
{
EnterCriticalSection(&g_csClientList);
std::vector <cclientcontext>::iterator IterClientContext;
for (IterClientContext = g_ClientContext.begin();
IterClientContext != g_ClientContext.end( );
IterClientContext++)
{
//i/o will be cancelled and socket
//will be closed by destructor.
delete *IterClientContext;
}
g_ClientContext.clear();
LeaveCriticalSection(&g_csClientList);
}
我创建并使用了一个 WriteToConsole()
函数,它将同步工作线程发送的控制台输出;由于我使用 printf()
写入控制台,因此控制台将存在竞争条件。它使用临界区进行同步。
//Function to synchronize console output
//Threads need to be synchronized while they write to console.
//WriteConsole() API can be used, it is thread-safe, I think.
//I have created my own function.
void WriteToConsole(char *szFormat, ...)
{
EnterCriticalSection(&g_csConsole);
va_list args;
va_start(args, szFormat);
vprintf(szFormat, args );
va_end(args);
LeaveCriticalSection(&g_csConsole);
}
最后是 GetNoOfProcessors()
的代码,一个获取主机上处理器数量的函数。
//The use of static variable will ensure that
//we will make a call to GetSystemInfo()
//to find out number of processors,
//only if we don't have the information already.
//Repeated use of this function will be efficient.
int GetNoOfProcessors()
{
static int nProcessors = 0;
if (0 == nProcessors)
{
SYSTEM_INFO si;
GetSystemInfo(&si);
nProcessors = si.dwNumberOfProcessors;
}
return nProcessors;
}
IOCP 客户端
以下是一个 IOCP 客户端;它将允许我们对服务器进行压力测试。它使用传统的套接字调用。我使用线程来对服务器施加额外的压力。
#include "stdafx.h" #include <stdio.h> #include <stdlib.h> #include <conio.h> #include <string.h> #include <winsock2.h> #include "ClientIOCP.h" int main(int argc, char* argv[]) { //Validate the input if (argc < 3) { printf("\nUsage: %s hostname port.", argv[0]); return 1; //error } //Initialize Winsock WSADATA wsaData; int nResult = WSAStartup(MAKEWORD(2,2), &wsaData); if (NO_ERROR != nResult) { printf("\nError occurred while executing WSAStartup()."); return 1; //error } //Initialize the Console Critical Section InitializeCriticalSection(&g_csConsole); int nPortNo = atoi(argv[2]); char szBuffer[MAX_BUFFER_LEN]; int nNoOfThreads = 0; int nNoOfSends = 0; printf("\nPlease enter message to be sent to the server: "); //Read the message from server gets(szBuffer); printf("\nPlease enter number of threads to be created: "); //No. of times we will send the message to the server scanf("%d", &nNoOfThreads); printf("\nPlease enter number of times the" + " messages needs to be sent: "); //No. of times we will send the message to the server scanf("%d", &nNoOfSends); HANDLE *p_hThreads = new HANDLE[nNoOfThreads]; ThreadInfo *pThreadInfo = new ThreadInfo[nNoOfThreads]; bool bConnectedSocketCreated = false; DWORD nThreadID; for (int ii = 0; ii < nNoOfThreads; ii++) { bConnectedSocketCreated = CreateConnectedSocket(&(pThreadInfo[ii].m_Socket), argv[1], nPortNo); if (!bConnectedSocketCreated) { //Clean up memory delete[] p_hThreads; delete[] pThreadInfo; //failed in creating of connected socket, error out. return 1; } //Populate ThreadInfo pThreadInfo[ii].m_nNoOfSends = nNoOfSends; pThreadInfo[ii].m_nThreadNo = ii+1; sprintf(pThreadInfo[ii].m_szBuffer, "Thread %d - %s", ii+1, szBuffer); //Create thread and start banging the server p_hThreads[ii] = CreateThread(0, 0, WorkerThread, (void *)(&pThreadInfo[ii]), 0, &nThreadID); } //Let Worker Threads shutdown WaitForMultipleObjects(nNoOfThreads, p_hThreads, TRUE, INFINITE); //Close the sockets here for (ii = 0; ii < nNoOfThreads; ii++) { closesocket(pThreadInfo[ii].m_Socket); } //Clean up memory delete[] p_hThreads; delete[] pThreadInfo; //Delete the Console Critical Section. DeleteCriticalSection(&g_csConsole); //Cleanup Winsock WSACleanup(); return 0; } //vprintf() is not thread safe void WriteToConsole(char *szFormat, ...) { EnterCriticalSection(&g_csConsole); va_list args; va_start(args, szFormat); vprintf(szFormat, args ); va_end(args); LeaveCriticalSection(&g_csConsole); } bool CreateConnectedSocket(SOCKET *pSocket, char *szHost, int nPortNo) { struct sockaddr_in ServerAddress; struct hostent *Server; //Create a socket *pSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == *pSocket) { WriteToConsole("\nError occurred while" + " opening socket: %d.", WSAGetLastError()); return false; //error } //Server name will be supplied as a commandline argument //Get the server details Server = gethostbyname(szHost); if (Server == NULL) { closesocket(*pSocket); WriteToConsole("\nError occurred no such host."); return false; //error } //Cleanup and Init with 0 the ServerAddress ZeroMemory((char *) &ServerAddress, sizeof(ServerAddress)); ServerAddress.sin_family = AF_INET; //Assign the information received from gethostbyname() CopyMemory((char *)&ServerAddress.sin_addr.s_addr, (char *)Server->h_addr, Server->h_length); ServerAddress.sin_port = htons(nPortNo); //Establish connection with the server if (SOCKET_ERROR == connect(*pSocket, reinterpret_cast<const>(&ServerAddress), sizeof(ServerAddress))) { closesocket(*pSocket); WriteToConsole("\nError occurred while connecting."); return false; //error } return true; } DWORD WINAPI WorkerThread(LPVOID lpParam) { ThreadInfo *pThreadInfo = (ThreadInfo*)lpParam; char szTemp[MAX_BUFFER_LEN]; int nBytesSent = 0; int nBytesRecv = 0; for (int ii = 0; ii < pThreadInfo->m_nNoOfSends; ii++) { sprintf(szTemp, "%d. %s", ii+1, pThreadInfo->m_szBuffer); //Send the message to the server, include the NULL as well nBytesSent = send(pThreadInfo->m_Socket, szTemp, strlen(szTemp), 0); if (SOCKET_ERROR == nBytesSent) { WriteToConsole("\nError occurred while " + "writing to socket %ld.", WSAGetLastError()); return 1; //error } //Get the message from the server nBytesRecv = recv(pThreadInfo->m_Socket, szTemp, 255, 0); if (SOCKET_ERROR == nBytesRecv) { WriteToConsole("\nError occurred while reading " + "from socket %ld.", WSAGetLastError()); return 1; //error } //Display the server message WriteToConsole("\nServer sent the following message: %s", szTemp); } return 0; //success }
关注点
I/O 完成端口是一种非常强大的机制,可以创建高度可扩展和健壮的服务器应用程序,需要与重叠 I/O 一起使用。它应该在服务多个客户端的基于套接字的服务器应用程序中与 WinSock 一起使用。IOCP 将为设计增添很多价值。
历史
- 2006 年 3 月 24 日 – 使用重叠 I/O 实现。
- 2006 年 4 月 – 花时间研究重叠 I/O。进行了与重叠 I/O 相关的更新。
- 2006 年 5 月 29 日 – 更改了 IOCP 实现,并实现了服务器的优雅关闭。
- 2006 年 6 月 9 日 – 添加了注释以便于理解代码,并进行了少量代码更新。
- 2006 年 8 月 19 日 – 更新了客户端和服务器以进行压力测试。
- 2006 年 8 月 22 日 – 客户端现在是多线程的。客户端获得了额外的冲击力。
- 2006 年 9 月 24 日 – IOCP 服务器上创建的工作线程数量将与主机上的处理器数量成比例。此外,客户端和服务器都使用新的改进的
WriteToConsole
()
。 - 2006 年 9 月 28 日 – 次要更新。
- 2007 年 2 月 3 日 - 根据 xircon (CodeProject) 的输入修复了一个缺陷,并进行了 Visual C++ 2005 迁移更新。
- 2007 年 3 月 - 通过更新 WSAWaitForMultipleEvents() 降低了 CPU 使用率。