65.9K
CodeProject 正在变化。 阅读更多。
Home

在 C# 中实现线程安全的邮件队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.76/5 (11投票s)

2020 年 7 月 14 日

MIT

5分钟阅读

viewsIcon

47936

downloadIcon

830

本文探讨了一种用于安全线程间通信的消息传递机制。

引言

多线程是困难的。别无选择。然而,有一些技术是可靠的,并且可以使处理多线程应用程序变得更容易。一种安全地在两个或多个线程之间进行通信的技术是消息队列。另一种是使用所有窗口都可用的 Windows 消息队列。我们将在这里深入探讨这两种技术。

更新:已添加窗口接收器技术

概念化这个混乱的局面

线程间的通信很棘手,因为如果没有某种同步机制,您就无法安全地访问线程之间共享的数据。有许多可供选择的选项,消息队列就是其中之一。

消息队列允许发送者发布消息,然后由另一个线程接收并响应。消息的发布和消息的读取是线程安全的。这样,您就可以通过将消息发送到队列来与其他线程进行通信。

发送者的任务是将命令消息传递给其他线程。这是一个相对简单的操作。我们将消息添加到线程安全的队列(由 ConcurrentQueue<T> 表示),并添加一个机制来在消息可用时高效地通知其他线程。后者由 SemaphoreSlim 提供。可以完全不使用信号量而运行线程,但这会增加 CPU 使用率。但是,根据您在线程中执行的工作类型,这可能适合您的需求。

接收者的任务是循环查找消息。在循环中,我们等待信号量,然后 switch/case 处理消息,执行相应的操作。

消息队列将继续存储消息,直到有线程唤醒并接收它们。因此,队列允许消息堆积。当一个线程无法足够快地响应消息时,这一点很重要。幸运的是,我们的并发队列可以轻松实现这一点。

如果您需要双向通信,您将必须创建一个额外的队列和一个额外的信号量,并在另一个方向上使用它。在这种情况下,每个线程都充当发送者和接收者。

有一个例外是,在 Windows 窗体应用程序中使用此技术的接收者实际上不能是主应用程序线程。原因在于 Windows 窗体已经在主线程上运行了一个应用程序循环,而您无法直接访问它。这就是 Application.Run() 所做的。

还有一种通过传递窗口消息与窗口化线程进行通信的方法。这样做,它的工作方式与此方法类似,但您必须子类化一个本地窗口。在接受参数方面,它确实有些受限,因为它只接受两个整数值,但您始终可以使用 Control.Invoke() 来执行类似的操作。基本上,您处理自定义窗口消息,并依赖 Windows 来维护同步的消息队列。我们可以通过 NativeWindowWndProc() 回调和必须使用 P/Invoke 的 PostMessage() 方法来访问它。

编写这个混乱的程序

无窗口接收器

就多线程应用程序而言,这个相对容易。首先,我们将介绍相关的成员变量。

// these two members constitute our message queue
ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();
SemaphoreSlim _messagesAvailable = new SemaphoreSlim(0);

_messages 队列保存我们的待处理消息,而 _messagesAvailable 信号量用于在有一条或多条消息等待处理时发出信号。

我们还为各种消息定义了常量。

const int MSG_STOP = 0;
const int MSG_RESET = 1;
const int MSG_INCREMENT = 2;
const int MSG_DECREMENT = 3;

接下来,我们看看如何从发送线程传递消息。

// pass the increment message
_messages.Enqueue(new Message(MSG_INCREMENT, null));
// signal messages available
_messagesAvailable.Release(1);

这里,我们有两个步骤。第一步是入队消息。MessageKeyValuePair<int,object[]> 的别名)的第一个参数是消息 ID,第二个参数是消息参数的数组。我们在演示中不使用第二个参数——参数列表。第二步是对信号量调用 Release() 来发出消息可用的信号。

现在让我们看看接收者线程。

