65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Mailslots 进行进程间通信

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (34投票s)

2004年10月11日

CPOL

21分钟阅读

viewsIcon

151262

downloadIcon

6848

如何使用 mailslots 在进程之间进行通信。

引言

有时你希望一个程序与另一个程序通信。你可能在多台机器上运行多个服务器,需要在一个中心位置远程监控一个或多个服务器。Windows 提供了一套丰富的通信方法,从套接字到命名管道,再到 DDE、DCOM 和邮槽。我曾在之前的文章[^]中探讨过命名管道。这次我将谈论邮槽。我假设你对 CreateFile()ReadFile()WriteFile() API 有一定的了解,并且对重叠 I/O 的基础知识也比较熟悉。

邮槽

实现“多写入器/单读取器”协议。一个进程通过名称创建一个邮槽,然后等待消息写入其中。其他进程如果知道其名称,就可以打开邮槽并向其写入消息。你只能有一个邮槽读取器,但可以有多个邮槽写入器。微软使用服务器/客户端术语来描述这一点。在微软的术语中,服务器创建并从邮槽读取;客户端连接到现有邮槽并向其写入。我发现这有点令人困惑——我更喜欢用邮槽读取器和邮槽写入器来思考。

邮槽展现了一个有趣且非常有用的特性。向邮槽写入一条消息,读取器会收到一条消息。在这种情况下,一条消息是一个完整的数据块,长度任意。如果写入器写入 60 字节,读取器读取 60 字节——不多也不少。如果写入器写入 327 字节,读取器读取 327 字节——你明白我的意思了。它是一个面向消息的协议,而不是面向字节的协议。它类似于命名管道上的消息模式。这并不是说你不能只读取消息的一部分;只是说使用邮槽的“自然”方式是面向消息的,这反映在读取器可用的 API 中。

邮槽可在网络上工作。你以 UNC 格式指定邮槽名称,就像在网络上的某个服务器上指定文件名一样。因此,如果你在同一台计算机上运行两个进程并使用邮槽进行通信,你可以通过指定名称 \\.\mailslot\slotname 来创建邮槽,连接到该邮槽的进程将使用相同的名称。看到其中的点了吗?它是运行进程的机器的别名,在这种情况下,意味着在本地机器上查找邮槽。要使应用程序感知网络,你需要用运行创建邮槽进程的机器名称替换该点。因此,如果我有两台机器分别名为 RobChris,并且一台名为 Chris 的机器上运行的进程创建了一个名为 cp 的邮槽,那么一台名为 Rob 的机器上运行的进程可以通过使用名称 \\chris\mailslot\cp 连接到该邮槽。

创建邮槽

通过调用 CreateMailslot() API 完成,将邮槽名称以 UNC 格式和其他一些参数传递给它。你只能在本地机器上创建邮槽,因此 UNC 路径名的服务器部分必须解析为本地机器;换句话说,它必须是本地机器的名称或一个点 '.'——使用 '.' 最简单。其他参数按顺序是:可以发送到邮槽的最大消息大小、指定邮槽读取器等待消息的时间的超时,以及指定 CreateMailslot() API 返回的句柄是否可以由子进程继承的安全描述符。

连接到邮槽

同样简单。你使用 CreateFile() API,以 UNC 格式指定邮槽名称。如果你希望实现多写入器/单读取器模型,在打开邮槽时需要注意共享模式。如果邮槽写入器在不指定 FILE_SHARE_WRITE 作为共享模式的情况下打开邮槽,它将阻止任何后续写入器向邮槽写入。API 实现中令人恼火的是,后续的 CreateFile() 调用会成功,它们似乎会返回一个有效的邮槽句柄,但使用该句柄写入的任何内容都会丢失。

一旦你获得了邮槽的句柄,你会怎么做呢?

如果你通过 CreateMailslot() API 创建了邮槽,你可以使用 ReadFile() API 从中读取。邮槽句柄以重叠 I/O 模式创建,因此你可以在其上使用重叠 I/O,尽管如果非重叠 I/O 更适合你的模型,你也可以使用它。你还可以调用 GetMailslotInfo() API 来查询有多少消息正在等待,下一条消息的长度以及超时时间。你可以调用 SetMailslotInfo() API 来更改超时。请注意,你传递给这两个 API 的句柄必须是由 CreateMailslot() API 创建的。

