用于处理工作单元的工作线程类






4.82/5 (21投票s)
2006年9月13日
4分钟阅读

70977

735
在单个线程中处理工作单元,而不是使用线程池。
引言
CodeProject上有很多示例演示了使用线程池来管理工作或作业队列。 这些都将工作项分配给一个空闲线程,因此工作是并发完成的。 相反,我需要能够将工作项排队,这些工作项将在单个线程中处理。 我使用这个类的典型场景是在WinForm主应用程序线程上必须发生的活动进行处理,例如更新UI元素。 并发处理没有任何意义,因为使用Invoke会阻塞,直到主应用程序线程空闲为止。
架构
以下部分描述了我在实现此类时做出的一些架构决策。
Generics
类ProcessingQueue<T>
是一个泛型类,允许您将工作类型指定为值类型或引用类型,例如
processQueue = new ProcessingQueue<int>();
当然,这意味着每个ProcessingQueue<T>
实例都仅限于一种类型的工作项,这很适合我。
事件和重写
有两个事件是从可重写的方法中调用的,用于完成工作和处理工作异常。
执行操作
为了完成实际工作,我实现了事件DoWork
,该事件为队列中的每个工作项调用。 如果您不想使用事件,则可以从ProcessingQueue<T>
派生一个类,并重写OnDoWork
方法。 工作项包含在ProcessingQueueEventArgs<T>
类中。
处理异常
工作线程(您的应用程序)通常会静默捕获在工作代码(您的应用程序)中发生的异常。 为了帮助向应用程序公开异常,您可以使用WorkException
事件,也可以在您自己的派生类中重写OnWorkException
方法。 这两个方法都传递一个包装Exception
实例的ProcessingQueueExceptionEventArgs
实例。
EventWaitHandle 与信号量
我考虑使用信号量来实现此目的,因为您可以为队列中的每个工作项释放信号量,并且实现WaitOne
的线程将释放信号量中的总释放计数。 但是,信号量需要最大释放计数,因为它实际上旨在释放多个线程,而不是单个线程。 并且,如果您释放的数量超过最大释放计数,则信号量会抛出异常,这不是我想要的。 由于队列深度未知,因此我不想对信号量的最大释放计数进行硬编码。 基本上,它是错误的工具。
因此,我选择了更简单的使用自动重置的EventWaitHandle
。 这里的问题是,在处理工作项时可能会将工作排队。 虽然这会发出等待事件的信号,但它只会发出一次(没有像信号量中的释放计数),因此工作线程必须处理当前队列中的所有工作,这也意味着它需要检查是否通过将工作放入队列来发出信号,它已经处理了,因此队列现在为空。
因此,代码变得足够复杂,我想一个不错的泛型类来支持此功能将很有用,因此就有了本文。 我仍然不敢相信Code Project上还没有类似的东西。 也许太简单了!
用法
以下部分描述了用法。
构造函数
要为特定工作类型创建ProcessingQueue<T>
实例,请实例化该类
processQueue = new ProcessingQueue<int>();
事件
将工作事件连接到将对工作项执行工作的方法,并根据需要连接异常事件
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException +=
new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
排队工作
排队工作很简单——调用QueueForWork
方法
processQueue.QueueForWork(1);
停止线程
要退出等待工作的线程,请调用Stop
方法
processQueue.Stop();
这是一个非阻塞调用,它也会在工作线程终止之前完成队列中任何剩余的工作。
实现
这是工作线程。 从注释和我上面的描述中应该很容易看出发生了什么。
protected void ProcessQueueWork()
{
while (!stop)
{
// Wait for some work.
waitProcess.WaitOne();
bool haveWork;
// Finish remaining work before stopping.
do
{
// Initialize to the default work value.
T work = default(T);
// Assume no work.
haveWork = false;
// Prevent enqueing from a different thread.
lock (workQueue)
{
// Do we have work? This might be 0 if stopping or if all
// work is processed.
if (workQueue.Count > 0)
{
// Get the work.
work = workQueue.Dequeue();
// Yes, we have work.
haveWork = true;
}
}
// If we have work...
if (haveWork)
{
try
{
// Try processing it.
OnDoWork(new ProcessingQueueEventArgs<T>(work));
}
catch (Exception e)
{
// Oops, inform application of a work error.
OnWorkException(new ProcessingQueueExceptionEventArgs(e));
}
}
} while (haveWork); // continue processing if there was work.
}
}
单元测试
我编写了几个非常简单的单元测试来验证该功能,但这肯定不是一个严格的测试
[TestFixture]
public class ProcessThreadTests
{
protected ProcessingQueue<int> processQueue;
protected bool exceptionRaised;
protected bool workRaised;
[TestFixtureSetUp]
public void FixtureSetup()
{
processQueue = new ProcessingQueue<int>();
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException +=
new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
}
[Test]
public void QueueWorkTest()
{
processQueue.QueueForWork(1);
while (!workRaised) { }
}
[Test]
public void WorkExceptionTest()
{
processQueue.QueueForWork(2);
while (!exceptionRaised) { }
}
void OnDoWork(object sender, ProcessingQueueEventArgs<int> args)
{
switch (args.Work)
{
case 1:
workRaised = true;
break;
case 2:
throw new ApplicationException("Exception");
}
}
void OnWorkException(object sender, ProcessingQueueExceptionEventArgs args)
{
exceptionRaised = true;
}
}
结论
希望您会发现此类有用,并且我的实现没有错误!
历史
在9/13更新,因为我发现我没有清除一个标志,代码从未返回到WaitOne指令! 修复此问题也消除了其中一个标志。