65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库:6/n

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (71投票s)

2011 年 5 月 10 日

CPOL

14分钟阅读

viewsIcon

175966

downloadIcon

2133

探究任务并行库的使用。

引言

这是我提出的关于 TPL 系列文章的第六部分,也是最后一部分。上次我介绍了管道,并涵盖了这部分内容。

  • BlockingCollection
  • BlockingCollection 基础知识
  • 简单管道
  • 更复杂的管道

这次我们将探讨一些您现在就可以使用的更高级的 TPL 功能,这将是我们使用当前 .NET 4.0 中可用的类进行 TPL 操作的总结。

然后,我们将继续研究通过使用新的 Async CTP,我们将能够使用 C# 5 做些什么。

文章系列路线图

这是第 6 部分,也是最后一篇文章,我希望大家会喜欢。下面是我希望涵盖的内容的大致提纲。

  1. 启动任务/触发操作/异常处理/取消/UI同步
  2. 延续 / 取消链式任务
  3. 并行 For / 自定义分区器 / 聚合操作
  4. 并行 LINQ
  5. 管道
  6. 高级场景 / 任务的未来版本(本文)

目录

前提条件

由于本文使用了一些尚未包含在 .NET Framework 中的社区技术预览 (CTP) 版本,您需要下载 Async CTP Refresh SP1(本文基于此版本)。您可以在此处下载:

完成 TPL

这一小节希望能帮助您了解 TPL 目前为止剩余的零散部分,也就是您在 .NET 4.0 中可用的那些类。

AsyncFrom

演示项目名称:AsyncFromBeginEnd/WCFService1

TPL 的一个很棒的功能是能够与旧的异步编程模型 (APM) 一起使用,方法是使用 TPL 内置的 FromAsync。它期望基于旧的、熟悉的 Begin/End 方法创建 Task,这些方法与 IAsyncResult 接口配合使用。为了演示这一点,我构建了一个相当简单的 WCF 服务(附加演示代码中的 WCFService1),然后添加了对它的引用,该服务也支持异步调用;因此,存在与 APM 配合使用时典型的 APM Begin/End IAsyncResult 方法。

WCF 服务契约的外观如下:

[ServiceContract]
public interface IService1
{
    [OperationContract]
    List<String> GetData(int numberOf);
}

当添加为具有异步方法支持的服务引用时,会有如下代理方法可用:

namespace AsyncFromBeginEnd.ServiceReference1 {
    
    [System.CodeDom.Compiler.GeneratedCodeAttribute("System.ServiceModel", "4.0.0.0")]
    [System.ServiceModel.ServiceContractAttribute(
       ConfigurationName="ServiceReference1.IService1")]
    public interface IService1 {
        
        [System.ServiceModel.OperationContractAttribute(
            Action="http://tempuri.org/IService1/GetData", 
            ReplyAction="http://tempuri.org/IService1/GetDataResponse")]
        System.Collections.Generic.List<string> GetData(int numberOf);
        
        [System.ServiceModel.OperationContractAttribute(AsyncPattern=true, 
            Action="http://tempuri.org/IService1/GetData", 
            ReplyAction="http://tempuri.org/IService1/GetDataResponse")]
        System.IAsyncResult BeginGetData(int numberOf, 
            System.AsyncCallback callback, object asyncState);
        
        System.Collections.Generic.List<string> EndGetData(System.IAsyncResult result);
    }
    
    ....
    ....
}

那么,我们如何从旧的 APM 风格代码中获得 TPL Task 呢?很简单,我们只需执行以下操作:

class Program
{
    static void Main(string[] args)
    {
        ServiceReference1.Service1Client client = 
        new ServiceReference1.Service1Client();

        Task<List<String>> task = 
            Task<List<String>>.Factory.FromAsync(
                client.BeginGetData(10, null, null),
                ar => client.EndGetData(ar));

        List<String> result = task.Result;
        Console.WriteLine("Successfully read all bytes using a Task");
        foreach (string s in result)
        {
            Console.WriteLine(s);
        }
        Console.ReadLine();
    }
}

为了证明一切正常,以下是输出:

TaskCompletionSource

演示项目名称:TaskCompletionSource1/TaskCompletionSource2