如果你没有创建邮槽,那么你可以通过使用 CreateFile() API 连接到邮槽。在这种情况下,你可以使用 WriteFile() API 写入它。是否可以使用重叠 I/O 取决于你如何调用 CreateFile() API;它可以是同步的或异步的,具体取决于你的需求。你不能使用 CreateFile() 连接到邮槽并期望能够从中读取(相信我,我尝试过)。

邮槽的一个陷阱

MSDN 邮槽文档中说,只要邮槽上存在任何打开的句柄,邮槽就会一直存在。我发现这并不完全正确(在 Windows XP Professional SP2 上)。你可以有任意数量的邮槽打开的写入器句柄,但邮槽在读取器句柄关闭后立即消失(一旦读取器句柄关闭,对邮槽的写入就会失败)。这是有道理的。如果你只能有一个读取器,那么一旦读取器消失,邮槽就没有存在的意义了,因为任何写入邮槽的消息都将无谓地被系统缓冲;如果没有读取器,这样缓冲的消息将永远存在(请记住,你不能使用 CreateFile() 来打开邮槽的读取句柄)。

邮槽的其他注意事项

如果你仔细阅读 MSDN 文档,你会发现我在将写入器连接到现有邮槽时简化了一些规则。除了指定服务器名称(它将连接到特定服务器上的邮槽)之外,还可以指定连接到域上特定名称的所有邮槽。你通过指定域名而不是服务器名称来做到这一点,例如 \\domainname\mailslot\name。你还可以使用星号 * 作为主域的简写。乍一看,这听起来很棒——你可以在域内的多台机器上创建任意数量的读取器,并通过指定域名同时向它们写入。但这里有一个陷阱。如果你使用域指定符作为写入器连接到邮槽,则每条消息不能写入超过 424 字节。另一方面,如果你的应用程序可以在此限制内运行,那么这是一种从域内多个位置监控单个进程的非常简单的方法。

当然,有一个(或两个)类

你目前所读到的内容可能已经激起了你使用邮槽的一些兴趣;本文的其余部分将描述我编写的一组类,以简化邮槽的使用。总共有五个类。

第一个类 CMailslot 是一个抽象基类,它封装了邮槽的基本功能。

然后是两个派生类,CSyncMailslotReader 实现邮槽的服务器(读取器)端,CSyncMailslotWriter 实现邮槽的客户端(写入器)端。对于这两个类,调用相应成员函数(读取或写入)的线程会阻塞,直到操作完成。

然后是 CQueuedMailslotWriter,它将消息排队并使用单独线程上的 CSyncMailslotWriter 实例实际写入消息。这个类是异步的,将写入器与网络延迟解耦。写入器进程可以根据需要排队数千条消息,同时主线程继续生成消息;CQueuedMailslotWriter 类处理将它们逐步发送到邮槽的详细信息,只要邮槽能够接收它们。

最后,还有一个更高级别的类 CAsyncMailslotReader,它实现了一个简单的协议,允许读取器将传入消息视为事件。

没有排队的邮槽读取器类。编写一个会很容易,但在我看来,两者都写没有意义。要么两端都能跟上消息流,要么一端排队。让两端都排队消息只是将责任从写入器转移到读取器。

CMailslot

看起来是这样的

//  Base mail slot class, contains stuff common to servers and clients
class CMailslot
{
protected:
                    CMailslot();
    virtual         ~CMailslot();

    virtual bool    Connect(LPCTSTR szSlotName, 
                            LPCTSTR szServerName = _T(".")) = 0;

public:
    virtual void    Disconnect();

    bool            IsOpen() const 
                    { return m_hMailSlot != INVALID_HANDLE_VALUE; }

protected:
    HANDLE          m_hMailSlot,
                    m_hStopEvent;
    bool            m_bStop;
    LPTSTR          m_pszSlotname;
    COverlappedIO   m_overlapped;
};
        

该类是一个抽象基类,因此您不能直接实例化 CMailslotm_hMaiSlot 句柄显而易见,但其余数据成员可能不那么明显,除非您读过我之前的一些文章。简而言之,几乎所有剩余的成员变量都用于多线程应用程序的优雅关闭(有关详细信息,请参阅本文[^],有关中断互斥体调用的更多信息,请参阅本文[^])。我还使用了我的 COverlappedIO [^]来封装重叠 I/O 逻辑。

