65.9K
CodeProject 正在变化。 阅读更多。
Home

扩展线程池

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (23投票s)

2008 年 6 月 29 日

Ms-PL

3分钟阅读

viewsIcon

84711

downloadIcon

1805

您自己的可扩展且可配置的线程池。

 

引言

市面上提供各种类型的线程池:智能线程池、BlackHen.Threading,但没有一个支持扩展。所有线程池都是硬编码的,使用起来非常困难。我试图解决这些问题

  • 线程池应该是可扩展和可配置的
  • 线程池应该尽可能简单

因此,我创建了一个微型线程池。这个线程池是用 C# 4.0 编写的。

微型线程池特性

  • 非常简单 
  • 可扩展队列
  • 可扩展任务项 
  • 最大工作线程数限制
  • 动态线程工作者
  • 任务优先级支持 
  • 可扩展日志记录

微型线程池设计

  • ITaskItem 表示一个任务
  • ITaskQueue 表示任务队列逻辑 
  • ITaskQueueController 表示生产者和消费者之间的通信逻辑(线程安全)
  • WorkThread 表示一个线程工作者
  • TinyThreadPool 控制工作线程

让我们更深入地了解每个类

  • ITaskItem 表示应该完成的某些工作。
    public interface ITaskItem
    {
        void DoWork();
    } 
  • TaskItemPriority - 可以为每个任务指定WorkThread的优先级。
  • ITaskQueue 是另一个简单的接口,用于管理任务队列。
    /// <summary>
    /// Represent the queue.
    /// </summary>
    public interface ITaskQueue
    {
        /// <summary>
        /// Count of work item.
        /// </summary>
        int Count { get; }

        /// <summary>
        /// Dequeue the work item.
        /// </summary>
        /// <returns>The work item.</returns>
        IWorkItem Dequeue();

        /// <summary>
        /// Enqueue the work item.
        /// </summary>
        /// <param name="item">The work item.</param>
        void Enqueue(IWorkItem item);
    }  
  • ITaskQueueController 提供生产者和消费者之间的通信逻辑。
    public interface ITaskQueueController : IDisposable
    {
        int ConsumersWaiting { get; }
        IWorkItem Dequeue();
        void Enqueue(IWorkItem item);
    } 

我已经实现了两个派生自ITaskQueueController的任务队列控制器

  • DefaultTaskQueueController
  • BoundedTaskQueueController

默认任务队列控制器

