瑞士军刀线程支持类






4.89/5 (8投票s)
用这个便捷的支持类让 .NET 线程池线程“跳 hoops”
引言
线程是现代应用程序的无名英雄。它们默默地在后台辛勤工作,很少得到认可,一丝不苟地处理着日常任务,正是这些任务让应用程序如此灵活和响应迅速。如果你能看到它们,它们可能会看起来像《神偷奶爸》里那些古灵精怪的小黄人。和它们一样,一旦启动并运行,就很难控制了。当然,你可以join它们,甚至abort它们,但你能让它们坐下、翻身、发牌吗?这基本上就是本文将要展示的内容。
背景
创建线程有多少种方式?根据这篇文章,至少有十种。虽然其中一些提供了有趣且实用的功能,但你仍然可以从枯燥但可靠的QueueUserWorkItem
中获得很多好处。本文要解决的问题是,一旦你释放了这些线程,该如何处理它们。你知道CancellationTokenSource
和ManualResetEvent
(我会在下面回顾它们,以防万一),但如果你使用其中一个,就不能在没有一些额外编程的情况下使用另一个。你知道join可能是一件好事,abort通常是一件坏事,尤其是在线程池线程方面,而异常永远无法“活着”逃离线程。 .NET 线程迫切需要简化的管理。Tasks
是在 .NET 4.0 中引入(并在 4.5 中得到增强)以解决许多这些问题,所以如果你还没有了解过,不妨看看它们提供了什么。但如果你想要一个带有增强功能的简单线程池线程,请继续阅读。
愿望清单
或多或少,我都曾单独或组合使用过这些功能。
- 等待工作线程完成(join)
- 工作线程超时,并检测到已发生超时
- 取消工作线程(可选地设置取消超时时间)
- 捕获工作线程抛出的异常
- 返回状态码和描述
- 传入任意数据
- 返回任意数据
- 从工作线程获取定期状态
如果能最小化在新线程或现有 WaitCallback 方法中支持这些功能所需的显式支持就好了。当然,我们希望同时支持所有这些功能。
解决方案
本文介绍了一个名为WorkerThreadController
的 C# 类,它兼具用户 WaitCallback 方法的包装器/管理器以及用户和包装器 WaitCallback 方法的数据包的双重作用。该类和一个测试驱动程序在一个 VS2010 解决方案中,可以通过页面顶部的链接下载。让我们先看看构造函数。
/// <summary> /// Constructor. Intitialize and then start a worker thread using our /// internal wrapper method. This (WorkerThreadController) object is /// passed as data into the wrapper method. /// </summary> /// <param name="method">The WaitCallback method as far as the user is concerned</param> /// <param name="data">Optional data to be supplied to the user's method</param> public WorkerThreadController(WaitCallback method, object data = null) { DataIn = data; cancellationToken = new CancellationTokenSource(); resetEvent = new ManualResetEvent(false); userMethod = method; ThreadPool.QueueUserWorkItem(UserMethodWrapper, this); }
CancellationTokenSource
提供了一种机制,允许父线程通过调用CancellationTokenSource.Cancel()
来请求取消工作线程。工作线程必须通过轮询CancellationTokenSource.IsCancellationRequested
来主动检查并响应取消请求。CancellationTokenSource
还提供了一些更复杂的功能,超出了本文的范围。有关详细信息,请查看这个 MSDN 博客。
ManualResetEvent
被描述为一个“门”,一个线程在等待进入,而另一个线程则打开和关闭这个门。实现join操作的一种方法是使用ManualResetEvent
,其中父线程通过调用ManualResetEvent.WaitOne()
等待门打开,而工作线程在退出时通过调用ManualResetEvent.Set()
打开门。
WorkerThreadController
将为您管理CancellationTokenSource
和ManualResetEvent
的所有细节。在工作线程中运行的包装器方法主要用于拦截用户 WaitCallback 方法抛出的异常,并将它们存储起来,以便稍后在父线程中重新抛出。最后,它将设置ManualResetEvent
(即允许等待的父线程继续执行)。包装器方法如下所示:
/// <summary> /// Wrap the user's method to catch exceptions and set the resetEvent. /// This allows the parent to continue if it was waiting. /// </summary> /// <param name="data">A reference to this object</param> private void UserMethodWrapper(object data) { // // OK is default true unless explicitly set otherwise in the worker // thread. That way, the worker thread doesn't need to bother // setting it true in the happy path case. Even if an exception // is thrown, don't assume OK is false; let the userMethod set it // if it wants to. // try { userMethod(data); } catch (Exception ex) { if (TransferExceptions) TheException = ex; } // // The WaitCallback wrapper is about to exit. Let the parent know // in case it's waiting on the ManualResetEvent. // resetEvent.Set(); }
接下来的部分将演示如何使用WorkerThreadController
来实现愿望清单中的每个功能。突出显示的块包含父线程运行的代码片段,然后是用户的 WaitCallback 方法,最后是代码执行时产生的控制台输出。
希望有一天你能加入我们 (#1)
在此场景中,父线程将使用Worker1
作为 WaitCallback 方法创建一个WorkerThreadController
,然后等待工作线程完成执行。没什么花哨的,只是一个简单的WorkerThreadController
使用示例。
//----------------------------------------------------------------- // Test 1: Simple happy path. The thread works a bit, then exits. //----------------------------------------------------------------- Console.WriteLine("Starting Test 1, simple happy path"); WorkerThreadController workerThread1 = new WorkerThreadController(Worker1); workerThread1.WaitForever(); static void Worker1(object data) { Console.WriteLine(" Worker1 is starting"); Thread.Sleep(250); Console.WriteLine(" Worker1 is exiting"); }
这太慢了 (#2)
我们不能永远等待,所以这次我们将设置一些限制。设置与上面 #1 的描述相同,但我们不等待永远,而是将等待时间限制在 300ms。如果此时工作线程仍在运行,我们将取消它。我们假设工作线程表现良好(即使它已超出分配的时间),并且它会及时响应取消请求。此示例未显示,但WorkerThreadController.ThrowCancelException
为您提供了在工作线程被取消时抛出OperationCanceledException
的选项。
//----------------------------------------------------------------- // Test 2: Timeout. The worker is taking too long, so kill it. //----------------------------------------------------------------- Console.WriteLine("Starting Test 2, worker join timeout and cancel"); WorkerThreadController workerThread2 = new WorkerThreadController(Worker2); if (workerThread2.IsStillRunningAfter(300)) { Console.WriteLine(" >>> Timeout! Cancel the worker"); workerThread2.Cancel(); } static void Worker2(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker2 is starting"); for (int i = 0; i < 50; i++) { Console.WriteLine(" Worker2 is working..."); Thread.Sleep(100); if (controller.IsCanceled()) { Console.WriteLine(" Worker2 has been canceled"); break; } } Console.WriteLine(" Worker2 is exiting"); }
好吧,我受够了,这次是认真的 (#3)
如果工作线程表现不好怎么办?我们不想永远等待线程被取消。该怎么办?WorkerThreadController
为您提供了为取消请求设置超时的选项。如果取消超时,则意味着工作线程拒绝合作,或者它可能因为它卡在等待另一个线程而无法执行。无论哪种情况,您的选择都有限。您已经等待了尽可能长的时间。您可以终止该线程,但如您所知,这可能会导致问题。另一种选择是记录尽可能多的信息并关闭应用程序。或者,您可以忽略问题,并抱最好的希望。您必须决定在您的情况下什么才是正确的。在下面的示例中,工作线程最终会自行退出。
//----------------------------------------------------------------- // Test 3: Cancel. The worker ignores the cancel. //----------------------------------------------------------------- Console.WriteLine("Starting Test 3, worker ignores the cancel"); WorkerThreadController workerThread3 = new WorkerThreadController(Worker3); Thread.Sleep(200); if (!workerThread3.Cancel(100)) { Console.WriteLine(" >>> The cancel timed out!"); } Console.WriteLine(" End of Test 3"); Thread.Sleep(300); // the worker will quit eventually static void Worker3(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker3 is starting"); for (int i = 0; i < 5; i++) { Console.WriteLine(" Worker3 is working..."); Thread.Sleep(100); // // Ignoring the cancellation here... // } Console.WriteLine(" Worker3 is exiting"); }
我没预料到 (#4)
通常情况下,异常在父线程中处理更好,因为父线程可能拥有足够的信息来纠正问题并重新启动工作线程。WorkerThreadController
通过捕获并记录用户 WaitCallback 方法周围的包装器中的异常来支持此功能,然后(可选地)在父线程等待工作线程完成时在该父线程中重新抛出异常。您可以选择从 WaitCallback 方法抛出什么类型的异常;您可能更喜欢比Exception
更专业的类。
//----------------------------------------------------------------- // Test 4: Exception. Catch the worker's exception. //----------------------------------------------------------------- Console.WriteLine("Starting Test 4, exception"); try { WorkerThreadController workerThread4 = new WorkerThreadController(Worker4); workerThread4.WaitForever(); } catch (Exception ex) { Console.WriteLine(" Exception: {0}", ex.Message); } static void Worker4(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker4 is starting"); throw new Exception("An exception in Worker4 has occurred!"); }
故事的结局是怎样的? (#5)
有时,您只需要一个状态码,如果出了问题,可能还需要一个描述。工作线程只需要在返回之前设置OK
和Details
字段。然后父线程就可以访问它们了。当然,您可以将bool OK
替换为数字、枚举或字符串代码,如果这更适合您的需求。
//----------------------------------------------------------------- // Test 5: Simple failure with status code and details. //----------------------------------------------------------------- Console.WriteLine("Starting Test 5, status code and details"); WorkerThreadController workerThread5 = new WorkerThreadController(Worker5); workerThread5.WaitForever(); Console.WriteLine(" Status: {0}", (workerThread5.OK) ? "OK" : "FAIL"); Console.WriteLine(" Details: {0}", workerThread5.Details); static void Worker5(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker5 is starting"); Thread.Sleep(250); controller.OK = false; controller.Details = "A problem occurred during processing!"; Console.WriteLine(" Worker5 is exiting"); }
你展示你的,我展示我的 (#6 和 #7)
如果工作线程执行任何类型的复杂处理,它很可能需要父线程提供一些复杂的输入,并且在完成后可能需要返回复杂的输出。DataIn
和DataOut
允许您将任何任意数据传递进出 WaitCallback 方法。WaitCallback 方法实际上可以自由地使用其中一个、两个或都不使用这些字段,以任何您选择的方式。名称仅暗示了它们的使用。可以添加其他字段。
//----------------------------------------------------------------- // Test 6-7: Send in some data, get some data back out. //----------------------------------------------------------------- Console.WriteLine("Starting Test 6-7, data in and data out"); WorkerThreadController workerThread67 = new WorkerThreadController(Worker67, "PURPLE COWS"); workerThread67.WaitForever(); Console.WriteLine(" Status: {0}", (workerThread67.OK) ? "OK" : "FAIL"); Console.WriteLine(" Returned data: {0}", workerThread67.DataOut); static void Worker67(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker67 is starting"); Console.WriteLine(" Worker67 input data: {0}", controller.DataIn); Thread.Sleep(250); controller.DataOut = 3.14159; Console.WriteLine(" Worker67 is exiting"); }
怎么了? (#8)
当工作线程运行时,尤其是在进行长时间操作时,您可能想了解它的当前状态。WorkerThreadController
提供了一个线程安全的Status
属性,可以在工作线程中设置,并在父线程中检索。我想它也可以反向工作,如果你找到了用途。利用此功能的一种方法是使用周期性计时器检查状态,然后更新您的 GUI,例如使用进度条。
//----------------------------------------------------------------- // Test 8: Status. Check thread status. //----------------------------------------------------------------- Console.WriteLine("Starting Test 8, check status"); WorkerThreadController workerThread8 = new WorkerThreadController(Worker8); for (int i = 0; i < 3; i++) { Thread.Sleep(100); Console.WriteLine(" >>> Status: {0}", workerThread8.Status); } workerThread8.WaitForever(); static void Worker8(object data) { WorkerThreadController controller = (WorkerThreadController)data; Console.WriteLine(" Worker8 is starting"); for (int i = 0; i < 2; i++) { Console.WriteLine(" Worker8 is working..."); controller.Status = "working"; Thread.Sleep(100); } controller.Status = "done"; Console.WriteLine(" Worker8 is exiting"); }
轮到你了
就是这样。我希望您觉得本文很有趣,甚至可能有用。您可能对WorkerThreadController
当前的样子很满意,或者您可能已经在考虑一些您希望看到的增强功能。也许是一个状态回调委托而不是简单的字符串,或者可能是 WaitCallback 方法链。让我知道您做出了哪些聪明的修改。
历史
初始版本 2015 年 11 月 13 日