请注意,Connect() 方法既是虚拟的又是抽象的。之所以如此,是因为(如前所述)打开邮件槽的方式取决于你是读取器还是写入器。它是虚拟的,以便可以重写,它是抽象的,以强制每个派生类实际实现该函数。如果你是从 CMailslot 派生的读取器类,你将期望使用 CreateMailSlot() API 打开邮件槽;如果你是写入器类,你将期望使用 CreateFile() API 打开邮件槽。这两种情况都没有“合理”的默认行为。Connect() 方法需要一个邮件槽名称,并接受一个可选的服务器名称,该名称默认为 '.'(点),如前所述,表示本地机器。

Disconnect() 的不同之处在于,“合理”的默认行为只要求关闭邮槽句柄。可能需要其他行为,但由于关闭句柄是“合理”的最小值,因此该类不会强制执行更多操作。

IsOpen() 返回一个 bool 值,指示邮槽是否成功打开。该函数只是测试 m_hMailSlot 成员是否不是 INVALID_HANDLE_VALUE。正如我们稍后将看到的,这不是一个足够好的测试,但这是我们在不尝试写入邮槽的情况下能做的最好的事情。

CSyncMailslotWriter

这个类实现了邮槽的写入端。它看起来像这样。

//  Mail slot writer class. Used to write to a mail slot.
//  The class creates an asynchronous mail slot handle which is 
//  used with overlapped I/O to write queued messages.
class CSyncMailslotWriter : public CMailslot
{
public:
                    CSyncMailslotWriter();
    virtual         ~CSyncMailslotWriter();

    virtual bool    Connect(LPCTSTR szSlot, LPCTSTR szServer = _T("."));

    virtual DWORD   Write(BYTE *pbData, DWORD dwDataLength);

protected:
    virtual bool    Connect();
};

该类提供了公共 Connect() 方法所需的重写,它看起来像这样。

//    Creates a connection to a mail slot.
//    Returns true on success, false on failure.
bool CSyncMailslotWriter::Connect(LPCTSTR szSlotname, LPCTSTR szServer)
{
    assert(szServer);
    assert(szSlotname);

    //    Delete any previous mail slot name
    delete m_pszSlotname;
    m_pszSlotname = new TCHAR[_MAX_PATH];
    assert(m_pszSlotname);

    //    Create our mail slot name
    _sntprintf(m_pszSlotname, _MAX_PATH, _T(\\\\%s\\mailslot\\%s),
                   szServer, szSlotname);
    m_pszSlotname[_MAX_PATH - sizeof(TCHAR)] = TCHAR(0);
    
    //    Now connect...
    return Connect();
}
        

这很简单。首先我们验证输入参数。然后我们创建邮槽名称的规范形式,并调用私有 Connect() 方法,该方法执行实际的邮槽连接。

我这样做是因为我希望我的邮件槽类能够应对网络问题,而无需客户端了解太多错误处理。无论是否联网,读取器意外消失都是完全可能的。如果发生这种情况,写入器应尝试重新创建连接,但如果失败,则不应阻塞客户端。在后一种情况下,如果类未能重新创建连接,它会丢弃消息。是的,在此模型中可能会丢失信息,但至少客户端可以继续运行。客户端可以自由选择在从 Write() 方法收到 false 作为返回值时做什么。它可以重试 Write() 或忽略错误。因此,有两个 Connect() 方法。公共方法,它接受服务器和邮件槽名称参数,以及一个执行实际连接的私有方法。客户端调用公共方法,既不知道也不关心有一个私有方法执行实际连接。私有方法看起来像这样

bool CSyncMailslotWriter::Connect()
{
    //  Close any existing mail slot
    Disconnect();

    //  Now open the mail slot for overlapped I/O
    if ((m_hMailSlot = CreateFile(m_pszSlotname, 
                            GENERIC_WRITE, 
                            FILE_SHARE_READ | FILE_SHARE_WRITE, 
                            NULL, 
                            OPEN_EXISTING, 
                            FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
                            &m_overlapped)
        ) != INVALID_HANDLE_VALUE)
    {
        m_overlapped.Attach(m_hMailSlot);
        return true;
    }

    return false;
}
        

此方法会断开与任何先前邮件槽的连接,然后使用 CreateFile() API 打开与邮件槽的连接。如果成功,它会将新的邮件槽句柄附加到 COverlappedIO 对象并返回 true,否则返回 false

Write() 方法看起来像这样

