任务并行库:6/n






4.95/5 (71投票s)
探究任务并行库的使用。
引言
这是我提出的关于 TPL 系列文章的第六部分,也是最后一部分。上次我介绍了管道,并涵盖了这部分内容。
- BlockingCollection
- BlockingCollection 基础知识
- 简单管道
- 更复杂的管道
这次我们将探讨一些您现在就可以使用的更高级的 TPL 功能,这将是我们使用当前 .NET 4.0 中可用的类进行 TPL 操作的总结。
然后,我们将继续研究通过使用新的 Async CTP,我们将能够使用 C# 5 做些什么。
文章系列路线图
这是第 6 部分,也是最后一篇文章,我希望大家会喜欢。下面是我希望涵盖的内容的大致提纲。
- 启动任务/触发操作/异常处理/取消/UI同步
- 延续 / 取消链式任务
- 并行 For / 自定义分区器 / 聚合操作
- 并行 LINQ
- 管道
- 高级场景 / 任务的未来版本(本文)
目录
前提条件
由于本文使用了一些尚未包含在 .NET Framework 中的社区技术预览 (CTP) 版本,您需要下载 Async CTP Refresh SP1(本文基于此版本)。您可以在此处下载:
完成 TPL
这一小节希望能帮助您了解 TPL 目前为止剩余的零散部分,也就是您在 .NET 4.0 中可用的那些类。
AsyncFrom
演示项目名称:AsyncFromBeginEnd/WCFService1
TPL 的一个很棒的功能是能够与旧的异步编程模型 (APM) 一起使用,方法是使用 TPL 内置的 FromAsync
。它期望基于旧的、熟悉的 Begin/End 方法创建 Task,这些方法与 IAsyncResult
接口配合使用。为了演示这一点,我构建了一个相当简单的 WCF 服务(附加演示代码中的 WCFService1),然后添加了对它的引用,该服务也支持异步调用;因此,存在与 APM 配合使用时典型的 APM Begin/End IAsyncResult
方法。
WCF 服务契约的外观如下:
[ServiceContract]
public interface IService1
{
[OperationContract]
List<String> GetData(int numberOf);
}
当添加为具有异步方法支持的服务引用时,会有如下代理方法可用:
namespace AsyncFromBeginEnd.ServiceReference1 {
[System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
[System.ServiceModel.ServiceContractAttribute(
ConfigurationName="ServiceReference1.IService1")]
public interface IService1 {
[System.ServiceModel.OperationContractAttribute(
Action="http://tempuri.org/IService1/GetData",
ReplyAction="http://tempuri.org/IService1/GetDataResponse")]
System.Collections.Generic.List<string> GetData(int numberOf);
[System.ServiceModel.OperationContractAttribute(AsyncPattern=true,
Action="http://tempuri.org/IService1/GetData",
ReplyAction="http://tempuri.org/IService1/GetDataResponse")]
System.IAsyncResult BeginGetData(int numberOf,
System.AsyncCallback callback, object asyncState);
System.Collections.Generic.List<string> EndGetData(System.IAsyncResult result);
}
....
....
}
那么,我们如何从旧的 APM 风格代码中获得 TPL Task
呢?很简单,我们只需执行以下操作:
class Program
{
static void Main(string[] args)
{
ServiceReference1.Service1Client client =
new ServiceReference1.Service1Client();
Task<List<String>> task =
Task<List<String>>.Factory.FromAsync(
client.BeginGetData(10, null, null),
ar => client.EndGetData(ar));
List<String> result = task.Result;
Console.WriteLine("Successfully read all bytes using a Task");
foreach (string s in result)
{
Console.WriteLine(s);
}
Console.ReadLine();
}
}
为了证明一切正常,以下是输出:
TaskCompletionSource
演示项目名称:TaskCompletionSource1/TaskCompletionSource2
TaskCompletionSource<T>
是一个比较特殊的类,当您可能需要生成一个 Task 时可以使用它。MSDN 的说法是:
表示
System.Threading.Tasks.Task{TResult}
的生产者端,未绑定到委托,通过System.Threading.Tasks.TaskCompletionSource<TResult>.Task
属性提供对使用者端的访问。
您可以执行所有普通 Task 应有的操作,例如设置结果、异常、Cancelled
属性,这将模拟 Task 所做的操作。我认为了解这一切的最佳方式是看几个例子。第一个例子直接借鉴自 MSDN,如下所示:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace TaskCompletionSource1
{
/// <summary>
/// This example is adapted from Microsoft MSDN code freely available at
/// http://msdn.microsoft.com/en-us/library/dd449174.aspx
/// </summary>
class Program
{
static void Main(string[] args)
{
TaskCompletionSource<String> tcs1 =
new TaskCompletionSource<String>();
Task<String> task1 = tcs1.Task;
// Complete tcs1 in background Task
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
tcs1.SetResult("Task1 Completed");
});
// Waits 1 second for the result of the task
Stopwatch sw = Stopwatch.StartNew();
String result = task1.Result;
sw.Stop();
Console.WriteLine("(ElapsedTime={0}): t1.Result={1} (expected \"Task1 Completed\") ",
sw.ElapsedMilliseconds, result);
TaskCompletionSource<String> tcs2 = new TaskCompletionSource<String>();
Task<String> task2 = tcs2.Task;
// Raise Exception tcs2 in background Task
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
tcs2.SetException(new InvalidProgramException("Oh no...Something is wrong"));
});
sw = Stopwatch.StartNew();
try
{
result = task2.Result;
Console.WriteLine("t2.Result succeeded. THIS WAS NOT EXPECTED.");
}
catch (AggregateException e)
{
Console.Write("(ElapsedTime={0}): ", sw.ElapsedMilliseconds);
Console.WriteLine("The following exceptions have been thrown " +
"by t2.Result: (THIS WAS EXPECTED)");
for (int j = 0; j < e.InnerExceptions.Count; j++)
{
Console.WriteLine("\n-------------------------------------------------\n{0}",
e.InnerExceptions[j].ToString());
}
}
Console.ReadLine();
}
}
}
运行时输出如下:
第二个示例演示了 TaskCompletionSource<T>
的一个新颖用法,我们用它来返回一个已延迟的 Task
。
class Program
{
static void Main(string[] args)
{
Stopwatch watch = new Stopwatch();
watch.Start();
Task<DateTimeOffset> delayTask = Delay(5000);
Console.WriteLine(String.Format("Ellapsed Time 1 : {0}", watch.Elapsed));
delayTask.ContinueWith((x) =>
{
Console.WriteLine(String.Format("Ellapsed Time 2 : {0}", watch.Elapsed));
});
Console.ReadLine();
}
public static Task<DateTimeOffset> Delay(int millisecondsTimeout)
{
var tcs = new TaskCompletionSource<DateTimeOffset>();
new Timer(self =>
{
((IDisposable)self).Dispose();
tcs.TrySetResult(DateTime.Now);
}).Change(millisecondsTimeout, - 1);
return tcs.Task;
}
}
运行时输出如下:
以上就是我想说的关于 TPL 的内容,以及我们目前在 .NET 4 中的功能。希望您享受这次旅程。接下来,我们将花一些时间看看 .NET 5 中可能的情况。
Async CTP
最近,Microsoft 发布了 Async CTP,它已经到了第二个 CTP 版本,并且发生了一些变化(我不会在本文中提及)。本文将重点介绍使用最新的 Async CTP(在撰写本文时为 Async CTP Refresh SP1)可以实现的当前功能。
特别值得注意的是,本节演示的一些代码将使用 CTP 的 TaskEx
类,这些类最终将成为常规 Task API 的一部分。
Async CTP 是关于什么的
那么,Async CTP 有什么特别之处呢?我们对使用 Task
以及当前的 TPL 功能已经很满意了,对吧?嗯,说实话,是的,但并非没有收获。Async CTP 以一种非常优雅的方式构建在 Task
/TPL 之上。它通过引入两个新关键字并提供大量新的扩展方法来实现这一点,这些扩展方法将现有的 .NET 类转换为 Awaitable 对象(这是我们稍后将要介绍的内容)。
新关键字是:
Async
:这是一个必须应用于包含异步代码的方法的关键字,您希望在该方法上执行Await
操作。Await
:允许我们等待Task
(或自定义Awaiter
对象)结果的获取。
这听起来可能不多,但它能为您做的是清理您的异步代码库,使其看起来与同步版本相同。程序流程没有改变,没有麻烦的回调处理错误,没有回调处理成功,也没有 IAsyncResult
。说实话,使用 Async CTP 将同步方法转换为异步方法将非常非常容易。
随着我们继续深入,我们将看到更多这些关键字,那么,我们继续吧?
我们的第一个简单异步示例
演示项目名称:SimpleAsync
让我们从一个极其简单的例子开始,好吗?这是代码。请注意突出显示的 Async/Await
关键字,并且请注意 GetString()
方法实际上返回的是 Task<String>
,尽管方法体只是返回一个 String
对象。很巧妙,不是吗?
运行时输出如下:
这里到底发生了什么?我们有一个标记为新 Async
关键字的 Start()
方法,它告诉编译器该方法将异步运行,然后我们有一个 String
,它通过 GetString()
方法返回,而该方法实际上返回一个 Task<String>
,我们使用新的 Await
关键字对其进行等待,而实际上我们在 GetString()
方法体中返回的是一个 String
。嗯?
好了,分解一下,编译器会为您做一些工作(您稍后会看到),它知道在看到 Async
关键字时应该发出什么。下一部分是 Task
已被增强为 Awaitable 对象(稍后会详细介绍),并且由于 CTP 的帮助,我们可以通过在 GetString()
方法体中简单地返回一个 String
来返回一个 Task<String>
。
编译器所做的是将 Await
之后的所有代码视为一个延续,该延续在 Await
关键字的工作完成后运行。
如果我们查看 Reflector 对这个简单示例生成的代码,我们可以看到它被转换成某种状态机类型的代码。
internal class Program
{
// Methods
private Task<string> GetString()
{
<GetString>d__5 d__ = new <GetString>d__5(0);
d__.<>4__this = this;
d__.<>t__MoveNextDelegate = new Action(d__, (IntPtr) this.MoveNext);
d__.$builder = AsyncTaskMethodBuilder<string>.Create();
d__.MoveNext();
return d__.$builder.Task;
}
private static void Main(string[] args)
{
new Program().Start();
}
private void Start()
{
<Start>d__0 d__ = new <Start>d__0(0);
d__.<>4__this = this;
d__.<>t__MoveNextDelegate = new Action(d__, (IntPtr) this.MoveNext);
d__.$builder = AsyncVoidMethodBuilder.Create();
d__.MoveNext();
}
// Nested Types
[CompilerGenerated]
private sealed class <GetString>d__5
{
// Fields
private bool $__disposing;
public AsyncTaskMethodBuilder<string> $builder;
private int <>1__state;
public Program <>4__this;
public Action <>t__MoveNextDelegate;
public StringBuilder <sb>5__6;
// Methods
[DebuggerHidden]
public <GetString>d__5(int <>1__state)
{
this.<>1__state = <>1__state;
}
[DebuggerHidden]
public void Dispose()
{
this.$__disposing = true;
this.MoveNext();
this.<>1__state = -1;
}
public void MoveNext()
{
string <>t__result;
try
{
if (this.<>1__state == -1)
{
return;
}
this.<sb>5__6 = new StringBuilder();
this.<sb>5__6.AppendLine("Hello world");
<>t__result = this.<sb>5__6.ToString();
}
catch (Exception <>t__ex)
{
this.<>1__state = -1;
this.$builder.SetException(<>t__ex);
return;
}
this.<>1__state = -1;
this.$builder.SetResult(<>t__result);
}
}
[CompilerGenerated]
private sealed class <Start>d__0
{
// Fields
private bool $__disposing;
public AsyncVoidMethodBuilder $builder;
private int <>1__state;
public Program <>4__this;
public Action <>t__MoveNextDelegate;
private TaskAwaiter<string> <a1>t__$await3;
public string <chungles>5__1;
// Methods
[DebuggerHidden]
public <Start>d__0(int <>1__state)
{
this.<>1__state = <>1__state;
}
[DebuggerHidden]
public void Dispose()
{
this.$__disposing = true;
this.MoveNext();
this.<>1__state = -1;
}
public void MoveNext()
{
try
{
string <1>t__$await2;
bool $__doFinallyBodies = true;
if (this.<>1__state != 1)
{
if (this.<>1__state != -1)
{
Console.WriteLine("*** BEFORE CALL ***");
this.<a1>t__$await3 = this.<>4__this.GetString().GetAwaiter<string>();
if (this.<a1>t__$await3.IsCompleted)
{
goto Label_0084;
}
this.<>1__state = 1;
$__doFinallyBodies = false;
this.<a1>t__$await3.OnCompleted(this.<>t__MoveNextDelegate);
}
return;
}
this.<>1__state = 0;
Label_0084:
<1>t__$await2 = this.<a1>t__$await3.GetResult();
this.<a1>t__$await3 = new TaskAwaiter<string>();
this.<chungles>5__1 = <1>t__$await2;
Console.WriteLine("*** AFTER CALL ***");
Console.WriteLine("result = " + this.<chungles>5__1);
Console.ReadLine();
}
catch (Exception <>t__ex)
{
this.<>1__state = -1;
this.$builder.SetException(<>t__ex);
return;
}
this.<>1__state = -1;
this.$builder.SetResult();
}
}
}
我们还可以看到诸如 GetAwaiter
之类的东西,这是一个基于模式的东西,它使对象可被 await。我们稍后将看到 GetAwaiter
的作用以及如何使我们自己的对象可被 await。目前,您应该能够大致看出存在一些编译器生成的代码,以及一个延续委托(在上面的 Reflector 代码中为 <>t__MoveNextDelegate
),该委托在 Reflector 代码中被调用。
我记得在 Anders Mix 的 Async CTP 视频中看到的一件事是,Async CTP 提供了响应性;当我们 await 时,控制权会交还给调用线程。
捕获异常
演示项目名称:TryCatchAwaitTask
在 Async CTP 中捕获异常再容易不过了,就像我说的,它严格遵循同步代码运行时的控制流;根本没有代码怪味,全是简单的 try/catch 语句。这是一个小例子:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TryCatchAwaitTask
{
class Program
{
static void Main(string[] args)
{
Program p = new Program();
p.DoIt();
}
public async void DoIt()
{
//No problems with this chap
try
{
List<int> results = await
GetSomeNumbers(10,20);
Console.WriteLine("==========START OF GOOD CASE=========");
Parallel.For(0, results.Count, (x) =>
{
Console.WriteLine(x);
});
Console.WriteLine("==========END OF GOOD CASE=========");
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
//Make something go wrong
try
{
//simulate a failure by erroring at 5
List<int> results = await GetSomeNumbers(10,5);
Parallel.ForEach(results, (x) =>
{
Console.WriteLine(x);
});
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.ReadLine();
}
/// <summary>
/// Throws InvalidOperationException when index > shouldFailAt value
/// </summary>
public async Task<List<int>> GetSomeNumbers(int upperLimit, int shouldFailAt)
{
List<int> ints = new List<int>();
for (int i = 0; i < upperLimit; i++)
{
if (i > shouldFailAt)
throw new InvalidOperationException(
String.Format("Oh no its > {0}",shouldFailAt));
ints.Add(i);
}
return ints;
}
}
}
以下是运行时显示的内容:
UI 响应性
演示项目名称:UIResponsiveness
Async CTP 在任何需要响应性的区域(例如 UI)都非常有用。以下代码简单地演示了用户可以启动多个可 await 的代码块,它们都在运行,但 UI 保持响应;只需打开演示并随机单击按钮即可了解我的意思。
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading.Tasks;
namespace UIResponsiveness
{
public partial class Form1 : Form
{
Random rand = new Random();
List<string> suffixes = new List<string>() {
"a","b","c","d","e","f","g","h","i","j","k","l",
"m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
public Form1()
{
InitializeComponent();
}
private async void button1_Click(object sender, EventArgs e)
{
RunTaskToGetText();
}
private void button2_Click(object sender, EventArgs e)
{
RunTaskToGetText();
}
private void button3_Click(object sender, EventArgs e)
{
RunTaskToGetText();
}
private async void RunTaskToGetText()
{
List<string> results = await GetSomeText(
suffixes[rand.Next(0,suffixes.Count())], 500000);
textBox1.Text += results[0];
textBox1.Text += results[1];
textBox1.Text += results[results.Count-2];
textBox1.Text += results[results.Count-1];
}
public async Task<List<string>> GetSomeText(string prefix, int upperLimit)
{
List<string> values = new List<string>();
values.Add("=============== New Task kicking off=================\r\n");
for (int i = 0; i < upperLimit; i++)
{
values.Add(String.Format("Value_{0}_{1}\r\n", prefix, i.ToString()));
}
values.Add("=============== New Task Done=================\r\n");
return values;
}
}
}
我无法真正为这个演示提供截图,但如果您尝试演示代码,您会发现它仍然响应,并且结果会在它们完成时返回,而不再需要 await。
支持进度
演示项目名称:ProgressReporting
Async CTP 还处理进度的报告,这在 TPL 中并不容易做到。那么它是如何做到的呢?嗯,Async CTP 中有一个名为 IProgress<T>
的新接口,CTP 已为您实现了它,即 Progress<T>
类。它们看起来像这样:
public interface IProgress<in T>
{
void Report(T value);
}
public class Progress<T> : IProgress<T>
{
public Progress();
public Progress(Action<T> handler);
public event ProgressEventHandler<T> ProgressChanged;
protected virtual void OnReport(T value);
}
那么,我们如何在自己的代码中使用 Progress<T>
呢?很简单。这是一个将当前进度值写入控制台的简单示例。我应该指出,计算正确的进度值取决于您自己。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ProgressReporting
{
class Program
{
static void Main(string[] args)
{
Program p = new Program();
p.DoIt();
}
public async void DoIt()
{
Progress<int> progress = new Progress<int>();
progress.ProgressChanged += (sender, e) =>
{
Console.WriteLine(String.Format("Progress has seen {0} item", e));
};
List<int> results = await GetSomeNumbers(10, progress);
Console.WriteLine("Task results are");
Parallel.For(0, results.Count, (x) =>
{
Console.WriteLine(x);
});
Console.ReadLine();
}
public async Task<List<int>> GetSomeNumbers(
int upperLimit, IProgress<int> progress)
{
List<int> ints = new List<int>();
for (int i = 0; i < upperLimit; i++)
{
ints.Add(i);
progress.Report(i + 1);
}
return ints;
}
}
}
以下是它运行的示例:
Awaiter/GetAwaiter
既然我们已经看到了一些新 Async CTP 关键字的实际应用示例,让我们深入了解一下 awaitable 到底意味着什么。
Await
关键字 **只能** 与可 await 的对象一起使用。由于 Async CTP,Task
已被扩展为可 await。那么它到底是如何做到的,我们是否可以使自己的类型可 await 呢?
是的,我们可以。事实证明,您只需要确保存在某些核心方法,这些方法可以是实例方法或扩展方法。
为了使您的对象可 await 或扩展现有对象,您必须实现的以下内容:
public void GetResult()
public void OnCompleted(Action continuation)
public bool IsCompleted { get; set; }
当使用新的 Await
关键字时生成的编译器代码将需要这些,它在内部会调用编译器重写的代码中的 GetAwaiter()
方法,所以只要您的对象拥有这些方法/属性,它就是可 await 的。
带返回值的 Awaiter
演示项目名称:AwaiterThatReturnsSomething
现在我们知道了如何创建一个自定义的 awaitable 对象,让我们尝试一下,比如添加一个等待给定数字求幂的能力(这显然除了用于演示之外没有其他用途,但我希望您能从中理解)。
所以我们开始创建一个 double
的扩展方法,如下所示,您可以看到它返回一个 DoubleAwaiter
。
public static class DoubleExtensionMethods
{
public static DoubleAwaiter GetAwaiter(this double demoDouble, int power)
{
return new DoubleAwaiter(demoDouble, power);
}
}
其中 DoubleAwaiter
如下所示,最重要的是我们设置了 IsCompleted
并调用了 continuation Action
。
public class DoubleAwaiter
{
private double theValue;
private int power;
public DoubleAwaiter(double theValue, int power)
{
this.theValue = theValue;
this.power = power;
IsCompleted = false;
}
public DoubleAwaiter GetAwaiter()
{
return this;
}
public double GetResult()
{
return theValue;
}
public void OnCompleted(Action continuation)
{
this.theValue = Math.Pow(theValue, power);
IsCompleted = true;
continuation();
}
public bool IsCompleted { get; set; }
}
我们现在可以这样使用它:
class Program
{
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(async delegate
{
double x = 10;
double x2 = await x.GetAwaiter(3);
Console.WriteLine(string.Format("x was : {0}, x2 is {1}",x,x2));
});
Console.ReadLine();
}
}
运行时会产生以下输出(就像我说的,这是自定义 awaitable 最愚蠢的用法,它纯粹是为了展示如果您想让自己的代码可 await 需要使用的结构):
同步 Awaiter
演示项目名称:SyncContextControlAwaiter
由于使某事物可 await 的概念最终归结为实现正确的几个方法/属性,我们可以利用这一点来做一些相当疯狂的事情;例如,在处理 UI 代码时,您一定遇到过需要跨线程回调到 UI 线程的需求。
嗯,我们实际上可以使用一个自定义 awaiter 来使整个过程更加整洁;请考虑以下 Windows Forms 代码片段:
public static class ControlExtensionMethods
{
public static ControlAwaiter GetAwaiter(this Control control)
{
return new ControlAwaiter(control);
}
}
public class ControlAwaiter
{
private readonly Control control;
public ControlAwaiter(Control control)
{
if (control == null)
throw new ArgumentNullException("control");
this.control = control;
IsCompleted = false;
}
public void GetResult()
{
}
public void OnCompleted(Action continuation)
{
control.BeginInvoke(continuation);
IsCompleted = true;
}
public bool IsCompleted { get; set; }
}
我们使用自定义 awaiter 来进行回传到 Windows Forms 应用程序的 UI 线程,使用 BeginInvoke(..)
,而调用代码是这样的:
private void BtnSyncContextAwaiter_Click(object sender, EventArgs e)
{
ThreadPool.QueueUserWorkItem(async delegate
{
string text = "This should work just fine thanks to our lovely \"ControlAwaiter\" " +
"which ensures correct thread marshalling";
await textBox1;
textBox1.Text = text;
});
}
演示代码演示了使用上面显示的自定义同步上下文 awaiter。
演示代码还展示了当我们不使用上面显示的自定义同步上下文 awaiter 时代码会做什么,它使用了这段代码:
private void BtnNoAwaiter_Click(object sender, EventArgs e)
{
ThreadPool.QueueUserWorkItem(delegate
{
try
{
string text = "This should cause a problem, as we have spawned " +
"background thread using ThreadPool" +
"Which is not the correct thread to change the UI control " +
"so should cause a CrossThread Violation";
textBox1.Text = text;
}
catch (InvalidOperationException ioex)
{
MessageBox.Show(ioex.Message);
}
});
}
这是演示代码使用上面显示的自定义同步上下文 awaiter 的截图:
以下是演示代码 **不** 使用上面显示的自定义同步上下文 awaiter 的截图:
更实际的 Awaiter
演示项目名称:MoreRealisticAwaiterWithCancellation
既然您已经看到可以创建自己的 awaitable 代码,甚至可以创建一些相当新颖的用法,我认为您应该知道,大多数时候您将使用 Task 作为 await 的对象。因此,最后一个示例演示了一个更完整的示例,该示例使用 Task 并向您展示了如何对您正在 await 的一些以 Task 为中心的类的服务代码进行单元测试,该代码的构造方式使我们能够轻松地注入一个 Task 以外的服务。
此演示还展示了如何使用 CancellationToken
来取消 awaitable Task。
让我们从实际进行 await 的代码开始,如下所示:
class Program
{
static void Main(string[] args)
{
ManualResetEvent mre1 = new ManualResetEvent(true);
ManualResetEvent mre2 = new ManualResetEvent(false);
ManualResetEvent mre3 = new ManualResetEvent(false);
ManualResetEvent mre4 = new ManualResetEvent(false);
DoItForReal(false, mre1, mre2);
DoItForReal(true, mre2, mre3);
Console.ReadLine();
}
/// <summary>
/// Shows how you can await on a Task based service
/// </summary>
private static async void DoItForReal(
bool shouldCancel, ManualResetEvent mreIn, ManualResetEvent mreOut)
{
mreIn.WaitOne();
CancellationTokenSource cts = new CancellationTokenSource();
int upperLimit = 50;
int cancelAfter = (int)upperLimit / 5;
// which is what is used in WorkProvider class
int waitDelayForEachDataItem = 10;
//allows some items to be processed before cancelling
if (shouldCancel)
cts.CancelAfter(cancelAfter * waitDelayForEachDataItem);
Console.WriteLine();
Console.WriteLine("=========================================");
Console.WriteLine("Started DoItForReal()");
try
{
List<String> data = await new Worker(new WorkProvider(),
upperLimit, cts.Token).GetData();
foreach (String item in data)
{
Console.WriteLine(item);
}
//allow those waiting on this WaitHandle to continue
if (mreOut != null) { mreOut.Set(); }
}
catch (OperationCanceledException)
{
Console.WriteLine("Processing canceled.");
//allow those waiting on this WaitHandle to continue
if (mreOut != null) { mreOut.Set(); }
}
catch (AggregateException aggEx)
{
Console.WriteLine("AggEx caught");
//allow those waiting on this WaitHandle to continue
if (mreOut != null) { mreOut.Set(); }
}
finally
{
Console.WriteLine("Finished DoItForReal()");
Console.WriteLine("=========================================");
Console.WriteLine();
}
}
}
使用 ManualResetEvent
(s) 只是为了确保一个场景在另一个场景开始之前完成到控制台的打印,以使演示输出截图对读者清晰。
现在让我们检查一下上面代码 await 的 Worker
代码:
public class Worker
{
private IWorkProvider workprovider;
private int upperLimit;
private CancellationToken token;
public Worker(IWorkProvider workprovider, int upperLimit, CancellationToken token)
{
this.workprovider = workprovider;
this.upperLimit = upperLimit;
this.token = token;
}
public Task<List<String>> GetData()
{
return workprovider.GetData(upperLimit, token);
}
}
可以看出,该设计支持从单元测试或通过使用与 IOC 容器配合使用的抽象工厂来解析实际 Worker
实例来提供替代 IWorkProvider
。
演示应用程序中的 IWorkProvider
接口如下所示:
public interface IWorkProvider
{
Task<List<String>> GetData(int upperLimit, CancellationToken token);
}
正在 await 的实际代码是 GetData()
方法返回的 Task<List<String>>
。现在让我们看看提供 Task<List<String>>
的实际基于 Task 的服务代码。
public class WorkProvider : IWorkProvider
{
public Task<List<string>> GetData(int upperLimit, System.Threading.CancellationToken token)
{
//will not be TaskEx when CTP is in .NET 5.0 Framework
return TaskEx.Run(() =>
{
List<string> results = new List<string>();
for (int i = 0; i < upperLimit; i++)
{
token.ThrowIfCancellationRequested();
Thread.Sleep(10);
results.Add(string.Format("Added runtime string {0}",i.ToString()));
}
return results;
});
}
}
现在,当我们运行使用此基于 Task 的服务代码的实际代码时,我们会得到以下结果,我们运行一次直到完成,一次是我们期望它被取消,因为演示代码通过 CancellationTokenSource
发起了取消。
现在您可能会问自己,好吧,那很好,我有一些基于 Task 的服务代码,我们可以 await 它,但我该如何对那个坏小子进行单元测试呢?嗯,虽然演示应用程序不包含正式的单元测试,但它提供了所有必要的组件,例如:
- 关注点分离
- IOC 的扩展点,用于提供替代的
IWorkProvider
实现,或用于单元测试 IWorkProvider
的模拟,展示了如何模拟您的Task
(s)
我更喜欢使用 **Moq**(一个很酷的模拟框架)来执行我所做的任何测试代码,所以这里有一个关于如何编写基于单元测试的模拟服务的示例,该服务将与我们刚刚看到的基于 Task 的服务代码具有相同的行为:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Moq;
using System.Threading.Tasks;
namespace MoreRealisticAwaiterWithCancellation
{
class Program
{
static void Main(string[] args)
{
...
...
...
DoItUsingMoq_YouKnowForUnitTestsLike(mre3);
Console.ReadLine();
}
/// <summary>
/// Shows how you might mock a task based service using "Moq" and TaskCompletionSource
/// </summary>
private static async void DoItUsingMoq_YouKnowForUnitTestsLike(ManualResetEvent mreIn)
{
mreIn.WaitOne();
CancellationTokenSource cts = new CancellationTokenSource();
int upperLimit = 50;
List<String> dummyResults = new List<string>();
for (int i = 0; i < upperLimit; i++)
{
dummyResults.Add(String.Format("Dummy Result {0}", i.ToString()));
}
//Allows this test method to simulate a Task with
//a result without actually creating a Task
TaskCompletionSource<List<String>> tcs =
new TaskCompletionSource<List<String>>();
tcs.SetResult(dummyResults);
Console.WriteLine();
Console.WriteLine("=========================================");
Console.WriteLine("Started DoItUsingMoq_YouKnowForUnitTestsLike()");
try
{
Mock<IWorkProvider> mockWorkProvider = new Mock<IWorkProvider>();
mockWorkProvider
.Setup(x => x.GetData(
It.IsAny<Int32>(),
It.IsAny<CancellationToken>()))
.Returns(tcs.Task);
List<String> data = await new Worker(
mockWorkProvider.Object, upperLimit, cts.Token).GetData();
foreach (String item in data)
{
Console.WriteLine(item);
}
}
finally
{
Console.WriteLine("Finished DoItUsingMoq_YouKnowForUnitTestsLike()");
Console.WriteLine("=========================================");
Console.WriteLine();
}
}
}
}
可以看出,此示例使用 **Moq** 创建了一个模拟的 IWorkProvider
,并将其提供给 Worker
构造函数,我们将最终 await 它。那么,如果我们处于单元测试环境中,而不是实际运行基于 Task 的服务,我们该如何运行 await 的延续呢?嗯,答案就在本文的开头。我们只需使用 TaskCompletionSource
来模拟 Task.Result
,通过使用 TaskCompletionSource.SetResult(..)
方法并为将在该测试代码中使用的模拟对象提供 TaskCompletionSource.Task
。
一如既往,这里有一个运行的例子:
我在这里没有对 CancellationToken
做任何事情,但我认为这也可以实现。**Moq** 非常酷,而且我认为完全能够胜任这项工作;我只是在那时累了,抱歉。
现在就到这里
这就是我在这篇文章中想说的全部内容。我希望您喜欢它,并享受了 TPL 的旅程。我知道这是一段漫长的旅程,说实话,有时我觉得我永远也完成不了。它终于完成了。所以,如果您喜欢这篇文章,请花点时间留下评论和投票。非常感谢。