var thread = new Thread(() => {
    var done = false;
    while(!done)
    {
        // wait until a message becomes available
        _messagesAvailable.Wait();
        Message msg;
        // process messages
        // we use Try here because we're multithreaded
        // so it's possible that between the Wait() call
        // and the dequeue call the queue may be cleared
        if (_messages.TryDequeue(out msg))
        {
            switch(msg.Key)
            {
                case MSG_STOP: // stop
                    done = true;
                    break;
                case MSG_RESET: // reset
                    _counter = 0;
                    break;
                case MSG_INCREMENT: // +
                    ++_counter;
                    break;
                case MSG_DECREMENT: // -
                    --_counter;
                    break;
            }
            // signal the main thread to update the counter display
            PostMessage(_uiReceiver.Handle,WM_USER,_counter,0);
            // an alternative that doesn't require all the window
            // messaging stuff is:
            // Invoke(new Action(delegate 
            //       { CounterLabel.Text = "Count: " + _counter.ToString(); }));
        }
    }
});
// now that we've created the thread
// stash it so it doesn't go out of scope
_receiver = thread;
thread.Start();

请注意,我们可以使用 Control.Invoke() 来安全地更新显示。这只是一个快速而粗糙的向下传递到主线程的方法。但是,我们正在使用通过 PostMessage() 访问的窗口化消息传递技术。我们稍后将对其进行探讨。

请注意,这里的关键是循环,等待信号量,然后尝试出队一条消息,之后通过消息 ID 进行 switch 判断该做什么。

这就是全部!现在您可以使用此技术安全地在线程之间进行通信。还有一种技术可以探索。

窗口化接收器

如果您试图在 Windows 窗体应用程序的主 UI 线程上接收消息,则此方法是必需的,原因如前所述。我们所做的是子类化一个 NativeWindow,然后使用 PostMessage() 与之通信,让 Windows 自己处理同步。

这是我们需要进行的 P/Invoke 声明和常量。

[DllImport("user32.dll")]
static extern bool PostMessage(IntPtr hWnd, uint Msg, int wParam, int lParam);
const int WM_USER = 0x0400;

WM_USER 常量是我们必须使用的自定义窗口消息,以避免与系统消息 ID 冲突。您可以使用 WM_USERWM_USER+1、WM_USER+2,一直到 WM_USER+0x7FFF。同时,PostMessage() 允许我们异步地将消息发送到窗口。

这是 NativeWindow 的子类。

// this class handles our native window receiver 
// incoming messages
private class _NativeReceiver : NativeWindow
{
    Main _main;
    public _NativeReceiver(Main main)
    {
        _main = main;
    }
    protected override void WndProc(ref System.Windows.Forms.Message m)
    {
        switch(m.Msg)
        {
            case WM_USER:
                // update the UI
                _main.CounterLabel.Text = "Count: "+ m.WParam.ToInt32();
                break;
        }
                
        base.WndProc(ref m);
    }
}

我们所做的就是等待 WM_USER 消息,然后将计数器标签设置为消息参数 WParam 的值。所有这些都发生在 UI 线程上。请注意,WParamLParam 在这里是 IntPtr,但在 PostMessage() 中它们是 int。这没关系,只需将 IntPtr 转换为 int 即可。

还有一件事需要介绍的是我们在主窗体的构造函数中执行的接收器的设置。

// we must create a native window in
// order to receive custom window messages
// the thread it operates on is always 
// the thread it was created on. Here
// it's the UI thread.
_uiReceiver = new _NativeReceiver(this);
// make sure the handle is created 
// because we need it to subclass
var p = new CreateParams();
_uiReceiver.CreateHandle(p);

最后,对 PostMessage() 的调用非常简单,但缺点是它们只接受两个整数参数。幸运的是,我们不需要更多,但如果您需要,您就需要更有创意。无论如何,这是 PostMessage() 的调用。

// signal the main thread to update the counter display
PostMessage(_uiReceiver.Handle,WM_USER,_counter,0);

您可能在我们的无窗口接收器中之前就注意到了它。您可以看到我们正在使用第一个整数参数,仅此而已。这就是将消息发送到 _NativeReceiver.WndProc()

就这样!您现在有了两种线程间通信技术。

演示应用程序

演示中有 4 个按钮:开始/停止重置递增 (+) 和递减 (-)。消息的处理要等到单击开始后才会进行。重置会重置计数器。递增 (+) 和递减 (-) 分别将计数器增加或减少一。您可以积累消息,方法是停止处理,然后使用开始启动它,此时所有待处理的消息都将被处理。其中每一个都需要进行双向线程间通信才能实现。

历史

  • 2020 年 7 月 14 日 - 初始提交
  • 2020 年 7 月 14 日 - 添加了窗口化接收器
© . All rights reserved.