实现异步命名管道服务器 - 第 2 部分






4.76/5 (5投票s)
如何实现命名管道服务器与客户端应用程序通信
引言
在上一篇文章中,我描述了如何以一种所有连接处理都在专用线程中异步进行的方式来构建命名管道服务器。在本文中,我将描述如何在一个 IO 工作线程池中执行 IO。
背景
在上一篇文章中,我解释了命名管道、Windows 事件和等待操作的基础知识。不出所料,这些信息在这里也同样适用。特别是自动重置事件的使用。
如我所提到的,有几种方法可以执行异步 IO。Windows 提供了 ThreadPool
API、完成端口支持以及 C++ 并行化支持等。虽然所有这些方法都是完全有效的方法,但我决定采取一种略有不同的方法。在上一篇文章中,我使用了 OVERLAPPED
结构和事件来进行异步部分,并且我想重用这些概念。
我的代码将有一个线程池,所有线程都执行完全相同的操作:等待与在每个活动客户端连接上执行的 IO 读取操作关联的事件。这个概念的关键是,所有活动线程同时等待所有 IO 事件。这乍一看似乎违反直觉。然而,一个自动重置事件只会满足 1 个等待操作,而让其他线程可以自由地接收其他事件。即使一个线程被 IO 完成触发,涉及不同线程中相同事件句柄的等待操作也不会受到影响,并且可能会被将来的 IO 完成所满足。
概念上,最多有 X 个并发客户端连接,以及一个可以并发处理 Y 条消息的线程池。
IO 缓冲区
在执行管道 IO 时,我们必须使用 ReadFile
启动一个 IO 读取操作。不幸的是,我们事先不知道消息有多大。提供给管道的缓冲区大小仅与低级保留有关,并且不会限制或保证客户端将发送的消息长度,或服务器需要发送回的消息长度。
这意味着,虽然我们可以将一个缓冲区与一个 HANDLE
关联起来用于执行 ReadFile
,但并不能保证这足够。存在一个实际的可能性,即在某个时候需要扩展缓冲区。此外,当我们开始一个可能的后续 IO 请求来读取消息的其余部分时,数据必须填充在前一个操作结束的地方。因此,我们需要一个写指针/位置跟踪。幸运的是,我有一个 CBuffer<T>
缓冲区类,它是在早期文章中介绍过的,并且实现了动态内存分配。
基于此,我们可以轻松地构建 CIOBuffer
。
CIOBuffer::CIOBuffer(DWORD defaultSize) :
CBuffer<BYTE>(defaultSize),
m_DefaultSize(defaultSize),
m_WriteOffset(0) { }
CIOBuffer::CIOBuffer() :
CBuffer<BYTE>(0),
m_DefaultSize(0),
m_WriteOffset(0) { }
void CIOBuffer::Reset() {
Resize(m_DefaultSize);
m_WriteOffset = 0;
}
void CIOBuffer::Reset(DWORD defaultSize) {
m_DefaultSize = defaultSize;
Reset();
}
void CIOBuffer::AddSize(DWORD additionalSize) {
Resize(Size() + additionalSize);
}
DWORD CIOBuffer::Offset() {
return m_WriteOffset;
}
void CIOBuffer::SetOffset(DWORD newOffset) {
m_WriteOffset = newOffset;
}
void* CIOBuffer::WritePtr() {
return (BYTE*) m_buffer + m_WriteOffset;
}
DWORD CIOBuffer::SizeRemaining() {
return Size() - m_WriteOffset;
}
void* CIOBuffer::Ptr() {
return m_buffer;
}
DWORD CIOBuffer::Size() {
return CBuffer<BYTE>::Size();
}
正如您所看到的,它基本上是一个动态内存的非常薄的包装器,外加一个变量来跟踪写指针和缓冲区中剩余的可用大小。这还为数据消费者提供了一种了解实际接收了多少数据的方式。
客户端 IO 上下文
每个连接的客户端都有与之关联的一组信息:管道句柄、OVERLAPPED
结构、事件……因此,将这些东西组合在一个类中是有意义的,因为它们之间存在 1 对 1 的关系。
class CIOContext
{
public:
CIOContext();
CHandle IOHandle; //The handle for the object on which the IO is done
CAutoResetEvent IOEvent; //completion event for the object
OVERLAPPED Overlapped; //overlapped completion structure
CIOBuffer InputBuffer; //buffer for input IO
CIOBuffer OutputBuffer; //buffer for output IO
};
这个类没有关联的代码。它充当一个通用的 struct
。唯一有的代码是在构造函数中,它使用默认构造函数初始化成员。我们可以这样做,因为所有成员都可以初始化,而无需名称。
CNamedPipeWorkerPool 类
在展示代码之前,需要一些背景信息。
如我所提到的,此实现使用 OVERLAPPED
事件来接收通知。所有线程都等待所有事件,这意味着当我们使用 WaitForMultipleObjects
时,我们将受限于最大并发等待对象数量。该数量为 64。此外,1 个事件必须保留给内部的关闭事件,该事件的使用方式与连接处理程序中的关闭方式类似。因此,最大并发客户端数量为 63。
该池具有以下内部状态信息(为清晰起见,已移除方法)
class CNamedPipeWorkerPool
{
public:
static const DWORD MAX_INSTANCES = MAXIMUM_WAIT_OBJECTS - 1;
private:
static const DWORD NUM_WAIT_OBJECTS = MAXIMUM_WAIT_OBJECTS;
static const DWORD MAX_THREADS = 16;
static const DWORD SHUTDOWN_EVT_INDEX = MAX_INSTANCES;
CHandle m_ServerThread[MAX_THREADS];
CIOContext m_IOContext[MAX_INSTANCES];
CManualResetEvent m_ShutdownEvent;
HANDLE m_WaitObjects[NUM_WAIT_OBJECTS];
PipeWorkerMessageHandler m_MessageHandler;
void* m_Context;
DWORD m_NumThreads;
}
尽可能避免动态调整数组大小,原因很简单,为了节省几字节的内存优化,我们需要添加程序上的复杂性(范围检查、错误处理……),这不仅花费时间,还会增加可执行文件大小,所以并没有真正获得太多收益。尤其是在我们只谈论几百字节的情况下。
MAX_INSTANCES
的值比最大等待对象数量少一,因为每个实例都需要一个关联的事件,并且还保留一个额外的关闭事件。最大等待对象数量也是一个常量 NUM_WAIT_OBJECTS
。我做了这个决定,因为如果我更改异步操作的机制,MAXIMUM_WAIT_OBJECTS
宏可能不再适用。
有多种方法可以设置最大线程数。一种明智的方法是,线程数永远不能大于可用的 CPU 核心数。然而,这个数字在编译时是无法得知的。我本可以选择 63(与最大连接数相同),但这也不是一个明智的数字。16 似乎是一个合理的上限。如果需要,以后可以轻松增加。
每个可能的连接都有一个 IO 上下文。有一个单独的数组包含原始句柄,因为这正是 WaitForMultipleObjects
所期望的。
现在,让我们看一下构造函数
CNamedPipeWorkerPool::CNamedPipeWorkerPool(
DWORD numWorkerThreads,
PipeWorkerMessageHandler messageHandler,
void* context,
DWORD nInBufferSize,
DWORD nOutBufferSize) :
m_ShutdownEvent(),
m_MessageHandler(messageHandler),
m_NumThreads(numWorkerThreads),
m_Context(context),
m_IOContext()
{
m_WaitObjects[SHUTDOWN_EVT_INDEX] = m_ShutdownEvent;
for (DWORD i = 0; i < MAX_INSTANCES; i++)
{
m_WaitObjects[i] = m_IOContext[i].IOEvent;
m_IOContext[i].InputBuffer.Reset(nInBufferSize);
m_IOContext[i].OutputBuffer.Reset(nInBufferSize);
}
try
{
for (DWORD i = 0; i < m_NumThreads; i++) {
m_ServerThread[i] =
CreateThread(NULL, 0, WorkerThreadFunc, this, 0, NULL);
if (m_ServerThread == NULL) {
throw ExWin32Error
(L"CNamedPipeWorkerPool::CNamedPipeWorkerPool CreateThread");
}
}
}
catch (...)
{
for (DWORD i = 0; i < MAX_THREADS; i++) {
if (m_ServerThread[i].IsValid())
#pragma warning( push)
#pragma warning( disable : 6258)
TerminateThread(m_ServerThread[i], -1);
#pragma warning (pop)
}
throw;
}
}
IO 上下文值都在初始化列表中初始化,之后我们使用这些值手动初始化等待对象数组。HANDLE
值是托管值的未托管副本并不重要。它们将在对象销毁时被清理,届时将没有人再使用它们。
初始化线程会更复杂一些。我们需要创建一定数量的线程,这即使不太可能发生,也可能失败。假设我们需要创建 8 个线程,并且在第 5 个线程时出现错误。终止这些线程至关重要。然而,严格来说,我们无法使用与 CNamedPipeWorkerPool
对象绑定的任何东西来做到这一点。我们仍然在构造函数中。如果我们在这里抛出异常,析构函数将不会被执行。这意味着这些线程可能仍然存在,并在 CNamedPipeWorkerPool
对象被删除后访问它。
这意味着我们必须在那里采取任何行动。可以设想,我们可以触发关闭事件并等待活动的线程完成并等待它们的句柄。然而,在这种特定情况下,我们可以采取一种更简单的方法。我们知道 CNamedPipeWorkerPool
仍在构造中。可以保证这些线程还没有执行任何需要清理的操作。在这种特定场景下,使用 TerminateThread
来简单地将它们从存在中移除是安全的。
TerminateThread
并非总是安全的。这样终止的线程不会执行清理,也不会以 RAII 兼容的方式展开堆栈。因此,这会引起编译器警告。我显式地禁用该特定代码行的编译器警告,以表明我意识到该编译器警告并且忽略它是一个有意识的决定。通常忽略该警告是一个坏主意,因为其他阅读代码的人会感到困惑。
创建线程后,对象就可以开始提供 IO 并接收客户端请求了。
添加连接
我之前提到过,我想在不使用锁或其他同步机制的情况下实现这个类。起初,这似乎要求很高,因为我们谈论的是同时在多个线程中使用和更新的内容。然而,以下情况属实:
- 连接仅在一个线程中添加,即连接处理程序线程。
- 连接仅在 IO 工作池中删除。
- IO 完成事件(无论是成功还是失败)只会触发一个 IO 线程。
- 保存所有信息的数组是固定长度的,不需要在运行时调整大小。
那么,让我们来看看如何添加连接。
void CNamedPipeWorkerPool::AddClientConnection(CHandle connection)
{
CIOContext* ioContext = NULL;
for (DWORD i = 0; i < MAX_INSTANCES; i++)
{
if (!m_IOContext[i].IOHandle.IsValid()) {
ioContext = &m_IOContext[i];
break;
}
}
if (ioContext == NULL)
throw AppException
("CNamedPipeWorker::AddClientConnection no more empty slots");
ioContext->IOHandle = connection;
try
{
InitiateRead(connection, ioContext->InputBuffer, &ioContext->Overlapped);
}
catch (exception& ex)
{
DisconnectNamedPipe(ioContext->IOHandle);
ioContext->IOHandle = NULL;
throw;
}
}
首要任务是找到一个可用的 IO 上下文。请记住,OVERLAPPED
结构、事件和缓冲区都已预先分配。我们只需要找到一个没有关联连接句柄的插槽。有尽可能多的插槽,就有尽可能多的可能连接,所以一个连接的句柄意味着必须有一个可用的插槽。但是,我们有一个安全检查,以防万一。
我们首先将句柄分配给该插槽,然后启动异步读取操作以等待消息。如果由于任何原因失败,我们将断开客户端连接并关闭句柄。
IO 处理线程
IO 线程本身相对简单。它们都做同样的事情。为了清晰起见,我将此函数分块了。该线程是一个连续循环,只在发生 Windows 事件时做出反应。
DWORD CNamedPipeWorkerPool::WorkerThreadFunc(void* ptr)
{
CNamedPipeWorkerPool* worker = (CNamedPipeWorkerPool*)ptr;
while (!worker->IsShuttingDown())
{
DWORD eventIndex = WaitForMultipleObjects
(NUM_WAIT_OBJECTS, worker->m_WaitObjects, FALSE, INFINITE);
//If a shutdown was signalled, the thread must end
if (eventIndex == SHUTDOWN_EVT_INDEX)
return 0;
//return value if an event was triggered is the index of the event.
//If the return value is not a valid index,
//something is catastrophically wrong and we exit.
if (eventIndex >= MAX_INSTANCES) {
DWORD error = GetLastError();
return -1;
}
CIOContext* ioContext = &worker->m_IOContext[eventIndex];
WaitForMultipleObjects
仅在 IO 事件完成或触发关闭时结束。其返回值是满足等待的事件的索引。当需要关闭时,线程本身不会进行任何清理操作,因为它们会尝试做同样的事情,这需要一个锁定机制来安全地实现。
现在也显而易见,为什么关闭事件是等待事件数组中的最后一个。这样,m_WaitObjects
中的索引与 m_IOContext
数组中相应的索引是一一对应的。并不是说重新计算会是个大问题,但避免“差一”错误总是个好主意。请注意,这也是为什么 while
循环使用 IsShuttingDown
作为其循环变量的原因。如果大量消息充斥着 IO 线程,它们将继续处理 IO 完成事件,并且仅在 IO 停止时处理关闭事件。
一旦我们获得了与事件对应的 IOContext
*,我们就可以进入通信的核心部分了。
try
{
if (!ioContext->IOHandle.IsValid())
continue;
worker->ProcessReadCompletion(
ioContext->IOHandle,
ioContext->InputBuffer,
&ioContext->Overlapped);
worker->m_MessageHandler(
worker->m_Context,
ioContext->IOHandle,
ioContext->InputBuffer,
ioContext->OutputBuffer );
if (GetCurrentThreadToken() != NULL) {
if (!RevertToSelf()) {
SetEvent(worker->m_ShutdownEvent);
return GetLastError();
}
}
if (ioContext->OutputBuffer.Offset() > 0) {
worker->SendReturnMessage(
ioContext->IOHandle,
ioContext->OutputBuffer);
}
worker->InitiateRead(
ioContext->IOHandle,
ioContext->InputBuffer,
&ioContext->Overlapped);
}
catch (exception& ex)
{
//will cause the handle to be closed
ioContext->IOHandle = NULL;
}
}
return 0;
}
首先,我们检查句柄是否有效。它应该始终是有效的。我看不出有什么现实的原因会导致它无效,但防止无效句柄总是有益的。下一步是完成 IO(稍后将详细介绍)。
随着传入消息的完成,我们终于可以调用应用程序提供的消息处理程序了。这是我们试图实现的重点:在这里,应用程序说了“每当收到传入消息时,在此数据上执行此函数”。上下文参数是应用程序希望发送到消息处理程序的(类似于传递给新创建线程的参数)。提供句柄是因为消息处理程序可能需要连接信息,例如客户端的身份。最后,输入和输出缓冲区用于将数据接收到消息处理程序中,并准备要发送回的数据。
接下来的部分可能看起来很奇怪,但至关重要。命名管道是支持模拟的 IPC 机制之一。换句话说,消息处理程序可能调用 API 来将线程切换到调用者的身份,以便执行某些操作,例如数据库更新。完成后,它应该撤销模拟。但是,如果它不这样做,线程将继续以该身份运行,这是不好的。为了确保这种情况不会发生,我们检查模拟是否仍然处于活动状态(当前线程令牌不是 NULL
),如果是,则恢复为我们自己的身份。文档说,如果 RevertToSelf
失败,这是一个严重错误,进程应该结束。这有点极端,但关闭所有 IO 处理是一个好主意。如果需要,可以实现更详细的错误处理。
假设我们走到这一步,如果消息处理程序在缓冲区中组合了一个响应消息,我们就会发送它,最后,我们启动一个新的 IO 读取请求,该请求可能立即得到满足,也可能不立即得到满足。如果消息已经存在并且数据已读,那么当该线程返回等待时,IO 处理可能已经在另一个线程上开始了。
如果在这些活动中的任何一点发生错误,都会抛出异常。该异常表示正在处理的 IO 连接存在问题,而不是线程本身的问题。因此,正确的做法是断开客户端连接并关闭连接。实际上,通过关闭句柄就可以实现这两个操作。
启动读取请求
那么,读取请求包含什么?我们已经讨论了 IO 是异步执行的事实:我们等待它。然而,在这里,我们也要预料到竞态条件。即使在连接客户端和启动读取请求之间的时间非常短,但客户端发送了一条已经可用并等待我们的消息的可能性也非零。
如果发生这种情况,ReadFile
将成功。数据将同步检索。这不是问题。重叠事件仍将被触发,并且任何数据处理都将在线程池中进行,等待将落空。发生这种情况时,读取操作可能在与读取请求起源的线程不同的线程中完成。这就是为什么我们在这里不能做任何事情。
void CNamedPipeWorkerPool::InitiateRead
(HANDLE pipe, CIOBuffer& buffer, LPOVERLAPPED overlapped)
{
DWORD bytesRead = 0;
buffer.Reset();
if (ReadFile(pipe, buffer.WritePtr(), buffer.SizeRemaining(),
&bytesRead, overlapped))
{
; //Data already available. Event will be triggered
}
else {
if (GetLastError() == ERROR_IO_PENDING) {
//There was no data waiting for the read operation. This is normal
}
else if (GetLastError() == ERROR_MORE_DATA) {
//Data arrived synchronous but not all data was received.
}
else {
throw ExWin32Error(L"Read named pipe error");
}
}
}
如果数据尚未可用,则有两种正常的错误模式。第一种是读取操作没有等待数据。这是正常的。当数据到达时,它将触发重叠事件,数据将在工作线程中的一个线程中读取。第二种是数据已经可用,但没有读取完整的消息(例如,因为读取缓冲区不够大)。这仅仅意味着,当工作线程处理重叠事件时,它会检测到它需要检索更多数据。
任何其他错误都将被视为灾难性的。
完成读取请求
当 ReadFile
操作完成时,有三种可能性。
void CNamedPipeWorkerPool::ProcessReadCompletion
(HANDLE pipe, CIOBuffer & ioBuffer, LPOVERLAPPED overlapped)
{
DWORD bytesRead = 0;
if (GetOverlappedResult(pipe, overlapped, &bytesRead, FALSE))
{
ioBuffer.SetOffset(ioBuffer.Offset() + bytesRead);
}
else
{
//Failure may indicate that there is more data to read.
//The pipe can tell us how much data
//is waiting in the message that is currently being read.
if (GetLastError() == ERROR_MORE_DATA) {
ioBuffer.SetOffset(ioBuffer.Offset() + bytesRead);
DWORD bytesRemainingInMessage = 0;
if (!PeekNamedPipe(pipe, NULL, 0, NULL, NULL, &bytesRemainingInMessage))
{
throw ExWin32Error
("PeekNamedPipe failed after GetLastError() == ERROR_MORE_DATA");
}
bytesRead = 0;
ioBuffer.AddSize(bytesRemainingInMessage);
if (!ReadFile(pipe, ioBuffer.WritePtr(), ioBuffer.SizeRemaining(),
&bytesRead, NULL)) {
throw ExWin32Error("ReadFile failed for remaining bytes in pipe");
}
}
else {
throw ExWin32Error("Overlapped ReadFile failed.");
}
}
}
第一种可能性是操作成功完成。在这种情况下,数据已经存在于缓冲区中,我们唯一需要做的就是更新写指针的偏移量,以便处理数据的代码确切地知道接收了多少数据。
如果发生错误,这可能意味着缓冲区大小对于整个消息来说太小了。但是,由于我们使用的是基于消息的管道,我们保证完整的消息是可用的。我们可以使用 PeekNamedPipe
来查找消息中还剩多少字节,然后增长缓冲区并执行同步读取来获取消息的其余部分。由于我们没有执行重叠读取,因此不会有 IO 事件,而是我们立即在那里获取数据。等待是没有意义的,因为我们已经知道数据在那里。
如果存在任何其他错误,这仅仅意味着 IO 由于各种原因而失败,这些原因可能表明是良性的(客户端已关闭连接)或灾难性的。无论哪种情况,它都意味着连接不再被视为有效,我们抛出一个异常,让调用者处理。
发送响应消息
发送响应消息很简单,但为了完整起见,我还是会介绍一下。
void CNamedPipeWorkerPool::SendReturnMessage(HANDLE pipe, CIOBuffer& buffer)
{
if (!WriteFile(pipe, buffer.Ptr(), buffer.Offset(), NULL, NULL)) {
throw ExWin32Error("CNamedPipeWorker::SendReturnMessage WriteFile");
}
}
此操作也以同步方式处理。通过异步处理此操作没有实际的优化。
多线程注意事项
存在一些需要考虑的潜在线程竞争场景。
为什么总有一个空闲插槽?
为什么我说总应该有一个空闲插槽?难道不可能发生客户端断开连接,而一个新的客户端在前一个连接的完成事件触发并完成清理之前连接吗?结果是没有。即使客户端断开连接,也不会关闭我们句柄所代表的管道的服务器端。服务器端点的数量是有限的。断开连接的服务器端点仍然是一个命名管道实例。只有当最后一个句柄被关闭时,它才真正被关闭。
因此,考虑一下当有 63 个连接正在使用,而其中一个断开连接时会发生什么。从命名管道的角度来看,仍然有 63 个连接,并且在完成断开连接的客户端的清理并关闭句柄之前,不会创建新的连接。由于关闭句柄是最后完成的操作,因此这隐式意味着,如果我们正在寻找一个空闲插槽,那是因为命名管道被允许打开另一个实例,这意味着我们确信有一个空闲插槽。
当另一个线程正在清理时,我们寻找空闲插槽会发生什么?
那么,如果在此期间,另外两个线程经历了客户端断开连接并在其他线程中关闭,而我们正在寻找一个空闲插槽,会发生什么?答案仍然是:什么也没有。因为我们正在寻找一个句柄无效的插槽。另一个线程只能关闭一个有效句柄,所以它不会干扰。即使检查应该在句柄关闭时执行:关闭句柄的代码是这样的
void CHandle::CloseHandle() {
if (IsValid()) {
::CloseHandle(m_handle);
m_handle = INVALID_HANDLE_VALUE;
}
}
内部变量在句柄完全关闭后被设置为无效,这意味着虽然理论上一个句柄可能被认为是“正在使用”因为它即将被释放,但另一种情况永远不会发生,因为只有一个线程在寻找空闲插槽,并且只有一个线程在访问任何特定的句柄进行清理。
当关闭触发时,是否存在未完成的 IO 请求?
当我们启动关闭时,可能存在许多未完成的 IO 请求,并且可能有多个线程正在处理消息。虽然当时关闭所有这些请求很诱人,但我们不能这样做。能够安全地添加和删除连接而无需锁的整个前提是,我们的设计保证在任何给定时间只有一个线程修改连接插槽。从另一个线程执行任何此类操作都会违反此原则。
为了使其安全,我们需要实现锁定,而这正是我不想做的。相反,我们公开了一个终止所有连接的方法。此方法在所有 IO 工作线程结束时由连接处理程序线程调用。在这种情况下,我们可以保证没有其他线程在触碰这些插槽,并且关闭所有连接是安全的。
请注意,技术上我们甚至不需要这样做。技术上,当 IO 工作池对象被销毁并且所有 CHandle
实例被销毁时,所有这些句柄都会被关闭。但是,存在潜在的未完成 IO 请求。每个 ReadFile
操作都有一个关联的缓冲区。假设 IO 上下文的缓冲区在句柄关闭之前被释放,并且在这两个操作之间的微秒时间内,消息到达。如果发生这种情况,可能会崩溃,因为读取操作将消息写入一个不再存在的缓冲区。
为了防止这种情况发生,CIOContext
类有一个析构函数来关闭句柄。这确保了上述情况不会发生。
我们仍然会断开所有客户端连接并取消所有 IO,当 IO 线程关闭时。不是因为这对于避免上述问题是必需的,而是因为如果命名管道服务器是大型复杂应用程序的一部分,那么在 IO 关闭和析构函数调用(这将关闭连接)之间可能存在显著的时间间隔。在我们的 IO 关闭时断开客户端连接会更清晰。
当处理前一个 IO 请求时,是否会有 IO 传入?
假设一个线程被 IO 完成触发,并且在我们处理它的时候,客户端发送了另一个请求。会发生什么?什么都不会发生。要设置 IO 事件,必须有一个 IO 读取请求正在进行。否则,传入的消息会简单地被 Windows IO 子系统缓冲,直到我们启动一个读取请求来处理它。
并发处理消息
有一件事值得更多关注,那就是消息本身的 Ldng。例如,考虑一个通过命名管道接收查询的 SQL 服务器。我们的命名管道实现对于多线程使用是完全安全的。我们不能做的是使消息处理程序成为线程安全的。实际上,技术上我们可以通过在每次调用消息处理程序时围绕一个互斥锁来做到这一点,并阻止 IO。但从性能角度来看,这种方法非常糟糕。
这意味着提供消息处理程序函数指针的整体应用程序需要确保消息处理程序实现适当的锁定机制,以确保在同时处理 2 条或更多消息时(有 2 个或更多客户端发送消息),锁定是正确的,并且粒度足够,不会比绝对必要的情况下更多地阻碍性能。
在实际应用程序中使用 CNamedPipeServer
既然我们已经介绍了所有内容,现在是时候展示如何在实际管道服务器中使用该组件了。为了测试目的,我们有一个消息处理程序,它接收消息,将其打印到控制台,并编译一个返回消息。
如上一节所述,我们需要查看需要哪种类型的锁定。在这种情况下,唯一对线程竞态敏感的代码部分是消息的打印。如果我们不做任何事情,多个消息会互相干扰。为此,应用程序会传递一个指向互斥锁的指针。
如果应用程序像 SQL 服务器那样复杂,那么提供一个完整的状态机来处理所有可能的情况可能比一个简单的互斥锁更好。这是超出本文范围的主题。
void pipeWorkerMessageHandler(
void* context,
w32::CHandle& handle,
CIOBuffer& input,
CIOBuffer& output)
{
std::mutex* pmutex = (std::mutex*) context;
DWORD threadId = GetCurrentThreadId();
Sleep(1000); //simulate work
LPWSTR text = (LPWSTR)input.Ptr();
{
lock_guard< std::mutex> lock(*pmutex);
std::wcout << L"Thread " << threadId << L" relieved: " << text << std::endl;
}
wstringstream stream;
stream << L"Message answered by thread " << threadId;
wstring message = stream.str();
DWORD bytes = (message.size() + 1 ) * sizeof(wstring::value_type);
memcpy(
output.Ptr(),
(void*)message.c_str(),
bytes);
output.SetOffset(bytes);
}
服务器应用程序
我们的测试服务器非常简单。它创建一个管道服务器实例并分配一个消息处理程序函数,并带有一个互斥锁指针作为上下文。此时,它只等待用户启动连接处理或发送关闭命令。
try
{
std::mutex g_mutex;
wstring name(L"\\\\.\\Pipe\\NamedPipe");
CNamedPipeServer server(name,
pipeWorkerMessageHandler,
&g_mutex,
256,
256,
10);
cout << "Make a choice:" << endl;
cout << "==============" << endl;
cout << "s: start serving" << endl;
cout << "q: quit" << endl << endl;
cout << "Choice: ";
char choice = 0;
do
{
choice = getchar();
if (choice == 'q') {
cout << "Initiating shutdown" << endl;
server.Shutdown();
cout << "Waiting until everything is shutdown" << endl;
server.WaitUntilFinished(INFINITE);
cout << "Shutdown finished" << endl;
}
else if (choice == 's') {
cout << "Starting server connections" << endl;
server.StartServing();
}
} while (choice != 'q');
}
catch (exception& ex)
{
cout << ex.what() << endl;
}
客户端应用程序
客户端应用程序也同样简单。它打开一个命名管道,并将管道连接切换为基于消息的,正如本文第一部分中所解释的那样。
try
{
wstring name(L"\\\\.\\Pipe\\NamedPipe");
w32::CHandle pipeHandle = CreateFileW(name.c_str(),
GENERIC_READ | GENERIC_WRITE, 0, NULL,
OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
if (!pipeHandle.IsValid())
throw ExWin32Error(L"CreateFileW "+ name);
DWORD dwMode = PIPE_READMODE_MESSAGE;
if(!SetNamedPipeHandleState(pipeHandle, &dwMode, NULL, NULL))
throw ExWin32Error(L"SetNamedPipeHandleState " + name);
w32::CHandle readerThread =
CreateThread(NULL, 0, ReaderLoop, pipeHandle, 0, NULL);
cout << "Make a choice:" << endl;
cout << "==============" << endl;
cout << "q: quit" << endl;
cout << "1: send 1 message" << endl;
cout << "5: send 5 messages" << endl;
cout << "50: send 50 messages" << endl;
cout << "Choice: ";
string choice;
do
{
getline(cin, choice);
if (choice == "q") {
shutdowninitiated = true;
pipeHandle.CloseHandle();
WaitForSingleObject(readerThread, INFINITE);
}
else if (choice == "1") {
SendPipeMessage(pipeHandle);
}
else if (choice == "5") {
for (int i = 0; i < 5; i++) {
SendPipeMessage(pipeHandle);
}
}
else if (choice == "50") {
for (int i = 0; i < 50; i++) {
SendPipeMessage(pipeHandle);
}
}
} while (choice != "q");
}
catch (exception& ex)
{
cout << ex.what() << endl;
}
该界面允许您发送 1 条消息、5 条消息或 50 条消息。您可以并排运行多个客户端应用程序实例,以观察并发消息处理的行为。
测试应用程序
最后,是时候进行一些测试了。对于这个测试,我启动了 1 个管道服务器和 3 个客户端应用程序,它们都同时发送消息。
正如您所见,一切都按预期进行。源代码已包含在内,因此如果您想让测试应用程序更有趣并让它们执行其他操作,您可以进行修改。
结论
命名管道是一项极其有用且灵活的技术。将它们正确地集成到您的服务器中可能很棘手,但有了这个服务器实现,我已经消除了大部分困难。代码采用 MIT 许可,因此如果您想重用它,请随意使用。
本文花费了大量时间撰写,并涵盖了许多细节。希望我没有遗漏任何重要的内容,或者犯了任何错误。如果您认为我搞砸了什么或者有什么不清楚的地方,请发表评论,我会查看。
历史
- 2022年12月22日:文章初版,包含可用的测试应用程序和源代码