//  Writes a message to the mail slot.
DWORD CSyncMailslotWriter::Write(BYTE *pbData, DWORD dwDataLength)
{
    assert(pbData);
    assert(dwDataLength);

    int nRetries = 2;

    while (nRetries--)
    {
        //  If the mail slot is closed attempt to reconnect to it
        if (!IsOpen() && m_pszSlotname != LPTSTR(NULL))
            Connect();

        DWORD dwWrittenLength = 0;

        if (IsOpen())
        {
            //  Write using overlapped I/O. We have to use overlapped 
            //  I/O if we want to be able to interrupt the write. If we
            //  use synchronous I/O there's a high chance the operation
            //  will stall inside the WriteFile call.  See 
            //  https://codeproject.org.cn/win32/overlappedio.asp
            //  for a more detailed explanation.
            if (m_overlapped.Write(pbData, dwDataLength, &dwWrittenLength,
                                   m_hStopEvent) 
                             && dwWrittenLength == dwDataLength)
                //  The I/O completed so return success (true).
                return dwWrittenLength;
            else
                //  If the write failed discard it but also force a 
                //  disconnect so that the next write will attempt a 
                //  connection.
                Disconnect();
        }
    }

    return 0;
}
        

此方法尝试两次将消息写入邮槽。如果邮槽已打开且写入成功,则返回写入的字节数。如果写入失败,它会执行 Disconnect(),然后循环再次尝试。如果 Connect() 尝试成功,它会写入消息并返回写入的字节数。如果 Connect() 失败,它会返回 0 作为写入的字节数,由调用者决定如何处理消息。

现在你可能已经注意到,我声称这个类是同步的,但它却使用了重叠 I/O。我没有疯,也没有前后矛盾。从调用者的角度来看,除非调用者需要中断 I/O,否则该类表现为同步。实际上,使用该类的线程无法中断 I/O,因为它在 Write() 调用上阻塞。尽管如此,如果需要,另一个线程可以中断 I/O,这就是同步类首先使用重叠 I/O 的原因。有关原因的解释,请参见此处[^]。

CSyncMailslotReader

这个类实现了邮槽的读取端,它看起来像这样

//  Mail slot reader class. Used  to read from a mail slot. 
class CSyncMailslotReader : public CMailslot
{
public: CSyncMailslotReader(); virtual ~CSyncMailslotReader();
    virtual bool    Connect(LPCTSTR szSlotname, 
                            LPCTSTR szServer = _T(".")); 
    BYTE            *Read(DWORD& dwBufferLength);
    DWORD            GetMessageCount(
                         LPDWORD pdwNextMessageLength = (DWORD *)  NULL);
};
        

CSyncMailslotWriter 类一样,该类也提供了 Connect() 方法的自己的重写,它看起来像这样。

//  Create a named mail slot. This must be done on the local machine
//  thus we don't use the server name parameter.
bool CSyncMailslotReader::Connect(LPCTSTR szSlotname, LPCTSTR /*szServer*/)
{
    assert(szSlotname);
    
    if (IsOpen())
    {
        TCHAR szTempSlotname[_MAX_PATH];

        //  If we get here it means the mailslot handle might be valid so
        //  let's check that the m_pszSlotname variable isn't a NULL 
        //  pointer.  If it is then we've got an inconsistency that 
        //  oughtn't to happen.
        assert(m_pszSlotname);
        _sntprintf(szTempSlotname, 
                   _MAX_PATH, _T("\\\\.\\mailslot\\%s"), szSlotname);
        
        if (_tcsicmp(m_pszSlotname, szTempSlotname) == 0)
            return true;
        else
            Disconnect();
        }

    //  Delete any previously created slot name
    delete m_pszSlotname;
    m_pszSlotname = new TCHAR[_MAX_PATH];
    assert(m_pszSlotname);

    //  Create our mail slot name
    _sntprintf(m_pszSlotname, _MAX_PATH, _T(\\\\.\\mailslot\\%s), 
               szSlotname);
    m_pszSlotname[_MAX_PATH - sizeof(TCHAR)] = TCHAR(0);
    
    if ((m_hMailSlot = CreateMailslot(m_pszSlotname, 0, 
                                      MAILSLOT_WAIT_FOREVER, NULL))
                    != INVALID_HANDLE_VALUE)
    {
        //  Attach the mailslot handle to the overlapped
        //  I/O object.
        m_overlapped.Attach(m_hMailSlot);
        return true;
    }

    return false;
}
        

