处理多个待处理的套接字读写操作






4.90/5 (26投票s)
本文解释了在单个套接字上拥有多个待处理的接收调用可能存在的问题。
以下源代码使用 Visual Studio 6.0 SP5 和 Visual Studio .NET 构建。您需要安装一个版本的 Microsoft Platform SDK。
请注意,代码的调试版本由于调试跟踪输出会浪费大量 CPU 周期。仅在发布版本上进行性能分析才有意义。
概述
“如何处理多个待处理 WSARecv()
调用?”是 Winsock 新闻组上的一个常见问题。似乎每个人都知道在套接字上拥有多个待处理读取请求通常是个好主意,并且每个人同样意识到有时这样做代码会不正常工作。本文解释了多个待处理接收可能存在的问题,并在过去几篇文章中开发的、可重用的服务器框架中提供了一个解决方案。
顺序混乱
在使用 I/O 完成端口和多个线程时,存在一个微妙的问题。尽管使用 I/O 完成端口的操作总是会按照提交的顺序完成,但线程调度问题可能意味着与完成相关的实际工作是以未定义的顺序处理的。例如,如果我们向套接字提交三个 WSARecv
请求,那么它们保证会按照我们提交的顺序完成,但是如果我们有 2 个线程服务 I/O 完成端口,那么两个完成可能会同时处理。如果处理“第一个”WSARecv
完成的线程被中断,第二个完成可能会在第一个完成之前完全处理。在多处理器机器上,这种情况更有可能发生,因为两个线程可能确实是同时执行的,但在单处理器盒子上也可能发生。一如既往,这是那种微妙的问题,很可能直到软件发布到生产环境后才会显现……
上面的例子很容易避免,只需不在单个套接字上拥有多个待处理的 WSARecv
请求即可。这就是我们在前几篇文章中开发的示例服务器中一直所做的。然而,这会降低性能,因为当数据实际到达网络接口时,有一个挂起的接收总是比数据已到达后再发布接收更具性能。拥有多个待处理的 WSARecv
调用可确保始终有一个调用挂起。更重要的是,问题不仅限于拥有多个 WSARecvs
。在我们的服务器框架中,我们将所有套接字 I/O 调用从用户线程通过 I/O 完成端口 marshall 到我们的 I/O 线程池。这意味着用户线程有可能向套接字发出多个连续的写入操作,并且这些操作可能以未定义的顺序执行。
到目前为止,在我们示例服务器中的代码,例如
void CSocketServer::ReadCompleted(
Socket *pSocket,
CIOBuffer *pBuffer)
{
// do stuff...
pSocket->Write(pBuffer); // echo the command
pSocket->Write(pResponse1); // send part 1 response
pSocket->Write(pResponse2); // send part 2 response
}
存在潜在的不安全,因为写入操作在用户线程上不是同步发生的,它们被发布到 I/O 完成端口并在 I/O 线程池中执行。
保留 I/O 完成操作的顺序相对简单。您应该还记得,传递给所有使用 I/O 完成端口的调用的 overlapped 结构代表“每个调用”数据。我们可以,并且确实通过使用 CIOBuffer
类来扩展 overlapped 结构以包含我们自己的“每个调用”数据。如果我们向 CIOBuffer
添加一个序列号,我们可以将序列号设置为用户线程中的“next
”值,然后确保我们在 I/O 线程池中按顺序处理缓冲区。这个概念适用于任何 I/O 完成端口操作,并且每个不同的操作都需要自己的序列号。对于我们的服务器框架,这意味着我们的 Socket
类现在必须维护读写请求的独立序列号。
Socket
的 Write
方法中的序列号管理代码可能是这样的
pBuffer->SetSequenceNumber(m_writeSequenceNumber++);
为了确保序列号实际代表提交操作的顺序,需要将序列号的设置和操作的提交作为一个原子操作。对于我们的套接字写入,这不是问题,因为我们只保证在单个线程上执行的写入顺序;对于套接字读取,我们需要确保序列号的分配和 WSARecv()
的调用发生,而不会让另一个线程有机会同时执行读取。这涉及到使用关键代码段来锁定对套接字在序列号分配和 WSARecv()
调用期间的访问。在此区域未能锁定可能导致 WSARecv()
调用发生的实际顺序与分配的序列号的顺序不匹配。
有序处理
确保按顺序处理 I/O 完成的代码稍微复杂一些。对于每个不同的 I/O 操作,我们需要跟踪我们可以处理的下一个序列号。当 GetQueuedCompletionStatus()
调用返回时,我们需要将请求中的序列号与我们可以处理的下一个序列号进行比较。如果这些数字匹配,那么我们可以处理该请求。如果不匹配,则当前无法处理该请求。如果无法处理 I/O 操作,应将其存储以供以后处理。无序请求的存储需要以序列号为键。当 I/O 线程发现它无法处理当前请求时,它应该将当前请求添加到存储中,并检查存储中是否有可以处理的请求。当一个请求被处理时,I/O 线程应该做的最后一件事是原子地递增代表下一个要处理的序列号的值,并检查存储中是否有可以处理的 I/O 请求。
上述策略处理了多个 I/O 请求并发完成的情况。只有一个线程可以处理满足“下一个要处理”标准的 I/O 请求,所有其他线程都将把它们的请求添加到存储中。当处理请求的线程完成处理时,它可以检查存储中是否有其他现在可以处理的请求。如果一个线程需要存储其 I/O 请求,它可以这样做,然后以原子操作检查一个可以处理的请求。
实际上,它比看起来要复杂,按顺序处理操作的代码可能如下所示
pBuffer = pSocket->m_outOfSequenceWrites.GetNext(pBuffer);
while(pBuffer)
{
DWORD dwFlags = 0;
DWORD dwSendNumBytes = 0;
if (SOCKET_ERROR == ::WSASend(
pSocket->m_socket,
pBuffer->GetWSABUF(),
1,
&dwSendNumBytes,
dwFlags,
pBuffer,
NULL))
{
// handle errors etc.
}
pBuffer = pSocket->m_outOfSequenceWrites.ProcessAndGetNext();
}
存储本身需要将序列号映射到 CIOBuffers
。数据结构的明显选择是 std::map<>
,尽管您的性能要求和性能分析可能决定不同的选择。GetNext()
接收一个缓冲区,将其序列号与我们可以处理的下一个序列号进行比较,然后返回缓冲区或将缓冲区添加到映射,然后检查映射以查看映射中的第一个缓冲区是否是我们可以处理的。请记住,映射会根据其键的顺序存储其元素,并且我们将序列号用作键,因此 m_list.begin()
指的是映射中具有最小序列号的元素。如果此函数返回 null
,则我们仍在等待“下一个”缓冲区到达。
CIOBuffer *CIOBuffer::InOrderBufferList::GetNext(
CIOBuffer *pBuffer)
{
CCriticalSection::Owner lock(m_criticalSection);
if (m_next == pBuffer->GetSequenceNumber())
{
return pBuffer;
}
BS::value_type value(pBuffer->GetSequenceNumber(), pBuffer));
std::pair<BS::iterator, bool> result = m_list.insert(value);
if (result.second == false)
{
// handle error, element already in map
}
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
处理完缓冲区后,线程可以检查是否还有其他缓冲区可以处理。它需要原子地增加最后一个处理值并执行检查,因此需要锁定。
CIOBuffer *CIOBuffer::InOrderBufferList::ProcessAndGetNext()
{
CCriticalSection::Owner lock(m_criticalSection);
::InterlockedIncrement(&m_next);
CIOBuffer *pNext = 0;
BufferSequence::iterator it;
it = m_list.begin();
if (it != m_list.end())
{
if (it->first == m_next)
{
pNext = it->second;
m_list.erase(it);
}
}
return pNext;
}
处理读取
如果每个写入操作使用的 CIOBuffer
都包含一个序列号,那么可以使用类似的、代码来确保已完成的读取请求按正确的顺序处理。但是,将此代码放在服务器框架中意义不大,因为框架的不同用户可能需要不同的功能。CSocketServer
派生类可以使用 CIOBuffer::InOrderBufferList
类来维护处理顺序,或者它可以简单地将读取完成分派到另一个 I/O 完成端口,以将其传递给 业务逻辑线程池。在这种情况下,实际处理数据的代码位于业务逻辑线程池中,并且应在此处维护顺序。它甚至可能需要两者兼顾,在 CSocketServer
类本身中确保数据包顺序,以便能够成功地将字节流分解为消息,然后将消息分派到业务逻辑线程池,并确保这些完整消息也按正确的顺序处理。
锁定粒度
每个 Socket
现在必须跟踪独立的读写序列号,并维护一个无序写请求的映射。映射和相关的下一个序列号计数器的操作必须受到保护。我们使用关键代码段来保护此代码。请注意,为每个 Socket
连接分配一个关键代码段可能会消耗大量资源。相反,我们可以选择通过牺牲锁定粒度来换取性能。CSocketServer
类已经有一个关键代码段,它使用该关键代码段来保护其 Sockets
列表,我们可以将此关键代码段的引用传递给每个 Socket
,而不是让它们创建自己的关键代码段。这样做的问题是,我们将所有 Socket
的映射访问串行化。在关键代码段内执行的工作量很小,但一个更好的解决方案可能是为每 X 个套接字创建一个关键代码段,其中 X 是通过性能分析您的应用程序确定的值。
只为使用过的付费
在用于发送和接收的所有缓冲区中包含序列号并确保写入按顺序处理,会给 I/O 线程的工作带来一些开销。如果您确定您的服务器不需要此功能,也许是因为您知道由于您的协议设计,将只有一个读取或写入请求挂起,则可以通过在 CSocketServer
的构造函数中将 useSequenceNumbers
标志传递为 false
来选择不包含此功能。独立启用读写序列号留给读者练习。
一个示例
为了演示确保多个读写操作顺序的雹概念,我们提出了一个相当牵强的例子。我们在 上一篇文章中开发的包回显服务器已更改如下
- 它现在在一个业务逻辑线程池中工作,以便我们能够演示在套接字服务器和业务逻辑线程池中维护接收顺序,当套接字服务器的工作线程本身不进行处理时。
- 它处理更大的数据包;我们使用两字节的数据包头而不是一字节的数据包头。这个两字节的头表示数据包的长度,格式如下:
packetLength = byte1 + (byte2 * 256)
。数据包的长度包括两字节的头。 - 当客户端最初连接时,它会发布可配置数量的读取。每当一个读取完成时,它就会发布一个新的读取,从而保持待处理读取的数量。
- 它按顺序处理读取,并且由于我们现在有多个待处理读取,
CSocketServer::ProcessDataStream()
已更改为当需要更多数据时,我们不会简单地重新发布读取以将更多数据读入同一个缓冲区。 - 它通过发布多个写入请求将数据包分块回显给客户端。
大型数据包回显服务器可在此处下载 here,文件名为 SocketServer6.zip。可以使用 telnet 进行测试,尽管更复杂,您可能会发现使用我们在此处开发的 测试平台来测试它更容易。与之前的示例一样,服务器运行直到一个命名的事件被设置然后关闭。非常简单的服务器关闭程序,可在此处 here 获得,为服务器提供了关闭开关。
虽然服务器和线程池类都可以配置是否使用序列号来维护数据包顺序,但为了使服务器能够以测试平台期望的方式工作,这些设置只能以一种方式设置。所有数据包排序标志都必须设置为 true
。这些标志的目的是让您可以关闭各种所需的排序,并查看对测试的影响。服务器不打算在任何其他配置下可靠运行。
CThreadPool pool(
5, // initial number of threads to create
5, // minimum number of threads to keep in the pool
10, // maximum number of threads in the pool
5, // maximum number of "dormant" threads
5000, // pool maintenance period (millis)
100, // dispatch timeout (millis)
10000, // dispatch timeout for when pool is at max threads
20, // (1) number of reads to post
true, // (2) maintain packet order with sequence numbers
true); // (3) echo packets with multiple writes
pool.Start();
CSocketServer server(
INADDR_ANY, // address to listen on
5001, // port to listen on
10, // max number of sockets to keep in the pool
10, // max number of buffers to keep in the pool
1024, // buffer size
pool,
65536, // max message size
true, // (4) maintain read packet order with sequence numbers
true, // (5) maintain write packet order with sequence numbers
true); // (6) issue a new read before we've completely processed
// this one
可以调整配置标志以观察以下效果
- 上面的配置确保进入读取完成方法的包按顺序处理 - 这维护了传入数据包数据的有效性;进入工作线程的包按顺序维护 - 这维护了回显实际包的顺序;并且写入调用的顺序得到维护 - 这维护了传出数据包数据的有效性。
- 如果要发布的读取数量(1)减少到 1,则无需维护读取完成排序((4)可以设置为
false
),只要读取完成方法在完成当前读取的处理之前不发出另一个读取((6)应设置为false
)。 - 如果业务逻辑线程池不尝试维护数据包排序((2)设置为
false
),那么测试很可能会报告序列号不匹配 - 因为数据包按无序顺序回显,并且响应 != 消息错误,因为业务逻辑线程池中的多个线程尝试以不同步的方式写入消息片段,从而交错不同消息的部分。 - 如果(2)保持为
false
但(3)也设置为false
,那么测试只会因序列号不匹配而失败,因为线程现在将它们的数据包回显为单个写入,因此各个数据包中的数据不会因与其他数据包的部分交错而被损坏。 - 不幸的是,目前无法在套接字服务器(选项(5))处简单地关闭写入数据包排序,因为这样做也会关闭读取数据包排序,因此无法将有效数据传递给业务逻辑线程,以便它可以回显,我们也无法看到写入操作按无序执行。如果您有兴趣观看此操作,您可以修改套接字服务器代码。
修订历史
- 2002 年 7 月 15 日 - 初始修订
- 2002 年 8 月 12 日 - 删除了套接字关闭中的竞态条件 - 感谢 David McConnell 指出这一点。派生类可以接收连接重置和连接错误通知。套接字提供了一种确定发送/接收是否已连接的方法。分派到线程池现在使用共享
enum
而不是硬编码的常量值。通用代码清理和 lint 问题。调整代码和文章,使每个套接字都有自己的关键代码段,并建议资源利用优化,而不是强制执行。修复了一个错误,即用于保护每个套接字数据的关键代码段由工作线程拥有,而不是由每个套接字数据拥有。