线程通信和线程间调用的简单阻塞队列






4.98/5 (20投票s)
一个泛型类,可用作线程之间的数据和任务的传送带
我想提供关于线程通信的建议,因为我已经为 CodeProject 成员提供了几个关于线程使用的答案。我将我的答案发布到网站的“问答”部分;通常我的回答都很好。唯一的问题是,许多问题导致我一遍又一遍地重复几乎相同的想法。我需要将一些代码和解释放在一个永久的地方并参考它。同时,我提供的技术可能不符合整篇文章的格式。所以,我认为“技巧/诀窍”的形式可以很好地满足我的目的。
这是最典型的情况。开发人员创建一些 UI,最常见的是使用 System.Windows.Forms
;此时 WFP 较少见。一些 UI 操作需要很长的处理时间;这就是 UI 事件的处理效果不佳的地方。在这种情况下,一些开发人员不知道该怎么做,一些人使用线程,但尝试重复创建线程,使用后台工作者,创建常规线程或来自线程池的线程。在简单的情况下,这已经足够了,但在大量处理和更复杂的逻辑中,问题是线程的总数变得不可预测。当然,重复创建线程然后放弃线程的额外成本很容易导致性能下降,更重要的是,导致清理问题(以及通常的支持问题)。
在许多这样的情况下,我建议创建固定数量的线程并永久保持它们运行,在需要时将一些任务发布到这样的线程。相对难以解释的是如何在任务之间将线程保持在等待状态(浪费零 CPU 时间)。解释为什么自旋等待是错误的也需要一些耐心。
另一个看起来不同的问题是渴望实现类似于 System.Windows.Forms.Control.Invoke
或 System.Windows.Threading.Dispatcher
的行为。关于这个类有一个流行的误解,由方法 System.Windows.Threading.Dispatcher.GetCurrectDispatcher
引起。它始终有效,但只有在目标线程以特殊方式设计时,才能在目标线程上调用 Invoke
方法。实际上,如果目标线程是在 Window.Form.Application
或 WPF 应用程序的 UI 循环中创建的特殊设计的 UI 线程,则会发生这种线程间处理。问题是 Dispatcher
类是密封的。所以,问题是如何为自定义线程提供类似的功能。
上面描述的两组问题可以使用封装在用作调用目标的线程中的某些队列来解决。首先,我创建了一个简单的阻塞队列类,以提供线程之间的数据传输和同步
using System;
using System.Threading;
using System.Collections.Generic;
public class DataQueue<ITEM> {
public DataQueue() { this.IsBlocking = true; }
public DataQueue(bool blocking) { this.IsBlocking = blocking; }
public EventWaitHandle WaitHandle { get { return FWaitHandle; } }
public void Close() {
this.FWaitHandle.Set();
} //Close
public void Submit(ITEM item) {
lock (LockObject)
Queue.Enqueue(item);
FWaitHandle.Set();
} //Submit
public ITEM GetMessage() {
if (IsBlocking)
FWaitHandle.WaitOne();
ITEM result = default(ITEM);
lock (LockObject) {
if (Queue.Count > 0)
result = Queue.Dequeue();
if (IsBlocking && (Queue.Count < 1))
FWaitHandle.Reset();
return result;
} //lock
} //GetMessage
public void Clear() {
lock (LockObject)
Clear(Queue);
} //Clear
public void Clear(Func<ITEM, bool> removalCriterion) {
lock (LockObject)
Clear(Queue, removalCriterion);
} //Clear
#region implementation
static void Clear(Queue<ITEM> queue) { queue.Clear(); }
static void Clear(Queue<ITEM> queue, Func<ITEM, bool> removalCriterion) {
if (removalCriterion == null) {
queue.Clear();
return;
} //if
Queue<ITEM> copy = new Queue<ITEM>();
while (queue.Count > 0) {
ITEM item = queue.Dequeue();
if (!removalCriterion(item))
copy.Enqueue(item);
} //loop
while (copy.Count > 0)
queue.Enqueue(copy.Dequeue());
} //Clear
bool IsBlocking;
Queue<ITEM> Queue = new Queue<ITEM>();
//SA!!! important ManualReset.
//See GetMessage for re-setting
EventWaitHandle FWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
Object LockObject = new object();
#endregion implementation
} //DataQueue
我建议几乎在所有情况下都使用阻塞操作,否则该机制的行为将类似于自旋等待,导致不必要的 CPU 滥用。
这是一个目标线程和用法的简单示例,一个控制台应用程序
using System;
using System.Threading;
class QueuedThread {
internal QueuedThread() { this.Thread = new Thread(Body); }
internal void Start() { this.Thread.Start(); }
internal void Abort() { //be extra careful!
this.Queue.Close();
this.Thread.Abort();
} //Abort
internal void PostMessage(string message) {
this.Queue.Submit(message);
} //void PostMessage
#region implementaion
void Body() {
try {
while (true) {
string message = Queue.GetMessage();
//safe way to exit, not alway possible
if (message == null)
break;
ProcessMessage(message);
} //loop
} catch (ThreadAbortException) {
//Very important in case Abort is used
//Provide last-will processing here
//even if Abort is not a designed option
} catch (Exception e) { //always needed
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
} //exception
} //Body
void ProcessMessage(string message) {
Console.WriteLine("message Processed {0}", message);
} //ProcessMessage
Thread Thread;
DataQueue<string> Queue = new DataQueue<string>();
#endregion implementaion
} //class QueuedThread
class Program {
void Run() {
QueuedThread t = new QueuedThread();
t.Start();
t.PostMessage("first");
Thread.Sleep(400);
t.PostMessage("second");
Thread.Sleep(400);
t.PostMessage("third");
Console.WriteLine();
Console.WriteLine("Finished. Press any key...");
Console.ReadKey(true);
//safest way of exiting of the thread,
//unfortunatelly, not applicable to 100% cases:
t.PostMessage(null);
//less safe way, need extra care in thread implementation,
//safe enough for this sample though:
//t.Abort();
} //Run
static void Main(string[] args) {
new Program().Run();
} //Main
} //class Program
我不想深入探讨与接受 Abort
方法相关的相对复杂的问题(请参见示例代码)。有时,这样的讨论会导致激烈的争论。最终决定应由想要使用我的建议的开发人员(以及通常,任何使用线程的开发人员)做出。无论如何,当使用数据队列时,可以使用特殊的“终止”消息(例如,在我上面的简单示例中为空字符串)轻松避免 Abort。
最后,人们也可以使用委托来实现处理任务,而不仅仅是数据元素
以上可能看起来不够灵活。线程需要知道如何处理数据项;因此 ProcessMessage
只是一个固定的方法。这是类似于 Dispatcher
的线程间调用发挥作用的时候。
那么,委托调用怎么样?嗯,没有任何东西阻止将委托添加到 ITEM
类型。这只是稍微复杂一些。首先要理解的是,也应该提供委托调用的参数(或参数)。这是一个最小的示例,同样是一个控制台应用程序
using System;
using System.Threading;
class DelegateInvocationThread {
class ActionWithParameter<PARAMETER> {
internal ActionWithParameter(Action<PARAMETER> action, PARAMETER parameterValue) {
this.Action = action;
this.Parameter = parameterValue;
} //ActionWithParameter
internal static ActionWithParameter<PARAMETER> ExitAction { get { return FExitAction; } }
internal void Invoke() {
if (Action != null)
Action(Parameter);
} //Invoke
internal bool ExitCondition { get { return Action == null || Parameter == null; } }
Action<PARAMETER> Action;
PARAMETER Parameter;
static ActionWithParameter<PARAMETER> FExitAction =
new ActionWithParameter<PARAMETER>(null, default(PARAMETER));
} //ActionWithParameter
internal DelegateInvocationThread() { this.Thread = new Thread(Body); }
internal void Start() { this.Thread.Start(); }
internal void Invoke(Action<string> action, string parameter) {
Queue.Submit(new ActionWithParameter<string>(action, parameter));
} //Invoke
internal void InvokeExit() {
Queue.Submit(ActionWithParameter<string>.ExitAction);
} //InvokeExit()
internal void Abort() { //be extra careful!
this.Queue.Close();
this.Thread.Abort();
} //Abort
#region implementaion
void Body() {
try {
while (true) {
ActionWithParameter<string> action = Queue.GetMessage();
//safe way to exit, not alway possible
if (action.ExitCondition)
break;
action.Invoke();
} //loop
} catch (ThreadAbortException) {
//Very important in case Abort is used
//Provide last-will processing here
//even if Abort is not a designed option
} catch (Exception e) { //always needed
Console.WriteLine("{0}: {1}", e.GetType().Name, e.Message);
} //exception
} //Body
void ProcessMessage(string message) {
Console.WriteLine("message Processed {0}", message);
} //ProcessMessage
Thread Thread;
DataQueue<ActionWithParameter<string>> Queue =
new DataQueue<ActionWithParameter<string>>();
#endregion implementaion
} //class DelegateInvocationThread
class Program {
void Run() {
DelegateInvocationThread t = new DelegateInvocationThread();
t.Start();
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #1, parameter: {0}", value));
}, "first");
Thread.Sleep(400);
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #2, parameter: {0}", value));
}, "second");
Thread.Sleep(400);
t.Invoke(
delegate(string value) {
Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
}, "third");
Console.WriteLine();
Console.WriteLine("Finished. Press any key...");
Console.ReadKey(true);
//safest way of exiting of the thread,
//unfortunatelly, not applicable to 100% cases:
t.InvokeExit();
//less safe way, need extra care in thread implementation,
//safe enough for this sample though:
//t.Abort();
} //Run
static void Main(string[] args) {
new Program().Run();
} //Main
} //class Program
我展示了可以使用 C# v.2 或更高版本编译的匿名委托语法。使用 C# v.3 也可以使用匿名委托的 lambda 形式
t.Invoke(
new Action<string>((value) => {
Console.WriteLine(string.Format("Method #3, parameter: {0}", value));
}), "third");
这个想法可以进一步用于在同一个目标线程中组合多个队列。警告是:不能在同一个线程中使用多个 DataQueue
实例:如果使用阻塞操作,它们会相互阻塞。修改应在 DataQueue
类内部进行。同时,使用我提供的技术可以轻松实现这种泛化。
谢谢。
- — Sergey A Kryukov