由于邮件槽读取器控制着邮件槽的生命周期,因此无需拥有两个 Connect() 方法。该方法会断开与此类的任何先前邮件槽实例的连接,并创建一个新的邮件槽。请注意,我们不使用 szServer 参数,因为如前所述,邮件槽必须在本地机器上创建。一旦我们创建了邮件槽,我们就会将其句柄附加到此类的 COverlappedIO 对象实例。

该方法开始时有一个健全性检查,以防止多次使用相同的邮槽名称调用 Connect() 的可能性。如果没有健全性检查,该方法将继续关闭现有邮槽的句柄并重新创建它。这会起作用,但持有邮槽打开句柄的写入器将在下一次写入尝试时遇到写入错误。CSyncMailslotWriter::Write() 方法可以处理这种情况,但是当测试相对简单时,为什么要浪费 CPU 周期呢?

从邮件槽读取消息比写入消息稍微复杂一些。原因在于你有两种读取方式。你可以通过 GetMessageCount() 方法循环轮询类,或者你可以实际进入 Read() 方法并等待消息。使用 GetMessageCount() 让你预先确定要读取多少数据(以及分配多少内存),但缺点是你会在轮询邮件槽以获取下一条消息时浪费大量的 CPU 周期。另一方面,直接调用 Read() 意味着你不知道要读取多少数据,因此你必须就单个 Read() 调用中准备处理多少数据做出一些任意决定。我选择了第二种方式,并任意限制为 65536 字节的数据(这个数字也恰好是微软建议的邮件槽消息大小的上限)。Read() 方法看起来像这样。

//  Read a message from the mailslot and return it in a buffer allocated
//  on the heap. The caller must delete the buffer once it's done with it.
BYTE *CSyncMailslotReader::Read(DWORD& dwBufferLength)
{
    //  We have to allocate a large buffer for incoming messages because
    //  we have no idea how much data is coming...
    BYTE  *pbData = (BYTE *) NULL,
          *pbTemp = (BYTE *) NULL;

    dwBufferLength = 0;

    if (IsOpen())
    {
        pbData = new BYTE[65536];
        assert(pbData);

        //  Now read the data
        if (m_overlapped.Read(pbData, 65536 - sizeof(TCHAR), 
                              &dwBufferLength, m_hStopEvent)
            && dwBufferLength)
        {
            //  If we read a message it's time to copy the data into a 
            //  buffer of the correct length to hold the message.  
            //  Actually we add one character to the buffer so that, if 
            //  the message is really a string, it'll be correctly 
            //  terminated and maintain string semantics.
            pbTemp = new BYTE[dwBufferLength + sizeof(TCHAR)];
            assert(pbTemp);
            memcpy(pbTemp, pbData, dwBufferLength);
            pbTemp[dwBufferLength] = TCHAR(0);
        }
    }

    delete [] pbData;
    return pbTemp;
}
        
你会注意到这里有一些小技巧。邮槽不是面向字符串的,它们是面向字节的,你可以发送任何你想要的数据。然而,我使用邮槽的大部分用途是从一个进程向另一个进程发送文本字符串,如果可能,我希望有字符串语义。在缓冲区末尾保留一个 NULL 终止符的空间可以实现这一点。因此,在此实现中,消息大小的真正限制是 65536 字节减去字符编码的 sizeof

CQueuedMailslotWriter

该类实现了一个消息队列,并使用后台线程将它们写入 CSyncMailslotWriter 的实例。该类实现了高优先级消息和普通优先级消息的概念。每种消息各有一个队列,自然,高优先级队列中的消息会首先发送。调用者在调用 Write() 方法时指定消息优先级。如果你愿意,可以将此机制扩展到任意数量的优先级级别,但在实践中我发现两个级别就足够了。当然,扩展优先级级别会带来性能损失。该类看起来像这样

class CQueuedMailslotWriter : public CSyncMailslotWriter
{
    class CQueuedData
    {
    public:
        CQueuedData(BYTE *pbData, DWORD dwDataLength);
        ~CQueuedData();

        DWORD       Length() const      { return m_dwDataLength; }
        BYTE        *Data() const       { return m_pbData; }

    private:
        BYTE        *m_pbData;
        DWORD       m_dwDataLength;
    };

    typedef deque<CQUEUEDDATA *> DATAQUEUE;
    typedef DATAQUEUE::const_iterator DQITER;

public:
                    CQueuedMailslotWriter(void);
    virtual         ~CQueuedMailslotWriter(void);

