65.9K
CodeProject 正在变化。 阅读更多。
Home

Windows 上使用 IPC 进行高级数据 I/O

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.55/5 (10投票s)

2017年12月8日

MIT

5分钟阅读

viewsIcon

27749

downloadIcon

425

和我一起为 Windows 创建最先进且最易于使用的全双工 MMF 基于 IPC

将日志工具转换为全双工 IPC

在上一篇文章(Windows 中使用 IPC 进行数据日志记录)中,我们介绍了在 Windows 中进行数据日志记录的基本 IPC。本项目展示了我们在此扩展的主要工具,使其能够进行全双工 IPC,用于在 Windows 中的两个应用程序之间发送和接收数据。为了保持简单,我们将逻辑分为客户端和服务器,但这不会是限制,稍后我们将看到原因。

在本文中,我们定义了一个结构,其中包含两个事件 - 一个用于读取,通知已发生读取;另一个用于通知已发生写入 - 以及一个内存映射文件,让所有日志应用程序将日志数据发送到正在监听的服务器。在这里,我们将此逻辑称为“通道”,并基于此,我们理解我们需要两个通道:一个用于所有发往其他应用程序的数据,另一个用于其他应用程序将发送到此应用程序的所有数据。

到目前为止,我们遵循上一篇文章,我们要做的就是在这里和那里更改一些内容,然后就完成了。小菜一碟。但这并不是一个日志工具,所以我们必须克服一些限制。其中之一是,我们只能发送文本,其大小最多略小于内存映射文件的大小。在这里,我们必须能够发送任意数量的数据,没有限制。如果您连接的是数据库提供商和数据处理工具,那么限制返回记录的大小是没有意义的。为了实现这一点,我们将依赖发送方锁定发送通道的事实,这样它就可以将数据包拆分成任意数量的块,同时确保它不会与其他线程的包混合,并且会按顺序接收。因此,我们无需在通道中发送包大小,只需发送数据块,并告知哪个块是新包的开头,以及何时完成。

BOOLEAN cs_ipc::send_data(void* buffer, int size)
{
    EnterCriticalSection(&m_send_buffer_cs);
    int offset = 0, pending_data = size;
    uint8_t flags_val = IPC_FLAG_DATA_BEGIN;
    PIO_PACKAGE_DATA header = (PIO_PACKAGE_DATA)m_send_buffer;
    header->header.application = APPLICATION_DATA;
    while (offset < size)
    {
        header->header.size = (UINT16)min(pending_data +sizeof(IO_PACKAGE_DATA), m_max_data_size);
        int data_send = header->header.size - sizeof(IO_PACKAGE_DATA);
        if (offset+data_send == size) flags_val |= IPC_FLAG_DATA_END;
        header->flags = flags_val;
        memcpy(m_send_buffer+ sizeof(IO_PACKAGE_DATA), ((char*)buffer)+offset, 
               header->header.size- sizeof(IO_PACKAGE_DATA));
        internal_send_data(m_send_buffer, header->header.size);
        flags_val = 0;
        offset += data_send;
        pending_data -= data_send;
    }
    LeaveCriticalSection(&m_send_buffer_cs);
    return true;
}

应用程序连接感知

另一个重要功能是我们必须感知到另一个进程是否已关闭。我们在此假设这是通信可能中断的唯一情况,IPC 连接将持续到应用程序运行的整个时间。假设您正在将电影流式传输到一个应用程序,而该应用程序在流式传输过程中关闭了,您如何知道?为了感知到这一点,我们修改了共享数据结构,添加了写入进程 ID。

typedef struct {
    UINT32 writer_process_id;
    UINT32 first_element_offset;
    UINT32 next_reading_pointer;
    UINT32 next_writing_pointer;
    BYTE   flags[8];
} MMIO_FILE_HEADER, *PMMIO_FILE_HEADER;

