任务/延续以及线程池的消亡?





5.00/5 (16投票s)
使用 VS2010 的 Tasks 命名空间。
目录
引言
本文探讨了新的 VS2010 System.Threading.Tasks
命名空间,该命名空间是由于 Microsoft 并行扩展到 .NET Framework 3.5,2008 年 6 月社区技术预览版 的持续工作而产生的。
由于本文使用了此命名空间,因此它显然需要 VS2010,所以如果您没有,很抱歉,我想阅读就此结束。但是,如果您仍然好奇,请继续,让我们看看这个新命名空间能为我们做些什么。
当前
我们目前支持在后台线程上运行小工作项代码,实际上是使用 System.Threading.ThreadPool
,我不得不说我是它的忠实粉丝。如果您有兴趣,我实际上在我以前的一篇线程文章中详细讨论过 ThreadPool
我喜欢 ThreadPool
的原因是 ThreadPool
为我处理所有繁琐的线程管理工作,并在有时间运行时为我调度我的工作项。您目前会这样使用 ThreadPool
using System;
using System.Threading;
namespace QueueUserWorkItemWithState
{
// ReportData holds state information for a task that will be
// executed by a ThreadPool thread.
public class ReportData
{
public int reportID;
public ReportData(int reportID)
{
this.reportID = reportID;
}
}
/// <summary>
/// Queues a method for execution with a ReportData state object
/// </summary>
class Program
{
static void Main(string[] args)
{
// Queue the task with a state object
ReportData rd = new ReportData(15);
ThreadPool.QueueUserWorkItem(new WaitCallback(ThreadProc), rd);
Console.WriteLine("Main thread does some work, then sleeps.");
// If you comment out the Sleep, the main thread exits before
// the thread pool task runs. The thread pool uses background
// threads, which do not keep the application running. (This
// is a simple example of a race condition.)
Thread.Sleep(1000);
Console.WriteLine("Main thread exits.");
Console.ReadLine();
}
// This thread procedure performs the task.
static void ThreadProc(Object stateInfo)
{
//get the state object
ReportData rd = (ReportData)stateInfo;
Console.WriteLine(string.Format("report {0} is running", rd.reportID));
}
}
}
这当然比管理我们自己的线程要好得多。然而,当前的 ThreadPool
对象存在几个问题,例如
- 工作项无法返回值(但是
BeginInvoke
/EndInvoke
委托确实返回值) - 无法取消任务
- 无法等待工作项,除非您使用外部同步方法,例如
WaitHandle
等。 - 以结构化方式组合一组项目
- 处理并发抛出的异常或在其之上构建的任何其他更丰富的构造
事实上,当前 ThreadPool
的工作方式是,您只需向它抛出一些工作然后忘记它。嗯,肯定有更好的方法。
过去,我会通过使用 Ami Bar 绝对可靠且出色的 SmartThreadPool 来弥补这一点,它解决了这些问题以及许多其他问题,但今天,我有点时间,所以我想我会研究一下新的 VS2010 System.Threading.Tasks
命名空间,看看它能提供什么。
我不得不说,从我所看到的来看,它看起来会提供很多;我们继续看看,好吗?
传奇的陨落:任务
在我们开始之前,让我提一下演示应用程序。演示应用程序是一个 Visual Studio 2010 WPF 项目,因此您需要 VS2010 才能运行它。
运行时,它看起来像这样,基本思想非常简单:运行一些后台 Task
,用数据项填充 ListBox
。就是这样。
这是演示应用程序的屏幕截图,它非常漂亮,不是吗?
从上面的演示应用程序屏幕截图中可以看出,有七个按钮,它们似乎执行不同的操作。那么这七个按钮到底做了什么?嗯,这些按钮执行以下操作
- 运行一个没有状态传递给
Task
的Task
- 运行一个有状态传递给
Task
的Task
- 使用
TaskFactory
创建并运行一个返回结果的Task
- 创建并运行一个使用延续的
Task
- 启动一个可取消的
Task
- 取消一个可取消的
Task
- 一个奇怪的,它坏了吗,还是没有?
所以我认为最好的方法是逐一查看这七个领域。那么让我们开始吧。
运行一个没有状态传递给 Task 的任务
这是迄今为止最容易使用的设置,因为我们真正需要做的就是创建一个 Task
并启动它。我们很容易做到这一点,因为 Task
的构造函数接受一个 Action
(委托),它是一个指向 Task
负载的指针,该负载将在 Task
启动时运行。下面的示例展示了这一点,应该注意的是,UI 线程将简单地继续,并且不会阻塞等待 Task
,因此您可能就是这样设置后台活动的。还应该注意的是,通过使用 Task
类,无法获取返回值。但是,如果使用 Task<T>
,我们将在稍后介绍,则可能。
/// <summary>
/// This handler shows you to use a Task that doesn't use any
/// state inside the Task
///
/// This task runs Asycnhronously and doesn't block the calling Thread
/// </summary>
private void TaskNoState_Click(object sender, RoutedEventArgs e)
{
Task task = new Task((Action)DoSomething);
items.Clear();
task.Start(); //NOTE : No Wait so runs Asynch
lstItems.ItemsSource = items;
MessageBox.Show("TaskNoState DONE");
}
/// <summary>
/// Runs the Tasks action delegate, accepting the NO state from the task
/// </summary>
private void DoSomething()
{
StringBuilder build = new StringBuilder();
build.AppendLine("This simple method demostrates how to use a Task with no state");
build.AppendLine("That means you can only really do something that doesn't");
build.AppendLine("need an input value, and doesn't return anything");
MessageBox.Show(build.ToString());
}
运行一个有状态传递给 Task 的任务
这一次,我们所做的只是将一个表示 Task
状态数据的单个 Object
传递给实际的 Task
。否则,它与上面显示的示例相同;我们仍然只是使用 Action
委托。这次不同的是 Action
委托的类型是 Action<T>
,因此它可以接受一个参数。下面是一个设置 Task
以接受一些 Object
状态的示例。和以前一样,下面的示例允许 UI 线程简单地继续,并且不会阻塞等待 Task
,因此您可能就是这样设置后台活动的。
/// <summary>
/// This handler shows you to use a Task that accepts some input
/// state that can then be used inside the Task
///
/// This task runs Asycnhronously and doesn't block the calling Thread
/// </summary>
private void TaskState_Click(object sender, RoutedEventArgs e)
{
Task task = new Task((Action<Object>)CreateStuffForTaskState,
new ObjectState<Int32>
{
CurrentTask = "TaskState",
Value = 999999
});
items.Clear();
task.Start(); //NOTE : No Wait so runs Asynch
items = new ObservableCollection<string>(taskItems);
lstItems.ItemsSource = items;
MessageBox.Show("TaskState DONE");
}
/// <summary>
/// Runs the Tasks Action delegate, accepting the state that
/// was created for the task
/// </summary>
/// <param name="o">the Task state</param>
private void CreateStuffForTaskState(Object o)
{
var state = o as ObjectState<Int32>;
taskItems.Clear();
for (int i = 0; i < state.Value; i++)
{
taskItems.Add(String.Format("{0} Item {1}", state.CurrentTask, i));
}
}
在上面的示例中,状态数据由这个简单类表示,它在演示应用程序中使用 Task 状态的任何地方都使用。
/// <summary>
/// Simple Task state object
/// </summary>
/// <typeparam name="T">The type to use</typeparam>
internal class ObjectState<T>
{
public String CurrentTask { get; set; }
public T Value { get; set; }
}
使用 TaskFactory 创建并运行一个返回结果的任务
到目前为止,我们已经了解了如何在 Task
中排队要运行的委托,这都很酷,但有时,我们需要从后台活动中获得一个返回结果,例如如果后台活动是从数据库中获取接下来的 10,000 条记录。对于此操作,我们可能确实需要一个返回值。那么,System.Threading.Tasks
命名空间可以处理返回值吗?答案是肯定的,它确实可以。我们所需要做的就是使用泛型 Task<T>
类型设置 Task
,并使用一些 Func<T>
委托作为要为 Task<T>
运行的负载。
这是一个示例;另请注意,在此示例中,我使用 TaskFactory
来创建和启动一个新 Task
;我认为这是首选方法。
/// <summary>
/// This handler shows you to use a Task that doesn't use any
/// state inside the Task
///
/// This Task blocks the calling thread until the Task completes
/// </summary>
private void TaskFactWithReturnValue_Click(object sender, RoutedEventArgs e)
{
Func<ObservableCollection<String>> obtainTaskResults = TaskWithResults;
Task<ObservableCollection<String>> task =
Task.Factory.StartNew<ObservableCollection<String>>(obtainTaskResults,
TaskCreationOptions.DetachedFromParent);
items.Clear();
task.Wait(); //Blocks while waiting for Task to complete
items = task.Result;
lstItems.ItemsSource = items;
MessageBox.Show("TaskFactWithReturnValue DONE");
}
/// <summary>
/// Runs the Tasks Func delegate, which returns a list
/// of ObservableCollection String
/// </summary>
private ObservableCollection<String> TaskWithResults()
{
ObservableCollection<String> results = new ObservableCollection<string>();
for (int i = 0; i < 10; i++)
{
results.Add(String.Format("TaskFactWithReturnValue Item {0}", i));
}
return results;
}
您可以在上面的这个示例中看到,由于我们确实需要一个返回值,我正在使用 Task.Wait()
方法等待 Task
完成,该方法会阻塞 UI 线程,然后我使用 Task.Result
更新 UI。我不喜欢 UI 线程被阻塞的事实,但我看不出除了等待之外还能做什么,如果您需要结果的话。TaskWait()
当然有一个重载可以接受 TimeSpan
超时,所以我们可以通过执行类似 task.Wait(2500)
的操作来缓解这种等待,它只等待 2.5 秒。
创建并运行一个使用延续的任务
我们刚刚看到我们可以创建一个返回值的 Task
,这非常好,并且是当前 ThreadPool
不具备的功能。这些 Task
还能做什么?嗯,它们还有一两个技巧;其中一个技巧叫做延续,它本质上是当前 Task
完成后运行的另一个 Task
。通过使用 Task
和延续,您可以以特定顺序对 Task
进行排序,而无需使用外部 WaitHandle
,例如 ManualResetEvent
/AutoResetEvent
。
这可以通过使用 Task.ContinueWith<T>()
方法轻松实现,如下所示。
这是一个示例,它创建了两个 Task
,它们组合成一组最终结果值,然后在 UI 上显示
/// <summary>
/// This handler shows you to use a Task that will use 2 chained Tasks
/// the first Task accepts some State, and the 2nd Task in the chain doesn't
/// ONLY when both Tasks have completed is the work considered done
///
/// This Task blocks the calling thread until the 2 chained Task complete
/// </summary>
private void TaskContinue_Click(object sender, RoutedEventArgs e)
{
//SYNTAX EXAMPLE 1
#region SYNTAX EXAMPLE 1 (Comment to try SYNTAX EXAMPLE 2)
Func<Object, ObservableCollection<String>> obtainTaskResultsFunc =
TaskWithResultsWithState;
Task<ObservableCollection<String>> task =
Task.Factory.StartNew(obtainTaskResultsFunc, new ObjectState<Int32>
{
CurrentTask = "TaskState",
Value = 2
});
Func<Task, ObservableCollection<String>> contineResultsFunc =
ContinueResults;
Task<ObservableCollection<String>> continuationTask =
task.ContinueWith<ObservableCollection<String>>(contineResultsFunc,
TaskContinuationOptions.OnlyOnRanToCompletion);
continuationTask.Wait();
items.Clear();
items = continuationTask.Result;
#endregion
//SYNTAX EXAMPLE 2
#region SYNTAX EXAMPLE 2 (UnComment to try And Comment SYNTAX EXAMPLE 1)
//Task<ObservableCollection<String>> taskAll =
// Task.Factory.StartNew((o) =>
// {
// return TaskWithResultsWithState(o);
// }, new ObjectState<Int32>
// {
// CurrentTask = "TaskState",
// Value = 2
// }).ContinueWith<ObservableCollection<String>>((previousTask) =>
// {
// return ContinueResults(previousTask);
// },TaskContinuationOptions.OnlyOnRanToCompletion);
//taskAll.Wait();
//items.Clear();
//items = taskAll.Result;
#endregion
lstItems.ItemsSource = items;
MessageBox.Show("TaskContinue DONE");
}
运行时会在 UI 中产生如下结果
可以看出,前两个结果来自原始 Task
,其余结果来自仅在第一个 Task
完成时才启动的 Continuation Task
。
创建并取消任务
任务的另一个宣传的优点是它们可以被取消。可以请求取消任务,但由于任务实际上只是一个正在运行的委托,因此任务的有效负载委托中需要有一些协作,它检查正在运行的任务的某些方面并根据任务的当前设置采取正确的路径。
下面是一个示例,其中 Task
运行速度极慢,UI 不会被阻塞,Task
在后台运行,但用户可以取消 Task
。显然,在 Task
的有效负载委托中完成的工作必须确保它知道如何处理取消请求,并相应地采取行动。
/// <summary>
/// This handler shows you to use a Task that
/// you can run, a wait a certain amount of time
/// for, and then cancel the Task
///
/// This task runs Asycnhronously and doesn't block the calling Thread
/// </summary>
private void TaskCancelStart_Click(object sender, RoutedEventArgs e)
{
if (cancelableTask != null &&
cancelableTask.Status == TaskStatus.Running)
return;
try
{
cancelableTask = new Task((Action)DoSomethingWithCancel);
items.Clear();
cancelableTask.Start();
items = new ObservableCollection<string>(taskItems);
lstItems.ItemsSource = items;
}
catch (Exception ex)
{
MessageBox.Show(String.Format("TaskCancel Exception {0}",
ex.InnerException != null ? " : " +
ex.InnerException.Message : String.Empty));
}
}
/// <summary>
/// Cancels the Tasks (requests in to Cancel)
/// </summary>
private void TaskCancelStop_Click(object sender, RoutedEventArgs e)
{
cancelableTask.Cancel();
}
/// <summary>
/// Runs the Tasks Action delegate, but will examine if the Task has been cancelled
/// and if it has the code in this delegate will accept the Cancellation request and
/// transition the Task to the Cancelled state
/// </summary>
private void DoSomethingWithCancel()
{
taskItems.Clear();
for (int i = 0; i < Int32.MaxValue; i++)
{
//See if the current Task is cancelled,
//and if so get this delegate to acknowledge
//the cancellation
if (!cancelableTask.IsCancellationRequested)
taskItems.Add(String.Format("TaskCancel Item {0}", i));
else
{
//transition Task to Cancelled state
Task.Current.AcknowledgeCancellation();
break;
}
}
}
可以看出,有一个 Task.Cancel()
方法可用于请求 Task
取消,还有一个 IsCancellationRequested
属性可用于检查是否已请求取消。从那里,Task
的有效负载委托必须公平地通过使用 Task.AcknowledgeCancellation()
方法来确认取消。
一个奇怪的问题,并非一帆风顺
虽然我对新的 Task
功能印象深刻,但我确实做了一些实验,例如使用 Task<T>
运行这段代码,我们现在知道它应该返回一个结果。
/// <summary>
/// This seems to be pretty broken to me, I could be misunderstanding something but
/// I thought Tasks were using new CLR4.0 ThreadPool as described
/// http://www.danielmoth.com/Blog/2008/11/new-and-improved-clr-4-thread-pool.html
/// which should cause a new thread to do this work on a new worker thread, so why
/// when this code is run is the UI unresponsive
/// </summary>
private void IsThisBroken_Click(object sender, RoutedEventArgs e)
{
Func<ObservableCollection<String>> obtainTaskResults =
TaskWithResultsWhyIsItBlockingUI;
Task<ObservableCollection<String>> task =
Task.Factory.StartNew<ObservableCollection<String>>(obtainTaskResults,
TaskCreationOptions.DetachedFromParent);
items.Clear();
//THE UI is simply queing this work up, and it not waiting (its commented out below)
//So why is the UI Not reponsive at all
//task.Wait(); //Blocks while waiting for Task to complete
items = task.Result;
lstItems.ItemsSource = items;
MessageBox.Show("IsThisBroken DONE");
}
/// <summary>
/// Runs the Tasks Func delegate, which returns a list
/// of ObservableCollection String
/// </summary>
private ObservableCollection<String> TaskWithResultsWhyIsItBlockingUI()
{
ObservableCollection<String> results = new ObservableCollection<string>();
//*************************************************
// VERY LONG RUNNING TASK, CAUSES UI TO BLOCK
// SEEMS TO BE BLOCKING WAITING ON Task<T>.Result
// basically the following line above
//
// items = task.Result;
//*************************************************
for (int i = 0; i < Int32.MaxValue; i++)
{
results.Add(String.Format("TaskFactWithReturnValue Item {0}", i));
}
return results;
}
现在,当我完全按照所示运行这段代码时,UI 变得无响应,这让我有点困惑,因为我们没有使用 Task.Wait()
(上面已注释掉),它会阻塞调用 Thread
直到 Task
完成,但 UI 明显阻塞或无响应。亲自尝试一下演示。使用演示代码,将 Task<T>
保持为 Int32.MaxValue
,并在演示应用程序的 IsThisBroken_Click()
方法中的那一行设置一个断点。
items = task.Result;
所以我开始对此进行更多研究,我想知道一个使用 Task<T>.Result
属性返回值的 Task
是否会阻塞调用者,直到 Task<T>
完成并有一个返回值。
所以我做了以下事情
- 注释掉上面处理项目的两行,所以这些行现在变成了
- 然后我将原始代码放回去,只是减少了
Task<T>
的有效负载,使其使用了较短的循环值;所以,我没有使用Int32.MaxValue
,而是使用了 50,000,然后我也看到代码直接跳回到我设置在该行的断点
//task.Wait(); //Blocks while waiting for Task to complete
//items = task.Result;
//lstItems.ItemsSource = items;
MessageBox.Show("IsThisBroken DONE");
你猜怎么着,UI 立刻变得响应,尽管我显然失去了使用 Task<T>
返回值的能力。
items = task.Result;
这在某种程度上向我证明,Task<T>.Result
属性似乎强制执行阻塞操作。当你仔细考虑时,这实际上是有道理的,但这是需要注意的一点。我认为跟踪长时间运行的 Task
的一个好方法是定期使用 TaskStatus
枚举检查 Task
状态,您可以使用诸如 TaskStatus.RanToCompletion
之类的值。所有这些都表明,Task
确实提供了一个非常好的 API,我认为它们比我们目前拥有的 ThreadPool
API 更好。
关于任务的其他有用链接
- http://www.danielmoth.com/Blog/2008/11/do-not-explicitly-use-threads-for.html
- http://www.danielmoth.com/Blog/2008/12/introducing-new-task-type.html
- http://www.danielmoth.com/Blog/2008/11/new-and-improved-clr-4-thread-pool.html
Task Parallel Library 网站上与任务相关的链接
- http://blogs.msdn.com/pfxteam/archive/2009/06/03/9691796.aspx
- http://blogs.msdn.com/pfxteam/archive/2009/05/31/9674669.aspx
就这些了,希望你喜欢
我想说的关于任务的所有内容就这些了,但我相信您会同意,这些看起来相当不错,并且是对当前 System.Threading
命名空间的一个受欢迎的补充。
谢谢
一如既往,欢迎投票/评论。