65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 ProcessQueue 通过多个线程处理对象队列

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.57/5 (8投票s)

2009年12月3日

CPOL

7分钟阅读

viewsIcon

64660

downloadIcon

1777

ProcessQueue 管理一个线程池来处理强类型对象的队列。

引言

最近,StackOverflow 上的一位用户提出了一个关于使用多个线程处理 Queue<T> 中项目的问题。虽然这个问题的通用解决方案是使用 .NET 的 ThreadPool 机制来排队这些操作,但这类任务有时会很容易超出 ThreadPool 的可行范围;一次排入过多的操作会耗尽 ThreadPool,而运行时间特别长的操作会垄断 ThreadPool 线程。因此,我着手开发一个线程安全(通过同步)的类,它的操作方式与内置的 Queue<T> 相同,但允许用户为每个项目指定操作,并使用其自己的用户可调线程池来调度这些操作。这就是 ProcessQueue<T>

FactoryDictionary 一样,ProcessQueue 中包含的大部分代码基本上是对底层 Queue<T> 类的同步传递,因此我在这里不详细介绍。我将重点介绍 ProcessQueue 创建的线程的管理以及它们与用户操作的关系。

线程池化

默认情况下,ProcessQueue 以一个工作线程启动。这适用于异步处理 ProcessQueue 中的项目,当完成时间不如节省资源重要时,或者当 ProcessQueue 中的项目必须按进入顺序处理时。

注意:我想指出一个重要注意事项,它可能已经出现在上一句话中:虽然 ProcessQueue 本身是一个队列(并且通过 EnqueueDequeue 函数使用它时行为符合预期),但当使用多个工作线程时,不能保证先添加到队列的项目 X 会在项目 Y 之前完成处理。如果完成顺序必须反映队列的顺序,则只能使用一个工作线程。

由于 ProcessQueue 需要能够挂起、恢复和终止其工作线程,因此 WorkerThread 类封装了协调此功能的 WaitHandle 对象的管理。WorkerThread 维护一个 ManualResetEvent,用于指示它是否应主动检查 ProcessQueue 是否有项目,以及另一个 ManualResetEvent,用于指示线程中止。相关代码如下所示。

private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent; 
private Thread thread;

public void Start()
{
    thread.Start();
}

public void Abort()
{
    abortEvent.Set();

    thread.Join();
}

public void Pause()
{
    signalEvent.Reset();
}

public void Signal()
{
    signalEvent.Set();
}

ProcessQueue 类使用此处列出的函数来管理线程。

  • 创建 WorkerThread 时会调用 Start
  • 当线程需要“唤醒”时(例如,添加项目到队列或增加线程数,并且线程需要立即开始处理),会调用 Signal
  • 仅当用户手动发出信号让 ProcessQueue 暂停所有处理时(此时会对所有 WorkerThread 对象调用该函数),才会调用 Pause
  • ProcessQueue 被释放或调用 SetThreadCount 减少工作线程数时,会调用 Abort

线程过程的主体如下所示。

private void ThreadProc()
{
    WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };

    while (true)
    {
        switch (WaitHandle.WaitAny(handles))
        {
            case 0: // signal
                {
                    ProcessItems();
                } break;
            case 1: // abort
                {
                    return;
                }
        }
    }
}

这是相当典型的无限循环线程主体,它等待两个 WaitHandles 中的一个被设置,然后继续执行。如果设置了信号句柄,则处理队列中的当前项目,然后重置信号句柄并等待其再次被设置。如果设置了中止句柄,则从线程主体返回,线程终止。

private void ProcessItems()
{
    T item;

    while (queue.TryDequeue(out item))
    {
        queue.ProcessItem(item);

        if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
    }

    signalEvent.Reset();
}

这引入了 ProcessQueue 类中的一个功能:TryDequeue 函数。如果您熟悉 .NET Framework 中的各种 TryX 函数(各种值类型的 TryParseTryGetValue 等),那么它的目的应该很清楚,我将在下一节中介绍它。

使用 WorkerThread 类的一个优点是,线程可以在处理完成后自行休眠,而无需 ProcessQueue 监控 Dequeue 调用来确定何时挂起线程。实际上,ProcessQueue 类内部只维护一个 List<WorkerThread>,并在需要时激活一个线程。

进程管理

从最终用户的角度来看,StartStopSetThreadCount 是控制 ProcessQueue 中项目处理所必需的唯一调用。

  • Start 触发处理队列中所有当前项目以及之后添加的任何项目。
  • Stop 暂停任何未处理项目的处理。调用 Pause 不会挂起任何正在进行的进程,但它会阻止任何完全未处理的项目(或在调用后添加的项目)被处理。
  • SetThreadCount 的作用顾名思义;它设置 ProcessQueue 维护的用于执行并发进程的线程数。请注意,至少必须有一个线程才能进行任何工作,因此小于 1 的值将引发 ArgumentOutOfRangeException

