Windows Mobile .NET 中使用消息队列的进程间通信






4.81/5 (15投票s)
演示在托管代码中使用原生消息队列的包装器。
引言
当需要在不同程序之间传递信息时,Windows Mobile 和 Windows CE 提供了各种技术和解决方案来实现。信息可以通过共享存储位置(如注册表、文件或数据库)传递。对于频繁的小消息通信,可以将消息放在应用程序的消息泵上或通过消息队列使用。消息队列与事件、信号量和互斥体属于同一类对象;它们是命名内核对象。目前 .NET Framework 不直接支持这些对象。但通过一些 P/Invoke 声明,可以轻松访问其功能。在本文中,我将展示如何与消息队列功能进行交互。
我的目标不是对消息队列进行详尽的解释,而是涵盖足够的信息,让读者掌握概念并继续学习。
必备组件
本文建立在其他一些 Windows CE 内核对象(即事件、互斥体和信号量)的概念之上。它还建立在我最近一篇关于Windows Mobile .NET 原生线程同步的文章中介绍的代码之上。在阅读本文之前,请先阅读上述文章;我在此前文章的基础上扩展了代码,您需要熟悉它才能阅读本文。
原生函数和结构
本文有许多核心的原生函数。有关这些函数的详细信息可在 MSDN 库中找到。函数列表如下:
这些函数中使用的一些结构如下所示:什么是消息队列
最简单地说,队列是一个有序列表。可以向列表中添加项目或从列表中移动项目。但是,不能从列表中任意位置添加或删除项目。项目只能从列表的开头删除,项目只能添加到列表的末尾。这些插入和删除规则通常被称为 FIFO(先进先出)或 FCFS(先到先服务)。Windows CE 设备提供两种消息队列实现。一种实现是操作系统的一部分,用于同一设备上进程之间的通信。另一种是 MSMQ 的实现,可以安装到 Windows CE 设备上,并可用于与其他机器通信。本文主要围绕前两种实现中的第一种。
消息队列可以是特定于进程的,也可以在进程之间共享。在任何一种情况下,消息队列的句柄都允许对队列进行只读或只写访问。不能使用同一句柄同时读写消息队列。如果您已经拥有消息队列的句柄,您可以创建额外的句柄,用于读写关联的队列。这对于无名队列尤其重要。
如前所述,本文中的代码建立在前一篇文章的代码之上。前一篇文章中的代码与本文中的代码之间的关系如下图类层次结构图所示。深蓝色(MessageQueue
、MessageQueueReader
、MessageQueueWriter
)的类是本文中添加的类。

