65.9K
CodeProject 正在变化。 阅读更多。
Home

支持 WaitAll、排序和唯一性的应用线程池

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.11/5 (7投票s)

2007年4月4日

CPOL

6分钟阅读

viewsIcon

34925

downloadIcon

368

此线程池库提供了对其调度和处理工作项方式的更大程度的配置。此扩展支持 WaitAll()、优先级和唯一性。

Screenshot - ThreadWorkerQueue.gif

引言

我曾在 2004 年 6 月写过一篇文章,描述了 System.Threading.ThreadPool 的一个简单包装器。这使得池中排队项目的顺序处理成为可能。我最近才找到时间写第二篇文章,基于此;然而,研究 CodeProject.com,我发现许多新文章记录了类似的行为。但是,我已经写好了,所以我发布它,因为它可能对某些人有用。

此解决方案和库包含一个 VSTS 测试项目和一个 .NET 2.0 程序集。我的类支持以下简单功能:

  • 类似于 System.Threading.ThreadPool:工作方式非常相似,便于轻松采用此代码(命名和约定)。
  • 可控线程池:每个线程池实例可配置并发线程数。
  • 多个线程池:每个应用程序进程可以实例化多个线程池。
  • 优先级:工作队列中 WorkItem 的优先级。
  • 删除重复项:如果相同的 WorkItem 已排队,可以再次放置 [使用键]。
  • WaitAll():阻塞操作符,等待所有 WorkItem 完成处理。
  • 自定义 Windows 性能计数器:监控应用程序 ThreadedQueue 的数量和并发工作线程的数量。
  • 六个单元测试:这些测试覆盖了 95% 的代码,如果您希望更改此代码以供自己使用,则很有用。

背景

我从事大宗商品交易多年,我们许多应用程序都需要并行处理才能获得吞吐量。最初的问题是由于 ThreadPool 的限制。一个进程空间只有一个 ThreadPool,因此应用程序的不同部分会争用此资源。通常,应用程序具有“非常高”优先级功能和“高”优先级功能,这两种代码路径都希望能够使用所有可用资源。

我的第一篇文章试图创建一个可以提供此功能的抽象类;然而,与大多数事物一样,它在过去两年中得到了增强。因此,我在这里展示的是一个更丰富的类,能够提供比我的第一个库更多的控制。

代码术语

  • WorkItem:一个封装要执行的方法 WaitCallbackThreadPriorityState 对象的对象。
  • WorkQueue:一个根据优先级排序的 WorkItem 列表。
  • ThreadedQueue:拥有一个 WorkQueue,它管理 WorkQueueWorkItem 的处理。并发线程数在此处配置。

Using the Code

我创建了四个简单的示例来演示如何使用该类。我没有包含所有代码执行的输出,因为它很冗长并且会使文章膨胀。因此,我编辑了输出,只保留了适当的行来(如果需要)说明每个示例。

代码有很多注释,如果您想研究它的行为,值得查看和执行。

标准线程池行为

这是一个需要并行执行大量任务的示例。这与 System.Threading.ThreadPool 完全相同,我的库只是此示例的抽象层。

100 个工作项(一个工作项是休眠 100 毫秒)在 ThreadedQueue 上排队。在某个时间点,所有这些休眠都将发生。

值得注意的是,在 ThreadedQueue 上调用方法与在 System.Threading.ThreadPool 上调用方法相似。

public void Demo_SimpleAsync()
{
    ThreadedQueue oThreadedQueue = new ThreadedQueue(5);

    const int iNumberOfWorkItems = 100;

    for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
    {
        oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
    }
}

private void CallbackMethod(object Param)
{
    Thread.Sleep(100);

    Debug.WriteLine(Param);
}

WaitAll() 行为

此示例与上面一个类似,不同之处在于,我们想知道所有休眠何时发生。WaitAll() 方法阻塞,直到所有工作项在线程池上异步处理完成。