TaskCompletionSource<T> 是一个比较特殊的类,当您可能需要生成一个 Task 时可以使用它。MSDN 的说法是:

表示 System.Threading.Tasks.Task{TResult} 的生产者端,未绑定到委托,通过 System.Threading.Tasks.TaskCompletionSource<TResult>.Task 属性提供对使用者端的访问。

您可以执行所有普通 Task 应有的操作,例如设置结果、异常、Cancelled 属性,这将模拟 Task 所做的操作。我认为了解这一切的最佳方式是看几个例子。第一个例子直接借鉴自 MSDN,如下所示:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace TaskCompletionSource1
{
    /// <summary>
    /// This example is adapted from Microsoft MSDN code freely available at
    /// http://msdn.microsoft.com/en-us/library/dd449174.aspx
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {

            TaskCompletionSource<String> tcs1 = 
        new TaskCompletionSource<String>();
            Task<String> task1 = tcs1.Task;

            // Complete tcs1 in background Task
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                tcs1.SetResult("Task1 Completed");
            });

            // Waits 1 second for the result of the task
            Stopwatch sw = Stopwatch.StartNew();
            String result = task1.Result;
            sw.Stop();

            Console.WriteLine("(ElapsedTime={0}): t1.Result={1} (expected \"Task1 Completed\") ", 
                sw.ElapsedMilliseconds, result);

            TaskCompletionSource<String> tcs2 = new TaskCompletionSource<String>();
            Task<String> task2 = tcs2.Task;

            // Raise Exception tcs2 in background Task
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                tcs2.SetException(new InvalidProgramException("Oh no...Something is wrong"));
            });

            sw = Stopwatch.StartNew();
            try
            {
                result = task2.Result;

                Console.WriteLine("t2.Result succeeded. THIS WAS NOT EXPECTED.");
            }
            catch (AggregateException e)
            {
                Console.Write("(ElapsedTime={0}): ", sw.ElapsedMilliseconds);
                Console.WriteLine("The following exceptions have been thrown " + 
                                  "by t2.Result: (THIS WAS EXPECTED)");

                for (int j = 0; j < e.InnerExceptions.Count; j++)
                {
                    Console.WriteLine("\n-------------------------------------------------\n{0}", 
                        e.InnerExceptions[j].ToString());
                }
            }

            Console.ReadLine();
        }
    }
}

运行时输出如下:

第二个示例演示了 TaskCompletionSource<T> 的一个新颖用法,我们用它来返回一个已延迟的 Task

class Program
{
    static void Main(string[] args)
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        Task<DateTimeOffset> delayTask = Delay(5000);
        Console.WriteLine(String.Format("Ellapsed Time 1 : {0}", watch.Elapsed));
        delayTask.ContinueWith((x) =>
            {
                Console.WriteLine(String.Format("Ellapsed Time 2 : {0}", watch.Elapsed));
            });
        Console.ReadLine();
    }



    public static Task<DateTimeOffset> Delay(int millisecondsTimeout)
    {
        var tcs = new TaskCompletionSource<DateTimeOffset>();
        new Timer(self =>
            {
                ((IDisposable)self).Dispose();
                tcs.TrySetResult(DateTime.Now);
            }).Change(millisecondsTimeout, - 1);
        return tcs.Task;
    }
}

运行时输出如下:

以上就是我想说的关于 TPL 的内容,以及我们目前在 .NET 4 中的功能。希望您享受这次旅程。接下来,我们将花一些时间看看 .NET 5 中可能的情况。

Async CTP

最近,Microsoft 发布了 Async CTP,它已经到了第二个 CTP 版本,并且发生了一些变化(我不会在本文中提及)。本文将重点介绍使用最新的 Async CTP(在撰写本文时为 Async CTP Refresh SP1)可以实现的当前功能。

特别值得注意的是,本节演示的一些代码将使用 CTP 的 TaskEx 类,这些类最终将成为常规 Task API 的一部分。

Async CTP 是关于什么的

那么,Async CTP 有什么特别之处呢?我们对使用 Task 以及当前的 TPL 功能已经很满意了,对吧?嗯,说实话,是的,但并非没有收获。Async CTP 以一种非常优雅的方式构建在 Task/TPL 之上。它通过引入两个新关键字并提供大量新的扩展方法来实现这一点,这些扩展方法将现有的 .NET 类转换为 Awaitable 对象(这是我们稍后将要介绍的内容)。

