智能线程池
一个完全用 C# 实现的 .NET 线程池,具有许多功能。
请参阅底部的 历史记录 部分了解更改。
基本用法
这是一个线程池;如果您看到这里,您可能已经知道您需要什么了。如果您想了解其功能和工作原理,请继续阅读以下部分。如果您只想使用它,这里有一个快速的使用片段。有关高级用法,请参阅 示例。
// Create an instance of the Smart Thread Pool
SmartThreadPool smartThreadPool = new SmartThreadPool();
// Queue an action (Fire and forget)
smartThreadPool.QueueWorkItem(System.IO.File.Copy,
@"C:\Temp\myfile.bin", @"C:\Temp\myfile.bak");
// The action (file copy) will be done in the background by the Thread Pool
引言
Smart Thread Pool 是一个用 C# 编写的线程池。最初的实现基于 Stephan Toub 的线程池,并增加了一些额外功能,但现在已经远远超出了原版。以下是线程池功能的列表:
- 线程的数量根据池中线程的工作负载动态变化。
- 工作项可以返回值。
- 工作项可以在尚未执行时被取消。
- 工作项执行时使用调用线程的上下文(有限)。
- 使用最少数量的 Win32 事件句柄,因此应用程序的句柄数不会爆炸。
- 调用者可以等待多个或所有工作项完成。
- 工作项可以有一个 `PostExecute` 回调,该回调会在工作项完成后立即调用。
- 伴随工作项的状态对象可以自动处置。
- 工作项异常会发送回调用者。
- 工作项具有优先级。
- 工作项分组。
- 调用者可以暂停线程池和工作项组的启动。
- 线程具有优先级。
- 线程具有初始化和终止事件。
- 支持 WinCE 平台(有限)。
- 支持 `Action<T>` 和 `Func<T>`泛型方法。
支持 Silverlight。 - 支持 Mono。
性能计数器(Windows 和内部)。 - 工作项超时(被动)。
- 线程 ApartmentState
- 线程 IsBakcground
- 线程名称模板
- 支持 Windows Phone(有限)
- 线程 MaxStackSize
为什么需要线程池?
“许多应用程序创建的线程花费大量时间处于睡眠状态,等待事件发生。其他线程可能仅进入睡眠状态,然后定期唤醒以轮询更改或更新状态信息。通过系统管理的线程池,线程池使您能够更有效地利用线程。一个线程监视着提交到线程池的多个等待操作的状态。当等待操作完成时,线程池中的一个工作线程将执行相应的回调函数。”
MSDN,2004 年 4 月,ThreadPool 类 [C#]。
Smart Thread Pool 功能
当我编写应用程序时,我发现我需要一个具有以下功能的线程池:
- 线程池应实现 `QueueUserWorkItem()` 方法,以符合 .NET 线程池。
- 线程池应可实例化。(无静态方法。)因此,池中的线程仅用于一个目的。
- 池中的线程数应动态变化,并具有上限和下限。
在我将 Smart Thread Pool 在此处发布后,我发现需要更多功能,并且某些功能需要更改。因此,以下是已实现功能的更新列表:
- 线程池是可实例化的.
- 线程数量动态变化.
- 工作项返回值.
- 调用者可以等待多个工作项完成.
- 工作项可以被取消.
- 工作项执行时使用调用线程的上下文(有限).
- 使用最少数量的 Win32 事件句柄,因此应用程序的句柄数不会爆炸.
由于功能 3 和 5,线程池不再符合 .NET 线程池的要求,因此我可以添加更多功能。
有关此版本中添加的新功能,请参阅下面的附加功能部分。
2004 年 12 月新增功能
- 每个工作项都可以有一个 PostExecute 回调。这是一个方法,它会在工作项执行完成后立即调用。.
- 用户可以选择自动处置伴随工作项的状态对象。.
- 用户可以等待 Smart Thread Pool 进入空闲状态。.
- 异常处理已更改,因此如果工作项引发异常,它会在 `GetResult()` 处重新抛出,而不是触发 `UnhandledException` 事件。请注意,`PostExecute` 异常始终被忽略。.
2006 年 1 月新增功能
2008 年 5 月新增功能
- 启用了在运行时更改 `MaxThreads`/`MinThreads`/`Concurrency`。.
- 改进了取消行为(请参阅第 5 节)。.
- 为线程的初始化和终止添加了回调。.
- 添加了对 WinCE 的支持(有限)。.
- 向 `SmartThreadPool` 和 `IWorkItemResult` 添加了 `IsIdle` 标志。.
- 添加了对 `Action<T>` 和 `Func<T>`(强类型工作项)的支持(请参阅第 3 节)。.
2009 年 4 月新增功能
- 添加了对 Silverlight 的支持。.
- 添加了对 Mono 的支持。.
- 添加了内部性能计数器(适用于 Windows CE、Silverlight 和 Mono)。.
- 添加了新方法:`Join`、`Choice` 和 `Pipe`。.
2009 年 12 月新增功能
2012 年 8 月新增功能
.NET 线程池怎么样?
Windows 系统为每个进程提供一个 .NET 线程池。该 .NET 线程池每个处理器最多可包含 25 个线程(默认)。此外,.NET 线程池中的操作应快速完成,以避免影响其他使用 .NET 线程池的用户。请注意,同一个进程中的多个 AppDomain 共享同一个 .NET 线程池。如果您希望某个线程工作很长时间,那么 .NET 线程池不是您的理想选择(除非您知道自己在做什么)。请注意,.NET Framework 中的每个以“Begin…”开头的异步方法调用(例如 `BeginInvoke`、`BeginSend`、`BeginReceive` 等)都使用 .NET 线程池来运行其回调。另外请注意,.NET 线程池不支持与单线程单元(STA)的 COM 调用,因为线程池线程默认是多线程单元(MTA)。
该线程池不符合要求 1、5、6、8、9、10、12-25。
请注意,要求 3 和 4 在 .NET 线程池中通过委托实现。
Stephen Toub 的线程池怎么样?
Toub 的线程池比 .NET 线程池更好,因为他的池中的线程可以用于更长时间而不会影响异步方法调用。Toub 的线程池使用静态方法,因此您无法实例化多个线程池。但是,此限制适用于每个 AppDomain,而不是整个进程。Toub 的线程池相对于 .NET 线程池的主要缺点是,Toub 在初始化时创建了池中的所有线程,而 .NET 线程池则是在需要时动态创建线程。
该线程池不符合要求 1、2、3、4、5、6、8-25。
Smart Thread Pool 的设计和功能
如前所述,Smart Thread Pool 基于 Toub 的线程池实现。但是,由于我扩展了它的功能,代码不再与原始代码相似。
功能实现
- 线程池是可实例化的.
- 线程数量动态变化.
- 工作项返回值(此功能已增强)。
- 以下函数使用 `GetResult()` 方法,该方法会阻止调用者直到结果可用为止。
- 以下函数不推荐使用,因为它使用了忙碌等待循环。如果您知道自己在做什么,可以使用它。
- 调用者可以等待多个工作项完成.
- 工作项可以被取消(此功能已增强)。
- `Queued` – 工作项正在队列中等待执行。
- `InProgress` – 池中的一个线程正在执行工作项。
- `Completed` – 工作项执行已完成。
- `Cancelled` – 工作项已被取消。
- 工作项执行时使用调用线程的上下文(有限).
- `CurrentCulture` - 线程的区域性。
- `CurrentUICulture` - 资源管理器在运行时查找特定于区域性的资源所使用的区域性。
- `CurrentPrincipal` - 当前主体(用于基于角色的安全)。
- `CurrentContext` - 线程当前正在执行的上下文(用于远程处理)。
- 捕获当前线程上下文。
- 应用调用线程上下文。
- 执行工作项。
- 恢复旧的当前线程上下文。
- 使用 最少数量的 Win32 事件句柄,因此应用程序的句柄数不会爆炸.
- 工作项可以有一个 PostExecute 回调。这是一个方法,它会在工作项执行完成后立即调用。.
- `Never` – 不运行 `PostExecute`。
- `WhenWorkItemCanceled` - 仅当工作项被取消时运行 `PostExecute`。
- `WhenWorkItemNotCanceled` - 仅当工作项未被取消时运行 `PostExecute`。
- `Always` – 始终运行 `PostExecute`。
- 用户可以选择自动处置伴随工作项的状态对象。.
- 用户可以等待 Smart Thread Pool / Work Items Group 进入空闲状态。
此功能使用户能够等待 Smart Thread Pool 或 Work Items Group 进入空闲状态。当工作项队列为空且所有线程都已完成执行所有工作项时,它们将进入空闲状态。
这在您想运行一批工作项然后等待它们全部完成的情况下很有用。如果您只想等待所有工作项完成,则可以节省处理 `IWorkItemResult` 对象的工作。
`SmartThreadPool` 和 WorkItemsGroup 类都实现了 `IWorkItemsGroup` 接口,该接口定义了 `WaitForIdle` 方法。
要利用此功能,请使用 `IWorkItemsGroup.WaitForIdle()` 方法(`SmartThreadPool` 和 WorkItemsGroup 都实现了定义 `WaitForIdle` 方法的 `IWorkItemsGroup` 接口)。它有多个重载,提供超时参数。`WaitForIdle()` 方法不是 `static` 的,并且应该在 `SmartThreadPool` 实例上使用。
`SmartThreadPool` 始终跟踪其拥有的工作项数量。当排队一个新工作项时,计数器会递增。当线程完成一个工作项时,计数器会递减。工作项的总数包括队列中的工作项和线程当前正在处理的工作项。
`WaitForIdle()` 机制使用私有的 `ManualResetEvent`。当排队一个工作项时,`ManualResetEvent` 会被重置(更改为非信号状态)。当工作项计数变为零时(Smart Thread Pool 的初始状态),`ManualResetEvent` 会被设置(更改为信号状态)。`WaitForIdle()` 方法只是等待 `ManualResetEvent` 实现其功能。
请参阅下面的 示例。
- 异常处理已更改,因此如果工作项抛出异常,它会在 `GetResult()` 处重新抛出,而不是触发 `UnhandledException` 事件。请注意,`PostExecute` 异常始终被忽略。.
在我阅读了有关委托及其实现的文章后,我决定更改 `SmartThreadPool` 处理异常的方式。在以前的版本中,我使用了一种事件驱动的机制。实体被注册到 `SmartThreadPool.UnhandledException` 事件,当工作项抛出异常时,就会触发此事件。这就是 Toub 线程池的行为。
.NET 委托的行为不同。它不是使用事件驱动的机制,而是在 `EndInvoke()` 处重新抛出委托方法的异常。类似地,`SmartThreadPool` 的异常机制也已更改,因此异常不再由 `UnhandledException` 事件触发,而是在调用 `IWorkItemResult.GetResult()` 时重新抛出。
请注意,异常会减慢 .NET 的速度并降低性能。.NET 在根本不抛出异常时运行更快。因此,我向 `GetResult()` 的某些重载添加了一个输出参数,以便可以检索异常而不是重新抛出它。工作项无论如何都会抛出异常,因此重新抛出它将浪费时间。作为经验法则,最好使用输出参数而不是捕获重新抛出的异常。
`GetResult()` 可以被调用任意次数,并且每次都会重新抛出相同的异常。
请注意,即使工作项抛出了异常,`PostExecute` 也会根据需要进行调用。当然,`PostExecute` 实现应该处理异常(如果它调用 `GetResult()`)。
另请注意,如果 `PostExecute` 抛出异常,则其异常将被忽略。
请参阅下面的 示例。
- 工作项具有优先级。.
- 工作项执行时可以使用调用线程的 HTTP 上下文。.
- 工作项分组。.
- 调用者可以创建处于暂停状态的线程池和工作项组。.
- 线程具有优先级。.
- 运行时可更改 `MaxThreads`/`MinThreads`/`Concurrency`.
- 改进的取消行为.
- 线程初始化和终止生成事件.
- 支持 Windows CE(有限).
- SmartThreadPool 和 IWorkItemsGroup 具有 IsIdle 标志.
- 支持 Action<T> 和 Func<T>(强类型工作项).
- 支持 Silverlight.
- 支持 Mono.
- 内部性能计数器.
- Join、Choice 和 Pipe.
- `Join` - 执行多个工作项并等待它们全部完成 (Join 示例)。
- `Choice` - 执行多个工作项并在第一个完成时返回 (Choice 示例)。
- `Pipe` - 按顺序执行多个工作项并等待最后一个工作项完成 (Pipe 示例)。
- 工作项超时(被动).
- 启用了设置线程 IsBackground
- 启用了设置线程 ApartmentState
- 支持 Windows Phone(有限)
- 启用了设置线程 MaxStackSize
我需要一个可实例化的线程池的原因是我有不同的需求。有些工作项需要很长时间才能执行,而有些工作项则需要很短的时间来执行。在同一个线程池上执行相同类型的工作项可能会导致严重的性能或响应问题。
要实现此功能,我只需复制 Toub 的实现并将方法中的 `static` 关键字删除。这是比较容易的部分。
线程的数量根据池中线程的工作负载动态变化,并且线程池中的线程数具有上下限。此功能是必需的,这样我们的应用程序就不会有冗余线程。
此功能是一个真正的问题,并且是 Smart Thread Pool 的核心。您如何知道何时添加新线程以及何时删除它?
我决定每当排队一个新工作项并且池中的所有线程都忙碌时,就添加一个新线程。添加线程的公式可以概括为
_currentWorkItemsCount > WorkerThreads
其中 `WorkerThreads` 是池中当前线程数,`InUseThreads` 是池中当前正在处理工作项的线程数,`WaitingCallbacks` 是等待的工作项数量。(感谢 **jrshute** 的评论。)
`SmartThreadPool.Enqueue()` 方法看起来像这样:
private void Enqueue(WorkItem workItem)
{
// Make sure the workItem is not null
Debug.Assert(null != workItem);
// Enqueue the work item
_workItemsQueue.EnqueueWorkItem(workItem);
// If all the threads are busy then try
// to create a new one
if (_currentWorkItemsCount > _workerThreads.Count)
{
StartThreads(1);
}
}
当线程数达到上限时,将不再创建更多线程。
我决定从池中移除一个线程,当它空闲(即线程不处理任何工作项)一段时间后。每次线程在工作项队列上等待工作项时,它还会等待超时。如果等待时间超过超时时间,则线程应离开池,这意味着如果线程空闲,它就应该退出。这听起来是一个简单的解决方案,但以下情况又如何呢?假设线程的下限为 0,上限为 100。空闲超时时间为 60 秒。当前,线程池包含 60 个线程,每秒都会有一个新工作项到达,并且处理一个工作项需要一个线程一秒钟。这意味着,每分钟,都会有 60 个工作项到达并由池中的 60 个线程处理。因此,没有线程退出,因为没有线程空闲满 60 秒,尽管 1 或 2 个线程足以完成所有工作。
为了解决这种情况的问题,您必须计算每个线程工作了多长时间,并且偶尔退出那些在超时间隔内没有足够时间工作的线程。这意味着,线程池必须使用计时器(使用 .NET 线程池)或管理器线程来处理线程池。对我来说,使用一个专用线程来管理线程池似乎有点开销。
这让我想到,线程池机制应该让线程“饥饿”,以便让它们退出。那么,如何让线程“饥饿”呢?
池中的所有线程都在等待同一个工作项队列。工作项队列管理两个队列:一个用于工作项,另一个用于等待者(池中的线程)。由于平凡的工作项队列有效,第一个等待工作项的线程将首先获得它(队列),因此您无法让线程“饥饿”。
请看以下场景:
线程池包含四个线程。我们称它们为 A、B、C 和 D。每秒钟都会有一个新工作项到达,并且处理每个工作项需要不到一秒半的时间。
工作项到达时间(秒) |
工作项工作持续时间(秒) |
线程队列状态 |
将执行到达工作项的线程 |
00:00:00 |
1.5 |
A、B、C、D |
A |
00:00:01 |
1.5 |
B、C、D |
B |
00:00:02 |
1.5 |
C、D、A |
C |
00:00:03 |
1.5 |
D、A、B |
D |
在此场景中,所有四个线程都已使用,尽管两个线程可以处理所有工作项。
解决方案是实现一个堆栈式的等待者队列。在此实现中,最后一个等待工作项的等待者将首先获得它(堆栈)。这样,一个刚完成工作项的线程在等待者队列的前面等待。
使用新实现后,上述场景将如下所示:
工作项到达时间(秒) |
工作项工作持续时间(秒) |
线程队列状态 |
将执行到达工作项的线程 |
00:00:00 |
1.5 |
A、B、C、D |
A |
00:00:01 |
1.5 |
B、C、D |
B |
00:00:02 |
1.5 |
A、C、D |
A |
00:00:03 |
1.5 |
B、C、D |
B |
线程 A 和 B 处理所有工作项,因为它们在完成后会回到等待者队列的前面。线程 C 和 D 被饿死,并且如果长时间内会到达相同的工作项,那么线程 C 和 D 将不得不退出。
线程池不实现负载均衡机制,因为所有线程都在同一台机器上运行并占用相同的 CPU。请注意,如果您在池中有许多线程,那么您将更倾向于使用最少数量的线程来完成工作,因为每次线程的上下文切换都可能导致线程堆栈的页面错误。工作线程越少,线程堆栈的页面错误就越少。
工作项队列的实现导致线程“饥饿”,并且饥饿的线程退出。这解决了前面提到的场景,而无需使用任何额外的线程。
第二个功能还指出,线程池中的线程数量应有一个下限。为了实现此功能,每个因未获得任何工作项而超时的线程都会检查它是否可以退出。Smart Thread Pool 允许线程退出,仅当当前线程数高于下限时。如果池中的线程数低于或等于下限,则线程保持活动状态。
此功能在您想知道工作项结果时非常有用。
.NET 线程池通过委托支持此功能。每次创建委托时,都可以免费获得 `BeginInvoke()` 和 `EndInvoke()` 方法。`BeginInvoke()` 将方法及其参数排队到 .NET 线程池,而 `EndInvoke()` 返回方法的执行结果。委托类是 `sealed` 的,所以我无法重写 `BeginInvoke()` 和 `EndInvoke()` 方法。我采用了不同的方法来实现这一点。
首先,工作项回调委托可以返回值。
public delegate object WorkItemCallback(object state);
或者,在其增强形式中,回调可以是以下任何一种形式:
public delegate void Action();
public delegate void Action<T>(T arg);
public delegate void Action<T1, T2>(T1 arg1, T2 arg2);
public delegate void Action<T1, T2, T3>(T1 arg1,
T2 arg2, T3 arg3);
public delegate void Action<T1, T2, T3, T4>(T1 arg1,
T2 arg2, T3 arg3, T4 arg4);
public delegate TResult Func();
public delegate TResult Func<T>(T arg1);
public delegate TResult Func<T1, T2>(T1 arg1, T2 arg2);
public delegate TResult Func<T1, T2, T3>(T1 arg1,
T2 arg2, T3 arg3);
public delegate TResult Func<T1, T2, T3, T4>(T1 arg1,
T2 arg2, T3 arg3, T4 arg4);
(请注意,上述委托在 .NET 3.5 中定义。在 .NET 2.0 和 3.0 中,仅定义了 `public delegate void Action<T>(T arg)`。)其次,`SmartThreadPool.QueueWorkItem()` 方法返回一个实现 `IWorkItemResult<TResult>` 接口的对象的引用。调用者可以使用此对象来获取工作项的结果。该接口类似于 `IAsyncResult` 接口。
public interface IWorkItemResult<TResult>
{
/// Get the result of the work item.
/// If the work item didn't run yet then the caller waits
/// until timeout or until the cancelWaitHandle is signaled.
/// If the work item threw then GetResult() will rethrow it.
/// Returns the result of the work item.
/// On timeout throws WorkItemTimeoutException.
/// On cancel throws WorkItemCancelException.
TResult GetResult(
int millisecondsTimeout,
bool exitContext,
WaitHandle cancelWaitHandle);
/// Some of the GetResult() overloads
/// get Exception as an output parameter.
/// In case the work item threw
/// an exception this parameter is filled with
/// it and the GetResult() returns null.
/// These overloads are provided
/// for performance reasons. It is faster to
/// return the exceptions as an output
/// parameter than rethrowing it.
TResult GetResult(..., out Exception e);
/// Other GetResult() overloads.
...
/// Gets an indication whether
/// the asynchronous operation has completed.
bool IsCompleted { get; }
/// Returns the user-defined object
/// that was provided in the QueueWorkItem.
/// If the work item callback is Action<...>
/// or Func<...> the State value
/// depends on the WIGStartInfo.FillStateWithArgs.
object State { get; }
/// Cancel the work item execution.
/// If the work item is in the queue, it won't execute
/// If the work item is completed, it will remain completed
/// If the work item is already cancelled it will remain cancelled
/// If the work item is in progress,
/// the result of the work item is cancelled.
/// (See the work item canceling section for more information)
/// Param: abortExecution - When true send an AbortException
/// to the executing thread.</param>
/// Returns true if the work item
/// was not completed, otherwise false.
bool Cancel(bool abortExecution);
/// Get the work item's priority
WorkItemPriority WorkItemPriority { get; }
/// Returns the result, same as GetResult().
/// Note that this property blocks the caller like GetResult().
TResult Result { get; }
/// Returns the exception, if occured, otherwise returns null.
/// This function is not blocking like the Result property.
object Exception { get; }
}
如果工作项回调是 `object WorkItemCallback(object state)`,则返回 `IWorkItemResult`,并且 `GetResult()` 返回 `object`。与以前的版本相同。
如果工作项回调是上面我提到的 `Func<...>` 方法之一,则 `QueueWorkItem` 的结果是 `IWorkItemResult<TResult>`。因此,工作项的结果是强类型的。
如果工作项回调是上面我提到的 `Action<...>` 方法之一,则 `QueueWorkItem` 的结果是 `IWorkItemResult`,并且 `GetResult()` 始终返回 `null`。
如果工作项回调是 `Action<...>` 或 `Func<...>` 并且 `WIGStartInfo.FillStateWithArgs` 设置为 `true`,则 `IWorkItemResult` 的 `State` 将使用一个包含工作项参数的 `object []` 进行初始化。否则,`State` 为 `null`。
下面的“代码示例”部分中的 代码示例 显示了一些如何使用它的片段。
要获取工作项的结果,请使用 `Result` 属性或 `GetResult()` 方法。此方法有多个重载。在上面的接口中,我只写了其中一些。其他重载使用更少的参数,并提供默认值。`GetResult()` 返回工作项回调的结果。如果工作项尚未完成,则调用者将等待直到以下任一情况发生:
GetResult() 返回原因 | GetResult() 返回值 |
工作项已执行并完成。 | 工作项的结果。 |
工作项已取消。 | 抛出 `WorkItemCancelException`。 |
超时已过期。 | 抛出 `WorkItemTimeoutException`。 |
`cancelWaitHandle` 已发出信号。 | 抛出 `WorkItemTimeoutException`。 |
工作项抛出了异常。 | 抛出 `WorkItemResultException`,并将工作项的异常作为内部异常。 |
有两种方法可以等待单个工作项完成:
private void WaitForResult1(IWorkItemResult wir)
{
wir.GetResult();
}
private void WaitForResult2(IWorkItemResult wir)
{
while(!wir.IsCompleted)
{
Thread.Sleep(100);
}
}
如果您想一次运行多个工作项然后等待它们全部完成,此功能将非常有用。`SmartThreadPool` 类有两个静态方法用于此目的:`WaitAny()` 和 `WaitAll()`(它们有多个重载)。它们的签名类似于 `WaitHandle` 的等效方法,除了在 `SmartThreadPool` 的情况下,它接受 `IWaitableResult`(`IWorkItemResult` 接口继承自 `IWaitableResult`)对象的数组,而不是 `WaitHandle` 对象。
以下片段显示了如何一次等待多个工作项结果。假设 `wir1` 和 `wir2` 的类型为 `IWorkItemResult`。您可以等待两个工作项完成:
// Wait for both work items complete
SmartThreadPool.WaitAll(new IWaitableResult[] { wir1, wir2});
或者,等待任何一个工作项完成:
// Wait for at least one of the work items complete
SmartThreadPool.WaitAny(new IWaitableResult[] { wir1, wir2});
`WaitAll()` 和 `WaitAny()` 方法是重载的,因此您可以指定超时、退出上下文和 `cancelWaitHandle`(就像前面提到的 `GetResult()` 方法一样)。
注意:为了使用 `WaitAny()` 和 `WaitAll()`,您需要以 MTA 模式工作,因为在内部,我使用 `WaitHandle.WaitAny()` 和 `WaitHandle.WaitAll()`,它们需要 MTA 模式。如果您不这样做,这些方法将抛出异常来提醒您。
另请注意:Windows 支持最多 64 个句柄的 `WaitAny()`。`WaitAll()` 更灵活,我重新实现了它,使其不受 64 个句柄的限制。
请参阅下面的示例部分中的 `WaitAll` 和 `WaitAny` 代码片段。
此功能允许取消工作项。
取消工作项有几种选择。要取消单个工作项,请调用 `IWorkItemResult.Cancel()`。要取消多个工作项,请调用 `IWorkItemsGroup.Cancel()` 或 `SmartThreadPool.Cancel()`。所有取消操作的时间复杂度均为 O(1)。
不能保证工作项一定会被取消,这取决于调用取消时的工作项状态以及工作项的配合程度。(请注意,这里提到的工作项状态与 `QueueWorkItem` 中提供的状态对象参数无关。)
以下是工作项的可能状态(在 `WorkItemState` 枚举中定义):
取消行为取决于工作项的状态。
初始状态 | 下一个状态 | 注释 |
Queued |
Cancelled |
已排队的工作项变为已取消,根本不会执行。 |
InProgress |
Cancelled |
正在执行的工作项即使在完成执行后也会被取消! |
Completed |
Completed |
已完成的工作项保持已完成状态。 |
Cancelled |
Cancelled |
已取消的工作项保持已取消状态。 |
当调用已取消工作项的 `GetResult()` 方法时,将抛出 `WorkItemCancelException`。
当工作项处于 `Completed` 或 `Cancelled` 状态时,`Cancel()` 的行为很简单,因此我将不详细介绍。已排队的工作项会被标记为已取消,并在池中的线程将其出队后丢弃。
如果工作项处于 `InProgress` 状态,则行为取决于 `Cancel` 调用中的 `abortExecution` 参数。当 `abortExecution` 为 `true` 时,将对正在执行的线程调用 `Thread.Abort()`。当 `abortExecution` 为 `false` 时,工作项方法负责采样静态方法 `SmartThreadPool.IsWorkItemCanceled` 并退出。请注意,在这两种情况下,工作项都会被取消,并且在调用 `GetResult()` 时会抛出 `WorkItemCancelException`。
以下是一个协作工作项的示例:
private void DoWork()
{
// Do something here.
// Sample SmartThreadPool.IsWorkItemCanceled
if (SmartThreadPool.IsWorkItemCanceled)
{
return;
}
// Sample the SmartThreadPool.IsWorkItemCanceled in a loop
while (!SmartThreadPool.IsWorkItemCanceled)
{
// Do some work here
}
}
此功能应该很简单,但实现起来并不那么简单。为了传递线程上下文,应传递调用线程的 `CompressedStack`。由于 Microsoft 使用安全措施阻止了此选项,因此这是不可能的。可以传递线程上下文的其他部分。这些包括:
前三个属于 `System.Threading.Thread` 类(静态或实例),并且是 get/set 属性。但是,最后一个是只读属性。为了设置它,我使用了反射,这会减慢应用程序的速度。如果您需要此上下文,请删除代码中的注释。
为了简化捕获上下文然后稍后应用它的操作,我编写了一个内部使用的特殊类来执行所有这些操作。该类称为 `CallerThreadContext`,并由内部使用。当 Microsoft 解锁 `CompressedStack` 上的保护时,我将在此处添加它。
调用线程的上下文在创建工作项时捕获,位于 `EnqueueWorkItem()` 方法中。每次池中的线程执行工作项时,线程的上下文将按以下顺序更改:
第七项功能是 Kevin 在 Smart Thread Pool 的早期版本上提出的评论的结果。似乎测试应用程序消耗了大量句柄(任务管理器中的句柄计数)而未释放它们。经过几次测试,我得出结论,`ManualResetEvent` 类的 `Close()` 方法并不总是立即释放 Win32 事件句柄,而是等待垃圾回收器来处理。因此,显式运行 GC 会释放句柄。
为了缓解这个问题,我采用了新方法。首先,我想创建更少的句柄;其次,我想重用已创建的句柄。因此,我不必公开任何 `WaitHandle`,而是将它们用于内部,然后关闭它们。
为了创建更少的句柄,我仅在用户请求时创建 `ManualResetEvent` 对象(惰性创建)。例如,如果您不使用 `IWorkItemResult` 接口的 `GetResult()`,则不会创建句柄。使用 `SmartThreadPool.WaitAll()` 和 `SmartThreadPool.WaitAny()` 会创建句柄。
工作项队列创建了许多句柄,因为每次等待工作项都会创建一个新的 `ManualResetEvent`。因此,每个工作项都有一个句柄。等待者始终是相同的线程,并且一个线程不能等待多次。所以现在,线程池中的每个线程都有自己的 `ManualResetEvent` 并重用它。为了避免工作项队列和线程池实现之间的耦合,工作项队列将上下文存储在线程的 TLS(线程局部存储)中。
`PostExecute` 是一个回调方法,它在工作项执行完成后立即调用。它在执行工作项的线程的相同上下文中运行。用户可以选择调用 `PostExecute` 的情况。选项由 `CallToPostExecute` 标志枚举表示:
[Flags]
public enum CallToPostExecute
{
Never = 0x00,
WhenWorkItemCanceled = 0x01,
WhenWorkItemNotCanceled = 0x02,
Always = WhenWorkItemCanceled | WhenWorkItemNotCanceled,
}
解释
`SmartThreadPool` 的默认 `CallToPostExecute` 值为 `CallToPostExecute.Always`。这可以在构造 `SmartThreadPool` 时在 `STPStartInfo` 类参数中更改。另一种提供 `CallToPostExecute` 值的方法是在 `SmartThreadPool.QueueWorkItem` 的重载之一中。请注意,与 `WorkItem` 执行相反,如果在 `PostExecute` 期间引发了异常,则该异常将被忽略。`PostExecute` 是一个签名如下的委托:
public delegate void PostExecuteWorkItemCallback(IWorkItemResult wir);
如您所见,`PostExecute` 接收 `IWorkItemResult` 类型的参数。它可用于获取工作项的结果,或 `IWorkItemResult` 接口提供的任何其他信息。
当用户调用 `QueueWorkItem` 时,他/她可以提供一个状态对象。状态对象通常存储特定信息,例如应在 `WorkItemCallback` 委托中使用其参数。
状态对象的生存期取决于其内容和用户的应用程序。有时,在工作项完成后立即处置状态对象会很有用。特别是当它包含非托管资源时。
因此,我向 `SmartThreadPool` 添加了一个布尔值,该值指示在工作项完成时调用状态对象的 `Dispose`。该布尔值在使用 `STPStartInfo` 构造线程池时进行初始化。`Dispose` 仅在状态对象实现 `IDisposable` 接口时调用。`Dispose` 在工作项完成并且其 `PostExecute` 运行后调用(如果存在 `PostExecute`)。即使工作项被取消或线程池已关闭,状态对象也会被处置。
注意:此功能仅适用于 `WorkItemCallback` 附带的 `state` 参数,不适用于 `Action<...>` 和 `Func<...>` 中提供的参数!!!
工作项优先级允许用户在运行时对工作项进行排序。工作项按其优先级排序。高优先级优先处理。有五种优先级:
public enum WorkItemPriority
{
Lowest,
BelowNormal,
Normal,
AboveNormal,
Highest,
}
默认优先级是 `Normal`。
优先级的实现非常简单。我没有使用一个队列将工作项排序在内部,而是为每个优先级使用一个队列。每个队列都是 FIFO。当用户排队一个工作项时,该工作项会被添加到具有匹配优先级的队列中。当线程出队一个工作项时,它会查找最高优先级的非空队列。
这是对工作项进行排序的最简单解决方案。
此功能改进了第 6 项,并由 Steven T 实现。我只是用他的实现替换了我的代码。
通过此功能,Smart Thread Pool 可以与 ASP.NET 一起使用,以在调用线程和将在线程池中执行工作项的线程之间传递 HTTP 上下文。
此功能允许用户执行一组工作项,并指定最大并发级别。
例如,假设您的应用程序使用多个资源,而这些资源不是线程安全的,因此一次只有一个线程可以使用某个资源。对此有几种解决方案:创建一个使用所有资源的线程,或者创建一个每个资源一个线程。第一个解决方案没有利用并行处理的强大功能,而后者(如果资源大部分时间处于空闲状态)则成本太高(许多线程)。
Smart Thread Pool 的解决方案是为每个资源创建一个 WorkItemsGroup,并将并发级别设置为一。每次资源需要执行一些工作时,就会将一个工作项排队到其 WorkItemsGroup 中。WorkItemsGroup 的并发级别为一,因此每个资源一次只有一个工作项可以运行。线程的数量根据工作项的负载动态变化。
以下是一段代码片段,展示了其工作原理:
...
// Create a SmartThreadPool
SmartThreadPool smartThreadPool = new SmartThreadPool();
// Create a work items group that processes
// one work item at a time for resource 1
IWorkItemsGroup wigPrinter1 = smartThreadPool.CreateWorkItemsGroup(1);
// Create a work items group that processes
// one work item at a time for resource 2
IWorkItemsGroup wigPrinter2 = smartThreadPool.CreateWorkItemsGroup(1);
// Queue work items to resources
wigPrinter1.QueueWorkItem(Print, printer1, lessons);
wigPrinter1.QueueWorkItem(Print, printer1, homework);
wigPrinter2.QueueWorkItem(Print, printer2, blueprints);
...
// Print prototype
void Print(Printer printer, Document document) {...}
...
从代码片段可以看出,WorkItemsGroup 附加到 Smart Thread Pool 的实例。Work Items Group 没有自己的线程,而是使用 Smart Thread Pool 的线程。它还具有与 Smart Thread Pool 类似的接口,因此可以使用相同的方式进行使用,并在需要时进行替换。
WorkItemsGroup 具有优先级队列(与 `SmartThreadPool` 相同)。队列存储 WorkItemsGroup 的工作项。WorkItemsGroup 以最高优先级出队队列头部的第一个工作项,并以相同的优先级将其排队到 `SmartThreadPool` 中。
WorkItemsGroup 负责管理其工作项的最大并发级别。一旦工作项被排队到 WorkItemsGroup 中,它就会检查在 `SmartThreadPool` 中有多少个工作项。如果此数量小于最大并发级别,则将工作项排队到 `SmartThreadPool`。如果数量相等(不能更大),则 WorkItemsGroup 将工作项存储在其自己的优先级队列中。
如果 WorkItemsGroup 在暂停模式下创建,它将把工作项存储在队列中,直到启动为止。启动时,它会将工作项排队到 `SmartThreadPool` 中,直到达到最大并发级别。
请注意,WorkItemsGroup 仅具有最大并发级别,而没有最小或确切值。它可能具有 3 的并发级别,但没有工作项正在执行,因为它们正在 `SmartThreadPool` 队列中等待。
为了实现并发级别,WorkItemsGroup 会注册其工作项的完成事件。该事件是内部使用的,不会向用户公开。一旦注册,WorkItemsGroup 将在工作项完成时收到事件。该事件将触发 WorkItemsGroup 将更多工作项排队到 `SmartThreadPool` 中。该事件是实现并发级别的唯一方法。当我尝试使用 `PostExecute` 来实现它时,我得到了不稳定的 `WaitForIdle`。
WorkItemsGroup 的另一个优点是,它可以在一个方法中以 O(1) 的复杂度取消所有尚未执行的工作项。WorkItemsGroup 通过将一个对象附加到其每个工作项上来实现这一点,该对象指示 WorkItemsGroup 是否已被取消。当一个工作项即将被执行时,它会被要求提供当前状态(`InQueue`、`InProgress`、`Completed` 或 `Canceled`)。最终状态会考虑此对象的值来判断工作项是否被取消。
Work Items Group 还可以用作连接点。例如,您想通过将一个任务分解为子任务来完成一个任务。一旦子任务完成,就会发出一个新的任务并将其分解为子任务。这可以通过使用 WorkItemsGroup 的 `OnIdle` 事件来实现。请参阅下面的 示例。
请参阅解决方案源代码中的 WorkItemsGroupDemo 演示。
创建 Smart Thread Pool 时,默认情况下它会立即启动其线程。但是,有时您需要排队一些工作项,然后才开始执行它们。
在这些情况下,您可以创建处于暂停状态的 Smart Thread Pool 和 Work Items Group。当您需要执行工作项时,只需调用 `Start()` 方法。Work Items Group 中也存在相同的方法,目的相同。
请注意,如果您创建一个暂停状态的 Work Items Group 在暂停状态的 Smart Thread Pool 中,启动 Work Items Group 不会执行工作项,直到 Smart Thread Pool 启动为止。
`STPStartInfo` 包含一个属性,该属性定义了在 `SmartThreadPool` 中启动线程的优先级。如果您知道自己在做什么,就使用它。调整线程优先级可能会导致死锁、活锁以及数天的锁定 。
此添加允许用户控制工作项执行的并发性。它有助于使 STP 具有适应性。
要并行执行更多工作项,请增加并发级别。要限制已执行工作项的数量和/或降低 CPU 使用率,请减少并发级别。
此选项在 `SmartThreadPool` 和 WorkItemsGroup 中通过 `IWokItemsGroup` 接口可用。
public interface IWorkItemsGroup
{
...
int Concurrency { get; set; }
...
}
`Concurrency` 的值必须为正数。
虽然 `Concurrency` 在 `SmartThreadPool` 和 WorkItemsGroup 中具有相同的含义和行为,但实现方式不同。
`SmartThreadPool` 的 `Concurrency` 等同于 `MaxThreads` 属性。当 `Concurrency` 增加时,`SmartThreadPool` 可以创建更多线程来处理其工作项,最多可达 `Concurrency` 限制。在这种情况下,线程的创建是即时的。线程仍按 第 2 节中所述进行创建。
当 `Concurrency` 减少时,`SmartThreadPool` 不会创建新线程,而是让现有线程终止以减少线程池中的线程数。请注意,`Concurrency` 的降低可能需要一段时间才能生效,因为我们需要等待工作项完成。`SmartThreadPool` 不会主动中止线程,而是被动地等待它退出。
WorkItemsGroup 的 `Concurrency` 负责在 `SmartThreadPool` 中可以并行处理多少工作项,如 第 14 节中所述。当 `Concurrency` 增加时,更多的工作项被排队到 `SmartThreadPool` 中。当 `Concurrency` 减少时,WorkItemsGroup 会停止排队工作项,直到此 WorkItemsGroup 在 `SmartThreadPool` 中的工作项数量低于 WorkItemsGroup 的 `Concurrency`。
此外,`SmartThreadPool` 还允许在创建后更改 `MinThreads` 属性。当创建 `MinThreads` 时,线程池中的线程数会增加,使其至少为 `MinThreads`。
`MaxThreads` 的值必须大于或等于 `MinThreads`。如果 `MaxThreads` 设置为一个小于 `MinThreads` 的数字,那么 `MinThreads` 也将设置为 `MaxThreads` 的新值。反之亦然。
此功能允许用户在线程池中创建或终止线程时执行代码。代码将在线程的上下文中执行。此功能作为 `SmartThreadPool` 类中的新事件公开:
// A delegate to call after a thread is created,
// but before it's first use.
public delegate void ThreadInitializationHandler();
// A delegate to call when a thread is about to exit,
// after it is no longer
// belong to the pool.
public delegate void ThreadTerminationHandler();
public event ThreadInitializationHandler OnThreadInitialization
{...}
public event ThreadTerminationHandler OnThreadTermination;
{...}
`OnThreadInitialization` 事件在线程创建并添加到线程池时触发。事件由创建的线程调用。在此事件中,用户应添加代码来初始化线程使用的资源,这些资源应该每个线程初始化一次,而不是每个工作项初始化一次。
`OnThreadTermination` 事件在线程离开线程池时触发。事件由终止的线程调用。在此事件中,用户有机会清理在 `OnThreadInitialization` 事件中已初始化的资源。
SmartThreadPool 项目有一个名为 SmartThreadPoolCE 的类似项目。此版本的 SmartThreadPool 可以在 Windows CE 上运行。
它具有与 PC 版本相同的功能,但尚未完全正常工作。我仍然有一个线程调度问题,因为第 2 节中解释的线程空闲问题在 Windows CE 上不起作用。
此标志允许用户轮询 Smart Thread Pool / Work Items Group 是否空闲。
在您的代码中添加对 `SmartThreadPoolSL.dll` 的引用并使用它。
在您的代码中添加对 `SmartThreadPoolMono.dll` 的引用并使用它。
请注意,Mono 的二进制文件是在 Windows 上使用 Visual Studio 2008 编译的。
内部性能计数器应在不支持性能计数器的平台(如 Windows CE、Silverlight 和 Mono)上使用。
内部性能计数器是 STP 内部用于收集数据的变量。要启用它们,请将 `STPStartInfo.EnableLocalPerformanceCounters` 设置为 `true`。我在新的演示(Windows CE、Silverlight 和 Mono)中使用此功能。
新方法已添加到 `SmartThreadPool` 类中,并使用 WorkItemsGroup 实现。它们的目的在于简化并行任务的启动。
此功能允许用户指定工作项完成的超时时间(以毫秒为单位)。当超时过期时,如果工作项尚未完成,它将自动取消。取消方式与调用 `Cancel` 并将 `abortExecution` 参数设置为 `false` 的方式相同(这就是为什么超时是“被动”的原因)。
要采样取消,请使用 `SmartThreadPool.IsWorkItemCanceled`(这是一个静态属性),或者您可以使用 `SmartThreadPool.AbortOnWorkItemCancel`,它会检查当前工作项是否被取消,如果取消,则中止线程(`Thread.CurrentThread.Abort()`)。
请参阅下面的超时 示例。
此功能允许设置 STP 线程的 `IsBackground` 属性。默认值为 `true`。
使用方法:
STPStartInfo stpStartInfo = new STPStartInfo();
stpStartInfo.AreThreadsBackground = false;
SmartThreadPool stp = new SmartThreadPool(stpStartInfo);
此功能允许设置 STP 线程的 `ApartmentState` 属性。默认是不设置。
使用方法:
STPStartInfo stpStartInfo = new STPStartInfo();
stpStartInfo.ApartmentState = ApartmentState.MTA;
SmartThreadPool stp = new SmartThreadPool(stpStartInfo);
在您的代码中添加对 SmartThreadPoolWP.dll 的引用并使用它。
借助此功能,用户可以设置线程池中线程的 MaxStackSize。
使用方法:在 STPStartInfo 中设置 MaxStackSize 成员。
STPStartInfo stpStartInfo = new STPStartInfo();
stpStartInfo.MaxStackSize = 1024*1024;
SmartThreadPool stp = new SmartThreadPool(stpStartInfo);
何时使用?
Smart Thread Pool 适用于您的工作项不执行太多 CPU 工作,而是等待事件、IO、套接字等任务。这意味着工作项不消耗 CPU,但运行时间很长。当您不需要一直保持大量线程处于活动状态时,它也很有用。如果您的工作项执行时间很短,则使用 .NET 线程池。如果您有持续的繁重工作负载,则使用 Toub 的线程池并相应地定义最大线程数。
如何使用?
创建 Smart Thread Pool 或 Work Items Group 时,需要一些参数;当未提供值时,将使用默认值。
值名称 | 默认值 | 智能线程池 | Work Items Group | 描述 |
IdleTimeout |
60 秒 | 已使用 | 未使用 | 空闲超时 |
MaxWorkerThreads |
25 | 已使用 | 未使用 | 最大线程数 |
MinWorkerThreads |
0 | 已使用 | 未使用 | 最小线程数 |
UseCallerCallContext |
false |
已使用 | 已使用 | 使用调用线程的调用上下文 |
UseCallerHttpContext |
false |
已使用 | 已使用 | 使用调用线程的 HTTP 上下文 |
DisposeOfStateObjects |
false |
已使用 | 已使用 | 处置状态对象(如果状态实现了 `IDisposable`) |
CallToPostExecute |
CallToPostExecute.Always |
已使用 | 已使用 | 调用 `PostExecute` |
PostExecuteWorkItemCallback |
`null`(不执行任何操作) | 已使用 | 已使用 | `PostExecute` 方法 |
StartSuspended |
false |
已使用 | 已使用 | 启动暂停 |
FillStateWithArgs |
false |
已使用 | 已使用 | 填充状态参数(`Action<T>` 和 `Func<T>`) |
ThreadPriority |
ThreadPriority.Normal |
已使用 | 未使用 | 线程池中的线程优先级(Mono 不支持) |
WorkItemPriority |
WorkItemPriority.Normal |
已使用 | 已使用 | 工作项默认优先级 |
PerformanceCounterInstanceName |
null |
已使用 | 未使用 | 性能计数器实例名称 |
EnableLocalPerformanceCounters |
false |
已使用 | 未使用 | 启用本地性能计数器(适用于不支持性能计数器的平台) |
一旦在构造函数中定义,它们就无法更改。因此,请根据您的需求选择它们的值。最小线程数应与您想在正常情况下处理的工作项数量成正比。最大线程数应与您想在高峰时处理的工作项数量成正比。空闲超时应与高峰持续时间成正比。
代码示例
创建 Smart Thread Pool 实例
SmartThreadPool smartThreadPool =
new SmartThreadPool(
10*1000, // Idle timeout in milliseconds
25, // Threads upper limit
5, // Threads lower limit
true); // Use caller thread context
另一种创建实例的方式
// Create a STPStartInfo object
STPStartInfo stpStartInfo = new STPStartInfo();
// Change the defaults of the STPStartInfo object
stpStartInfo.DisposeOfStateObjects = true;
// Create the SmartThreadPool instance
SmartThreadPool smartThreadPool =
new SmartThreadPool(stpStartInfo);
使用 Smart Thread Pool
以下片段是一个简单示例。用户排队一个工作项,然后获取结果。请注意,`Result` 属性会阻塞直到结果可用或工作项被取消:
public class SimpleExample
{
public void DoWork(int [] numbers)
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
// Queue the work item
IWorkItemResult<double> wir = smartThreadPool.QueueWorkItem(
new Func<int[], double>(CalcAverage), numbers);
// Do some other work here
// Get the result of the operation
double average = wir.Result;
smartThreadPool.Shutdown();
}
// Do the real work
private double CalcAverage(int [] numbers)
{
double average = 0.0;
// Do the real work here and put
// the result in 'result'
return average;
}
}
此示例显示了如何等待特定的工作项完成。用户排队两个工作项,等待它们全部完成,然后获取结果:
public class WaitForAllExample
{
public void DoWork()
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
IWorkItemResult wir1 =
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork1), null);
IWorkItemResult wir2 =
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork2), null);
bool success = SmartThreadPool.WaitAll(
new IWorkItemResult [] { wir1, wir2 });
if (success)
{
int result1 = (int)wir1.Result;
int result2 = (int)wir2.Result;
}
smartThreadPool.Shutdown();
}
private object DoSomeWork1(object state)
{
return 1;
}
private object DoSomeWork2(object state)
{
return 2;
}
}
此示例显示了如何等待特定的工作项之一完成。用户排队两个工作项,等待它们中的一个完成,然后获取其结果。
public class WaitForAnyExample
{
public void DoWork()
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
IWorkItemResult wir1 =
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork1), null);
IWorkItemResult wir2 =
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork2), null);
IWorkItemResult [] wirs =
new IWorkItemResult [] { wir1, wir2 };
int index = SmartThreadPool.WaitAny(wirs);
if (index != WaitHandle.WaitTimeout)
{
int result = (int)wirs[index].Result;
}
smartThreadPool.Shutdown();
}
private object DoSomeWork1(object state)
{
return 1;
}
private object DoSomeWork2(object state)
{
return 1;
}
}
以下示例显示了 `WaitForIdle()` 的用法。我们只需排队所有工作项,然后等待它们全部完成。请注意,我们忽略了工作项的结果:
public class WaitForIdleExample
{
public void DoWork(object [] states)
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
foreach(object state in states)
{
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork), state);
}
// Wait for the completion of all work items
smartThreadPool.WaitForIdle();
smartThreadPool.Shutdown();
}
private object DoSomeWork(object state)
{
// Do the work
return null;
}
}
以下示例显示了如何处理异常。请注意 `Result` 属性会抛出 `WorkItemResultException` 而不是实际异常:
public class CatchExceptionExample
{
public void DoWork()
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
IWorkItemResult<double> wir = smartThreadPool.QueueWorkItem(
new Func<double, double, double>(DoDiv), 10.0, 0.0);
try
{
double result = wir.Result;
}
// Catch the exception that Result threw
catch (WorkItemResultException e)
{
// Dump the inner exception which DoDiv threw
Debug.WriteLine(e.InnerException);
}
smartThreadPool.Shutdown();
}
private double DoDiv(double x, double y)
{
return x / y;
}
}
这是另一个显示如何处理异常的示例。它比前一个更好,因为它更快。.NET 在一切正常时运行得更快。当 .NET 需要处理异常时,它会变慢:
public class GetExceptionExample
{
public void DoWork()
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
IWorkItemResult<double> wir = smartThreadPool.QueueWorkItem(
new Func<double, double, double>(DoDiv), 10.0, 0.0);
Exception e = null;
double result = wir.GetResult(out e);
// e contains the expetion that DoDiv threw
if(null != e)
{
// Do something with the exception
}
smartThreadPool.Shutdown();
}
private double DoDiv(double x, double y)
{
return x / y;
}
}
下一个示例显示了如何创建 Work Items Group 并使用它:
public class WorkItemsGroupExample
{
public void DoWork(object [] states)
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
// Create a work items group that processes
// one work item at a time
IWorkItemsGroup wig =
smartThreadPool.CreateWorkItemsGroup(1);
// Queue some work items
foreach(object state in states)
{
wig.QueueWorkItem(
new WorkItemCallback(this.DoSomeWork), state);
}
// Wait for the completion of all work
// items in the work items group
wig.WaitForIdle();
smartThreadPool.Shutdown();
}
private object DoSomeWork(object state)
{
// Do the work
return null;
}
}
下一个示例显示了如何创建暂停的 Smart Thread Pool:
public class SuspendedSTPStartExample
{
public void DoWork(object [] states)
{
STPStartInfo stpStartInfo = new STPStartInfo();
stpStartInfo.StartSuspended = true;
SmartThreadPool smartThreadPool =
new SmartThreadPool(stpStartInfo);
foreach(object state in states)
{
smartThreadPool.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork), state);
}
// Start working on the work items in the queue
smartThreadPool.Start();
// Wait for the completion of all work items
smartThreadPool.WaitForIdle();
smartThreadPool.Shutdown();
}
private object DoSomeWork(object state)
{
// Do the work
return null;
}
}
下一个示例显示了如何创建暂停的 Work Items Group:
public class SuspendedWIGStartExample
{
public void DoWork(object [] states)
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
WIGStartInfo wigStartInfo = new WIGStartInfo();
wigStartInfo.StartSuspended = true;
IWorkItemsGroup wig =
smartThreadPool.CreateWorkItemsGroup(1, wigStartInfo);
foreach(object state in states)
{
wig.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork), state);
}
// Start working on the work items
// in the work items group queue
wig.Start();
// Wait for the completion of all work items
wig.WaitForIdle();
smartThreadPool.Shutdown();
}
private object DoSomeWork(object state)
{
// Do the work
return null;
}
}
此示例显示了如何获取 Work Items Group 的 `OnIdle` 事件:
public class OnWIGIdleEventExample
{
public void DoWork(object [] states)
{
SmartThreadPool smartThreadPool = new SmartThreadPool();
IWorkItemsGroup wig =
smartThreadPool.CreateWorkItemsGroup(1);
wig.OnIdle += new WorkItemsGroupIdleHandler(wig_OnIdle);
foreach(object state in states)
{
wig.QueueWorkItem(new
WorkItemCallback(this.DoSomeWork), state);
}
smartThreadPool.WaitForIdle();
smartThreadPool.Shutdown();
}
private object DoSomeWork(object state)
{
// Do the work
return null;
}
private void wig_OnIdle(IWorkItemsGroup workItemsGroup)
{
Debug.WriteLine("WIG is idle");
}
}
public class JoinExample
{
public void DoWork()
{
SmartThreadPool stp = new SmartThreadPool();
stp.Join(DoSomeWork1, DoSomeWork2);
smartThreadPool.Shutdown();
}
private void DoSomeWork1()
{
// ...
}
private void DoSomeWork2()
{
// ...
}
}
此示例显示了如何使用 `Choice`:(Choice 示例)
public class ChoiceExample
{
public void DoWork()
{
SmartThreadPool stp = new SmartThreadPool();
int index = stp.Choice(GetDataFromA, GetDataFromB);
if (index == 0)
{
// Got data from A
}
else if (index == 1)
{
// Got data from B
}
smartThreadPool.Shutdown();
}
private void GetDataFromA()
{
// ...
}
private void GetDataFromB()
{
// ...
}
}
此示例显示了如何使用 `Pipe`:(Pipe 示例)
public class PipeExample
{
public void DoWork()
{
SmartThreadPool stp = new SmartThreadPool();
int [] data = new int[2];
stp.Pipe(data, DoStep1, DoStep2);
smartThreadPool.Shutdown();
}
private void DoStep1(int [] data)
{
data[0] = ...
}
private void DoStep2(int [] data)
{
data[1] = ...
}
}
此示例显示了如何使用 `Timeout`:(Timeout 示例)
public class TimeoutExample
{
public void DoWork()
{
SmartThreadPool stp = new SmartThreadPool();
...
// Queue a work item that will be cancelled within 5 seconds
IWorkItemResult wir =
stp.QueueWorkItem(
new WorkItemInfo() { Timeout = 5*1000 },
DoSomething);
...
smartThreadPool.Shutdown();
}
private object DoSomething(object state)
{
...
for(...)
{
// If the work item was cancelled then abort the thread.
SmartThreadPool.AbortOnWorkItemCancel();
}
...
return result;
}
}
免责声明
本代码和信息按“原样”提供,不提供任何形式的保证,无论是明示的还是暗示的,包括但不限于适销性和/或特定用途的适用性的暗示保证。
历史
- 2004 年 8 月 7 日:初始版本。
- 2004 年 9 月 14 日:Bug 修复
- 将启动线程公式更改为“
if ((InUseThreads + WaitingCallbacks) > _workerThreads.Count)
”。 - 修复了句柄泄漏。
- 2004 年 10 月 16 日:添加了一些功能
- 工作项返回结果。
- 支持多个工作项的等待同步。
- 工作项可以被取消。
- 将调用线程的上下文传递给池中的线程。
- 最少使用 Win32 句柄。
- 小的 Bug 修复。
- 2004 年 12 月 26 日:更改
- 添加了 `PostExecute`,并提供了调用它的选项。
- 添加了 `WaitForIdle()` 方法,该方法等待直到工作项队列为空。
- 添加了处置状态对象的选项。
- 更新了 `FireUnhandledException` 以使其更健壮。
- 移除了静态构造函数。
- 添加了终结器。
- 将异常更改为可序列化。
- 修复了 `SmartThreadPool` 构造函数之一中的 Bug。
- 更改了 `SmartThreadPool.WaitAll()`,使其支持任意数量的等待者。`SmartThreadPool.WaitAny()` 仍然受 .NET Framework 的限制。
- 更改了异常处理,因此如果工作项抛出异常,它会在 `GetResult()` 处重新抛出,而不是触发 `UnhandledException` 事件。请注意,`PostExecute` 异常始终被忽略。
- 2005 年 3 月 25 日:更改
- 修复了工作项丢失的 Bug。这尤其发生在空闲超时时间较小时。
- 2005 年 7 月 3 日:更改
- 修复了 `PopWaiter()` 返回 `null` 时 `Enqueue()` 抛出异常的 Bug,该 Bug 已难以重建。
- 2005 年 8 月 16 日:更改
- 修复了在取消工作项时 `InUseThreads` 变为负数的 Bug。
- 2006 年 1 月 31 日:更改
- 添加了工作项优先级。
- 移除了对回调和 post executes 中链式委托的支持(实际上没有人使用它)。
- 添加了工作项组。
- 添加了工作项组的空闲事件。
- 更改了 `SmartThreadPool.WaitAll()` 的行为,使其在接收到空数组时返回 `true` 而不是抛出异常。
- 添加了将 Smart Thread Pool 和 Work Items Group 作为暂停状态启动的选项。
- 异常行为已更改(再次)。实际异常通过内部异常返回。
- 添加了保留调用线程 HTTP 上下文的选项(感谢 Steven T)。
- 添加了性能计数器。
- 为池中的线程添加了优先级。
- 2006 年 2 月 13 日:更改
- 修复了演示,使其在启动时不会因异常而崩溃。
- 添加了一个调用来处置性能计数器,以避免性能计数器泄漏。
- 添加了异常捕获,以防性能计数器无法创建。
- 2008 年 5 月 16 日:更改
- 更改了处置行为并移除了终结器。
- 启用了在运行时更改 `MaxThreads` 和 `MinThreads`。
- 启用了在运行时更改 `IWorkItemsGroup` 的 `Concurrency`。如果 `IWorkItemsGroup` 是 `SmartThreadPool`,则 `Concurrency` 指的是 `MaxThreads`。
- 改进了取消行为。
- 为线程创建和终止添加了事件。
- 修复了 HttpContext 上下文捕获。
- 更改了内部集合,使其使用泛型集合。
- 向 `SmartThreadPool` 和 `IWorkItemsGroup` 添加了 IsIdle 标志。
- 添加了对 WinCE 的支持。
- 添加了对 `Action<T>` 和 `Func<T>` 的支持。
- 2009 年 4 月 7 日:更改
- 添加了对 Silverlight 和 Mono 的支持。
- 向 `SmartThreadPool` 添加了 `Join`、`Choice` 和 `Pipe`。
- 添加了本地性能计数器(适用于 Mono、Silverlight 和 Windows CE)。
- 将持续时间测量从 `DateTime.Now` 更改为 `Stopwatch`。
- 队列从 `System.Collections.Queue` 更改为 `System.Collections.Generic.LinkedList<T>`。
- 2009 年 12 月 21 日:更改:
- 添加了工作项超时(被动)。
- 2012 年 8 月 27 日:更改
- 添加了为线程设置名称
- 为线程添加了 IsBackground 选项
- 为线程添加了 ApartmentState
- 为线程添加了 MaxStackSize
- 修复了 SmartThreadPool.Pipe
- 修复了同时排队大量工作项时的线程创建。
- 修复了 WorkItemsQueue.Dequeue。
将 `while(!Monitor.TryEnter(this));` 替换为 `lock(this) { ... }`