public void Demo_SimpleWaitAll()
{
    ThreadedQueue oThreadedQueue = new ThreadedQueue(5);

    const int iNumberOfWorkItems = 100;

    for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
    {
        oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), iIndex);
    }

    oThreadedQueue.WaitAll();
}

private void CallbackMethod(object Param)
{
    Thread.Sleep(100);

    Debug.WriteLine(Param);
}

WorkItem 的优先级

此示例将 4 组 5 个项目(总共 20 个)排队到 ThreadedQueue。每 5 个项目按优先级升序放入队列,因此最后 5 个 WorkItem 具有最高优先级。该示例将 Int32 转换为 ThreadPriority 枚举。

有趣的地方在输出中。前 5 条消息立即处理(“1=Severity.Lowest”001),因为有线程在等待 WorkItem。然而,第 6 条要处理的消息启动了(4“=Highest”001)ThreadedQueue,确保待处理的 WorkItem 按优先级顺序排序。

///
///Prioritisation only applies the ordered list of items pending processing.
///In this example, there are 5 concurrent threads, therefore the 5 first low priority items
///are enqueue on the System.Threading.Threadpool prior to higher priority items being added.
///However, subsequent to the first 5 items, 
///the remainder of the queue is process in priority order.
///
public void Demo_Priority()
{
    ThreadedQueue oThreadedQueue = new ThreadedQueue(5);

    const int iNumberOfWorkItems = 5;
    //ThreadPriority.Lowest==0
    //ThreadPriority.Highest=4;

    for (int iPriority = 0; iPriority < 4; iPriority++)
    {
        for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
        {
            //                               Object State: Param
            oThreadedQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), 
            //             Priority of workitem
                           iIndex + (iPriority * 1000), (ThreadPriority)iPriority);
        }
    }
}

private void CallbackMethod(object Param)
{
    Thread.Sleep(100);

    Debug.WriteLine(Param);
}

输出

PrioritisedWorkerQueue.Constr: Requested:5 Availible:5
1000//First 5 low priority work items a pushed straight onto threads
1001
1002
1003
1004
4000//The high priority items are processed decending order
4001
4003
4002
4004
3000
3001
3002
3004
3003
2000
2001
2003
2004
2002

队列中的唯一 WorkItem

此方法将 4 组 5 个 WorkItem 发送到队列,每组共享相同的标识符。如果队列中已存在相同的标识符,则丢弃该工作。此程序的输出很有趣。第一个项目入队并立即分派,留下一个空队列。第一次迭代的下一个项目放入队列。然后,第一批的第三个项目放入队列,但由于第二个项目仍在队列中而被拒绝。随后的批次由于处理线程已被占用,只能将一个项目放入队列。

我发现这个例子很有趣,因为结果最初没有意义,为什么第一批有两个项目,而其余批次只有一个?然而,当你考虑到 WorkItems 被分派到处理线程时,它就清楚了。当你向 ThreadedQueue 添加更多线程时,输出会变得随机;对于这个例子,它被配置为 1。

public void Demo_UniqueWorkItem()
{
    ThreadedQueue oPWorkerQueue = new ThreadedQueue(1);

    const int iNumberOfWorkItems = 5;

    for (int iUniqueKey = 0; iUniqueKey < 4; iUniqueKey++)
    {
        for (int iIndex = 0; iIndex < iNumberOfWorkItems; iIndex++)
        {
            oPWorkerQueue.QueueUserWorkItem(new WaitCallback(CallbackMethod), 
              iUniqueKey + ":" + iIndex, ThreadPriority.Normal, iUniqueKey);
        }
    }
}

private void CallbackMethod(object Param)
{
    Thread.Sleep(100);

    Debug.WriteLine(Param);
}

输出

PrioritisedWorkerQueue.Constr: Requested:1 Availible:1
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'0'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'1'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'2'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
PrioritisedArrayList: Key Already Queued: Skipped Request Key:'3'
0:0 //First item processed immediately
0:1 //Only one further item from the first batch is queued
1:0 //Only one from each subsequent batch is processed.
3:0
2:0