    virtual bool    Write(BYTE *pbData, DWORD dwDataLength,
                          BOOL bImportant);
    virtual bool    Connect(LPCTSTR szSlotname,
                            LPCTSTR szServername = _T("."));

private:
    static unsigned __stdcall ThreadStub(LPVOID data);
    virtual void    ThreadProc(CBaseThread *pThread);
    void            StopThread();

    HANDLE          m_hStopEvent,
                    m_hSignalEvent,
                    m_haSignal[2];
    CInterruptibleMutex m_imMutex;
    CBaseThread     *m_pThread;
    volatile bool   m_bStop;
    DATAQUEUE       m_highPriorityDataQueue,
                    m_normalPriorityDataQueue;
};
        
主要增加了一个私有类 CQueuedData 和一些线程相关的变量。CQueuedData 类只是方便地保存每次调用 Write() 方法时传递的数据。Write() 方法将传递的数据打包并添加到队列中。稍后,ThreadProc() 方法将从队列中取出数据并将其传递给基类的 Write() 方法。CQueuedMailslotWriter::Write() 看起来像这样
//  Writes a message to the mail slot. Actually it queues the message
//  for the mail slot and leaves it to the background thread to actually
//  do the write.
bool CQueuedMailslotWriter::Write(BYTE *pbData, DWORD dwDataLength,
                                  BOOL bImportant)
{
    assert(pbData);
    assert(dwDataLength);

    //  If the mail slot is closed attempt to reconnect to it
    if (!IsOpen() && m_pszSlotname != LPTSTR(NULL))
        CSyncMailslotWriter::Connect();

    if (IsOpen())
    {
        //  Grab the mutex first.  You must have the mutex before
        //  attempting to create the QueuedData object else you'll
        //  corrupt the program heap or deadlock on the heap lock.
        if (m_imMutex.AquireMutex(m_hStopEvent) == 
                                  CInterruptibleMutex::eMutexAquired)
        {
            CQueuedData *pqData = new CQueuedData(pbData, dwDataLength);

            assert(pqData);

            if (bImportant)
                //  High priority message, put it on the high priority
                //  queue
                m_highPriorityDataQueue.push_back(pqData);
            else
                //  Normal priority message, put it on the normal priority 
                //  queue
                m_normalPriorityDataQueue.push_back(pqData);

            m_imMutex.ReleaseMutex();

            //  Now signal the queue handler thread...
            SetEvent(m_hSignalEvent);
            return true;
        }
    }

    return false;
}
        

CSyncMailslotWriter::Write() 方法一样,此方法首先检查邮件槽连接是否打开。如果未打开,则尝试重新连接到邮件槽。如果它认为存在有效的邮件槽连接,它会继续将消息添加到队列中。否则,它只是丢弃消息。如果它将消息添加到队列中,它会设置一个事件,通知后台线程有新消息到达需要发送。Write() 方法和线程过程共享相同的数据队列,因此它们必须实现一种安全的方法来添加和删除队列中的条目。它们通过使用共享互斥体(实际上是我CInterruptibleMutex 类)[^ ]的实例来实现这一点。

线程看起来像这样

