使用消息传递的线程池和任务队列演示






4.47/5 (9投票s)
如何实现复杂的多线程消息传递场景
引言
本文旨在向您展示如何使用消息传递进行同步和线程池进行高效线程分配来实现复杂的**多线程**场景。虽然我们可以使用 `ThreadPool` 和 `Task` 来完成大部分甚至全部工作,但这并不一定意味着使用消息传递。同时,我所阐述的概念和技术适用于大多数编程语言,不像任务框架和内置线程池那样。这主要关乎概念,而不是生产级别的代码。
概念化这个混乱的局面
消息传递是一种行之有效且安全的方式,用于在线程之间进行通信。基本上,它的工作原理是建立一个线程安全的队列,用于对消息进行排队。客户端调度程序和工作线程都可以访问此队列。客户端调度程序主要负责将消息排队,而工作线程则从队列中提取消息。
如果我们需要双向通信,我们可以建立第二条反向队列。我们在演示中就是这样做的,以便将工作项的进度等信息报告给客户端。
此外,为了防止系统调度程序过载或频繁创建和销毁线程,最好限制使用的线程数量并对已有的线程进行池化/回收。本项目也处理了这一点。
消息传递
首先,队列就是一种集合,它在队尾添加项,在队头移除项。移除项时,您可以检查它。基本上,它就是一个集合。我们使用的这个集合恰好可以安全地从不同线程调用,这一点至关重要。我们使用这个队列来保存待处理的消息,供工作线程处理。有一个工作消息队列供所有工作线程共享。
其次,信号量(semaphore)是一种同步工具,它包含一个计数。每次线程在信号量上完成一次 `Wait()` 调用时,计数就会减一;每次线程调用 `Release()` 时,计数就会加一。除非计数非零,否则 `Wait()` 调用将阻塞。我们用它来在队列中有零个或多个消息等待时进行信号通知。信号量的计数就是队列中消息的数量。这样,我们的工作线程就可以通过等待(使用 `Wait()`)来休眠,直到有消息到达。与消息队列一样,所有工作线程共享同一个信号量。
任何时候,只要我们需要完成某项任务,并且该任务是从另一个线程请求的,我们就需要发送一条消息。我们有工作线程和客户端。客户端的主要任务是调度给工作线程。为了将消息传递给正在运行的工作线程,客户端必须对 `WorkerMessage` 进行 `Enqueue()` 操作,并对信号量调用 `Release(1)` 以通知有等待的消息,这将增加信号量的计数。每个空闲的工作线程都在等待信号量计数变为非零。一旦变为非零,下一个等待信号量的线程就会被唤醒。它接收消息并将其从队列中移除以进行处理。
稍微复杂一点的是,工作线程也可以向客户端发送消息。这是为了让我们能够收到各种通知,例如工作消息正在处理或已完成,或者工作消息的进度。因此,客户端也必须像工作线程一样处理自己的消息。
最后,每条消息都有一个 `Id`,唯一标识它。它通常在创建时分配,类似于数据库的自动编号功能。我们稍后使用它,以便在工作线程发送回进度等信息时,可以引用相关的工作消息。这样,我们就可以通过 `Id` 将进度与特定消息关联起来。
在此实现中有两种消息形式——一种是给工作线程的消息,另一种是给客户端的消息。给工作线程的消息更为详尽,因为它(通常)包含工作线程的 ID,并且始终包含消息 ID,而客户端消息则不跟踪这些,因为它们不需要。每种消息都接受一个参数和一个命令 ID,告诉它做什么,尽管客户端消息只是使用 `KeyValuePair
应该注意的是,在某些情况下,客户端会向自己发布消息。这是为了让发布消息的方法保持线程安全。因此,例如 `DispatchWorkerMessage()` 方法是线程安全的。如果我们没有向自己发布消息,那么调度将不是线程安全的。
线程池
为了有效地利用 CPU,我们分配可配置数量的工作线程,默认值为总核心数减一,最小值为一。每个工作线程会一直循环,直到收到停止消息,这会使其保持活动状态,直到我们指示它关闭。当没有可用的工作线程来处理请求时,我们就会启动一个新的工作线程。这会排除当前正在忙碌的线程,所以如果所有线程都在忙碌,并且可以创建新线程,它就会创建一个新线程。能否创建取决于我们是否已达到线程使用配额。如果无法分配新的工作线程,我们则将消息排队,供下一个可用的工作线程拾取。为了实现这一点,我们跟踪空闲工作线程的数量和总线程数。
从技术上讲,由于 Windows 窗体应用程序的限制(它已经在其 UI 线程之外启动了自己的循环),您不能在客户端线程的同一线程上使用 UI 线程,因此我们应该减去额外的核心,从而减去一个默认池大小的工作线程,但在演示中不影响。在控制台应用程序或 Windows 服务中,您可以在主线程中(分别在 `Main()` 或 `OnStart()` 中)启动客户端循环,并在其中执行客户端逻辑。我建议尽可能这样做。有一种方法可以使用 WinForms UI 线程,但它充满限制和复杂性。如果您确实想这样做,可以参考这篇文章中的代码。您需要让工作线程使用窗口消息而不是我们上面的消息队列来与客户端运行的 UI 线程进行通信。
**我们分配工作线程数量为核心数减一的原因是,我们不想给调度器带来压力,而且我们的客户端有自己的线程,不计入工作线程。
如果您仔细观察,您会注意到,在关键时刻,即使无法创建更多工作线程,工作线程仍然可以拥有排队的消息。正如我所说,一旦有工作线程可用,它们就会被执行。您增加的最大消息数越多,您的应用程序并行性就越强,但超过默认最大值后,您将不会获得任何性能提升——或者至少不应该。
同步原语
除了消息传递,我们还使用 `Interlocked` 来安全地修改成员和静态成员,这两者都可能从不同线程访问。我们主要将其用于统计信息,例如待处理消息的数量或可用工作线程的数量。这样做的目的是确保这些属性是线程安全的。
用户界面
用户界面通过几种方式进行工作——它使用计时器从我们的客户端轮询统计信息,并且它还响应由客户端触发的进度事件。这样可以保持 UI 的最新状态。每个任务的进度都显示在全局统计信息下方。它使用进度事件将消息 `Id` 与关联的进度条联系起来。每当我们调度消息时,就会返回该 ID。每当我们收到进度事件时,我们就会检查该 ID,并使用一个将消息 `Id` 映射到 `WorkerProgressControl` 的字典,其中包含我们的进度条。请注意,`WorkerMessageProgress` 事件不是在 UI 线程上触发的,而是在客户端线程上触发的。因此,我们基本上使用了 `Control` 内置的另一个同步机制来处理该事件。
编写这个混乱的程序
工作线程
`Worker` 是两者中更简单的一个,它将介绍消息处理的概念,所以我们将从那里开始。
工作线程的任务是分派队列中传入的消息,并根据收到的命令执行不同的操作。我们通过启动一个 `while` 循环来实现这一点,在该循环中,我们在最终切换消息的 `CommandId` 来决定做什么之前,会先在客户端的 `_messagesAvailable` 类成员(一个 `SemaphoreSlim` 对象)上 `Wait()`。
这一切都发生在 `Start()` 方法中,该方法通过旋转上述循环来阻塞。每当我们创建一个新的工作线程时,我们都会从一个新的线程中调用 start,如下所示:
new Thread(() => { worker.Start(); }).Start();
同时,这是 `Start()` 方法:
// spin the main loop to keep the
// thread alive and processing
// messages.
var done = false;
while(!done)
{
// wait until a message becomes available
_messagesAvailable.Wait();
WorkerMessage smsg;
// check again just to be sure there's still
// something there
if(!done && _messages.TryDequeue(out smsg))
{
// tell the client we received a message
_client.PostMessage(new ClientMsg(CLIMSG_MESSAGE_RECEIVED,
new WorkerMessage(smsg.Id, Id, smsg.CommandId, smsg.Argument)));
// TODO: Add your own messages here
// you may want to make consts for
// them
switch(smsg.CommandId)
{
// TODO: replace below with
// actual work
case MSG_WORK: // do work
// signal start of work
_client.PostMessage(new ClientMsg(CLIMSG_PROGRESS,
new KeyValuePair<WorkerMessage, float>(smsg, 0f)));
// simulate work:
for (var i = 0;i<50;++i)
{
Thread.Sleep(100);
// report some progress
_client.PostMessage(new ClientMsg(CLIMSG_PROGRESS,
new KeyValuePair<WorkerMessage,float>(smsg,(i+1)/50f)));
}
break;
case MSG_STOP: // shut down
// signalling shut down is simple
done = true;
break;
}
// tell the client we processed a message
_client.PostMessage(new ClientMsg(CLIMSG_MESSAGE_COMPLETE,
new WorkerMessage(smsg.Id,Id,smsg.CommandId,smsg.Argument)));
}
}
`Client.PostMessage()` 将消息添加到客户端的队列中,并增加客户端的消息信号量,表示有消息可用。我们使用它来方便地从任务客户端获取事件,例如 `WorkerMessageProgress`。`Client` 和 `Worker` 都有一个异步且线程安全的方法 `PostMessage()`。它们对于消息队列操作至关重要。
有一种方法不需要客户端拥有消息队列,但每个工作线程都必须自己更新 UI。这种方法有一个优点,那就是它避免了客户端潜在的瓶颈,因为客户端突然需要花费时间来处理消息。然而,我们所采取的方法的优点是它实现了双向通信,这是我想在这里说明的,但它确实需要客户端启动一个类似上述的消息循环。我们很快就会探讨它。
消息循环的实现方式是强制客户端线程仅在收到消息时“唤醒”。这样,它的行为就像一个 `worker` 一样,在收到消息之前保持空闲状态。这可能不是您需要的。我们稍后将讨论更改此行为。
无论哪种方式,这里的想法都是用您自己的长时间运行的工作来替换 `for` 循环及其内部的所有内容,最好如上所示定期传递进度。
请注意,当我们发布 `CLIMSG_MESSAGE_COMPLETE` 时,它会导致客户端引发 `WorkerMessageComplete` 事件。另请注意,我们实际上是根据旧的 `WorkerMessage` 创建了一个新的 `WorkerMessage`,并将几个项传递给了构造函数。这是因为当您使用 `DispatchWorkerMessage()` 时,您传递的 `WorkerMessage` 将不会关联工作线程的 `Id`,因为在消息被工作线程处理之前您没有 `Id`。由于 `struct` 是只读的,我们通过重建它并添加工作线程的 `Id` 来实现。
工作线程中其他重要的部分是初始化。它需要对客户端的引用,以及工作线程消息队列的两个组件。
public Worker(Client client,SemaphoreSlim messagesAvailable,
ConcurrentQueue<WorkerMessage> messages)
{
_id = _NextWorkerId;
Interlocked.CompareExchange(ref _NextWorkerId, 0, int.MaxValue);
Interlocked.Increment(ref _NextWorkerId);
_client = client;
_messagesAvailable = messagesAvailable;
_messages = messages;
}
关于 `_NextWorkerId` 的事情是,它跟踪每个创建的消息可能的新 ID。它是一个 `static` 成员,每次创建 `Worker` 时都会以线程安全的方式递增。它会跳过零,因为我们不想要零 `Id`,仅仅是因为我们不想。
客户端
客户端的职责比工作线程多得多,因此也复杂得多。它必须调度消息、管理线程池并跟踪可用工作线程等统计信息。其中,线程池是最复杂的。让我们通过探索 `Start()` 方法来解决这个问题,该方法与工作线程的同名方法类似。它会阻塞并等待消息,直到收到 `Stop()` 消息。让我们看一下。
// spin the loop
var done = false;
while (!done)
{
// wait for an incoming message
_messagesAvailable.Wait();
ClientMsg climsg;
if (_messages.TryDequeue(out climsg))
{
// TODO: Add your own messages here
// you may want to make consts for
// them
switch (climsg.Key)
{
// a worker has received a message and has
// just started processing it. The single
// parameter (not used) is the message
case MSG_MESSAGE_RECEIVED:
// increment the pending message count
Interlocked.Increment(ref _pendingWorkerMessageCount);
// now a worker is busy so decrease our available
// worker count
Interlocked.Decrement(ref _availableWorkerCount);
break;
// a worker has completed processing a message
case MSG_MESSAGE_COMPLETE:
// decrease the pending worker count
Interlocked.Decrement(ref _pendingWorkerMessageCount);
var wrkmsg = (WorkerMessage)climsg.Value;
if (WMSG_STOP == wrkmsg.CommandId)
{
// if the worker sent sent a stop message
// decrement the worker count
Interlocked.Decrement(ref _workerCount);
}
else // otherwise increment the available workers
Interlocked.Increment(ref _availableWorkerCount);
// raise the completed event
WorkerMessageComplete?.Invoke
(this, new WorkerMessageCompleteEventArgs(wrkmsg));
break;
// a worker has progress to report
case MSG_PROGRESS:
var arg = (KeyValuePair<WorkerMessage, float>)climsg.Value;
WorkerMessageProgress?.Invoke
(this, new WorkerProgressEventArgs(arg.Key, arg.Value));
break;
case MSG_DISPATCH: // dispatch a message to a worker
// this message is invoked by DispatchWorkerMessage()
// we handle it here to keep that message thread safe
// if there are not available workers...
if (0 == _availableWorkerCount)
{
// and we haven't met our thread quota...
if (_workerCount < _maxWorkerCount)
{
// create a new worker
Interlocked.Increment(ref _workerCount);
Interlocked.Increment(ref _availableWorkerCount);
var ts = new Worker(this, _workerMessagesAvailable, _workerMessages);
// start it
new Thread(() => { ts.Start(); }).Start();
// send it the message we received from the
// DispatchWorkerMessage()
_PostWorkerMessage((WorkerMessage)climsg.Value);
}
else
{
// we have to queue to an
// already busy worker.
_PostWorkerMessage((WorkerMessage)climsg.Value);
}
}
else
{
// there's a worker available
// just post to it
_PostWorkerMessage((WorkerMessage)climsg.Value);
}
break;
// called in response to Stop()
case MSG_STOP:
// we need to make sure we send as many stop messages as there are workers
for (var i = 0; i < _workerCount; ++i)
{
// increment the pending message count
Interlocked.Increment(ref _pendingWorkerMessageCount);
// post stop to the workers
_PostWorkerMessage(new WorkerMessage(WMSG_STOP, null));
}
// TODO: Wait until all work stops
done = true;
break;
}
}
}
希望注释能够清楚地说明它的作用。`_PostWorkerMessage()` 类似于 `PostMessage()`,它将消息添加到队列中并增加关联信号量的计数。唯一的区别是 `PostMessage()` 处理客户端队列,而 `_PostWorkerMessage()` 处理工作线程队列。我们所做的很多事情都是为了统计信息的簿记。请注意,我们大量使用 `Interlocked`。这样,即使它们可以从另一个线程访问,我们也能安全地设置值。
请注意,当我们处理 `MSG_STOP` 时,我们会为每个工作线程转发一个 `WMSG_STOP` 消息。这样它们都可以优雅地退出。
还记得我说的关于更改线程行为,使其在收到消息之前不会休眠的事情吗?假设您已经在线程上运行一个密集循环,正在进行一些处理,并且该线程无法等待消息。只需删除与队列关联的信号量,并移除与其关联的 `Wait()` 和 `Release()` 调用。只需将循环设置为在没有信号量的情况下运行。如果您的线程还在等待其他事情,这效果就不那么好了,因为在等待期间,它无法处理更多消息。
用户界面
在用户界面中,我们提供了排队消息(工作项)执行一些伪“工作”、检查整体统计信息、设置池中的最大线程数以及查看每个排队工作项进度的功能。
我们使用 `WorkerProgressControl` `UserControl` 来创建一个简单的进度条,左侧有一个标签。进度条随控件的大小而变化。控件本身有一个自定义构造函数,接受一个消息 ID(UI 称之为任务 ID),然后将其显示在其左侧。应该注意的是,进度条在第一次设置进度 `Value` 之前是不可见的。在其后面有一个标签,告诉用户它已排队并正在等待。这是因为它启动时是排队等待状态,我们希望它告知用户这一点。
在 `Main` `Form` 上,我们使用计时器来轮询整体统计信息,而不是事件。实际这样做更简单,而且效果很好。我们还有一个控件来编辑我们可以拥有的工作线程数量。最后,我们在底部有一个 `Panel`,它随着窗体的缩放而缩放并自动滚动。当每个项被排队时,它就会被填充新的 `WorkerProgressControl`,这些控件会被停靠在面板中。一个将消息 ID 映射到进度控件的字典会在每个项排队时被添加到其中。
当 `_client` 引发 `WorkerMessageProgress` 事件时,我们在主线程上使用 `Control.BeginInvoke()` 来处理该事件,以便为了安全起见,我们可以与 UI 本身的线程进行交互。我们使用之前的字典来匹配传入的 `Id` 到一个控件,然后更新关联控件的 `Value`。
我们需要处理的一个奇怪情况是减少最大工作线程数。为了实现这一点,我们必须停止工作线程。例如,如果我们从五个工作线程减少到三个,我们必须在下一个机会(当两个工作线程空闲时)停止两个工作线程。这就是 `Client.DeallocateWorkers()` 所做的。
首先,在用户界面中,我们有主窗体的构造函数代码。
// designer support:
InitializeComponent();
// we allow the format string
// to be set in the designer
// this stores it for later
// because we change the text
// of the labels
_currentWorkersFmt = CurrentWorkersLabel.Text;
_waitingWorkItemsFmt = WaitingWorkItemsLabel.Text;
_pendingWorkItemsFmt = PendingWorkItemsLabel.Text;
_availableWorkersFmt = AvailableWorkersLabel.Text;
// hook the client's progress reporting
_client.WorkerMessageProgress += _client_WorkerMessageProgress;
// set the max worker box to the default max worker count
MaximumWorkersUpDown.Value = _client.MaximumWorkerCount;
// this is so we know when we've decreased the max workers
_oldMaximumWorkerCount = _client.MaximumWorkerCount;
// start the client thread so it will process messages
new Thread(() => { _client.Start(); }).Start();
// start the timer to keep the UI fresh
StatusTimer.Enabled = true;
注释应该能让事情变得清楚。请记住,我们必须在线程中启动一个消息循环来处理工作线程消息。我们必须对客户端也做类似的事情。这就是上面的 `Start()` 所做的。
任何时候我们收到进度消息,我们都必须使用 `BeginInvoke()` 来安全地访问主 UI 线程上的控件,因为我们在此事件内部的线程不同。我们所做的就是检查字典,然后更新关联的进度条。
BeginInvoke(new Action(delegate () {
WorkerProgressControl wpc;
if(_progressMap.TryGetValue(args.Id,out wpc))
{
wpc.Value = args.Progress;
}
}));
使用计时器事件更新 UI 是微不足道的——不需要同步,因为事件在主 UI 线程上触发——这里,我们使用了从设计器在窗体启动时获得的格式字符串。
CurrentWorkersLabel.Text = string.Format(_currentWorkersFmt, _client.WorkerCount);
WaitingWorkItemsLabel.Text = _
string.Format(_waitingWorkItemsFmt, _client.WaitingWorkerMessageCount);
PendingWorkItemsLabel.Text = _
string.Format(_pendingWorkItemsFmt, _client.PendingWorkerMessageCount);
AvailableWorkersLabel.Text = _
string.Format(_availableWorkersFmt, _client.AvailableWorkerCount);
最后,当我们单击 **Enqueue** 按钮时,我们必须将一个工作消息分派给工作线程,然后将一个新的 `WorkerProgressControl` 添加到我们之前的 `Panel` 中。请注意,`DispatchWorkMessage()` 返回新创建消息的 `Id`。
var id = _client.DispatchWorkerMessage(new WorkerMessage(MSG_WORK,null));
var wpc = new WorkerProgressControl(id);
// add the id to control mapping:
_progressMap.Add(id, wpc);
ProgressPanel.SuspendLayout();
ProgressPanel.Controls.Add(wpc);
wpc.Dock = DockStyle.Top;
ProgressPanel.ResumeLayout(true);
改进这一点的一种方法是任务优先级,但这会增加项目的复杂性。
历史
- 2020 年 7 月 16 日 - 首次提交