使用 ProcessQueue 通过多个线程处理对象队列






4.57/5 (8投票s)
ProcessQueue 管理一个线程池来处理强类型对象的队列。
引言
最近,StackOverflow 上的一位用户提出了一个关于使用多个线程处理 Queue<T>
中项目的问题。虽然这个问题的通用解决方案是使用 .NET 的 ThreadPool
机制来排队这些操作,但这类任务有时会很容易超出 ThreadPool
的可行范围;一次排入过多的操作会耗尽 ThreadPool
,而运行时间特别长的操作会垄断 ThreadPool
线程。因此,我着手开发一个线程安全(通过同步)的类,它的操作方式与内置的 Queue<T>
相同,但允许用户为每个项目指定操作,并使用其自己的用户可调线程池来调度这些操作。这就是 ProcessQueue<T>
。
与 FactoryDictionary
一样,ProcessQueue
中包含的大部分代码基本上是对底层 Queue<T>
类的同步传递,因此我在这里不详细介绍。我将重点介绍 ProcessQueue
创建的线程的管理以及它们与用户操作的关系。
线程池化
默认情况下,ProcessQueue
以一个工作线程启动。这适用于异步处理 ProcessQueue
中的项目,当完成时间不如节省资源重要时,或者当 ProcessQueue
中的项目必须按进入顺序处理时。
注意:我想指出一个重要注意事项,它可能已经出现在上一句话中:虽然 ProcessQueue
本身是一个队列(并且通过 Enqueue
和 Dequeue
函数使用它时行为符合预期),但当使用多个工作线程时,不能保证先添加到队列的项目 X 会在项目 Y 之前完成处理。如果完成顺序必须反映队列的顺序,则只能使用一个工作线程。
由于 ProcessQueue
需要能够挂起、恢复和终止其工作线程,因此 WorkerThread
类封装了协调此功能的 WaitHandle
对象的管理。WorkerThread
维护一个 ManualResetEvent
,用于指示它是否应主动检查 ProcessQueue
是否有项目,以及另一个 ManualResetEvent
,用于指示线程中止。相关代码如下所示。
private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent;
private Thread thread;
public void Start()
{
thread.Start();
}
public void Abort()
{
abortEvent.Set();
thread.Join();
}
public void Pause()
{
signalEvent.Reset();
}
public void Signal()
{
signalEvent.Set();
}
ProcessQueue
类使用此处列出的函数来管理线程。
- 创建
WorkerThread
时会调用Start
。 - 当线程需要“唤醒”时(例如,添加项目到队列或增加线程数,并且线程需要立即开始处理),会调用
Signal
。 - 仅当用户手动发出信号让
ProcessQueue
暂停所有处理时(此时会对所有WorkerThread
对象调用该函数),才会调用Pause
。 - 当
ProcessQueue
被释放或调用SetThreadCount
减少工作线程数时,会调用Abort
。
线程过程的主体如下所示。
private void ThreadProc()
{
WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
while (true)
{
switch (WaitHandle.WaitAny(handles))
{
case 0: // signal
{
ProcessItems();
} break;
case 1: // abort
{
return;
}
}
}
}
这是相当典型的无限循环线程主体,它等待两个 WaitHandles
中的一个被设置,然后继续执行。如果设置了信号句柄,则处理队列中的当前项目,然后重置信号句柄并等待其再次被设置。如果设置了中止句柄,则从线程主体返回,线程终止。
private void ProcessItems()
{
T item;
while (queue.TryDequeue(out item))
{
queue.ProcessItem(item);
if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
}
signalEvent.Reset();
}
这引入了 ProcessQueue
类中的一个功能:TryDequeue
函数。如果您熟悉 .NET Framework 中的各种 TryX
函数(各种值类型的 TryParse
、TryGetValue
等),那么它的目的应该很清楚,我将在下一节中介绍它。
使用 WorkerThread
类的一个优点是,线程可以在处理完成后自行休眠,而无需 ProcessQueue
监控 Dequeue
调用来确定何时挂起线程。实际上,ProcessQueue
类内部只维护一个 List<WorkerThread>
,并在需要时激活一个线程。
进程管理
从最终用户的角度来看,Start
、Stop
和 SetThreadCount
是控制 ProcessQueue
中项目处理所必需的唯一调用。
Start
触发处理队列中所有当前项目以及之后添加的任何项目。Stop
暂停任何未处理项目的处理。调用Pause
不会挂起任何正在进行的进程,但它会阻止任何完全未处理的项目(或在调用后添加的项目)被处理。SetThreadCount
的作用顾名思义;它设置ProcessQueue
维护的用于执行并发进程的线程数。请注意,至少必须有一个线程才能进行任何工作,因此小于 1 的值将引发ArgumentOutOfRangeException
。
从 ProcessQueue
的角度来看,它所做的只是激活单个线程或暂停所有线程。它从不需要暂停单个线程。它会在以下情况激活线程:
- 向队列添加新项目(且有不活动的线程)
- 暂停(或首次启动)后恢复处理,并且有未处理的项目
- 调用
SetThreadCount
来增加线程池大小,并且有未处理的项目
public void Enqueue(T item)
{
lock (syncRoot)
{
queue.Enqueue(item);
if (isRunning)
{
RegenerateIfDisposed();
WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
if (firstThread != null) firstThread.Signal();
}
}
}
public void Start()
{
lock (syncRoot)
{
RegenerateIfDisposed();
for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
{
threads[i].Signal();
}
isRunning = true;
}
}
public void SetThreadCount(int threadCount)
{
if (threadCount < 1)
throw new ArgumentOutOfRangeException("threadCount",
"The ProcessQueue class requires at least one worker thread.");
lock (syncRoot)
{
int pending = queue.Count;
for (int i = threads.Count; i < threadCount; i++) // add additional threads
{
WorkerThread thread = new ProcessQueue<t>.WorkerThread(this);
threads.Add(thread);
thread.Start();
if (pending> 1)
{
thread.Signal();
}
pending--;
}
int toRemove = threads.Count - threadCount;
if (toRemove > 0)
{
foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
{
thread.Abort();
threads.Remove(thread);
toRemove--;
}
while (toRemove > 0)
{
WorkerThread thread = threads[threads.Count - 1];
thread.Abort();
threads.Remove(thread);
toRemove--;
}
}
}
}
请注意,SetThreadCount
函数将根据需要添加或删除线程,如果存在任何未处理的项目,并且正在添加线程,那么将向相应数量的线程发出信号。如果正在减少线程数,则先删除未发出信号的线程,然后删除已发出信号的线程,直到所需数量的线程保持活动状态。
如前所述,WorkerThread
类使用已添加到 ProcessQueue
类中的 TryDequeue
方法。此函数尝试从 ProcessQueue
中检索下一个项目,并将其值赋给调用者提供的 out
变量。如果可以获取项目,则函数返回 true
;如果无法获取,则返回 false
。
public bool TryDequeue(out T value)
{
lock (syncRoot)
{
if (queue.Count > 0)
{
value = queue.Dequeue();
return true;
}
else
{
value = default(T);
return false;
}
}
}
采用这种方法可以使所有锁定操作保留在 ProcessQueue
内部,但它可以防止这种情况发生:线程 A 检查 Count
属性并看到一个项目,但线程 B 在线程 A 之前将其出列,而线程 A 仍然尝试出列(导致异常)。通过这种方法,第一个调用该函数的线程将获得该项目(返回值为 true
),第二个线程将返回 false
。
使用该类
至少,使用该类要求您为 ProcessQueue
提供一个操作委托,该委托接受一个参数:即队列旨在保存的类型的实例。例如,如果我需要一个队列来维护文件名列表和一个打开文件、读取内容然后对数据执行某些操作的过程,我会这样做:
private void FileHandler(string fileName)
{
// open the file and do whatever processing is necessary
}
ProcessQueue<string> queue = new ProcessQueue<string>(FileHandler);
这将创建一个 ProcessQueue
来保存字符串对象,其中一个工作线程将通过我的 FileHandler
函数处理任何排队的项。但是,直到我调用 Start
,否则它将什么也不做。
queue.Start();
这会释放工作线程去处理队列中的任何项目,以及稍后可能添加的任何项目。
一旦开始处理,就可以通过调用队列上的 Stop
来停止(或者更准确地说,暂停)它。
queue.Stop();
这将重置所有线程上的 WaitHandle
,导致它们在当前项目完成处理后暂停执行。
可以通过调用 SetThreadCount
随时调整线程池中的线程数,并在 threadCount
参数中指定所需的最大工作线程数。虽然 SetThreadCount
会首先删除不活动的(未发出信号的)线程,但如果必须删除活动的线程,那么 SetThreadCount
将阻塞直到线程完成当前项目的处理。
一旦队列的生命周期结束(或者至少,当应用程序或服务终止时),必须在队列上调用 Dispose
以终止工作线程。Dispose
将阻塞直到每个活动线程完成当前项目的处理。
注意:在应用程序终止之前,必须在 ProcessQueue
上调用 Dispose
,否则工作线程将继续无限期地运行,即使是那些当前未处理项目的线程。
摘要
.NET ThreadPool
提供了一种方便的方式来调度小的代码“块”以异步运行,但过度使用 ThreadPool
可能导致不理想的结果。ProcessQueue
提供了一种相对简单的方法来处理大量项目,即使单个项目需要大量时间来处理。
历史
- 2009 年 12 月 3 日:初始帖子