DefaultTaskQueueControllerITaskQueue 的线程安全包装器

    public sealed class DefaultTaskQueueController : TaskQueueController
    {
        public DefaultTaskQueueController(ITaskQueue taskQueue)
            : base(taskQueue)
        {
        }

        protected override IWorkItem DequeueCore()
        {
            lock (_locker)
            {
                while (_taskQueue.Count == 0 && !_isDisposed)
                {
                    _consumersWaiting++;
                    Monitor.Wait(_locker);
                    _consumersWaiting--;
                }
                if (_isDisposed)
                {
                    return null;
                }
                return _taskQueue.Dequeue();
            }
        }

        protected override void EnqueueCore(IWorkItem item)
        {
            lock (_locker)
            {
                _taskQueue.Enqueue(item);
                if (_consumersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
        }
    } 

有界任务队列控制器

BoundedTaskQueueController (线程安全) - 如果生产者任务或任务创建项目的速度快于消费者处理它们的速度,系统可能会导致内存使用无限制。BoundedTaskQueueController 允许您在队列达到某个限制之前强制生产者阻塞。

    public sealed class BoundedTaskQueueController : TaskQueueController
    {
        private readonly int _maxTasksCount;
        private int _producersWaiting;

        public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
            : base(taskQueue)
        {
            if (maxTasksCount < 1)
            {
                throw new ArgumentException("MaxTasksCount should be greater 0");
            }
            _maxTasksCount = maxTasksCount;
        }

        protected override IWorkItem DequeueCore()
        {
            IWorkItem taskItem;
            lock (_locker)
            {
                while (_taskQueue.Count == 0 && !_isDisposed)
                {
                    _consumersWaiting++;
                    Monitor.Wait(_locker);
                    _consumersWaiting--;
                }
                if (_isDisposed)
                {
                    return null;
                }
                taskItem = _taskQueue.Dequeue();
                if (_producersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
            return taskItem;
        }

        protected override void EnqueueCore(IWorkItem item)
        {
            lock (_locker)
            {
                while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
                {
                    _producersWaiting++;
                    Monitor.Wait(_locker);
                    _producersWaiting--;
                }
                _taskQueue.Enqueue(item);
                if (_consumersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
        }
    } 

微型线程池

TinyThreadPool 管理 ITaskQueueControllerTinyThreadPool 是通过 Create 方法创建的,例如

var threadPool = TinyThreadPool.Create(x =>
{
    x.Name = "My ThreadPool";
    x.MinThreads = 2;
    x.MaxThreads = 10;
    x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});

所有 TinyThreadPool 的属性都有默认值,因此创建 TinyThreadPool 最简单的方法是调用 Default 属性。

var threadPool = TinyThreadPool.Default;

MultiThreadingCapacityType 表示线程容量

    public enum MultiThreadingCapacityType
    {
        /// <summary>
        /// Represent all processors
        /// </summary>
        Global,
        /// <summary>
        /// Represent one processor
        /// </summary>
        PerProcessor
    }

AddTask 方法用于将任务添加到任务管道。如果尚未达到最大线程限制且 ConsamersWaiting = 0,则会创建一个新的 WorkThread。 

您可以添加带有 TaskItemPriority 的任务。请注意,DefaultTaskQueue 不验证 TaskItemPriority,请使用 PriorityTaskQueue 处理优先级任务

        /// <summary>
        /// Add new task.
        /// </summary>
        /// <param name="taskItem">Represent task.</param>
        /// <param name="priority">Task priority.</param>
        /// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
        public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
        {
            IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
            AddWorkItem(workItem);
        }

        /// <summary>
        /// Add new task as <see cref="Action" />.
        /// </summary>
        /// <param name="action">Represent task.</param>
        /// <param name="priority">Task priority.</param>
        /// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
        public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
        {
            IWorkItem workItem = WorkItem.FromAction(action, priority);
            AddWorkItem(workItem);
        } 

工作线程

WorkThread 类执行任务项并提供日志记录。

        private void DoWork()
        {
            while (_isRun)
            {
                try
                {
                    IWorkItem workItem = _taskQueueController.Dequeue();
                    if (workItem == null)
                    {
                        continue;
                    }
                    ProcessItem(workItem);
                }
                catch (Exception ex)
                {
                    _log.Error(ex);
                }
            }
        } 

线程池可扩展性 

如果您需要更强大的任务队列,则需要实现 ITaskQueue,并且不用担心线程安全:您也可以创建自己的 ITaskQueueController。  

示例

我们使用自定义设置创建 TinyThreadPoolSampleTask 派生自 ITaskItem,请参见下面的详细信息。

internal class Program
{
    private static ITinyThreadPool _threadPool;

    private static void AddTasks()
    {
        for (int taskIndex = 0; taskIndex < 50; taskIndex++)
        {
            _threadPool.AddTask(new SampleTask(taskIndex));
        }
    }

    private static void Main()
    {
        // create default TinyThreadPool instance or thru method TinyThreadPool.Create
        // _threadPool = TinyThreadPool.Default;

        _threadPool = TinyThreadPool.Create(x =>
        {
            x.Name = "My ThreadPool";
            x.MinThreads = 2;
            x.MaxThreads = 10;
            x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
        });

        AddTasks();
        Console.ReadKey();
    }

    private sealed class SampleTask : ITaskItem
    {
        private readonly int _taskIndex;

        public SampleTask(int taskIndex)
        {
            _taskIndex = taskIndex;
        }

        public void DoWork()
        {
            Thread.Sleep(100);
            Console.WriteLine("Task {0} has been finished", _taskIndex);
        }
    }
}

历史

  • 2008 年 6 月 29 日:初始版本。
  • 2008 年 7 月 2 日
    • + TransactionalMsmqTaskItemDefaultTaskQueue
    • * TaskQueueController 类已添加可处置支持。
    • * WorkThread 更新了停止逻辑。
    • * 示例项目: 
      • CoreDefaultMsmqSample 使用 Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue(有关更多详细信息,请参阅 *App.config* 文件)。
      • CoreDefaultSample 使用 CoreDefaultSample.TaskQueue(有关更多详细信息,请参阅 *App.config* 文件)。
  • 2008 年 7 月 10 日
    • 添加了 Mike.Strobel 的建议。
    • + ActionTaskItem
    • * ExtendedThreadPool 添加了 AddTask(Action action)AddTask(Action action, ThreadPriority priority) 方法。
    • 添加了更多测试
  • 2009 年 10 月 2 日
    • + PriorityTaskQueue
    • + StatisticController
    • * ExtendedThreadPool 添加了对 StatisticControllerMultiThreadingCapacityType 的支持。
    • 添加了对 Unity 1.2 的支持
    • 添加了 CorePrioritySample。该项目使用 Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue(有关更多详细信息,请参阅 *App.config* 文件)。
    • 添加了更多测试 
  • 2013 年 4 月 7 日 
    •  ExtendedThreadPool  v2
    •  已移除 Unity
    •  ExtendedThreadPool 的创建已简化 
  • 2015 年 4 月 22 日
    • ExtendedThreadPool -> TinyThreadPool
    • TinyThreadPool 基于 lambda 的新创建 API 
    • TinyThreadPoolNelibur.Sword NuGet 程序包 的一部分
© . All rights reserved.