新关键字是:

  • Async:这是一个必须应用于包含异步代码的方法的关键字,您希望在该方法上执行 Await 操作。
  • Await:允许我们等待 Task(或自定义 Awaiter 对象)结果的获取。

这听起来可能不多,但它能为您做的是清理您的异步代码库,使其看起来与同步版本相同。程序流程没有改变,没有麻烦的回调处理错误,没有回调处理成功,也没有 IAsyncResult。说实话,使用 Async CTP 将同步方法转换为异步方法将非常非常容易。

随着我们继续深入,我们将看到更多这些关键字,那么,我们继续吧?

我们的第一个简单异步示例

演示项目名称:SimpleAsync

让我们从一个极其简单的例子开始,好吗?这是代码。请注意突出显示的 Async/Await 关键字,并且请注意 GetString() 方法实际上返回的是 Task<String>,尽管方法体只是返回一个 String 对象。很巧妙,不是吗?

运行时输出如下:

这里到底发生了什么?我们有一个标记为新 Async 关键字的 Start() 方法,它告诉编译器该方法将异步运行,然后我们有一个 String,它通过 GetString() 方法返回,而该方法实际上返回一个 Task<String>,我们使用新的 Await 关键字对其进行等待,而实际上我们在 GetString() 方法体中返回的是一个 String。嗯?

好了,分解一下,编译器会为您做一些工作(您稍后会看到),它知道在看到 Async 关键字时应该发出什么。下一部分是 Task 已被增强为 Awaitable 对象(稍后会详细介绍),并且由于 CTP 的帮助,我们可以通过在 GetString() 方法体中简单地返回一个 String 来返回一个 Task<String>

编译器所做的是将 Await 之后的所有代码视为一个延续,该延续在 Await 关键字的工作完成后运行。

如果我们查看 Reflector 对这个简单示例生成的代码,我们可以看到它被转换成某种状态机类型的代码。

internal class Program
{
    // Methods
    private Task<string> GetString()
    {
        <GetString>d__5 d__ = new <GetString>d__5(0);
        d__.<>4__this = this;
        d__.<>t__MoveNextDelegate = new Action(d__, (IntPtr) this.MoveNext);
        d__.$builder = AsyncTaskMethodBuilder<string>.Create();
        d__.MoveNext();
        return d__.$builder.Task;
    }

    private static void Main(string[] args)
    {
        new Program().Start();
    }

    private void Start()
    {
        <Start>d__0 d__ = new <Start>d__0(0);
        d__.<>4__this = this;
        d__.<>t__MoveNextDelegate = new Action(d__, (IntPtr) this.MoveNext);
        d__.$builder = AsyncVoidMethodBuilder.Create();
        d__.MoveNext();
    }

    // Nested Types
    [CompilerGenerated]
    private sealed class <GetString>d__5
    {
        // Fields
        private bool $__disposing;
        public AsyncTaskMethodBuilder<string> $builder;
        private int <>1__state;
        public Program <>4__this;
        public Action <>t__MoveNextDelegate;
        public StringBuilder <sb>5__6;

        // Methods
        [DebuggerHidden]
        public <GetString>d__5(int <>1__state)
        {
            this.<>1__state = <>1__state;
        }

        [DebuggerHidden]
        public void Dispose()
        {
            this.$__disposing = true;
            this.MoveNext();
            this.<>1__state = -1;
        }

        public void MoveNext()
        {
            string <>t__result;
            try
            {
                if (this.<>1__state == -1)
                {
                    return;
                }
                this.<sb>5__6 = new StringBuilder();
                this.<sb>5__6.AppendLine("Hello world");
                <>t__result = this.<sb>5__6.ToString();
            }
            catch (Exception <>t__ex)
            {
                this.<>1__state = -1;
                this.$builder.SetException(<>t__ex);
                return;
            }
            this.<>1__state = -1;
            this.$builder.SetResult(<>t__result);
        }
    }

