支持 WaitAll、排序和唯一性的应用线程池
此线程池库提供了对其调度和处理工作项方式的更大程度的配置。此扩展支持 WaitAll()、优先级和唯一性。
引言
我曾在 2004 年 6 月写过一篇文章,描述了 System.Threading.ThreadPool
的一个简单包装器。这使得池中排队项目的顺序处理成为可能。我最近才找到时间写第二篇文章,基于此;然而,研究 CodeProject.com,我发现许多新文章记录了类似的行为。但是,我已经写好了,所以我发布它,因为它可能对某些人有用。
此解决方案和库包含一个 VSTS 测试项目和一个 .NET 2.0 程序集。我的类支持以下简单功能:
- 类似于
System.Threading.ThreadPool
:工作方式非常相似,便于轻松采用此代码(命名和约定)。 - 可控线程池:每个线程池实例可配置并发线程数。
- 多个线程池:每个应用程序进程可以实例化多个线程池。
- 优先级:工作队列中
WorkItem
的优先级。 - 删除重复项:如果相同的
WorkItem
已排队,可以再次放置 [使用键]。 WaitAll()
:阻塞操作符,等待所有WorkItem
完成处理。- 自定义 Windows 性能计数器:监控应用程序
ThreadedQueue
的数量和并发工作线程的数量。 - 六个单元测试:这些测试覆盖了 95% 的代码,如果您希望更改此代码以供自己使用,则很有用。
背景
我从事大宗商品交易多年,我们许多应用程序都需要并行处理才能获得吞吐量。最初的问题是由于 ThreadPool
的限制。一个进程空间只有一个 ThreadPool
,因此应用程序的不同部分会争用此资源。通常,应用程序具有“非常高”优先级功能和“高”优先级功能,这两种代码路径都希望能够使用所有可用资源。
我的第一篇文章试图创建一个可以提供此功能的抽象类;然而,与大多数事物一样,它在过去两年中得到了增强。因此,我在这里展示的是一个更丰富的类,能够提供比我的第一个库更多的控制。
代码术语
WorkItem
:一个封装要执行的方法WaitCallback
、ThreadPriority
和State
对象的对象。WorkQueue
:一个根据优先级排序的WorkItem
列表。ThreadedQueue
:拥有一个WorkQueue
,它管理WorkQueue
上WorkItem
的处理。并发线程数在此处配置。
Using the Code
我创建了四个简单的示例来演示如何使用该类。我没有包含所有代码执行的输出,因为它很冗长并且会使文章膨胀。因此,我编辑了输出,只保留了适当的行来(如果需要)说明每个示例。
代码有很多注释,如果您想研究它的行为,值得查看和执行。
标准线程池行为
这是一个需要并行执行大量任务的示例。这与 System.Threading.ThreadPool
完全相同,我的库只是此示例的抽象层。
100 个工作项(一个工作项是休眠 100 毫秒)在 ThreadedQueue
上排队。在某个时间点,所有这些休眠都将发生。
值得注意的是,在 ThreadedQueue
上调用方法与在 System.Threading.ThreadPool
上调用方法相似。
public void Demo_SimpleAsync()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 100;
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
WaitAll() 行为
此示例与上面一个类似,不同之处在于,我们想知道所有休眠何时发生。WaitAll()
方法阻塞,直到所有工作项在线程池上异步处理完成。
public void Demo_SimpleWaitAll()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 100;
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
}
oThreadedQueue.WaitAll();
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
WorkItem 的优先级
此示例将 4 组 5 个项目(总共 20 个)排队到 ThreadedQueue
。每 5 个项目按优先级升序放入队列,因此最后 5 个 WorkItem
具有最高优先级。该示例将 Int32
转换为 ThreadPriority
枚举。
有趣的地方在输出中。前 5 条消息立即处理(“1=Severity.Lowest”001),因为有线程在等待 WorkItem
。然而,第 6 条要处理的消息启动了(4“=Highest”001)ThreadedQueue
,确保待处理的 WorkItem
按优先级顺序排序。
///
///Prioritisation only applies the ordered list of items pending processing.
///In this example, there are 5 concurrent threads, therefore the 5 first low priority items
///are enqueue on the System.Threading.Threadpool prior to higher priority items being added.
///However, subsequent to the first 5 items,
///the remainder of the queue is process in priority order.
///
public void Demo_Priority()
{
ThreadedQueue oThreadedQueue = new ThreadedQueue(5);
const int iNumberOfWorkItems = 5;
//ThreadPriority.Lowest==0
//ThreadPriority.Highest=4;
for (int iPriority = 0; iPriority < 4; iPriority++)
{
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
// Object State: Param
oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod),
// Priority of workitem
iIndex + (iPriority * 1000), (ThreadPriority)iPriority);
}
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
输出
PrioritisedWorkerQueue.Constr: Requested:5 Availible:5
1000//First 5 low priority work items a pushed straight onto threads
1001
1002
1003
1004
4000//The high priority items are processed decending order
4001
4003
4002
4004
3000
3001
3002
3004
3003
2000
2001
2003
2004
2002
队列中的唯一 WorkItem
此方法将 4 组 5 个 WorkItem
发送到队列,每组共享相同的标识符。如果队列中已存在相同的标识符,则丢弃该工作。此程序的输出很有趣。第一个项目入队并立即分派,留下一个空队列。第一次迭代的下一个项目放入队列。然后,第一批的第三个项目放入队列,但由于第二个项目仍在队列中而被拒绝。随后的批次由于处理线程已被占用,只能将一个项目放入队列。
我发现这个例子很有趣,因为结果最初没有意义,为什么第一批有两个项目,而其余批次只有一个?然而,当你考虑到 WorkItems 被分派到处理线程时,它就清楚了。当你向 ThreadedQueue
添加更多线程时,输出会变得随机;对于这个例子,它被配置为 1。
public void Demo_UniqueWorkItem()
{
ThreadedQueue oPWorkerQueue = new ThreadedQueue(1);
const int iNumberOfWorkItems = 5;
for (int iUniqueKey = 0; iUniqueKey < 4; iUniqueKey++)
{
for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
{
oPWorkerQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod),
iUniqueKey + ":" + iIndex, ThreadPriority.Normal, iUniqueKey);
}
}
}
private void CallbackMethod(object Param)
{
Thread.Sleep(100);
Debug.WriteLine(Param);
}
输出
PrioritisedWorkerQueue.Constr: Requested:1 Availible:1
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
0:0 //First item processed immediately
0:1 //Only one further item from the first batch is queued
1:0 //Only one from each subsequent batch is processed.
3:0
2:0
有趣的部分
此函数是 ThreadedQueue
类的主要逻辑。这是项目中的代码,并解释了主要部分。
1 private void SpawnWorkThread(IAsyncResult AsyncResult)
2 {
3 lock(m_Lock_WorkItem)
4 {
5 if(AsyncResult!=null)
6 {
7 oWaitCallback.EndInvoke(AsyncResult);
8
9 m_ItemsProcessing--;
10
11 #region Removes the ManualReset from the collection,
indicating it is processed.
12 WorkItem oPThreadWorkItem = AsyncResult.AsyncState as WorkItem;
13
14 if (oPThreadWorkItem != null)
15 {
16 m_WorkQueue.ManualResets.Remove(oPThreadWorkItem.MRE);
17 }
18 #endregion
19
20 if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
21 ThreadedQueueMonitor.Instance.ThreadPoolThreadCount.Decrement();
22 }
23
24 // oWorker item will be null if the queue was empty
25 if(m_WorkQueue.Count>0)
26 {
27 m_ItemsProcessing++;
28
29 //Gets the next piece of work to perform
30 WorkItem oWorkItem = m_WorkQueue.Dequeue();
31
32 if (oWorkItem != null)
33 {
34 //Hooks up the callback to this method.
35 AsyncCallback oAsyncCallback = new AsyncCallback(this.SpawnWorkThread);
36
37 //Invokes the work on the threadpool
38 oWaitCallback.BeginInvoke(oWorkItem, oAsyncCallback, oWorkItem);
39
40 if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
41 hreadedQueueMonitor.Instance.ThreadPoolThreadCount.Increment();
42 }
43 }
44
45 //Debug.WriteLine(String.Format("SpawnWorkThread" +
// " Length:{0} InUse:{1} DateTime:{2}",
// m_WorkQueue.Count, m_ItemsProcessing, DateTime.Now.ToString()));
46 }
47 }
如何调用:第 1 行,35
SpawnWorkThread
函数通过回调递归调用。当一个新的 WorkerItem
被放置到 ThreadedQueue
中,从而产生一个新的线程时,AsyncResult
参数为 null
。唯一不为 null
的情况是当回调调用该方法时。回调由 WorkItem
处理完成触发。
处理回调:第 5 - 22 行
此代码块确保清除对回调对象的所有引用。一旦对象完成处理,必须移除 ManualResetEvent
对象,以确保它不会泄漏内存(第 16 行)。
获取下一个 WorkItem:第 30 行
此对象是 WorkItemQueue
的一个实例,这是一个自定义类,维护一个等待处理的已排序和唯一项目的列表。排序和唯一性逻辑封装在此类中,因此 SpawnWorkThread
方法非常简单,只需调用 Dequeue()
即可获取下一个项目。
在线程池上生成 WorkItem:第 25 - 43 行
如果有一些项目要处理,则从队列中移除下一个要处理的项目(这不是事务性的)。接下来的代码行创建一个引用此函数 SpawnWorkThread
的 AsyncCallback
类,以便在 WorkItem
完成处理后能够确定。最后,异步调用 WaitCallBack
委托变量,该变量在内部在 System.Threading.ThreadPool
上生成一个新线程。
性能计数器:第 20、40 行
此代码与 ThreadedQueueMonitor
交互,后者处理与 Windows 性能计数器的交互,在处理 WorkItem
时增加和减少计数器。
结论
我已在许多生产应用程序中使用过此类别及其变体。如果您对我的工作有任何意见,我很乐意听取。感谢您阅读我的文章。如果您觉得它对您有用,请花时间给它评分。
历史
- 1.0 - 2007 年 2 月 20 日 - 初稿。
- 1.1 - 2007 年 4 月 3 日 - 上传到 CodeProject。