进程内线程消息传递
一个类库,用于创建工作线程和它们之间的消息通信结构。
引言
该库实现了进程内工作线程的消息传递。所有线程都通过一个单一的访问点——消息队列或邮箱——进行通信。消息由线程串行处理。邮箱确保消息被排队,并且在线程能够处理它们之前不会丢失。这是一个异步消息系统,因此发送线程不会被阻塞。通过将线程的所有数据和代码声明在一个类中,线程的数据不存在并发问题。我认为这种方法比在代码中充斥着锁定语句共享对象,并不可避免地导致死锁,要更清晰、更容易维护。
历史
过去,我曾使用过提供消息队列的操作系统:VRTX-32 和 OS/2。不知何故,Windows 从未提供过这样的对象。
上图快速概览了其工作原理。
- 某个外部线程向线程 1 发送消息。
- 完成处理后,线程 1 将结果 1 消息发送给线程 2。
- 线程 2 然后执行一些处理,完成后,它将结果 2 消息发送给线程 3,并将确认消息发送给线程 1。
- 线程 3 可能只是更新一个数据库,作为处理结果 2 消息的结果。
库的内部结构
这个库中有几个类,如下所示:
- 消息队列
- 托管线程
- 消息类
- 线程管理器
消息队列
消息队列,或称为邮箱,允许任何线程发送消息,并允许拥有该队列的线程接收这些消息。消息队列由三个组件组成:一个 Queue
,一个 ManualResetEvent
,以及一个用于锁定对其他组件访问的对象。
消息队列有三个方法:SendMessage()
,供任何线程向拥有该队列的线程发送消息;GetMessage()
,仅供拥有该队列的线程使用,它会一直等待直到有消息到达;以及 GetMessage(int waitTime)
,它与 GetMessage()
相同,但如果在 waitTime
毫秒内没有收到消息,则会返回。这允许您的线程执行一些周期性处理。或者,您可以创建一个计时器,向您的线程发送消息以执行周期性函数。
实现
/// <summary>
/// Implements a message queue for a thread.
/// </summary>
internal class MessageQueue
{
/// <summary>
/// Queue for FIFO processing of messages
/// </summary>
private Queue<BaseMessage> msgQueue;
/// <summary>
/// Event signalling to the thread that a message has arrived
/// </summary>
private ManualResetEvent messageArrivedEvent;
/// <summary>
/// Mutual exclusion of threads to proteced resources
/// </summary>
private object mutexObject;
/// <summary>
/// Name of the managed threead that owns this message queue
/// </summary>
private readonly string threadName;
/// <summary>
/// Constructor
/// </summary>
public MessageQueue(string threadName)
{
msgQueue = new Queue<BaseMessage>();
messageArrivedEvent = new ManualResetEvent(false);
mutexObject = new object();
this.threadName = threadName;
}
/// <summary>
/// Send a message to the thread that owns this message queue
/// </summary>
public void SendMessage(BaseMessage msg)
{
lock(mutexObject)
{
msgQueue.Enqueue(msg);
messageArrivedEvent.Set();
}
}
/// <summary>
/// Get a message. If no messages wait indefinitely for one.
/// </summary>
public BaseMessage GetMessage()
{
BaseMessage msg = null;
// Only wait for a signal if no messages present
if(msgQueue.Count == 0)
messageArrivedEvent.WaitOne();
lock (mutexObject)
{
msg = msgQueue.Dequeue();
messageArrivedEvent.Reset();
}
return msg;
}
/// <summary>
/// Get a message. Only wait for the time out period
/// before returning, null return value indicates
/// no messages were received.
/// </summary>
public BaseMessage GetMessage(int waitTime)
{
BaseMessage msg = null;
// Don't wait for a signal if a message present
if (msgQueue.Count > 0 || messageArrivedEvent.WaitOne(waitTime) == true)
{
lock (mutexObject)
{
msg = msgQueue.Dequeue();
messageArrivedEvent.Reset();
}
}
return msg;
}
托管线程
此类包装了线程池中线程的功能和消息队列。它实现了 IThread
接口,允许操作线程并向其发送消息。
此类提供执行以下操作的公共方法:
- 创建线程后启动它。
- 停止线程。
- 向线程发送消息。
/// <summary>
/// Interface to a managed thread object
/// </summary>
public interface IThread
{
/// <summary>
/// The thread name
/// </summary>
string Name { get; }
/// <summary>
/// Start this thread running
/// </summary>
void StartThread();
/// <summary>
/// Stop this thread
/// </summary>
void StopThread();
/// <summary>
/// Send a message to this thread
/// </summary>
void SendMessage(BaseMessage msg);
}
在构造函数中,会提供一个消息处理函数的引用。在这里,您将实现处理特定消息的逻辑。
一旦托管线程开始运行,它就会进入一个循环,在该循环中,它在消息队列处等待下一个要处理的消息。停止线程消息会导致线程退出循环。
/// <summary>
/// Main processing loop for the thread. This one implements both the infinite
/// and timed wait for new messages.
/// </summary>
private void mainLoop(object state)
{
Thread.CurrentThread.Name = name;
bool run = true;
while (run == true)
{
// Call the appropriate GetMessage() method
BaseMessage msg = null;
if (waitTimeInMilliseconds == -1)
msg = msgQueue.GetMessage();
else
msg = msgQueue.GetMessage(waitTimeInMilliseconds);
// If a message returned, check its type
if (msg != null)
{
switch (msg.MsgType)
{
// Stop message, clear the flag so we exit the loop
// and end the thread
case MessageType.StopThread:
run = false;
break;
default:
// Call the user's supplied message handler
messageHandler(msg);
break;
}
}
else
// If we get here the caller has opted for time outs
messageHandler(msg);
}
}
消息类
所有有用的消息都继承自 BaseMessage
。该类定义了您设计中所需的特定消息类型。
/// <summary>
/// The different messages the managed threads send to each other.
/// Add your own definitions here as needed.
/// </summary>
public enum MessageType
{
StopThread, // Default one to have each thread exit its main processing loop
// Add your message definitions here
Message1,
Message2A,
Message2B
}
/// <summary>
/// Base for all specific messages
/// </summary>
public abstract partial class BaseMessage
{
/// <summary>
/// The specific message being instantiated
/// </summary>
public readonly MessageType MsgType;
/// <summary>
/// Constructor
/// </summary>
public BaseMessage(MessageType msgType)
{
this.MsgType = msgType;
}
}
对于这个示例程序,我创建了三个特定的消息类。
这是在循环测试中传递的消息。
/// <summary>
/// Example user defined message class for the loop test
/// </summary>
public class Message1 : BaseMessage
{
/// <summary>
/// Each thread in the chain will sleep this long before passing the message on
/// </summary>
public readonly int DelayInMilliSeconds;
/// <summary>
/// Constructor
/// </summary>
public Message1(int delayInMilliSeconds)
: base(MessageType.Message1)
{
DelayInMilliSeconds = delayInMilliSeconds;
}
}
这是启动负载和序列测试的消息。
/// <summary>
/// This message signals the worker thread to go into a loop sending out Message2Bs
/// </summary>
public class Message2A : BaseMessage
{
/// <summary>
/// Constructor
/// </summary>
public Message2A()
: base(MessageType.Message2A)
{
}
}
这是为负载和序列测试发送回的消息。
/// <summary>
/// This message signals the worker thread to go into a loop sending out Message2Bs
/// </summary>
public class Message2B : BaseMessage
{
/// <summary>
/// The current sequence number
/// </summary>
public readonly int SequenceNumber;
/// <summary>
/// The name of the managed thread sending this message
/// </summary>
public readonly string ThreadName;
/// <summary>
/// Constructor
/// </summary>
public Message2B(int sequenceNumber, string threadName)
: base(MessageType.Message2B)
{
SequenceNumber = sequenceNumber;
ThreadName = threadName;
}
线程管理器
此类是您应用程序代码的连接点。它实现了 IThreadManager
接口。
/// <summary>
/// Interface to the thread manager
/// </summary>
public interface IThreadManager
{
/// <summary>
/// Create a thread, it must be started running separately.
/// This method pends at the message queue until a message arrives and then
/// calls your message handler.
/// </summary>
IThread CreateThread(string threadName, Action<BaseMessage> messageHandler);
/// <summary>
/// Create a thread, it must be started running separately.
/// This method pends at the message queue until either a message arrives or the time
/// out period expires and then calls your message handler. If a time out the handler
/// is called with a null instead of a message object.
/// </summary>
IThread CreateThread(
string threadName,
Action<BaseMessage> messageHandler,
int waitTimeInMilliseconds);
/// <summary>
/// Get a list of all the threads under management
/// </summary>
List<IThread> GetThreads();
/// <summary>
/// Start all threads running
/// </summary>
void StartAllThreads();
/// <summary>
/// Stop all threads
/// </summary>
void StopAllThreads();
/// <summary>
/// Removes all threads from the manager
/// </summary>
void ClearAllThreads();
/// <summary>
/// Send a message to the thread that created the Thread Manager
/// </summary>
void SendMessage(BaseMessage msg);
/// <summary>
/// If your application code wants to receive messages,
/// it should wire in this event.
/// </summary>
event Action<BaseMessage> ReceivedMessage;
}
这允许您的应用程序代码:
- 获取
ThreadManager
实例的引用。 - 为您的系统创建任意数量的线程。
- 启动和停止所有正在运行的线程。
- 获取所有当前线程的
IThread
列表。 - 向其中任何一个线程发送消息。
- 通过连接一个事件,从线程接收消息。
使用库
我还没有找到一种干净的方法来添加消息枚举和类而不修改消息类代码,因此目前,库的源代码必须包含在您的解决方案中,并根据您的特定应用程序进行自定义更改。
我包含了一个我用来测试该库的 WinForms 应用程序,它也展示了该库的功能。我编写了两个测试。
循环测试
在这个测试中,WinForms 应用程序创建了三个线程。WinForms 线程将消息 1 发送给线程 1,线程 1 然后休眠指定的时间,再将消息传递给线程 2。线程 2 休眠指定的时间,并将消息传递给线程 3,线程 3 也休眠指定的时间,最后将消息发送回 WinForms 线程,从而完成测试。
创建和启动线程
Form1::btnStartSleepLoop_Click()
中的以下代码启动测试。
Example1
是一个类,它提供了托管线程类所需的用户定义消息处理程序。请注意,worker3
接收了 WinForms 邮箱的引用,它将在完成时将消息 1 发送到那里。然后,托管线程会使用在 worker3
中实现的对消息处理程序的引用进行创建。线程 2 和 1 也是如此。接下来,WinForms 线程将 ReceivedMessage
事件连接到从线程 3 接收完成消息。然后,所有托管线程都会启动,并将消息 1 发送给线程 1 以开始测试。
// Get a reference to the thread manager
threadMgr = ThreadManager.GetThreadManager();
// Create 3 threads for passing around the sleep message
// Create them in reverse order as thread 1 needs to know thread 2's mail box, etc.
Example1 worker3 = new Example1(
"Thread3",
threadMgr.SendMessage); // Thread 3 sends the message back to this thread
IThread thread3 = threadMgr.CreateThread(worker3.ThreadName, worker3.MessageHandler);
threads.Add(worker3.ThreadName, thread3);
Example1 worker2 = new Example1(
"Thread2",
thread3.SendMessage); // Thread 2 sends the message to thread 3
IThread thread2 = threadMgr.CreateThread(worker2.ThreadName, worker2.MessageHandler);
threads.Add(worker2.ThreadName, thread2);
Example1 worker1 = new Example1(
"Thread1",
thread2.SendMessage); // Thread 1 sends the message to thread 2
IThread thread1 = threadMgr.CreateThread(worker1.ThreadName, worker1.MessageHandler);
threads.Add(worker1.ThreadName, thread1);
// Wire in the message received event handler
threadMgr.ReceivedMessage += new Action<BaseMessage>(threadMgr_NewMessage);
// Start the threads running
threadMgr.StartAllThreads();
// Send a message to the 1st thread
Message1 msg = new Message1(Convert.ToInt32(txtDelay.Text) * 1000);
thread1.SendMessage(msg);
Example1 类
此类有一个发送消息委托,当处理完成时,它会将消息发送给该委托。MessageHandler()
的引用被传递给托管线程,托管线程每次收到消息时都会调用此方法。
/// <summary>
/// Class to handle the thread messaging loop for Example 1.
/// This class is used to encapsulate all the objects maintained by the thread.
/// </summary>
public class Example1
{
/// <summary>
/// Name of this thread
/// </summary>
public readonly string ThreadName;
/// <summary>
/// Delegate used to send the message on to the next thread
/// </summary>
private Action<BaseMessage> sendMsg;
/// <summary>
/// Ctor
/// </summary>
public Example1(string threadName, Action<BaseMessage> sendMsg)
{
ThreadName = threadName;
this.sendMsg = sendMsg;
}
/// <summary>
/// Received message handler
/// </summary>
public void MessageHandler(BaseMessage msg)
{
switch (msg.MsgType)
{
case MessageType.Message1:
{
Message1 msg1 = (Message1)msg;
// Sleep for the requested time
System.Threading.Thread.Sleep(msg1.DelayInMilliSeconds);
// And then send the message on to the next thread
sendMsg(msg1);
}
break;
}
}
当线程 3 完成时,WinForms 线程在 threadMgr_NewMessage()
中接收到消息。loopTestUpdateForm()
方法处理了在 UI 线程上进行 Invoke 所需的操作,以更新窗体上的控件。
/// <summary>
/// This form has gotten a message, i.e. from thread 3
/// </summary>
void threadMgr_NewMessage(BaseMessage msg)
{
if (msg != null)
{
switch (msg.MsgType)
{
// Thread 3 has finished processing and sent the message on to us
case MessageType.Message1:
// Clean up
threadMgr.ReceivedMessage -= threadMgr_NewMessage;
threadMgr.StopAllThreads();
threadMgr.ClearAllThreads();
threads.Clear();
loopTestUpdateForm();
break;
}
}
}
负载和序列测试
此测试旨在对系统进行压力测试,并检测丢失的消息和乱序接收的消息。消息 2A 被发送给三个线程,并请求它们发送指定数量的消息 2B。每个线程以尽可能快的速度循环发送所需数量的消息,并带有一个从 0 开始递增且连续的序列号。这使得 WinForms 线程能够检查不仅收到了预期的消息数量,而且它们是按顺序到达的。
启动测试
如前所述,创建了三个线程,不同之处在于,这次,Example2
对象实现了消息处理程序,并且它们都向 WinForms 线程发送消息。
Example2 类
/// <summary>
/// Class to handle Example 2, the load/stress and dropped message test
/// This class is used to encapsulate all the objects maintained by the thread.
/// </summary>
public class Example2
{
/// <summary>
/// Thread name
/// </summary>
public readonly string ThreadName;
/// <summary>
/// Ascending sequence number sent out in successive messages
/// </summary>
private int messageSequenceNumber;
/// <summary>
/// The number of messages to send
/// </summary>
private int numberOfMessagesToSend;
/// <summary>
/// Delegate used to send the sequence of messages
/// back to the thread running the test
/// </summary>
private Action<BaseMessage> sendMsg;
/// <summary>
/// Ctor
/// </summary>
public Example2(string threadName, int numberOfMessagesToSend,
Action<BaseMessage> sendMsg)
{
ThreadName = threadName;
messageSequenceNumber = 0;
this.numberOfMessagesToSend = numberOfMessagesToSend;
this.sendMsg = sendMsg;
}
/// <summary>
/// Received message handler
/// </summary>
public void MessageHandler(BaseMessage msg)
{
switch (msg.MsgType)
{
case MessageType.Message2A:
{
// Drop into a loop sending out all the requested messages back to back
while (messageSequenceNumber < numberOfMessagesToSend)
{
Message2B outboundMsg =
new Message2B(messageSequenceNumber, ThreadName);
sendMsg(outboundMsg);
messageSequenceNumber++;
}
}
break;
}
}
关注点
我曾为不同客户在临时基础上多次实现了这个消息传递方案,最后决定将其打包,以便在未来的项目中更具可重用性。
一个重要的注意事项是,当在线程之间传递引用时,线程仍然会对同一对象进行异步访问。良好的多线程设计中的分区将消除这个问题。
历史
- 2010/05/15 - 首个版本。