    [CompilerGenerated]
    private sealed class <Start>d__0
    {
        // Fields
        private bool $__disposing;
        public AsyncVoidMethodBuilder $builder;
        private int <>1__state;
        public Program <>4__this;
        public Action <>t__MoveNextDelegate;
        private TaskAwaiter<string> <a1>t__$await3;
        public string <chungles>5__1;

        // Methods
        [DebuggerHidden]
        public <Start>d__0(int <>1__state)
        {
            this.<>1__state = <>1__state;
        }

        [DebuggerHidden]
        public void Dispose()
        {
            this.$__disposing = true;
            this.MoveNext();
            this.<>1__state = -1;
        }

        public void MoveNext()
        {
            try
            {
                string <1>t__$await2;
                bool $__doFinallyBodies = true;
                if (this.<>1__state != 1)
                {
                    if (this.<>1__state != -1)
                    {
                        Console.WriteLine("*** BEFORE CALL ***");
                        this.<a1>t__$await3 = this.<>4__this.GetString().GetAwaiter<string>();
                        if (this.<a1>t__$await3.IsCompleted)
                        {
                            goto Label_0084;
                        }
                        this.<>1__state = 1;
                        $__doFinallyBodies = false;
                        this.<a1>t__$await3.OnCompleted(this.<>t__MoveNextDelegate);
                    }
                    return;
                }
                this.<>1__state = 0;
            Label_0084:
                <1>t__$await2 = this.<a1>t__$await3.GetResult();
                this.<a1>t__$await3 = new TaskAwaiter<string>();
                this.<chungles>5__1 = <1>t__$await2;
                Console.WriteLine("*** AFTER CALL ***");
                Console.WriteLine("result = " + this.<chungles>5__1);
                Console.ReadLine();
            }
            catch (Exception <>t__ex)
            {
                this.<>1__state = -1;
                this.$builder.SetException(<>t__ex);
                return;
            }
            this.<>1__state = -1;
            this.$builder.SetResult();
        }
    }
}

我们还可以看到诸如 GetAwaiter 之类的东西,这是一个基于模式的东西,它使对象可被 await。我们稍后将看到 GetAwaiter 的作用以及如何使我们自己的对象可被 await。目前,您应该能够大致看出存在一些编译器生成的代码,以及一个延续委托(在上面的 Reflector 代码中为 <>t__MoveNextDelegate),该委托在 Reflector 代码中被调用。

我记得在 Anders Mix 的 Async CTP 视频中看到的一件事是,Async CTP 提供了响应性;当我们 await 时,控制权会交还给调用线程。

捕获异常

演示项目名称:TryCatchAwaitTask

在 Async CTP 中捕获异常再容易不过了,就像我说的,它严格遵循同步代码运行时的控制流;根本没有代码怪味,全是简单的 try/catch 语句。这是一个小例子:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TryCatchAwaitTask
{
    class Program
    {
        static void Main(string[] args)
        {
            Program p = new Program();
            p.DoIt();
        }

        public async void DoIt()
        {
            //No problems with this chap
            try
            {
                List<int> results = await 
                    GetSomeNumbers(10,20);
                Console.WriteLine("==========START OF GOOD CASE=========");
                Parallel.For(0, results.Count, (x) =>
                {
                    Console.WriteLine(x);
                });
                Console.WriteLine("==========END OF GOOD CASE=========");

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }

            //Make something go wrong
            try
            {
                //simulate a failure by erroring at 5
                List<int> results = await GetSomeNumbers(10,5);
                Parallel.ForEach(results, (x) =>
                {
                    Console.WriteLine(x);
                });

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }

            Console.ReadLine();
        }

        /// <summary>
        /// Throws InvalidOperationException when index > shouldFailAt value
        /// </summary>
        public async Task<List<int>> GetSomeNumbers(int upperLimit, int shouldFailAt)
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < upperLimit; i++)
            {
                if (i > shouldFailAt)
                    throw new InvalidOperationException(
                        String.Format("Oh no its > {0}",shouldFailAt));

                ints.Add(i);
            }
            return ints;
        }
    }
}

以下是运行时显示的内容:

UI 响应性

演示项目名称:UIResponsiveness

Async CTP 在任何需要响应性的区域(例如 UI)都非常有用。以下代码简单地演示了用户可以启动多个可 await 的代码块,它们都在运行,但 UI 保持响应;只需打开演示并随机单击按钮即可了解我的意思。

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.Threading.Tasks;