void CQueuedMailslotWriter::ThreadProc(CBaseThread *pThread)
{
    CQueuedData *pqData;
    DQITER      pdqIterator;
    bool        bQueuePriority;

    while (!pThread->Stop())
    {
        switch (WaitForMultipleObjects(2, m_haSignal, FALSE, INFINITE))
        {
        case WAIT_OBJECT_0:
            //  Told to stop, so stop
            break;

        case WAIT_OBJECT_0 + 1:
            //  Grab the mutex before we fall into the loop
            if (m_imMutex.AquireMutex(m_hStopEvent) !=
                                      CInterruptibleMutex::eMutexAquired)
                //  Signalled to stop, so stop...
                break;

            //  New message added to the queue, send it
            while ((m_highPriorityDataQueue.size() || 
                    m_normalPriorityDataQueue.size()) 
                   && !pThread->Stop())
            {
                //  Keep looping until either the queue is empty or 
                //  we've been signalled to stop.
                if (m_highPriorityDataQueue.size())
                {
                    pdqIterator = m_highPriorityDataQueue.begin();
                    bQueuePriority = false;
                }
                else
                {
                    pdqIterator = m_normalPriorityDataQueue.begin();
                    bQueuePriority = true;
                }

                pqData = *pdqIterator;

                //  Done for now, release the mutex to give other threads
                //  a chance of queuing data.
                m_imMutex.ReleaseMutex();
                
                if (CSyncMailslotWriter::Write(pqData->Data(),
                        pqData->Length()) == pqData->Length())
                {
                    //  Now aquire the mutex again so we can remove the
                    //  message from the queue
                    if (m_imMutex.AquireMutex(m_hStopEvent) == 
                                  CInterruptibleMutex::eMutexAquired)
                    {
                        //  The bQueuePriority flag tells us which queue we 
                        //  pulled the message from. We can't use the queue
                        //  size here because the queues might have 
                        //  changed in the time between now and when we
                        //  pulled the message to be sent.
                        if (bQueuePriority == false)
                            m_highPriorityDataQueue.pop_front();
                        else
                            m_normalPriorityDataQueue.pop_front();

                        delete pqData;
                        continue;
                    }
                }
                else
                    //  Failed to write the message, leave it on the queue
                    //  and break out of the writer loop
                    break;
            }

            //  Finished the loop, so release the mutex.
            m_imMutex.ReleaseMutex();
            break;
        }
    }

    //  Make sure we cancel any pending I/O before exiting the thread.
    if (IsOpen())
        CancelIo(m_hMailSlot);
}
        
它会一直等待,直到收到新消息的信号。当有消息到达时,它会首先检查高优先级队列,然后是普通优先级队列。无论哪种方式,它都会获取一条要发送的消息,通过调用 CSyncMailslotWriter::Write() 来完成。此过程中最值得注意的是锁定。当收到消息等待发送的信号时,它通过获取互斥锁来锁定消息队列。它从一个或另一个队列中取出消息,并在尝试发送消息之前释放互斥锁。一旦发送了消息,它会再次获取互斥锁,以便安全地从队列中移除消息。由于需要确保在检查队列大小之前获取互斥锁,并且队列大小是 while 循环的控制变量,因此需要比预期更多的互斥锁获取调用,从而使循环变得复杂。自然地,由于对 CSyncMailslotWriter::Write() 的调用可能会阻塞,因此我们在进行该调用之前释放互斥锁。一旦调用返回,我们需要再次获取互斥锁,然后继续到 while 循环的顶部。

CAsyncMailslotReader

这个类代表了邮槽读取器更高层次的使用,它在一个独立的线程上运行,并通过一个虚拟函数 OnMessage() 将传入的消息传递给客户端。

该类看起来像这样

class CAsyncMailslotReader : public CSyncMailslotReader
{
public:
                    CAsyncMailslotReader();
    virtual         ~CAsyncMailslotReader();

    virtual bool    OnMessage(BYTE *pbMessage, DWORD dwMessageLength) = 0;
    virtual bool    Connect(LPCTSTR szSlotName);

protected:
    static unsigned int __stdcall ThreadStub(LPVOID data);
    unsigned int    ThreadProc(LPVOID data);

    CBaseThread     *m_pThread;
};
        

该类重写了虚拟 Connect() 方法,以便在对象创建邮件槽时创建监控线程。Connect() 看起来像这样

bool CAsyncMailslotReader::Connect(LPCTSTR szSlotName)
{
    assert(szSlotName);
    bool bStatus = CSyncMailslotReader::Connect(szSlotName);

    if (bStatus)
    {
        //  If we succeeded in creating the mailslot we start up a thread to
        //  monitor it.
        m_pThread = new CBaseThread(m_hStopEvent, &m_bStop, ThreadStub,
                                    false, this);
        assert(m_pThread);
    }

    return bStatus;
}
        

这很简单。线程过程看起来像这样

unsigned int CAsyncMailslotReader::ThreadProc(LPVOID data)
{
    CBaseThread *pThread = (CBaseThread *) data;
    BYTE        *pbMessage;
    DWORD       dwMessageLength = 0;

    assert(pThread);

    while (!pThread->Stop())
    {
        //  Get and dispatch messages
        pbMessage = Read(dwMessageLength);

        if (dwMessageLength)
            OnMessage(pbMessage, dwMessageLength);
    }

    return 0;
}
        
