扩展线程池






4.98/5 (23投票s)
您自己的可扩展且可配置的线程池。
引言
市面上提供各种类型的线程池:智能线程池、BlackHen.Threading,但没有一个支持扩展。所有线程池都是硬编码的,使用起来非常困难。我试图解决这些问题
- 线程池应该是可扩展和可配置的
- 线程池应该尽可能简单
因此,我创建了一个微型线程池。这个线程池是用 C# 4.0 编写的。
微型线程池特性
- 非常简单
- 可扩展队列
- 可扩展任务项
- 最大工作线程数限制
- 动态线程工作者
- 任务优先级支持
- 可扩展日志记录
微型线程池设计
ITaskItem
表示一个任务ITaskQueue
表示任务队列逻辑ITaskQueueController
表示生产者和消费者之间的通信逻辑(线程安全)WorkThread
表示一个线程工作者TinyThreadPool
控制工作线程
让我们更深入地了解每个类
ITaskItem
表示应该完成的某些工作。
public interface ITaskItem
{
void DoWork();
}
TaskItemPriority
- 可以为每个任务指定WorkThread
的优先级。ITaskQueue
是另一个简单的接口,用于管理任务队列。
/// <summary>
/// Represent the queue.
/// </summary>
public interface ITaskQueue
{
/// <summary>
/// Count of work item.
/// </summary>
int Count { get; }
/// <summary>
/// Dequeue the work item.
/// </summary>
/// <returns>The work item.</returns>
IWorkItem Dequeue();
/// <summary>
/// Enqueue the work item.
/// </summary>
/// <param name="item">The work item.</param>
void Enqueue(IWorkItem item);
}
ITaskQueueController
提供生产者和消费者之间的通信逻辑。
public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
}
我已经实现了两个派生自ITaskQueueController
的任务队列控制器
DefaultTaskQueueController
BoundedTaskQueueController
默认任务队列控制器
DefaultTaskQueueController
是 ITaskQueue
的线程安全包装器
public sealed class DefaultTaskQueueController : TaskQueueController
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
protected override IWorkItem DequeueCore()
{
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
return _taskQueue.Dequeue();
}
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
有界任务队列控制器
BoundedTaskQueueController
(线程安全) - 如果生产者任务或任务创建项目的速度快于消费者处理它们的速度,系统可能会导致内存使用无限制。BoundedTaskQueueController
允许您在队列达到某个限制之前强制生产者阻塞。
public sealed class BoundedTaskQueueController : TaskQueueController
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
{
throw new ArgumentException("MaxTasksCount should be greater 0");
}
_maxTasksCount = maxTasksCount;
}
protected override IWorkItem DequeueCore()
{
IWorkItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
return taskItem;
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
微型线程池
TinyThreadPool
管理 ITaskQueueController
。TinyThreadPool
是通过 Create
方法创建的,例如
var threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
所有 TinyThreadPool
的属性都有默认值,因此创建 TinyThreadPool
最简单的方法是调用 Default
属性。
var threadPool = TinyThreadPool.Default;
MultiThreadingCapacityType
表示线程容量
public enum MultiThreadingCapacityType
{
/// <summary>
/// Represent all processors
/// </summary>
Global,
/// <summary>
/// Represent one processor
/// </summary>
PerProcessor
}
AddTask
方法用于将任务添加到任务管道。如果尚未达到最大线程限制且 ConsamersWaiting = 0
,则会创建一个新的 WorkThread
。
您可以添加带有 TaskItemPriority
的任务。请注意,DefaultTaskQueue
不验证 TaskItemPriority
,请使用 PriorityTaskQueue
处理优先级任务
/// <summary>
/// Add new task.
/// </summary>
/// <param name="taskItem">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
AddWorkItem(workItem);
}
/// <summary>
/// Add new task as <see cref="Action" />.
/// </summary>
/// <param name="action">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromAction(action, priority);
AddWorkItem(workItem);
}
工作线程
WorkThread
类执行任务项并提供日志记录。
private void DoWork()
{
while (_isRun)
{
try
{
IWorkItem workItem = _taskQueueController.Dequeue();
if (workItem == null)
{
continue;
}
ProcessItem(workItem);
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
线程池可扩展性
如果您需要更强大的任务队列,则需要实现 ITaskQueue
,并且不用担心线程安全:您也可以创建自己的 ITaskQueueController
。
示例
我们使用自定义设置创建 TinyThreadPool
。SampleTask
派生自 ITaskItem
,请参见下面的详细信息。
internal class Program
{
private static ITinyThreadPool _threadPool;
private static void AddTasks()
{
for (int taskIndex = 0; taskIndex < 50; taskIndex++)
{
_threadPool.AddTask(new SampleTask(taskIndex));
}
}
private static void Main()
{
// create default TinyThreadPool instance or thru method TinyThreadPool.Create
// _threadPool = TinyThreadPool.Default;
_threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
AddTasks();
Console.ReadKey();
}
private sealed class SampleTask : ITaskItem
{
private readonly int _taskIndex;
public SampleTask(int taskIndex)
{
_taskIndex = taskIndex;
}
public void DoWork()
{
Thread.Sleep(100);
Console.WriteLine("Task {0} has been finished", _taskIndex);
}
}
}
历史
- 2008 年 6 月 29 日:初始版本。
- 2008 年 7 月 2 日
- +
TransactionalMsmqTaskItem
、DefaultTaskQueue
。 - *
TaskQueueController
类已添加可处置支持。 - *
WorkThread
更新了停止逻辑。 - * 示例项目:
- CoreDefaultMsmqSample 使用
Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue
(有关更多详细信息,请参阅 *App.config* 文件)。 - CoreDefaultSample 使用
CoreDefaultSample.TaskQueue
(有关更多详细信息,请参阅 *App.config* 文件)。
- CoreDefaultMsmqSample 使用
- +
- 2008 年 7 月 10 日
- 添加了 Mike.Strobel 的建议。
- +
ActionTaskItem
。 - *
ExtendedThreadPool
添加了AddTask(Action action)
和AddTask(Action action, ThreadPriority priority)
方法。 - 添加了更多测试
- 2009 年 10 月 2 日
- +
PriorityTaskQueue
。 - +
StatisticController
。 - *
ExtendedThreadPool
添加了对StatisticController
、MultiThreadingCapacityType
的支持。 - 添加了对 Unity 1.2 的支持
- 添加了 CorePrioritySample。该项目使用
Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue
(有关更多详细信息,请参阅 *App.config* 文件)。 - 添加了更多测试
- +
- 2013 年 4 月 7 日
-
ExtendedThreadPool
v2 - 已移除 Unity
-
ExtendedThreadPool
的创建已简化
-
- 2015 年 4 月 22 日
ExtendedThreadPool
->TinyThreadPool
TinyThreadPool
基于 lambda 的新创建 APITinyThreadPool
是Nelibur
.Sword
NuGet 程序包 的一部分