namespace UIResponsiveness
{
    public partial class Form1 : Form
    {
        Random rand = new Random();
        List<string> suffixes = new List<string>() { 
            "a","b","c","d","e","f","g","h","i","j","k","l",
            "m","n","o","p","q","r","s","t","u","v","w","x","y","z"};

        public Form1()
        {
            InitializeComponent();


        }

        private async void button1_Click(object sender, EventArgs e)
        {
            RunTaskToGetText();
        }

        private void button2_Click(object sender, EventArgs e)
        {
            RunTaskToGetText();
        }

        private void button3_Click(object sender, EventArgs e)
        {
            RunTaskToGetText();
        }


        private async void RunTaskToGetText()
        {
            List<string> results = await GetSomeText(
                suffixes[rand.Next(0,suffixes.Count())], 500000);

            textBox1.Text += results[0];
            textBox1.Text += results[1];
            textBox1.Text += results[results.Count-2];
            textBox1.Text += results[results.Count-1];
        }

        public async Task<List<string>> GetSomeText(string prefix, int upperLimit)
        {
            List<string> values = new List<string>();
            values.Add("=============== New Task kicking off=================\r\n");
            for (int i = 0; i < upperLimit; i++)
            {
                values.Add(String.Format("Value_{0}_{1}\r\n", prefix, i.ToString()));
            }
            values.Add("=============== New Task Done=================\r\n");
            return values;
        }
    }
}

我无法真正为这个演示提供截图,但如果您尝试演示代码,您会发现它仍然响应,并且结果会在它们完成时返回,而不再需要 await。

支持进度

演示项目名称:ProgressReporting

Async CTP 还处理进度的报告,这在 TPL 中并不容易做到。那么它是如何做到的呢?嗯,Async CTP 中有一个名为 IProgress<T> 的新接口,CTP 已为您实现了它,即 Progress<T> 类。它们看起来像这样:

public interface IProgress<in T>
{
    void Report(T value);
}

public class Progress<T> : IProgress<T>
{
    public Progress();

    public Progress(Action<T> handler);

    public event ProgressEventHandler<T> ProgressChanged;

    protected virtual void OnReport(T value);
}

那么,我们如何在自己的代码中使用 Progress<T> 呢?很简单。这是一个将当前进度值写入控制台的简单示例。我应该指出,计算正确的进度值取决于您自己。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ProgressReporting
{
    class Program
    {
        static void Main(string[] args)
        {
            Program p = new Program();
            p.DoIt();
        }

        public async void DoIt()
        {
            Progress<int> progress = new Progress<int>();
            progress.ProgressChanged += (sender, e) =>
            {
                Console.WriteLine(String.Format("Progress has seen {0} item", e));
            };

            List<int> results = await GetSomeNumbers(10, progress);
            Console.WriteLine("Task results are");
            Parallel.For(0, results.Count, (x) =>
            {
                Console.WriteLine(x);
            });

            Console.ReadLine();
        }

        public async Task<List<int>> GetSomeNumbers(
        int upperLimit, IProgress<int> progress)
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < upperLimit; i++)
            {
                ints.Add(i);
                progress.Report(i + 1);
            }
            return ints;
        }

    }
}

以下是它运行的示例:

Awaiter/GetAwaiter

既然我们已经看到了一些新 Async CTP 关键字的实际应用示例,让我们深入了解一下 awaitable 到底意味着什么。

Await 关键字 **只能** 与可 await 的对象一起使用。由于 Async CTP,Task 已被扩展为可 await。那么它到底是如何做到的,我们是否可以使自己的类型可 await 呢?

是的,我们可以。事实证明,您只需要确保存在某些核心方法,这些方法可以是实例方法或扩展方法。

为了使您的对象可 await 或扩展现有对象,您必须实现的以下内容:

public void GetResult()
public void OnCompleted(Action continuation)
public bool IsCompleted { get; set; }

当使用新的 Await 关键字时生成的编译器代码将需要这些,它在内部会调用编译器重写的代码中的 GetAwaiter() 方法,所以只要您的对象拥有这些方法/属性,它就是可 await 的。

