65.9K
CodeProject 正在变化。 阅读更多。
Home

用于处理工作单元的工作线程类

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.82/5 (21投票s)

2006年9月13日

4分钟阅读

viewsIcon

70977

downloadIcon

735

在单个线程中处理工作单元,而不是使用线程池。

引言

CodeProject上有很多示例演示了使用线程池来管理工作或作业队列。 这些都将工作项分配给一个空闲线程,因此工作是并发完成的。 相反,我需要能够将工作项排队,这些工作项将在单个线程中处理。 我使用这个类的典型场景是在WinForm主应用程序线程上必须发生的活动进行处理,例如更新UI元素。 并发处理没有任何意义,因为使用Invoke会阻塞,直到主应用程序线程空闲为止。

架构

以下部分描述了我在实现此类时做出的一些架构决策。

Generics

ProcessingQueue<T>是一个泛型类,允许您将工作类型指定为值类型或引用类型,例如

processQueue = new ProcessingQueue<int>();

当然,这意味着每个ProcessingQueue<T>实例都仅限于一种类型的工作项,这很适合我。

事件和重写

有两个事件是从可重写的方法中调用的,用于完成工作和处理工作异常。

执行操作

为了完成实际工作,我实现了事件DoWork,该事件为队列中的每个工作项调用。 如果您不想使用事件,则可以从ProcessingQueue<T>派生一个类,并重写OnDoWork方法。 工作项包含在ProcessingQueueEventArgs<T>类中。

处理异常

工作线程(您的应用程序)通常会静默捕获在工作代码(您的应用程序)中发生的异常。 为了帮助向应用程序公开异常,您可以使用WorkException事件,也可以在您自己的派生类中重写OnWorkException方法。 这两个方法都传递一个包装Exception实例的ProcessingQueueExceptionEventArgs实例。

EventWaitHandle 与信号量

我考虑使用信号量来实现此目的,因为您可以为队列中的每个工作项释放信号量,并且实现WaitOne的线程将释放信号量中的总释放计数。 但是,信号量需要最大释放计数,因为它实际上旨在释放多个线程,而不是单个线程。 并且,如果您释放的数量超过最大释放计数,则信号量会抛出异常,这不是我想要的。 由于队列深度未知,因此我不想对信号量的最大释放计数进行硬编码。 基本上,它是错误的工具。

因此,我选择了更简单的使用自动重置的EventWaitHandle。 这里的问题是,在处理工作项时可能会将工作排队。 虽然这会发出等待事件的信号,但它只会发出一次(没有像信号量中的释放计数),因此工作线程必须处理当前队列中的所有工作,这也意味着它需要检查是否通过将工作放入队列来发出信号,它已经处理了,因此队列现在为空。

因此,代码变得足够复杂,我想一个不错的泛型类来支持此功能将很有用,因此就有了本文。 我仍然不敢相信Code Project上还没有类似的东西。 也许太简单了!

用法

以下部分描述了用法。

构造函数

要为特定工作类型创建ProcessingQueue<T>实例,请实例化该类

processQueue = new ProcessingQueue<int>();

事件

将工作事件连接到将对工作项执行工作的方法,并根据需要连接异常事件

processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException += 
     new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);

排队工作

排队工作很简单——调用QueueForWork方法

processQueue.QueueForWork(1);

停止线程

要退出等待工作的线程,请调用Stop方法

processQueue.Stop();

这是一个非阻塞调用,它也会在工作线程终止之前完成队列中任何剩余的工作。

实现

这是工作线程。 从注释和我上面的描述中应该很容易看出发生了什么。

protected void ProcessQueueWork()
{
  while (!stop)
  {
    // Wait for some work.
    waitProcess.WaitOne();
    bool haveWork;

    // Finish remaining work before stopping.
    do
    {
      // Initialize to the default work value.
      T work = default(T);
      // Assume no work.
      haveWork = false;
// Prevent enqueing from a different thread. lock (workQueue) { // Do we have work? This might be 0 if stopping or if all // work is processed. if (workQueue.Count > 0) { // Get the work. work = workQueue.Dequeue(); // Yes, we have work. haveWork = true; } } // If we have work... if (haveWork) { try { // Try processing it. OnDoWork(new ProcessingQueueEventArgs<T>(work)); } catch (Exception e) { // Oops, inform application of a work error. OnWorkException(new ProcessingQueueExceptionEventArgs(e)); } } } while (haveWork); // continue processing if there was work. } }

单元测试

我编写了几个非常简单的单元测试来验证该功能,但这肯定不是一个严格的测试

[TestFixture]
public class ProcessThreadTests
{
  protected ProcessingQueue<int> processQueue;
  protected bool exceptionRaised;
  protected bool workRaised;

  [TestFixtureSetUp]
  public void FixtureSetup()
  {
    processQueue = new ProcessingQueue<int>();
    processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
    processQueue.WorkException += 
         new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
  }

  [Test]
  public void QueueWorkTest()
  {
    processQueue.QueueForWork(1);
    while (!workRaised) { }
  }

  [Test]
  public void WorkExceptionTest()
  {
    processQueue.QueueForWork(2);
    while (!exceptionRaised) { }
  }

  void OnDoWork(object sender, ProcessingQueueEventArgs<int> args)
  {
    switch (args.Work)
    {
      case 1:
        workRaised = true;
        break;
      case 2:
        throw new ApplicationException("Exception");
    }
  }

  void OnWorkException(object sender, ProcessingQueueExceptionEventArgs args)
  {
    exceptionRaised = true;
  }
}

结论

希望您会发现此类有用,并且我的实现没有错误!

历史

在9/13更新,因为我发现我没有清除一个标志,代码从未返回到WaitOne指令! 修复此问题也消除了其中一个标志。

© . All rights reserved.