Windows 上使用 IPC 进行高级数据 I/O






4.55/5 (10投票s)
和我一起为 Windows 创建最先进且最易于使用的全双工 MMF 基于 IPC
将日志工具转换为全双工 IPC
在上一篇文章(Windows 中使用 IPC 进行数据日志记录)中,我们介绍了在 Windows 中进行数据日志记录的基本 IPC。本项目展示了我们在此扩展的主要工具,使其能够进行全双工 IPC,用于在 Windows 中的两个应用程序之间发送和接收数据。为了保持简单,我们将逻辑分为客户端和服务器,但这不会是限制,稍后我们将看到原因。
在本文中,我们定义了一个结构,其中包含两个事件 - 一个用于读取,通知已发生读取;另一个用于通知已发生写入 - 以及一个内存映射文件,让所有日志应用程序将日志数据发送到正在监听的服务器。在这里,我们将此逻辑称为“通道”,并基于此,我们理解我们需要两个通道:一个用于所有发往其他应用程序的数据,另一个用于其他应用程序将发送到此应用程序的所有数据。
到目前为止,我们遵循上一篇文章,我们要做的就是在这里和那里更改一些内容,然后就完成了。小菜一碟。但这并不是一个日志工具,所以我们必须克服一些限制。其中之一是,我们只能发送文本,其大小最多略小于内存映射文件的大小。在这里,我们必须能够发送任意数量的数据,没有限制。如果您连接的是数据库提供商和数据处理工具,那么限制返回记录的大小是没有意义的。为了实现这一点,我们将依赖发送方锁定发送通道的事实,这样它就可以将数据包拆分成任意数量的块,同时确保它不会与其他线程的包混合,并且会按顺序接收。因此,我们无需在通道中发送包大小,只需发送数据块,并告知哪个块是新包的开头,以及何时完成。
BOOLEAN cs_ipc::send_data(void* buffer, int size)
{
EnterCriticalSection(&m_send_buffer_cs);
int offset = 0, pending_data = size;
uint8_t flags_val = IPC_FLAG_DATA_BEGIN;
PIO_PACKAGE_DATA header = (PIO_PACKAGE_DATA)m_send_buffer;
header->header.application = APPLICATION_DATA;
while (offset < size)
{
header->header.size = (UINT16)min(pending_data +sizeof(IO_PACKAGE_DATA), m_max_data_size);
int data_send = header->header.size - sizeof(IO_PACKAGE_DATA);
if (offset+data_send == size) flags_val |= IPC_FLAG_DATA_END;
header->flags = flags_val;
memcpy(m_send_buffer+ sizeof(IO_PACKAGE_DATA), ((char*)buffer)+offset,
header->header.size- sizeof(IO_PACKAGE_DATA));
internal_send_data(m_send_buffer, header->header.size);
flags_val = 0;
offset += data_send;
pending_data -= data_send;
}
LeaveCriticalSection(&m_send_buffer_cs);
return true;
}
应用程序连接感知
另一个重要功能是我们必须感知到另一个进程是否已关闭。我们在此假设这是通信可能中断的唯一情况,IPC 连接将持续到应用程序运行的整个时间。假设您正在将电影流式传输到一个应用程序,而该应用程序在流式传输过程中关闭了,您如何知道?为了感知到这一点,我们修改了共享数据结构,添加了写入进程 ID。
typedef struct {
UINT32 writer_process_id;
UINT32 first_element_offset;
UINT32 next_reading_pointer;
UINT32 next_writing_pointer;
BYTE flags[8];
} MMIO_FILE_HEADER, *PMMIO_FILE_HEADER;
此值使用函数 GetCurrentProcessId()
获取。现在,每个进程将始终知道另一个进程是谁。我们使用 OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id)
获取写入进程的句柄,并在我们的多个等待对象中使用它。
waitOn[0] = log_avail_ev;
waitOn[1] = self->m_terminate;
waitOn[2] = self->m_peer_process_handle;
while (true)
{
int wait_result = WaitForMultipleObjects(3, waitOn, FALSE, INFINITE);
if (wait_result == WAIT_OBJECT_0 + 2) // peer process terminated
{
...
}
if (wait_result == WAIT_OBJECT_0 + 1) // terminate has been triggered
{
self->m_running = false;
break;
}
if (wait_result != WAIT_OBJECT_0) // unexpected result! terminate as well - but report error!
{
self->error_handler(L"waitForData > WaitForMultipleObjects failed - waitResult = %u\n",
wait_result);
goto error_case;
break;
}
standard processing, data is available...
...
}
但是,当第一个应用程序启动时,我们没有另一个进程 ID,因此在这种情况下,我们只需要等待前两个事件。当另一个进程停止时,这种情况再次出现,在这种情况下,我们必须关闭另一个进程句柄,然后再次等待前两个事件。最后,我们必须通知应用程序另一个进程已停止。
while (true)
{
int wait_result = WaitForMultipleObjects(NULL == waitOn[2] ? 2 : 3, waitOn, FALSE, INFINITE);
if (wait_result == WAIT_OBJECT_0 + 2) // peer process terminated
{
//self->error_handler(L"waitForData > Peer process terminated\n");
waitOn[2] = NULL;
CloseHandle(self->m_peer_process_handle);
self->m_status = IPC_STATUS_PEER_DISCONNECTED;
if (NULL != self->m_status_change_handler)
self->m_status_change_handler
(IPC_STATUS_PEER_DISCONNECTED, self->m_status_change_handler_parameter);
continue;
}
...
另一方面,当一个进程连接时,我们必须读取其进程 ID,创建该进程的句柄,并开始等待它。我们感知进程已连接的方式是使用一种新型的应用程序数据。如果您现在对应用程序数据类型感到困惑,请回到上一篇文章,我在其中介绍了此 IPC 逻辑中数据传输的方式。新型应用程序数据是 APPLICATION_PEER_CONNECT
,当我们收到它时,我们必须通知应用程序另一个进程已连接。
case APPLICATION_PEER_CONNECT:
{
PIO_HEADER msg_header = (PIO_HEADER)((char*)memory + data_header->next_reading_pointer);
self->m_peer_process_handle = OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id);
if (NULL == self->m_peer_process_handle)
self->error_handler(L"Unexpected error trying to open process id: %u - lastError: 0x%X",
data_header->writer_process_id, GetLastError());
else {
waitOn[2] = self->m_peer_process_handle;
self->m_status = IPC_STATUS_PEER_CONNECTED;
if (NULL != self->m_status_change_handler)
self->m_status_change_handler(IPC_STATUS_PEER_CONNECTED,
self->m_status_change_handler_parameter);
}
data_header->next_reading_pointer += msg_header->size;
break;
}
现在我们几乎完成了。我们启动服务器,它会感知到客户端何时连接并进行通知。它也知道客户端何时停止,因此它可以对此情况做出反应。
客户端也会感知到服务器何时停止。当客户端启动时,它知道服务器是否正在运行,否则它将无法打开内存映射文件或事件。
Server Start|------> Client Connected --------> Client Disconnected ----------> Server Stop|
| |
| |
| \----> Notify Client Disconnection
|
\---> Server Notify Client Connection
Client Start & Server is detected -------------> Server Disconnect ---------> Client Stop|
| |
| \----> Notify Server Disconnect
|
\-> Send APPLICATION_PEER_CONNECT
如果服务器正在运行,客户端启动,然后服务器停止并重新启动,它将知道客户端仍在运行,因为当它尝试创建事件时,GetLastError()
将返回 ERROR_ALREADY_EXISTS
。服务器在上一个实例中已创建的对象将保留在那里,因为客户端已连接到它们。然后服务器发送 APPLICATION_PEER_CONNECT
包。
Server Start|--> Client Connected --> Server Stop ----> Server Start ----> Server Stop|
| |
| |
| \----> Send APPLICATION_PEER_CONNECT
|
\---> Server Notify Client Connection
最令人震惊的情况是当客户端启动而服务器根本没有运行时。它无法连接到对象,m_running
保持 false
。我们在这里唯一的选择是创建一个特殊的事件 m_client_wait_for_server
和一个特殊的线程,该线程将等待它被触发。
DWORD WINAPI cs_ipc::waitForServer(LPVOID data)
{
cs_ipc * self = (cs_ipc *)data;
HANDLE waitOn[3];
waitOn[0] = self->m_terminate;
waitOn[1] = self->m_client_wait_for_server;
int wait_res = WaitForMultipleObjects(2, waitOn, FALSE, INFINITE);
if (wait_res == WAIT_OBJECT_0) // terminate has been triggered
return 0;
else
if (wait_res == WAIT_OBJECT_0 + 1)
{
// server signaled to start
self->initialize();
// notify peer connected
if (self->m_running && (self->m_status == IPC_STATUS_PEER_CONNECTED))
{
if (NULL != self->m_status_change_handler)
self->m_status_change_handler(self->m_status, self->m_status_change_handler_parameter);
self->writePeerConnected();
}
if (!self->m_running)
return 0;
}
else
{
self->error_handler(L"waitForServer > WaitForMultipleObjects failed - waitResult = %u\n",
wait_res);
}
return 0;
}
如果服务器启动,并且在创建事件时,GetLastError()
不返回 ERROR_ALREADY_EXISTS
,它必须尝试打开 LOG_NAME_CLIENT_WAIT_EV
事件,以测试是否有客户端正在运行并等待服务器,从而启动客户端连接并开始运行。类构造函数现在根据初始化结果通知连接或等待另一个进程。
initialize();
if (!m_running && !m_is_server) {
wchar_t element_name[200];
if (NULL == (m_client_wait_for_server =
CreateEvent(NULL, true, false, ipc_encode_string
(element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
if (NULL == m_terminate)
if (NULL == (m_terminate = CreateEvent(NULL, TRUE, FALSE, NULL))) goto failed;
if (0 == m_client_wait_for_server_thread)
if (0 == (m_client_wait_for_server_thread = CreateThread
(NULL, 0, cs_ipc::waitForServer, this, 0, NULL))) goto failed;
} else
if (m_running && !m_is_server)
{
if (m_status == IPC_STATUS_PEER_CONNECTED)
writePeerConnected();
}
else
if (m_running && m_is_server)
{
if (m_status == IPC_STATUS_PEER_CONNECTED) {
writePeerConnected();
}
else
{
wchar_t element_name[200];
HANDLE client_wait;
if (NULL == (client_wait = OpenEvent(EVENT_MODIFY_STATE, false,
ipc_encode_string(element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
SetEvent(client_wait);
CloseHandle(client_wait);
}
}
如您所见,利用上一篇文章的代码,我们能够创建一个 IPC 逻辑来连接两个进程以发送和接收数据,并感知与另一个进程的连接状态。所有这些都仅使用三个回调(数据可用、连接状态更改和错误处理程序)和一个 send_data
方法来实现。非常简单,但同时又非常复杂。希望您喜欢它,并可以在您的项目中加以利用。