带返回值的 Awaiter

演示项目名称:AwaiterThatReturnsSomething

现在我们知道了如何创建一个自定义的 awaitable 对象,让我们尝试一下,比如添加一个等待给定数字求幂的能力(这显然除了用于演示之外没有其他用途,但我希望您能从中理解)。

所以我们开始创建一个 double 的扩展方法,如下所示,您可以看到它返回一个 DoubleAwaiter

public static class DoubleExtensionMethods
{
    public static DoubleAwaiter GetAwaiter(this double demoDouble, int power)
    {
        return new DoubleAwaiter(demoDouble, power);
    }
}

其中 DoubleAwaiter 如下所示,最重要的是我们设置了 IsCompleted 并调用了 continuation Action

public class DoubleAwaiter
{
    private double theValue;
    private int power;

    public DoubleAwaiter(double theValue, int power)
    {
        this.theValue = theValue;
        this.power = power;
        IsCompleted = false;
    }

    public DoubleAwaiter GetAwaiter()
    {
        return this;
    }

    public double GetResult()
    {
        return theValue;
    }


    public void OnCompleted(Action continuation)
    {
        this.theValue = Math.Pow(theValue, power);
        IsCompleted = true;
        continuation();
    }

    public bool IsCompleted { get; set; }
}

我们现在可以这样使用它:

class Program
{
    static void Main(string[] args)
    {
        ThreadPool.QueueUserWorkItem(async delegate
        {
            double x = 10;
            double x2 = await x.GetAwaiter(3);
            Console.WriteLine(string.Format("x was : {0}, x2 is {1}",x,x2));
        });

        Console.ReadLine();
    }
}

运行时会产生以下输出(就像我说的,这是自定义 awaitable 最愚蠢的用法,它纯粹是为了展示如果您想让自己的代码可 await 需要使用的结构):

同步 Awaiter

演示项目名称:SyncContextControlAwaiter

由于使某事物可 await 的概念最终归结为实现正确的几个方法/属性,我们可以利用这一点来做一些相当疯狂的事情;例如,在处理 UI 代码时,您一定遇到过需要跨线程回调到 UI 线程的需求。

嗯,我们实际上可以使用一个自定义 awaiter 来使整个过程更加整洁;请考虑以下 Windows Forms 代码片段:

public static class ControlExtensionMethods
{
    public static ControlAwaiter GetAwaiter(this Control control)
    {
        return new ControlAwaiter(control);
    }
}

public class ControlAwaiter
{
    private readonly Control control;

    public ControlAwaiter(Control control)
    {
        if (control == null) 
          throw new ArgumentNullException("control");
        this.control = control;
        IsCompleted = false;
    }

    public void GetResult()
    {
    }

    public void OnCompleted(Action continuation)
    {
        control.BeginInvoke(continuation);
        IsCompleted = true;
    }

    public bool IsCompleted { get; set; }
}

我们使用自定义 awaiter 来进行回传到 Windows Forms 应用程序的 UI 线程,使用 BeginInvoke(..),而调用代码是这样的:

private void BtnSyncContextAwaiter_Click(object sender, EventArgs e)
{
    ThreadPool.QueueUserWorkItem(async delegate
    {
        string text = "This should work just fine thanks to our lovely \"ControlAwaiter\" " +
                        "which ensures correct thread marshalling";
        await textBox1;
        textBox1.Text = text;
    });

}

演示代码演示了使用上面显示的自定义同步上下文 awaiter。

演示代码还展示了当我们不使用上面显示的自定义同步上下文 awaiter 时代码会做什么,它使用了这段代码:

private void BtnNoAwaiter_Click(object sender, EventArgs e)
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        try
        {
            string text = "This should cause a problem, as we have spawned " +
                            "background thread using ThreadPool" + 
                            "Which is not the correct thread to change the UI control " +
                            "so should cause a CrossThread Violation";
            textBox1.Text = text;
        }
        catch (InvalidOperationException ioex)
        {
            MessageBox.Show(ioex.Message);
        }
    });
}

这是演示代码使用上面显示的自定义同步上下文 awaiter 的截图:

以下是演示代码 **不** 使用上面显示的自定义同步上下文 awaiter 的截图:

更实际的 Awaiter

演示项目名称:MoreRealisticAwaiterWithCancellation

既然您已经看到可以创建自己的 awaitable 代码,甚至可以创建一些相当新颖的用法,我认为您应该知道,大多数时候您将使用 Task 作为 await 的对象。因此,最后一个示例演示了一个更完整的示例,该示例使用 Task 并向您展示了如何对您正在 await 的一些以 Task 为中心的类的服务代码进行单元测试,该代码的构造方式使我们能够轻松地注入一个 Task 以外的服务。

此演示还展示了如何使用 CancellationToken 来取消 awaitable Task。

让我们从实际进行 await 的代码开始,如下所示:

class Program
{
    static void Main(string[] args)
    {
        ManualResetEvent mre1 = new ManualResetEvent(true);
        ManualResetEvent mre2 = new ManualResetEvent(false);
        ManualResetEvent mre3 = new ManualResetEvent(false);
        ManualResetEvent mre4 = new ManualResetEvent(false);

        DoItForReal(false, mre1, mre2);
        DoItForReal(true, mre2, mre3);

        Console.ReadLine();
    }

    /// <summary>
    /// Shows how you can await on a Task based service
    /// </summary>
    private static async void DoItForReal(
        bool shouldCancel, ManualResetEvent mreIn, ManualResetEvent mreOut)
    {

        mreIn.WaitOne();

        CancellationTokenSource cts = new CancellationTokenSource();

        int upperLimit = 50;
        int cancelAfter = (int)upperLimit / 5;
        // which is what is used in WorkProvider class
        int waitDelayForEachDataItem = 10; 

        //allows some items to be processed before cancelling
        if (shouldCancel)
            cts.CancelAfter(cancelAfter * waitDelayForEachDataItem); 

        Console.WriteLine();
        Console.WriteLine("=========================================");
        Console.WriteLine("Started DoItForReal()");


        try
        {
            List<String> data = await new Worker(new WorkProvider(), 
                upperLimit, cts.Token).GetData();

            foreach (String item in data)
            {
                Console.WriteLine(item);
            }
            //allow those waiting on this WaitHandle to continue
            if (mreOut != null) { mreOut.Set(); } 
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Processing canceled.");
            //allow those waiting on this WaitHandle to continue
            if (mreOut != null) { mreOut.Set(); }
        }
        catch (AggregateException aggEx)
        {
            Console.WriteLine("AggEx caught");
            //allow those waiting on this WaitHandle to continue
            if (mreOut != null) { mreOut.Set(); }
        }
        finally
        {
            Console.WriteLine("Finished DoItForReal()");
            Console.WriteLine("=========================================");
            Console.WriteLine();
        }
    }

}

使用 ManualResetEvent(s) 只是为了确保一个场景在另一个场景开始之前完成到控制台的打印,以使演示输出截图对读者清晰。

现在让我们检查一下上面代码 await 的 Worker 代码:

public class Worker
{
    private IWorkProvider workprovider;
    private int upperLimit;
    private CancellationToken token;

    public Worker(IWorkProvider workprovider, int upperLimit, CancellationToken token)
    {
        this.workprovider = workprovider;
        this.upperLimit = upperLimit;
        this.token = token;
    }

    public Task<List<String>> GetData()
    {
        return workprovider.GetData(upperLimit, token);
    }
}

可以看出,该设计支持从单元测试或通过使用与 IOC 容器配合使用的抽象工厂来解析实际 Worker 实例来提供替代 IWorkProvider

演示应用程序中的 IWorkProvider 接口如下所示:

public interface IWorkProvider
{
    Task<List<String>> GetData(int upperLimit, CancellationToken token);
}

正在 await 的实际代码是 GetData() 方法返回的 Task<List<String>>。现在让我们看看提供 Task<List<String>> 的实际基于 Task 的服务代码。

public class WorkProvider : IWorkProvider
{
    public Task<List<string>> GetData(int upperLimit, System.Threading.CancellationToken token)
    {
    //will not be TaskEx when CTP is in .NET 5.0 Framework
        return TaskEx.Run(() =>
        {
            List<string> results = new List<string>();

            for (int i = 0; i < upperLimit; i++)
            {
                token.ThrowIfCancellationRequested();
                Thread.Sleep(10);  
                results.Add(string.Format("Added runtime string {0}",i.ToString()));
            }
            return results;
        });
    }
}