有趣的部分

此函数是 ThreadedQueue 类的主要逻辑。这是项目中的代码,并解释了主要部分。

1   private void SpawnWorkThread(IAsyncResult AsyncResult)
2   {
3       lock(m_Lock_WorkItem)
4       {
5           if(AsyncResult!=null)
6           {
7               oWaitCallback.EndInvoke(AsyncResult);
8
9               m_ItemsProcessing--;
10
11              #region Removes the ManualReset from the collection, 
                        indicating it is processed.
12              WorkItem oPThreadWorkItem = AsyncResult.AsyncState as WorkItem;
13
14              if (oPThreadWorkItem != null)
15              {
16                  m_WorkQueue.ManualResets.Remove(oPThreadWorkItem.MRE);
17              } 
18              #endregion
19                   
20              if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
21                  ThreadedQueueMonitor.Instance.ThreadPoolThreadCount.Decrement();
22          }
23
24          // oWorker item will be null if the queue was empty
25          if(m_WorkQueue.Count>0) 
26          {
27              m_ItemsProcessing++;
28
29              //Gets the next piece of work to perform
30              WorkItem oWorkItem = m_WorkQueue.Dequeue();
31
32              if (oWorkItem != null)
33              {
34                  //Hooks up the callback to this method.
35                  AsyncCallback oAsyncCallback = new AsyncCallback(this.SpawnWorkThread);
36
37                  //Invokes the work on the threadpool
38                  oWaitCallback.BeginInvoke(oWorkItem, oAsyncCallback, oWorkItem);
39
40                  if (ThreadedQueueMonitor.Instance.ThreadPoolThreadCount != null)
41                      hreadedQueueMonitor.Instance.ThreadPoolThreadCount.Increment();
42              }
43          }
44
45          //Debug.WriteLine(String.Format("SpawnWorkThread" + 
            // " Length:{0} InUse:{1} DateTime:{2}", 
            // m_WorkQueue.Count, m_ItemsProcessing, DateTime.Now.ToString()));
46      }
47  }

如何调用:第 1 行,35

SpawnWorkThread 函数通过回调递归调用。当一个新的 WorkerItem 被放置到 ThreadedQueue 中,从而产生一个新的线程时,AsyncResult 参数为 null。唯一不为 null 的情况是当回调调用该方法时。回调由 WorkItem 处理完成触发。

处理回调:第 5 - 22 行

此代码块确保清除对回调对象的所有引用。一旦对象完成处理,必须移除 ManualResetEvent 对象,以确保它不会泄漏内存(第 16 行)。

获取下一个 WorkItem:第 30 行

此对象是 WorkItemQueue 的一个实例,这是一个自定义类,维护一个等待处理的已排序和唯一项目的列表。排序和唯一性逻辑封装在此类中,因此 SpawnWorkThread 方法非常简单,只需调用 Dequeue() 即可获取下一个项目。

在线程池上生成 WorkItem:第 25 - 43 行

如果有一些项目要处理,则从队列中移除下一个要处理的项目(这不是事务性的)。接下来的代码行创建一个引用此函数 SpawnWorkThreadAsyncCallback 类,以便在 WorkItem 完成处理后能够确定。最后,异步调用 WaitCallBack 委托变量,该变量在内部在 System.Threading.ThreadPool 上生成一个新线程。

性能计数器:第 20、40 行

此代码与 ThreadedQueueMonitor 交互,后者处理与 Windows 性能计数器的交互,在处理 WorkItem 时增加和减少计数器。

结论

我已在许多生产应用程序中使用过此类别及其变体。如果您对我的工作有任何意见,我很乐意听取。感谢您阅读我的文章。如果您觉得它对您有用,请花时间给它评分。

历史

  • 1.0 - 2007 年 2 月 20 日 - 初稿。
  • 1.1 - 2007 年 4 月 3 日 - 上传到 CodeProject。
© . All rights reserved.