创建和打开消息队列
消息队列通过原生函数 CreateMsgQueue
创建或打开。与前一篇文章中的同步对象一样,消息队列必须分配一个名称才能在进程之间共享。如果多个进程创建同名消息队列,则每个进程都将收到同一个消息队列的句柄。创建消息队列的调用必须传递一个 MSGQUEOPTIONS
结构,以指定消息队列中可以存在的最大项目数、队列中每个消息的最大大小以及是请求只读还是只写句柄。
如果您的消息队列仅用于在同一进程内的线程之间传递信息(在这种情况下消息队列可能没有名称),您将需要使用您创建的第一个消息队列的句柄来创建另一个句柄,该句柄使用 OpenMsgQueue 连接到同一个队列。如果没有此函数,在您创建新的消息句柄时,您将无法指定您正在创建的句柄应该连接到已经存在的上一个队列。
消息队列由操作系统创建,是系统资源。您必须确保在不再需要消息队列句柄时释放它。您可以使用 CloseMsgQueue 释放消息队列句柄。
基类
与系统事件、信号量和互斥体一样,消息队列是可等待的,可以选择有名称,并通过必须手动清除的句柄绑定到资源。前一篇文章中的 SyncBase
类已经设计用于保存系统资源的句柄。该类的布局如下所示
public abstract class SyncBase : IDisposable
{
protected IntPtr _hSyncHandle;
protected bool _firstInstance;
public bool FirstInstance
{
get { return _firstInstance; }
}
public IntPtr SyncHandle
{
get { return _hSyncHandle; }
}
public static SyncBase WaitForMultipleObjects
(int timeout, params SyncBase[] syncObjectList) {...}
public bool SetEvent() {...}
public static SyncBase WaitForMultipleObjects
(params SyncBase[] syncObjectList) {...}
public WaitObjectReturnValue Wait(int timeout) {...}
public WaitObjectReturnValue Wait() {...}
#region IDisposable Members
public virtual void Dispose()
{
if(!_hSyncHandle.Equals(IntPtr.Zero))
CoreDLL.CloseHandle(_hSyncHandle);
}
#endregion
}
与系统事件一样,当代码等待队列时,它将被阻塞,直到队列处于信号状态。对于队列的只读句柄,信号状态表示有数据已准备好从队列中读取。对于只写句柄,信号状态表示队列中有足够的空间容纳更多消息。一旦队列已满,句柄就不再处于信号状态。因此,基类中定义的 Wait
方法无需修改即可使用。
构造函数必须定义并调用 CreateMsgQueue
。调用返回的句柄将保存在成员 _hSyncHandle
中。
构造函数
虽然有几个消息队列构造函数,但它们都调用两个基本构造函数之一。原生函数 CreateMsgQueue
在构造函数中使用,它需要 MSGQUEUEOPTIONS
参数作为创建参数。我已实现了托管类 MsgQueueOptions
以反映该结构,并将该类的实例创建和初始化包装在方法 GetMessageQueueOptions
中。现在我允许调用者使用一个消息队列设置。它是 MsgQueueOptions
的 dwFlags
成员。可以通过此标志设置的选项是 MSGQUEUE_NOPRECOMMIT
和 MSGQUEUE_ALLOW_BROKEN
。MSGQUEUE_NOPRECOMMIT
可防止系统预先分配消息队列所需的内存,并根据需要分配内存。MSGQUEUE_ALLOW_BROKEN
用于允许写入器队列即使在另一端没有读取器队列的情况下也可以使用。如果未指定此选项,并且在读取器连接到队列之前尝试写入队列,则句柄会立即变得不可用(因此,我绝不会省略该标志)。如果读取器或写入器的创建导致队列的创建,则 GetLastWin32Error
将返回 SUCCESS
(数值 0)。否则它返回 ERROR_ALREADY_EXISTS
。只要 CreateMsgQueue
返回非零句柄,则调用成功。我使用 GetLastWin32Error()
来确定队列的第一个句柄是否刚刚创建。这是基本构造函数的代码
internal MessageQueue(string name, bool readAccess, int maxItems, int itemSizeInBytes)
{
MsgQueueOptions options = GetMessageQueueOptions
(readAccess, maxItems, itemSizeInBytes);
_hSyncHandle = CoreDLL.CreateMsgQueue(name, options);
int lastError = Marshal.GetLastWin32Error();
if (IntPtr.Zero.Equals(_hSyncHandle))
{
throw new ApplicationException(String.Format("Could not create or
open message queue {0}. LastWin32Error={1}", name, lastError));
}
_firstInstance = (0 == lastError);
}
另一个重要的构造函数是创建与现有队列连接的队列端点,使用另一个队列端点作为参数。如果您正在使用一个没有名称的队列,那么这是您能够创建另一个队列端点的唯一方法。原生函数 OpenMsgQueue
用于执行此操作。与 CreateMsgQueue
一样,此方法需要一个 MsgQueueOptions
。在选项结构中,OpenMsgQueue
唯一使用的参数是 dwSize
和 bReadAccess
。基消息队列类的另一个构造函数如下
internal MessageQueue(MessageQueue source, int maxCount, bool readOnly)
{
_firstInstance = false;
MsgQueueOptions options = GetMessageQueueOptions(readOnly, maxCount, 0);
IntPtr processID = (IntPtr)Process.GetCurrentProcess().Id;
_hSyncHandle = CoreDLL.OpenMsgQueue(processID, source._hSyncHandle, options);
if (_hSyncHandle.Equals(IntPtr.Zero))
{
int errorCode = Marshal.GetLastWin32Error();
QueueResult result = Win32ErrorCodeToQueueResult(errorCode);
string errorMessage = String.Format("Error occurred opening
read message queue (Win32Error={0}, QueueResult={1}", errorCode, result);
if (result == QueueResult.InvalidHandle)
{
CoreDLL.CloseMsgQueue(_hSyncHandle);
_hSyncHandle = IntPtr.Zero;
}
throw new ApplicationException(errorMessage);
}
}
这个消息队列实现是可处置的;当它不再使用时,可以通过调用 dispose 消息来清理其资源。
队列的子类化
当我为消息队列功能编写包装器时,我曾考虑过,如果开发人员尝试从只写队列中读取或向只读队列中写入,就抛出异常。但后来我觉得更合理的做法是,根本不允许开发人员执行这种无效操作。所以我将 MessageQueue
类子类化为两个类:MessageQueueReader
和 MessageQueueWriter
。每个类都只包含一组用于读写消息队列的方法,而不是两者兼有。这些类的构造函数只调用基构造函数,并将 readOnly
参数设置为 true
或 false
。
写入队列
用于写入队列的方法使用 WriteMsgQueue
。如果队列中没有空间写入消息,该方法将阻塞调用者。CreateMsgQueue
接受一个名为 dwTimeout
的参数,用于指定调用者在写入请求被视为失败之前等待多长时间。如果此值设置为 INFINITE
(数值 -1),则调用将无限期阻塞,直到有足够的可用空间执行写入。
在我实现这个包装器时,开发人员只能以字节数组的形式将信息传递到队列中。我曾考虑使用泛型来使代码更灵活,但我发现了一些这种实现被误用和滥用的方式。开发人员有责任将其数据转换为字节数组。我的实现中公开了两个写入方法。第一个接受消息字节数组和超时值。第二个只包含消息字节,并假设超时值为 INFINITE
。
从队列读取
Read
方法是 Write
方法的对应;开发人员为方法提供一个字节缓冲区,并可选择一个超时值。如果没有可读取的内容,调用将被阻塞,超时值控制方法在读取尝试被视为失败之前等待消息的时间。
读写结果
由于从队列读取或写入队列失败是正常执行流程的一部分,我决定在写入请求失败时不抛出异常。抛出异常会影响性能,所以我尽量不不必要地抛出它们。相反,这些方法返回枚举类型 QueueResult
的值。QueueResult.OK
表示读或写请求成功完成,其他值表示失败并附带原因(例如写入请求超时)。

代码示例
读/写客户端
读/写客户端示例创建一个消息队列,并允许用户从主 (UI) 线程向队列添加消息,并在单独的线程上处理队列中的消息。从用户体验的角度来看,没有什么可看的。所有有趣的内部工作都在代码中。

/// <summary>
/// Waits on messages to be placed on the queue and displays them as they arrive
/// </summary>
void ReaderThread()
{
using(_reader)
{
while (!_shuttingDown)
{
//The following call will block this thread until there is either a message
//on the queue to read or the thread is being signalled to run to prepare
//for program termination. Since the following call blocks the thread until
//it is time to do work it is not subject to the same batter killing
//affect of other similar looking code patterns
//( http://tinyurl.com/6rxoc6 ).
if (SyncBase.WaitForMultipleObjects(_readerWaitEvent, _reader) == _reader)
{
string msg;
_reader.Read(out msg); //Get the next message
AppendMessage(msg); //Display the thread to the user
}
}
}
}
/// <summary>
/// Appends processed message to top of list box.
/// </summary>
/// <param name=""message""></param>
public void AppendMessage(string message)
{
//If this is called from a secondary thread then marshal it to
//the primary thread.
if (this.InvokeRequired)
{
this.Invoke(_appendDelegate, new object[] { message });
}
else
{
this.lstReceivedMessages.Items.Insert(0, message);
}
}
写入客户端
写入客户端与之前的代码示例一起工作。它连接到与之前代码示例相同的队列,用户放入队列的任何消息都将显示在另一个程序中(如果它正在运行)。如果您单独运行写入客户端而不启动读取客户端,那么消息将累积在队列中直到它已满。如果您尝试向队列已满时写入消息,请求将阻塞 4 秒,然后返回超时结果。

电源通知队列
电源通知队列示例与我发布的一篇关于Windows Mobile 电源管理的文章相关。该程序创建一个队列读取器,并在调用原生函数 RequestPowerNotifications
时将读取器队列的句柄传递过去。操作系统随后会将消息写入队列,以通知程序电源状态的变化。通知会附加到列表视图的开头。选择列表中的项目将导致相关的电源标志显示在屏幕底部。
请求电源通知所产生的消息以结构体的形式传递,但我提供的包装器使用字节数组。我创建了一个继承自 MessageQueueReader
的新队列类型,并提供了一个返回电源队列消息的 Read
实现。重载的 Read
方法会重新构造 PowerBroadcast
结构。

public PowerBroadcast Read()
{
PowerBroadcast retVal = new PowerBroadcast();
int bytesRead;
QueueResult result;
result = Read(readBuffer, out bytesRead);
if (QueueResult.OK == result)
{
int message = readBuffer[0] | readBuffer[1] << 8 |
readBuffer[2] << 0x10 | readBuffer[3] << 0x18;
int flags = readBuffer[4] | readBuffer[5] << 8 |
readBuffer[6] << 0x10 | readBuffer[7] << 0x18;
int length = readBuffer[8] | readBuffer[9] << 8 |
readBuffer[10] << 0x10 | readBuffer[11] << 0x18;
retVal.Message = (PowerBroadCastMessageType)message;
retVal.Flags = (PowerBroadcastFlags)flags;
retVal.Length = length;
if ((length > 0)&&( (retVal.Message&PowerBroadCastMessageType.PBT_TRANSITION)
==PowerBroadCastMessageType.PBT_TRANSITION))
{
retVal.SystemPowerState = TextEncoder.GetString(readBuffer,12,length);
}
}
return retVal;
}
闭运算
我已涵盖了 Windows 消息队列的基本知识,所提供的信息对于需要使用消息队列的大多数场景来说已经足够。但请不要止步于此。继续阅读 MSDN 库中关于消息队列和其他命名对象的文章。
历史
- 2008 年 11 月 16 日 - 初次发布