ProcessQueue 的角度来看,它所做的只是激活单个线程或暂停所有线程。它从不需要暂停单个线程。它会在以下情况激活线程:

  • 向队列添加新项目(且有不活动的线程)
  • 暂停(或首次启动)后恢复处理,并且有未处理的项目
  • 调用 SetThreadCount 来增加线程池大小,并且有未处理的项目
public void Enqueue(T item)
{
    lock (syncRoot)
    {
        queue.Enqueue(item);

        if (isRunning)
        {
            RegenerateIfDisposed();

            WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();

            if (firstThread != null) firstThread.Signal();
        }
    }
}

public void Start()
{
    lock (syncRoot)
    {
        RegenerateIfDisposed();

        for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
        {
            threads[i].Signal();
        }

        isRunning = true;
    }
}

public void SetThreadCount(int threadCount)
{
    if (threadCount < 1) 
        throw new ArgumentOutOfRangeException("threadCount", 
            "The ProcessQueue class requires at least one worker thread.");

    lock (syncRoot)
    {
        int pending = queue.Count;

        for (int i = threads.Count; i < threadCount; i++) // add additional threads
        {
            WorkerThread thread = new ProcessQueue<t>.WorkerThread(this);

            threads.Add(thread);

            thread.Start();

            if (pending> 1) 
            {
                thread.Signal();
            }

            pending--;
        }

        int toRemove = threads.Count - threadCount;

        if (toRemove > 0)
        {
            foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
            {
                thread.Abort();
                threads.Remove(thread);

                toRemove--;
            }

            while (toRemove > 0)
            {
                WorkerThread thread = threads[threads.Count - 1];

                thread.Abort();

                threads.Remove(thread);

                toRemove--;
            }
        }
    }
}

请注意,SetThreadCount 函数将根据需要添加或删除线程,如果存在任何未处理的项目,并且正在添加线程,那么将向相应数量的线程发出信号。如果正在减少线程数,则先删除未发出信号的线程,然后删除已发出信号的线程,直到所需数量的线程保持活动状态。

如前所述,WorkerThread 类使用已添加到 ProcessQueue 类中的 TryDequeue 方法。此函数尝试从 ProcessQueue 中检索下一个项目,并将其值赋给调用者提供的 out 变量。如果可以获取项目,则函数返回 true;如果无法获取,则返回 false

public bool TryDequeue(out T value)
{
    lock (syncRoot)
    {
        if (queue.Count > 0)
        {
            value = queue.Dequeue();

            return true;
        }
        else
        {
            value = default(T);

            return false;
        }
    }
}

采用这种方法可以使所有锁定操作保留在 ProcessQueue 内部,但它可以防止这种情况发生:线程 A 检查 Count 属性并看到一个项目,但线程 B 在线程 A 之前将其出列,而线程 A 仍然尝试出列(导致异常)。通过这种方法,第一个调用该函数的线程将获得该项目(返回值为 true),第二个线程将返回 false

使用该类

至少,使用该类要求您为 ProcessQueue 提供一个操作委托,该委托接受一个参数:即队列旨在保存的类型的实例。例如,如果我需要一个队列来维护文件名列表和一个打开文件、读取内容然后对数据执行某些操作的过程,我会这样做:

private void FileHandler(string fileName)
{
    // open the file and do whatever processing is necessary
}

ProcessQueue<string> queue = new ProcessQueue<string>(FileHandler);

这将创建一个 ProcessQueue 来保存字符串对象,其中一个工作线程将通过我的 FileHandler 函数处理任何排队的项。但是,直到我调用 Start,否则它将什么也不做。

queue.Start();

这会释放工作线程去处理队列中的任何项目,以及稍后可能添加的任何项目。

一旦开始处理,就可以通过调用队列上的 Stop 来停止(或者更准确地说,暂停)它。

queue.Stop();

这将重置所有线程上的 WaitHandle,导致它们在当前项目完成处理后暂停执行。

可以通过调用 SetThreadCount 随时调整线程池中的线程数,并在 threadCount 参数中指定所需的最大工作线程数。虽然 SetThreadCount 会首先删除不活动的(未发出信号的)线程,但如果必须删除活动的线程,那么 SetThreadCount 将阻塞直到线程完成当前项目的处理。

一旦队列的生命周期结束(或者至少,当应用程序或服务终止时),必须在队列上调用 Dispose 以终止工作线程。Dispose 将阻塞直到每个活动线程完成当前项目的处理。

注意:在应用程序终止之前,必须在 ProcessQueue 上调用 Dispose,否则工作线程将继续无限期地运行,即使是那些当前未处理项目的线程。

摘要

.NET ThreadPool 提供了一种方便的方式来调度小的代码“块”以异步运行,但过度使用 ThreadPool 可能导致不理想的结果。ProcessQueue 提供了一种相对简单的方法来处理大量项目,即使单个项目需要大量时间来处理。

历史

  • 2009 年 12 月 3 日:初始帖子
© . All rights reserved.