此值使用函数 GetCurrentProcessId() 获取。现在,每个进程将始终知道另一个进程是谁。我们使用 OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id) 获取写入进程的句柄,并在我们的多个等待对象中使用它。

    waitOn[0] = log_avail_ev;
    waitOn[1] = self->m_terminate;
    waitOn[2] = self->m_peer_process_handle;

    while (true)
    {
        int wait_result = WaitForMultipleObjects(3, waitOn, FALSE, INFINITE);
        if (wait_result == WAIT_OBJECT_0 + 2) // peer process terminated
        {
            ...
        } 
        if (wait_result == WAIT_OBJECT_0 + 1) // terminate has been triggered
        {
            self->m_running = false;
            break;
        } 
        if (wait_result != WAIT_OBJECT_0) // unexpected result! terminate as well - but report error!
        {
            self->error_handler(L"waitForData > WaitForMultipleObjects failed - waitResult = %u\n", 
                                wait_result);
            goto error_case;
            break;
        }
        standard processing, data is available...
        ... 
    }

但是,当第一个应用程序启动时,我们没有另一个进程 ID,因此在这种情况下,我们只需要等待前两个事件。当另一个进程停止时,这种情况再次出现,在这种情况下,我们必须关闭另一个进程句柄,然后再次等待前两个事件。最后,我们必须通知应用程序另一个进程已停止。

    while (true)
    {
        int wait_result = WaitForMultipleObjects(NULL == waitOn[2] ? 2 : 3, waitOn, FALSE, INFINITE);
        if (wait_result == WAIT_OBJECT_0 + 2) // peer process terminated
        {
            //self->error_handler(L"waitForData > Peer process terminated\n");
            waitOn[2] = NULL;
            CloseHandle(self->m_peer_process_handle);
            self->m_status = IPC_STATUS_PEER_DISCONNECTED;
            if (NULL != self->m_status_change_handler)
                self->m_status_change_handler
                (IPC_STATUS_PEER_DISCONNECTED, self->m_status_change_handler_parameter);
            continue;
        }

    ...

另一方面,当一个进程连接时,我们必须读取其进程 ID,创建该进程的句柄,并开始等待它。我们感知进程已连接的方式是使用一种新型的应用程序数据。如果您现在对应用程序数据类型感到困惑,请回到上一篇文章,我在其中介绍了此 IPC 逻辑中数据传输的方式。新型应用程序数据是 APPLICATION_PEER_CONNECT,当我们收到它时,我们必须通知应用程序另一个进程已连接。

    case APPLICATION_PEER_CONNECT:
    {
        PIO_HEADER msg_header = (PIO_HEADER)((char*)memory + data_header->next_reading_pointer);
        self->m_peer_process_handle = OpenProcess(SYNCHRONIZE, FALSE, data_header->writer_process_id);
        if (NULL == self->m_peer_process_handle)
            self->error_handler(L"Unexpected error trying to open process id: %u - lastError: 0x%X", 
                                data_header->writer_process_id, GetLastError());
        else {
            waitOn[2] = self->m_peer_process_handle;
            self->m_status = IPC_STATUS_PEER_CONNECTED;
            if (NULL != self->m_status_change_handler)
                self->m_status_change_handler(IPC_STATUS_PEER_CONNECTED, 
                                              self->m_status_change_handler_parameter);
        }
        data_header->next_reading_pointer += msg_header->size;
        break;
    }

现在我们几乎完成了。我们启动服务器,它会感知到客户端何时连接并进行通知。它也知道客户端何时停止,因此它可以对此情况做出反应。

客户端也会感知到服务器何时停止。当客户端启动时,它知道服务器是否正在运行,否则它将无法打开内存映射文件或事件。

  Server Start|------> Client Connected --------> Client Disconnected ----------> Server Stop|
                               |                             | 
                               |                             | 
                               |                             \----> Notify Client Disconnection
                               | 
                               \---> Server Notify Client Connection

 Client Start & Server is detected -------------> Server Disconnect ---------> Client Stop|
                       |                              | 
                       |                              \----> Notify Server Disconnect 
                       |                                    
                       \-> Send APPLICATION_PEER_CONNECT     
                                                           

如果服务器正在运行,客户端启动,然后服务器停止并重新启动,它将知道客户端仍在运行,因为当它尝试创建事件时,GetLastError() 将返回 ERROR_ALREADY_EXISTS。服务器在上一个实例中已创建的对象将保留在那里,因为客户端已连接到它们。然后服务器发送 APPLICATION_PEER_CONNECT 包。

  Server Start|--> Client Connected --> Server Stop ----> Server Start ----> Server Stop|
                               |                           | 
                               |                           | 
                               |                           \----> Send APPLICATION_PEER_CONNECT
                               | 
                               \---> Server Notify Client Connection

最令人震惊的情况是当客户端启动而服务器根本没有运行时。它无法连接到对象,m_running 保持 false。我们在这里唯一的选择是创建一个特殊的事件 m_client_wait_for_server 和一个特殊的线程,该线程将等待它被触发。

DWORD WINAPI cs_ipc::waitForServer(LPVOID data)
{
    cs_ipc * self = (cs_ipc *)data;
    HANDLE waitOn[3];

    waitOn[0] = self->m_terminate;
    waitOn[1] = self->m_client_wait_for_server;
    int wait_res = WaitForMultipleObjects(2, waitOn, FALSE, INFINITE);
    if (wait_res == WAIT_OBJECT_0) // terminate has been triggered
        return 0;
    else
    if (wait_res == WAIT_OBJECT_0 + 1)
    {
        // server signaled to start
        self->initialize();
        // notify peer connected
        if (self->m_running && (self->m_status == IPC_STATUS_PEER_CONNECTED))
        {
            if (NULL != self->m_status_change_handler)
                self->m_status_change_handler(self->m_status, self->m_status_change_handler_parameter);
            self->writePeerConnected();
        }
        if (!self->m_running)
            return 0;
    }
    else
    {
        self->error_handler(L"waitForServer > WaitForMultipleObjects failed - waitResult = %u\n", 
                            wait_res);
    }

    return 0;
}

如果服务器启动,并且在创建事件时,GetLastError() 不返回 ERROR_ALREADY_EXISTS,它必须尝试打开 LOG_NAME_CLIENT_WAIT_EV 事件,以测试是否有客户端正在运行并等待服务器,从而启动客户端连接并开始运行。类构造函数现在根据初始化结果通知连接或等待另一个进程。

    initialize();
    if (!m_running && !m_is_server) {
        wchar_t element_name[200];
        if (NULL == (m_client_wait_for_server = 
               CreateEvent(NULL, true, false, ipc_encode_string
               (element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
        if (NULL == m_terminate)
            if (NULL == (m_terminate = CreateEvent(NULL, TRUE, FALSE, NULL))) goto failed;
        if (0 == m_client_wait_for_server_thread)
            if (0 == (m_client_wait_for_server_thread = CreateThread
                  (NULL, 0, cs_ipc::waitForServer, this, 0, NULL))) goto failed;
    } else
    if (m_running && !m_is_server)
    {
        if (m_status == IPC_STATUS_PEER_CONNECTED) 
            writePeerConnected();
    }
    else
    if (m_running && m_is_server)
    {
        if (m_status == IPC_STATUS_PEER_CONNECTED) {
            writePeerConnected();
        }
        else
        {
            wchar_t element_name[200];
            HANDLE client_wait;
            if (NULL == (client_wait = OpenEvent(EVENT_MODIFY_STATE, false, 
            ipc_encode_string(element_name, 200, LOG_NAME_CLIENT_WAIT_EV, m_prefix.c_str())))) return;
            SetEvent(client_wait);
            CloseHandle(client_wait);
        }
    }

如您所见,利用上一篇文章的代码,我们能够创建一个 IPC 逻辑来连接两个进程以发送和接收数据,并感知与另一个进程的连接状态。所有这些都仅使用三个回调(数据可用、连接状态更改和错误处理程序)和一个 send_data 方法来实现。非常简单,但同时又非常复杂。希望您喜欢它,并可以在您的项目中加以利用。

© . All rights reserved.