创建自定义 C# 线程池






4.83/5 (13投票s)
一种初学者的创建自定义线程池的方法,以理解多线程和同步的概念。
引言
首先,高效地使用线程池并非易事——并不是你今天开始编程,明天就能精通这项艺术。即使是高手(或者用我们的话来说,他们喜欢自称为“极客”!)也可能在那里犯错,结果是大量的线程导致性能缓慢、系统响应迟钝、死锁、活锁,最后甚至让你产生拔掉电源线将系统浸入浴缸的冲动。等等……有人告诉我,在这种情况下,慢慢地从五数到一……是的;如果这有帮助,你可以重复五次——之后,事情就会得到控制——相信我——会如此。
好的——所以在这里我不会详细阐述我们如何利用线程或多线程来为我们服务;我们
会稍微谈一下,但不会深入。我这里主要要做的是——创建我自己的线程池,它将为我提供一种使用多个线程异步地做事的方式。我将能够控制池中线程的最大数量和最小数量。框架的线程池也能做到这一点,而且可能比我的池做得更好——但这并不能阻止我创建这个。我的想法是深入了解线程池的核心,它内部处理的复杂性,它带来的强大功能,以及最终它暴露出来的可能导致系统崩溃的漏洞。
根据我目前的经验,我遇到过许多开发者,尽管他们可能已经多次使用过线程池,但他们并没有正确理解这个概念。我主要针对的是这些人——一起来看看这到底是什么。
所以,这是我将要讲解的内容列表;
- 线程背景——简要描述
- 多线程及其误解
- 同步和临界区
- 对象池模式以实现有效的资源利用
- Microsoft .NET Framework 提供的线程池
- 创建自定义线程池的想法和期望
- 代码逐步讲解
- 结束语
线程背景——简要描述
嗯——这并不是一个可以轻易定义的概念,但我正试图在保持合理和简洁的同时,在概念上涵盖一切。让我们看看接下来会怎样。
计算机的核心部分是中央处理器(CPU),它包含处理器/处理器、RAM、寄存器等。当我们启动系统时——操作系统会被加载并在处理器上运行。操作系统是基于计算机架构(如 x-86 系列)编写的。操作系统负责让其他应用程序加载/运行。它暴露了使用系统中可用资源的 API——例如内存请求、处理器请求等。执行程序需要处理器时间。对于单处理器系统,实际上只有一个执行单元,因此在任何时刻只能运行一件事。在 DOS 时代,在 Windows 之前,你一次只能运行一件事。如果文档正在打印,你就只能等着它完成。最糟糕的是——如果在过程中出现问题,你就不得不重新启动系统。Windows 3.1 也不是一个完整的操作系统。它当时是一个操作系统环境——意味着它提供了一个图形用户界面,在那里像 Word、Excel 和 PowerPoint 这样的 Windows 应用程序可以一起运行。这得益于一个新概念——进程。进程是处理器上的一种映射。每个应用程序在进程下启动。这是一个巨大的成就,因为它提供了隔离。现在,内存会为每个进程单独分配,因为它们不共享,所以一个进程不可能破坏分配给另一个进程的内存。隔离对于安全原因非常必要,并且这是一项巨大的成就。
现在下一个障碍是 CPU。CPU 不像内存那样可以被这样分割。如果一个进程陷入无限循环,那么其他进程就必须永远等待。即使是操作系统也对此无能为力。在进程不释放 CPU 的情况下,其他进程实际上什么也做不了。
需要某种方法来分割 CPU——这就是一种新方法——线程的出现。
线程是 Windows 的一个概念——它是处理器的虚拟化。
进程就是一个正在运行的程序——在运行时,它在任何时刻都有一定的状态。随着它的进展,状态不断变化。状态信息非常重要——没有它,运行程序就没有意义。就像一个方法是用某些参数调用的——那么参数对于方法正确计算/运行是必不可少的。参数会被加载到寄存器或推入堆栈,以便处理器轻松读取。结果也必须写回寄存器,以便调用代码可以访问。这种协作/协调由 CPU 架构定义,并且将始终如此。在任何时刻,寄存器中加载的值以及 CPU 依赖的其他任何东西就是我们所说的状态。它更广为人知的名称是上下文。
那么虚拟化在这里能如何提供帮助呢?
操作系统运行一个调度程序,其工作是提供虚拟化。每个启动的程序(或者我们可以说进程的开始)都会被分配一个线程——该线程表现得像一个处理器。它的整个上下文被加载到物理寄存器中,然后它开始执行。然后很快它就停止了——调度程序停止了它——进程实际上并没有退出——它只是进入了暂停模式。然后调度程序会记录上下文,并将其写回分配给线程的内存中(每个线程默认分配 1 MB 用于这些目的)。它会查找其他处于暂停状态的线程,然后将线程本地内存中记录的上下文加载到寄存器中,并要求处理器重新开始。很快它又会被暂停,并以相同的轮询方式,所有进程的线程都会获得执行各自任务的机会。CPU 的速度非常快,而且这种时间切片发生得如此迅速,以至于通常几个活动的进程永远不会发现在任何时刻它们处于等待状态。
这是一个令人惊叹的发展。我们可以发送一个巨大的文档进行打印,然后要求网站下载一个文件,然后要求 Visual Studio 构建我们的巨大代码库,而在所有这些发生的同时,我可以翻阅我的相册并听一首歌,所有这些都在同一时间。到目前为止很棒。多线程及其误解
直到多个进程单线程运行时,我们才获得了好处。同步问题也不会持续存在,因为进程在不同的地址空间中工作。它提供了急需的隔离——现在如果一个进程因任何原因崩溃——影响将仅限于该进程。不需要重新启动,因为其他进程仍然会获得 CPU 时间切片。这是线程最初出现的主要原因,而且它做得非常好。这就是多任务处理。多个任务并行运行。
现在,随着用户期望的不断提高和处理器速度的不断提升,使应用程序更具响应性的负担已经转移到软件方面——单线程方法仍然浪费了很多 CPU 时间——导致整体利用率下降。如何更有效地利用它?而线程就是答案。
如果有五个单线程应用程序和一个使用多线程的应用程序,那么与提供平均切片的所有线程相比,后者将获得更多的 CPU 时间。所以,作为超级智能的开发者,我应该尽可能多地创建线程,以便我的应用程序获得更多 CPU 时间,事情也能更快完成。然后,突然间,每个人都变得聪明起来,结果是,如果你打开 Visual Studio,然后是 Word、Excel 和 Outlook——然后打开 Windows 任务管理器,你可以在下面的截图(来自我的电脑)中看到——这里有高达 1254 个线程和 86 个进程在运行。占用了 1.47 GB 的 RAM。线程的创建方式存在严重问题。尽管有所有这些,CPU 利用率仍然只有 3%。
现在想象一个情况,一个线程真的需要 CPU 时间进行一些广泛的计算。根据切片规则,由于有成千上万个线程处于活动状态,它将无法获得足够的 CPU 时间来完成。如果计算的性质是同步的,那么只有一个线程可以执行。在这种情况下,我们的应用程序创建的多个线程实际上会干扰 CPU 密集型线程,从而增加了整体执行时间。我们最初的想法是为我们的应用程序争取更多 CPU 时间,在这里却彻底失败了,所以疯狂地创建线程实际上是有害的,而且它的危害方式比你想象的要多。
创建多个线程并不能简单地提高 CPU 利用率。这主要是我们如何有效地设计我们的应用程序——这才是关键。它会引入很多其他复杂性。这并不意味着我们永远不应该这样做——我们应该这样做——但前提是我们理解我们正在做什么以及它将如何帮助或浪费现有资源。
一句话——创建多个线程并不能简单地提高应用程序的性能——反而会降低整个系统的效率,因为它会促进资源的低效利用。.
同步和临界区处理
很多时候,为一项耗时且有明确时限的任务创建一个新线程是有意义的——比如进行网络请求。由于涉及许多因素,其时间是不可预测的。在许多情况下,用户会失去耐心,因此整体体验留下的印象并不好。为了改变这种情况——我们将这些不可预测的任务交给另一个线程来处理,而当它
完成时,用户仍然可以进行 UI 的其他操作。
这个想法很好,但它带来了一个需要我们关注的问题。为了理解这个问题,让我们举一个例子,我们下载一个公司的提要并将其存储在一个临时文件中。每次这样做时,我们都会以追加模式写入现有文件,这样就不会在内存中堆积太多文件。我们有一个带有文本框的表单,我们在其中提供公司股票代码,例如 IBM、YHOO 和 TRI。然后有一个“上传”按钮。它会为该股票代码启动一个网络请求,然后提要将被下载到用户系统上的临时文件中。由于这需要时间,我们会在一个线程中进行。假设下载需要 30 秒,然后写入文件需要 20 秒。
用户为 IBM 发出下载请求,并在五秒钟内为 TRI 发出另一个请求。情况可以用以下图示表示。
并行发送两个请求是可以的。Web 服务会按预期给出结果。但现在两者都会尝试写入临时文件。如果允许此类操作,我们将在文件中写入损坏的值,因为一个写入的内容会与另一个提要混合。所以这里的文件写入需要同步进行。换句话说,当一个线程更新临时文件时——所有其他线程都应该等待它完成。临时文件是热点或临界区。它是许多潜在竞争线程高需求资源的组成部分。即使正在并行运行的线程,对临界区的访问也必须以同步方式进行。
为此,有一些构造,如 Monitor 和 lock,可以确保只有一个线程进入临界区。
在我们进一步进行同步之前,让我们转过头来看看对象池模式。对象池模式以实现有效的资源利用
从 MSDN 网站上找到的一条相关说明开始
实际工作或获取资源
如果您有一个组件,客户端会简短而频繁地使用它,其中对象使用时间的大部分时间都花在获取资源或初始化上,然后再为客户端执行特定工作,那么编写组件使用对象池很可能会为您带来巨大好处。
有些对象在创建和初始化时可能需要很长时间。如果您经常处理此类资源,那么在任务完成后让创建的对象保持活动状态是有意义的。如果再次需要新对象,那么不要创建一个新的,只需重置现有对象并将其交出。此模式旨在通过有效利用资源来提高性能。有几种做法——其中一种流行的方法可以总结如下:
- 对象在创建时成本很高,所以最好预先准备好使用它们——达到某个最小限度。
- 有一个池——一个列表,其中准备好的对象将随时可用。
- 池中的对象应该有一个上限——这样它就不会超出某个限制并引入与之相关的其他问题。设置最大限制。
- 如果一段时间内未使用的对象,则应释放池或将其降低到最低水平。
- 监控池的性能并相应地调整大小。
这些是创建对象池的基本要点——该模式很简单,有时非常有效。
Framework 提供的 .NET 线程池
由于创建线程成本很高,因此为它提供一个池是有意义的。另一方面,它将保证它不会超出某个限制。它的使用非常普遍,以至于 Microsoft 自己在System.Threading
命名空间下创建了一个线程池类,可以用于执行异步操作,而不是让开发者自己去实现。它是一个静态类,暴露了以下方法以及其他一些方法。
public static void GetMaxThreads(out int workerThreads, out int completionPortThreads);
public static void GetMinThreads(out int workerThreads, out int completionPortThreads);
public static bool QueueUserWorkItem(WaitCallback callBack);
public static bool QueueUserWorkItem(WaitCallback callBack, object state);
public static bool SetMaxThreads(int workerThreads, int completionPortThreads);
public static bool SetMinThreads(int workerThreads, int completionPortThreads);
WaitCallback
是一个委托,类似于ParameterizedThreadStart
public delegate void WaitCallback(object state);
public delegate void ParameterizedThreadStart(object obj);
在这里,我们看到了设置上限和下限的方法。我们应该将这个决定留给框架,因为它会根据系统参数进行正确设置。我们有 QueUserWorkItem
方法,它接受一个 WaitCallback
类型的任务。这是用户任务进入托管线程池的入口点。当池中有一个空闲线程时,它会被调用。
在将任务委托给线程时使用 ThreadStart 对一样。这是一个非常有用的类,框架本身也使用它来实现它提供的具有
BeginInvoke
和EndInvoke
系列方法的异步回调模式,这些方法可用于委托和一些其他类。创建自定义线程池的想法和期望
网上有很多关于线程池的信息和教程。我们现在就不深入研究了——取而代之的是,我们将创建我们自己的自定义线程池,并包含最少量的东西供我们的演示使用。我们将
逐步讲解它带来的内部复杂性。其中一些与对象池模式相关,一些与同步相关。我们将在遇到时看到。
然后,我们将进一步提供一项功能——取消任务;这里也会有一些复杂性。我们还将提供一种方法,在任务完成时获得回调通知。这并非必需,因为用户可以轻松地使用 lambda 表达式来实现这一点——但我们将这样做,只是为了更好地理解。所以,这是项目列表。
- 创建一个自定义线程池,并设置所需的变量/常量。
- 定义线程池的公共接口,以便可以使用它。
- 公开一种回调通知方式。
- 提供一种取消已请求任务的方式。
- 期望——如果没有任务执行,它不会维护超过 MIN 限制的线程。
- 它不会在池中创建超过 MAX 限制的线程。
- 我们还将实现一项功能——如果我们将一个任务标记为简单任务,那么在它花费的时间超过预期执行时间后取消该任务。
再说一点——我将使用单例实现我的池,而不是静态实现——这是我的首选方法。我没有对静态还是单例更好做太多分析;
代码逐步讲解
创建单例自定义线程池并在其中定义一些常量
public class CustomThreadPool
{
//#region configurable items - for demo let's have these as constants
private const int MAX = 8; // maximum no of threads in pool
private const int MIN = 3; // minimum no of threads in pool
private const int MIN_WAIT = 10; // milliseconds
private const int MAX_WAIT = 15000; // milliseconds - threshold for simple task
private const int CLEANUP_INTERVAL = 60000; // millisecond - to free waiting threads in pool
private const int SCHEDULING_INTERVAL = 10; // millisecond - look for task in queue in loop
//#endregion
//#region singleton instance of threadpool
private static readonly CustomThreadPool _instance = new CustomThreadPool();
private CustomThreadPool() {
InitializeThreadPool();
}
public static CustomThreadPool Instance
{
get
{
return _instance;
}
}
//#endregion
private void InitializeThreadPool() {
//TODO: write initialization code here
}
}
现在我将定义基本类型,以便与池进行通信
public delegate void UserTask();
public class ClientHandle
{
public Guid ID;
public bool IsSimpleTask = false;
}
public class TaskStatus
{
public bool Success = true;
public Exception InnerException = null;
}
UserTask
是一个 void 委托,代表要由线程池线程执行的用户任务。它类似于框架提供的 Action。使用 lambda 表达式,现在可以将任何方法调用封装在此 UserTask
中。我们将在开始使用此线程池时看到这一点。
接下来是线程池的公共接口。将这些公共方法添加到 CustomThreadPool
类中。
//#region public interface
public ClientHandle QueueUserTask(UserTask task, Action<TaskStatus> callback)
{
throw new Exception("not implemented yet.");
}
public static void CancelUserTask(ClientHandle handle)
{
//TODO: write implementation code here
}
//#endregion
添加一些私有的嵌套类供我们的线程池内部使用
//#region nested private types
enum TaskState // to represent current state of a usertask
{
notstarted,
processing,
completed,
aborted
}
class TaskHandle // Item in waiting queue
{
public ClientHandle Token; // generate this everytime an usertask is queued and return to the caller as a reference.
public UserTask task; // the item to be queued - supplied by the caller
public Action<TaskStatus> callback; // optional - in case user want's a notification of completion
}
class TaskItem // running items in the pool - TaskHandle gets a thread to execute it
{
public TaskHandle taskHandle;
public Thread handler;
public TaskState taskState = TaskState.notstarted;
public DateTime startTime = DateTime.MaxValue;
}
//#endregion
现在我们必须创建 UserTask
的队列和一个线程池。我们还添加了一些初始化代码:
//private instance members
private Queue<TaskHandle> ReadyQueue = null;
private List<TaskItem> Pool = null;
private Thread taskScheduler = null;
private void InitializeThreadPool()
{
ReadyQueue = new Queue<TaskHandle>();
Pool = new List<TaskItem>();
taskScheduler = new Thread(() =>
{
//TODO: write scheduling logic here
});
taskScheduler.Start();
}
这里要记住的重要一点是 taskScheduler
。这是一个额外的线程,它将在线程池的整个生命周期中运行。它的工作是监控队列中的 UserTask
,并尽快通过任何空闲线程将它们用于执行。它还负责强制执行最小和最大限制。定期执行清理活动。它是拥有整个池功能责任的主线程。
现在让我们用接近完整的算法来实现这个初始化。
private void InitializeThreadPool()
{
ReadyQueue = new Queue<TaskHandle>();
Pool = new List<TaskItem>();
InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready
DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity
taskScheduler = new Thread(() =>
{
do
{
while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null)
ReadyQueue.Dequeue();
// remove cancelled item/s - cancelled item will have it's task set to null
int itemCount = ReadyQueue.Count;
for (int i = 0; i < itemCount; i++)
{
TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue
bool Added = false;
foreach (TaskItem ti in Pool)
{
if (ti.taskState == TaskState.completed)
{
// if in the Pool task state is completed then a different
// task can be handed over to that thread
ti.taskHandle = readyItem;
ti.taskState = TaskState.notstarted;
Added = true;
ReadyQueue.Dequeue();
break;
}
}
if (!Added && Pool.Count < MAX)
{
// if all threads in pool are busy and the count is still less than the
// Max limit set then create a new thread and add that to pool
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = readyItem;
// add a new TaskItem in the pool
AddTaskToPool(ti);
Added = true;
ReadyQueue.Dequeue();
}
if (!Added) break; // It's already crowded so try after sometime
}
if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL))
// It's long time - so try to cleanup Pool once.
{
CleanupPool();
LastCleanup = DateTime.Now;
}
else
{
// either of these two can work - the combination is also fine for our demo.
Thread.Yield();
Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change.
// the wait should be minimal - close to zero
}
} while (true);
});
taskScheduler.Priority = ThreadPriority.AboveNormal;
taskScheduler.Start();
}
private void InitPoolWithMinCapacity()
{
for (int i = 0; i <= MIN; i++)
{
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = new TaskHandle() { task = () => { } };
ti.taskHandle.callback = (taskStatus) => { };
ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() };
AddTaskToPool(ti);
}
}
private void AddTaskToPool(TaskItem taskItem)
{
taskItem.handler = new Thread(() =>
{
do
{
bool Enter = false;
// if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
// this state means it has been removed from Pool already.
if (taskItem.taskState == TaskState.aborted) break;
if (taskItem.taskState == TaskState.notstarted)
{
taskItem.taskState = TaskState.processing;
taskItem.startTime = DateTime.Now;
Enter = true;
}
if (Enter)
{
TaskStatus taskStatus = new TaskStatus();
try
{
taskItem.taskHandle.task.Invoke(); // execute the UserTask
taskStatus.Success = true;
}
catch (Exception ex)
{
taskStatus.Success = false;
taskStatus.InnerException = ex;
}
if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted)
{
try
{
taskItem.taskState = TaskState.completed;
taskItem.startTime = DateTime.MaxValue;
taskItem.taskHandle.callback(taskStatus); // notify callback with task-status
}
catch
{
}
}
}
// give other thread a chance to execute as it's current execution completed already
Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here
} while (true); // it's a continuous loop until task gets abort request
});
taskItem.handler.Start();
Pool.Add(taskItem);
}
private void CleanupPool()
{
throw new NotImplementedException();
}
让我们也实现 QueueUserTask
public ClientHandle QueueUserTask(UserTask task, Action<taskstatus> callback)
{
TaskHandle th = new TaskHandle()
{
task = task,
Token = new ClientHandle()
{
ID = Guid.NewGuid()
},
callback = callback
};
ReadyQueue.Enqueue(th);
return th.Token;
}
此实现一切正常——但是我们遗漏了一件重要的事情;我们没有处理同步。这里有一些资源是多个资源共享的,例如 ReadyQueue
。调用者可以添加一个项目,调度程序可以同时尝试从中出队。在这种情况下,结果将是不可预测的。
所以,现在第一件事是识别被多个线程共享的资源;定义私有同步对象用于锁定这些资源。如果需要多个锁,需要避免可能导致死锁情况的构造。而且用户任务不得持有池内定义的任何锁——因为那样的话,池可能会被任何任务长时间阻塞;因此必须阻止这种情况。
我将提供完整的实现,而不是逐行解释代码,希望它能不言自明。
// Locks
private object syncLock = new object();
private object criticalLock = new object();
public ClientHandle QueueUserTask(UserTask task, Action<taskstatus> callback)
{
TaskHandle th = new TaskHandle()
{
task = task,
Token = new ClientHandle()
{
ID = Guid.NewGuid()
},
callback = callback
};
lock (syncLock) // main-lock - will be used for accessing ReadyQueue always
{
ReadyQueue.Enqueue(th);
}
return th.Token;
}
将任务添加到池中-
private void AddTaskToPool(TaskItem taskItem)
{
taskItem.handler = new Thread(() =>
{
do
{
bool Enter = false;
lock (taskItem)
// the taskState of taskItem is exposed to scheduler
// thread also so access that always with this lock
{ // Only two thread can contend for this [cancel and executing
// thread as taskItem itself is is mapped to a dedicated thread]
// if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
// this state means it has been removed from Pool already.
if (taskItem.taskState == TaskState.aborted) break;
if (taskItem.taskState == TaskState.notstarted)
{
taskItem.taskState = TaskState.processing;
taskItem.startTime = DateTime.Now;
Enter = true;
}
}
if (Enter)
{
TaskStatus taskStatus = new TaskStatus();
try
{
taskItem.taskHandle.task.Invoke(); // execute the UserTask
taskStatus.Success = true;
}
catch (Exception ex)
{
taskStatus.Success = false;
taskStatus.InnerException = ex;
}
lock (taskItem) // Only two thread can contend for this [cancel and executing
// thread as taskItem itself is is mapped to a dedicated thread]
{
if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted)
{
try
{
taskItem.taskState = TaskState.completed;
taskItem.startTime = DateTime.MaxValue;
taskItem.taskHandle.callback(taskStatus); // notify callback with task-status
}
catch
{
// supress exception
}
}
}
}
// give other thread a chance to execute as it's current execution completed already
Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here
} while (true); // it's a continuous loop until task gets abort request
});
taskItem.handler.Start();
lock (criticalLock) // always use this lock for Pool
{
Pool.Add(taskItem);
}
}
初始化线程池 -
private void InitializeThreadPool()
{
ReadyQueue = new Queue<TaskHandle>();
Pool = new List<TaskItem>();
InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready
DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity
taskScheduler = new Thread(() =>
{
do
{
lock (syncLock) // obtaining lock for ReadyQueue
{
while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null)
ReadyQueue.Dequeue(); // remove cancelled item/s - cancelled item will have it's task set to null
int itemCount = ReadyQueue.Count;
for (int i = 0; i < itemCount; i++)
{
TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue
bool Added = false;
lock (criticalLock) // lock for the Pool
{
foreach (TaskItem ti in Pool) // while reading the pool another thread should not add/remove to that pool
{
lock (ti) // locking item
{
if (ti.taskState == TaskState.completed)
{ // if in the Pool task state is completed then a different task can be handed over to that thread
ti.taskHandle = readyItem;
ti.taskState = TaskState.notstarted;
Added = true;
ReadyQueue.Dequeue();
break;
}
}
}
if (!Added && Pool.Count < MAX)
{ // if all threads in pool are busy and the count is still less than the Max
// limit set then create a new thread and add that to pool
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = readyItem;
// add a new TaskItem in the pool
AddTaskToPool(ti);
Added = true;
ReadyQueue.Dequeue();
}
}
if (!Added) break; // It's already crowded so try after sometime
}
}
if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL))
// It's long time - so try to cleanup Pool once.
{
CleanupPool();
LastCleanup = DateTime.Now;
}
else
{
Thread.Yield(); // either of these two can work - the combination is also fine for our demo.
Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change.
// the wait should be minimal - close to zero
}
} while (true);
});
taskScheduler.Priority = ThreadPriority.AboveNormal;
taskScheduler.Start();
}
取消用户任务 -
public static void CancelUserTask(ClientHandle clientToken)
{
lock (Instance.syncLock)
{
var thandle = Instance.ReadyQueue.FirstOrDefault((th) => th.Token.ID == clientToken.ID);
if (thandle != null) // in case task is still in queue only
{
thandle.task = null;
thandle.callback = null;
thandle.Token = null;
}
else // in case theread is running the task - try aborting the thread to cancel the operation (rude behavior)
{
int itemCount = Instance.ReadyQueue.Count;
TaskItem taskItem = null;
lock (Instance.criticalLock)
{
taskItem = Instance.Pool.FirstOrDefault(task => task.taskHandle.Token.ID == clientToken.ID);
}
if (taskItem != null)
{
lock (taskItem) // only item need the locking
{
if (taskItem.taskState != TaskState.completed)
// double check - in case by the time this lock obtained callback already happened
{
taskItem.taskState = TaskState.aborted;
taskItem.taskHandle.callback = null; // stop callback
}
if (taskItem.taskState == TaskState.aborted) // this does not need criticalLock
{
try
{
taskItem.handler.Abort(); // **** it does not work ****
taskItem.handler.Priority = ThreadPriority.BelowNormal;
taskItem.handler.IsBackground = true;
}
catch { }
}
}
}
}
}
}
清理池
private void CleanupPool()
{
List<TaskItem> filteredTask = null;
lock (criticalLock) // aquiring lock for Pool
{
filteredTask = Pool.Where(ti => ti.taskHandle.Token.IsSimpleTask == true &&
(DateTime.Now - ti.startTime) > TimeSpan.FromMilliseconds(MAX_WAIT)).ToList();
}
foreach (var taskItem in filteredTask)
{
CancelUserTask(taskItem.taskHandle.Token);
}
lock (criticalLock)
{
filteredTask = Pool.Where(ti => ti.taskState == TaskState.aborted).ToList();
foreach (var taskItem in filteredTask) // clean all aborted thread
{
try
{
taskItem.handler.Abort(); // does not work
taskItem.handler.Priority = ThreadPriority.Lowest;
taskItem.handler.IsBackground = true;
}
catch { }
Pool.Remove(taskItem);
}
int total = Pool.Count;
if (total >= MIN) // clean waiting threads over minimum limit
{
filteredTask = Pool.Where(ti => ti.taskState == TaskState.completed).ToList();
foreach (var taskItem in filteredTask)
{
taskItem.handler.Priority = ThreadPriority.AboveNormal;
taskItem.taskState = TaskState.aborted;
Pool.Remove(taskItem);
total--;
if (total == MIN) break;
}
}
while (Pool.Count < MIN)
{
TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
ti.taskHandle = new TaskHandle() { task = () => { } };
ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() };
ti.taskHandle.callback = (taskStatus) => { };
AddTaskToPool(ti);
}
}
}
注意事项
看看 CancelUserTask
的实现。如果任务仍在队列中,只需将其置空即可,因为当它被执行时,调度程序会看到 null 并简单地丢弃它。但如果它已经在运行,那么我们正在尝试中止线程。
try
{
taskItem.handler.Abort(); // **** it does not work ****
taskItem.handler.Priority = ThreadPriority.BelowNormal;
taskItem.handler.IsBackground = true;
}
catch { }
中止不像我们想象的那样工作,原因是如果该线程持有锁,那么该锁根本不会被释放,其他线程将一直等待它;这将阻碍系统。所以调用 abort 一个线程(除了它自己)并没有带来任何结果。取而代之的是,我们将线程降级并使其成为后台线程。我们只是确保这个线程不占用太多 CPU 时间。我们将其从池中移除。该线程将继续执行其任务,但不会过多地干扰其他线程。任务完成后,它将自然死亡。
此逻辑已在 AddTaskToPool
中实现。
lock (taskItem) // the taskState of taskItem is exposed to scheduler thread also so access that always with this lock
{
// Only two thread can contend for this [cancel and executing thread as taskItem itself is is mapped to a dedicated thread]
// if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
// this state means it has been removed from Pool already.
if (taskItem.taskState == TaskState.aborted) break;
if (taskItem.taskState == TaskState.notstarted)
{
taskItem.taskState = TaskState.processing;
taskItem.startTime = DateTime.Now;
Enter = true;
}
}
当 TaskState
为已中止时,我们正在中断循环——这样线程会立即退出并释放它占用的资源。
记住 ClientHandle
类中的 IsSimpleTask
属性。如果我们通过 IsSimpleTask
属性为 true 来更改在队列化任务后,CleanupPool
将取消此类操作,如果它们花费的时间太长。
lock (criticalLock) // aquiring lock for Pool
{
filteredTask = Pool.Where(ti => ti.taskHandle.Token.IsSimpleTask == true &&
(DateTime.Now - ti.startTime) > TimeSpan.FromMilliseconds(MAX_WAIT)).ToList();
}
foreach (var taskItem in filteredTask)
{
CancelUserTask(taskItem.taskHandle.Token);
}
说实话,这个功能并不是很有趣,因为线程并没有被中止。
锁——它们足够好吗,还是有死锁的漏洞?
让我们分析一下——这里是所有使用 lock 的方法以及它们使用锁的顺序。
方法 | 锁 |
|
|
|
|
|
|
|
|
|
|
没有可以与其他链冲突的锁链。链会发生冲突,例如一个线程获得syncLock
,另一个线程获得criticalLock
;现在第一个线程想要criticalLock
,第二个线程想要syncLock
。在这种情况下,就会发生死锁——但幸运的是,我们这里没有这种情况。
但是现在看看 CleanupPool
中的以下代码行。它在 critical lock 下。
if (total >= MIN) // clean waiting threads over minimum limit
{
filteredTask = Pool.Where(ti => ti.taskState == TaskState.completed).ToList();
foreach (var taskItem in filteredTask)
{
taskItem.handler.Priority = ThreadPriority.AboveNormal;
taskItem.taskState = TaskState.aborted;
Pool.Remove(taskItem);
total--;
if (total == MIN) break;
}
}
当我们执行
Pool.where(ti => ti.taskState == taskState.completed).ToList()
那么我们并没有锁定 taskItem
。此时,状态可能已被另一个线程修改。但是,这在这里不是问题,因为我们正在处理已完成和已中止的状态,我们只是读取状态。我们没有修改它。这两个状态在这里都是最终状态,所以逻辑上不会有问题;但这类事情很容易被我们忽略,并可能导致潜在的死锁。
这里还有一点是关于调用 UserTask
和回调通知的。我们绝不应该在获取任何锁时这样做。因为我们不知道用户任务需要多长时间,并且长时间锁定一个锁意味着阻塞线程池。这当然是我们不想要的。
测试应用程序
随附的代码中有一个简单的 Windows 应用程序,它在按下按钮时会打开一个消息框。这只是为了测试行为,并不代表我们应该如何使用它。在这里,您只需不断按按钮,它就会不断打开新的消息框。在八个这样的框之后,它将不再打开任何更多的框——但是您仍然可以按下按钮。请求正在排队。当我们关闭现有消息框时,一个新的消息框将从 ReadyQueue 中出现。代码很简单。它还传递了一个回调,显示已执行任务的状态。这是线程池的调用方式。
CustomThreadPool MyPool;
private void Form1_Load(object sender, EventArgs e)
{
MyPool = CustomThreadPool.Instance;
}
void showMessage(string message)
{
MessageBox.Show(message);
}
int x = 0;
private void btnStart_Click(object sender, EventArgs e)
{
x++;
int arg = x;
MyPool.QueueUserTask(() =>
{
showMessage(arg.ToString());
},
(ts) =>
{
showMessage(ts.Success.ToString());
});
}
结语
随着多核处理器的普及,如今明智地编写代码以利用这种强大功能变得越来越重要。在这样做时,我们不应该盲目地忽视与此类编码相关的问题。
我在这里的努力是为了探讨线程的基础知识,并通过一些编码来理解我们需要关注的情况。这个池远非完整,但它提供了一个基本结构。我希望它能帮助那些像我一样遇到困难的开发者更好地理解它,并关注那些根本性的问题。有很多关于线程的内容是我们应该了解的,而我并没有涵盖其中的许多内容。从 Framework 4.0 开始,我们在 System.Threading 命名空间中有一个像 Task 这样的类。它是线程的抽象;它在内部利用 ThreadPool,然后就是完整的 TPL-Task Parallel Library。这应该是在此之后立即要涵盖的主题。
至此,我在此结束; 欢迎评论和建议!
谢谢。