现在,当我们运行使用此基于 Task 的服务代码的实际代码时,我们会得到以下结果,我们运行一次直到完成,一次是我们期望它被取消,因为演示代码通过 CancellationTokenSource 发起了取消。

现在您可能会问自己,好吧,那很好,我有一些基于 Task 的服务代码,我们可以 await 它,但我该如何对那个坏小子进行单元测试呢?嗯,虽然演示应用程序不包含正式的单元测试,但它提供了所有必要的组件,例如:

  1. 关注点分离
  2. IOC 的扩展点,用于提供替代的 IWorkProvider 实现,或用于单元测试
  3. IWorkProvider 的模拟,展示了如何模拟您的 Task(s)

我更喜欢使用 **Moq**(一个很酷的模拟框架)来执行我所做的任何测试代码,所以这里有一个关于如何编写基于单元测试的模拟服务的示例,该服务将与我们刚刚看到的基于 Task 的服务代码具有相同的行为:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Moq;
using System.Threading.Tasks;

namespace MoreRealisticAwaiterWithCancellation
{
    class Program
    {
        static void Main(string[] args)
        {
        
        ...
        ...
        ...

            DoItUsingMoq_YouKnowForUnitTestsLike(mre3);
            Console.ReadLine();
        }

        /// <summary>
        /// Shows how you might mock a task based service using "Moq" and TaskCompletionSource
        /// </summary>
        private static async void DoItUsingMoq_YouKnowForUnitTestsLike(ManualResetEvent mreIn)
        {

            mreIn.WaitOne();
            CancellationTokenSource cts = new CancellationTokenSource();

            int upperLimit = 50;
            
            List<String> dummyResults = new List<string>();
            for (int i = 0; i < upperLimit; i++)
            {
                dummyResults.Add(String.Format("Dummy Result {0}", i.ToString()));     
            }

            //Allows this test method to simulate a Task with
            //a result without actually creating a Task
            TaskCompletionSource<List<String>> tcs = 
                new TaskCompletionSource<List<String>>();
            tcs.SetResult(dummyResults);


            Console.WriteLine();
            Console.WriteLine("=========================================");
            Console.WriteLine("Started DoItUsingMoq_YouKnowForUnitTestsLike()");

            try
            {
                Mock<IWorkProvider> mockWorkProvider = new Mock<IWorkProvider>();
                mockWorkProvider
                    .Setup(x => x.GetData(
                        It.IsAny<Int32>(), 
                        It.IsAny<CancellationToken>()))
                    .Returns(tcs.Task);

                List<String> data = await new Worker(
                    mockWorkProvider.Object, upperLimit, cts.Token).GetData();

                foreach (String item in data)
                {
                    Console.WriteLine(item);
                }
            }
            finally
            {
                Console.WriteLine("Finished DoItUsingMoq_YouKnowForUnitTestsLike()");
                Console.WriteLine("=========================================");
                Console.WriteLine();
            }
        }
    }
}

可以看出,此示例使用 **Moq** 创建了一个模拟的 IWorkProvider,并将其提供给 Worker 构造函数,我们将最终 await 它。那么,如果我们处于单元测试环境中,而不是实际运行基于 Task 的服务,我们该如何运行 await 的延续呢?嗯,答案就在本文的开头。我们只需使用 TaskCompletionSource 来模拟 Task.Result,通过使用 TaskCompletionSource.SetResult(..) 方法并为将在该测试代码中使用的模拟对象提供 TaskCompletionSource.Task

一如既往,这里有一个运行的例子:

我在这里没有对 CancellationToken 做任何事情,但我认为这也可以实现。**Moq** 非常酷,而且我认为完全能够胜任这项工作;我只是在那时累了,抱歉。

现在就到这里

这就是我在这篇文章中想说的全部内容。我希望您喜欢它,并享受了 TPL 的旅程。我知道这是一段漫长的旅程,说实话,有时我觉得我永远也完成不了。它终于完成了。所以,如果您喜欢这篇文章,请花点时间留下评论和投票。非常感谢。

© . All rights reserved.