这也很简单。线程只是在调用 CSyncMailslotReader::Read() 中等待下一条消息。当消息到达时,线程调用虚拟 OnMessage() 方法,你可以重写该方法来执行应用程序所需的任何处理。请注意,CAsyncMailslotReader::OnMessage() 方法是一个纯虚方法;你不能直接实例化 CAsyncMailslotReader 的实例,而必须从它派生你自己的类。

请注意,对 OnMessage() 方法的调用发生在与您的主应用程序线程不同的线程上;如果您在 MFC 应用程序中使用该类并从 OnMessage() 方法操作 CWnd,记住这一点尤其重要。

使用代码

你需要构建库。这是一个静态库,而不是 DLL;我们不要涉及那个。源代码下载包含构建库所需的所有文件。根据你的项目使用哪些非抽象类,将相应的头文件包含到你的项目中;它们将包含 mailslots.h,该文件反过来插入必要的语句以拉取库。你需要做的就是确保库位于你的库路径中的某个位置。

演示项目

有三个演示项目。第一个实际上并没有使用邮件槽库;它所做的只是枚举并显示系统上存在哪些邮件槽。我在最初探索邮件槽时编写了它,我发现它在尝试确定哪些邮件槽可用时很有用。你会发现它有用吗? *耸肩*

第二个演示项目是一个邮件槽监听器。您指定一个邮件槽名称并点击创建按钮。一旦完成,只要程序继续运行,系统上就会存在一个具有该名称的邮件槽。发送到该邮件槽的任何消息都将由程序显示。当然,由于这是一个邮件槽监听器,一旦您关闭程序,邮件槽就会消失。

第三个演示项目是一个邮槽写入器。与邮槽监听器演示项目一样,您指定一个邮槽名称并点击创建按钮。如果存在该名称的邮槽,写入器将连接到它。然后,您可以在消息编辑控件中键入消息,点击发送按钮,并期望看到邮槽监听器显示消息。

已知错误

我只知道一个bug。它发生在一种我认为在实际应用程序中不太可能出现的情况下,所以我不担心它。如果你运行邮件槽监听器演示项目和邮件槽写入器项目,在监听器中创建一个邮件槽,连接到监听器,发送一条或多条消息,然后更改监听器中的名称,在写入器中匹配更改并重复发送消息,你会发现每隔一条消息都是垃圾。这是一个很容易重现的bug,我花了几个小时试图找到它。(有人会在我发布这篇文章的5分钟内找到它)。然而,在类的实际使用中,这种情况不太可能发生,因为我非常怀疑邮件槽监听器会更改它们监听的邮件槽的名称,更不用说通知写入器它们即将更改邮件槽的名称了。

邮槽与命名管道

根据我的经验,对于进程间通信,邮槽比命名管道更容易使用。然而,这并不是一个简单的设计决策。如果简单,我们可能就不会同时拥有这两种机制了。所以,以下是它们的优缺点。

如文章开头所述,邮件槽实现了一个多写入器/单读取器模型。邮件槽读取器端的一个句柄可以监控许多写入器。命名管道要求每个连接有一个管道。这意味着多写入器/单读取器模型的实现需要每个写入器一个命名管道和每个写入器一个读取句柄。这限制了命名管道实现可以监控的写入器数量,每个监控线程最多 64 个写入器。

此外,您可以在任何 32 位 Windows 实现上创建邮件槽,包括 Windows 95。如果相信微软的营销,那无关紧要,但我的经验是,任何时候您可以将 Win9X 系列操作系统包含在您的目标受众中,都会更好。Win9X 系统可以连接到命名管道,但不能创建命名管道——要创建命名管道,您需要 Windows NT 及其后代。

另一方面,邮件槽使用 UDP 数据报进行传输。这意味着消息不保证会被送达。这也意味着消息不保证按发送顺序接收。如果你的应用程序要求消息被接收且按顺序接收,那么邮件槽不一定是正确的选择。如果你在同一台机器上使用邮件槽作为进程间通信机制,你可能没问题——在本地机器上消息丢失或乱序的几率极低(但如果你的本地机器是多处理器,请小心:我的就是)。如果你通过网络发送消息,并且绝对要求每条消息都按正确的顺序到达,你可能需要寻找其他 IPC 机制。

最后,命名管道是一种双向通信通道。管道的两端都可以从同一个管道中读写。邮槽无法做到这一点!

历史

2004年10月10日 - 初始版本。

2004年10月16日 - 增加了一些关于命名管道与邮件槽的评论,以及为什么选择其中之一。(感谢 JT Anderson)。

© . All rights reserved.