65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库:2/n

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (105投票s)

2011年2月19日

CPOL

15分钟阅读

viewsIcon

263867

downloadIcon

2338

探究任务并行库的使用。

引言

这是我计划编写的 TPL 系列文章的第二部分。上次我介绍了任务(Tasks),并涵盖了以下内容:

  • 线程与任务的对比
  • 创建任务
  • 触发方法/属性
  • 处理异常
  • 取消任务
  • 同步上下文

这次我们将要研究如何使用 TPL 的一个概念,称为“延续”(Continuations)。它允许我们在一个任务完成后,执行另一个任务,该任务可以利用前一个任务的返回值。

文章系列路线图

这是我可能撰写的 6 篇文章中的第 2 篇,希望大家会喜欢。下面是我计划涵盖的内容的粗略大纲:

  1. 启动任务 / 触发操作 / 异常处理 / 取消 / UI 同步
  2. 延续 / 取消链式任务(本篇文章)
  3. 并行 For / 自定义分区器 / 聚合操作
  4. 并行 LINQ
  5. 管道
  6. 高级场景 / 任务的 v.Next

我知道有些人会直接阅读这篇文章并说它与 MSDN 上已有的内容相似,我部分同意这一点;然而,我仍然选择撰写这些文章有几个原因,如下:

  • 只有前面几篇文章会展示与 MSDN 相似的想法;之后,我认为我将要深入探讨的材料在 MSDN 上找不到,并且是我个人对 TPL 进行研究的结果。我将在文章中详细介绍我的研究,这样您就可以直接阅读我的研究成果……嗯,真不错。
  • 这里会有实时输出的截图,这是 MSDN 上不常有的,这有助于一些读者加强对文章内容的理解。
  • 这里可能有一些读者从未听说过任务并行库,因此也不会在 MSDN 上找到它,您知道,就是那种你必须先知道你在找什么才能找到东西的老套路。
  • 我喜欢关于线程的文章,所以喜欢写它们,所以我写了它们,将会写它们,已经写了它们,并将继续写它们。

尽管如此,如果读完这篇文章的人真的认为它与 MSDN 太相似(我仍然希望不会),也请告诉我,我会尝试调整后续文章以作弥补。

目录

总之,本文将涵盖以下内容

更多 TPL 背景知识

这一部分我本应该在第一篇文章中就进行介绍,但我没有,所以现在把它放在这里。我希望解释 TPL 的设计者们为何那样设计,以及这对我们大家有什么好处。

默认任务调度器

TPL 依赖于一个调度器来组织和运行任务。在 .NET 4 中,默认的(您可以替换它)任务调度器与 `ThreadPool` 紧密集成。因此,如果您使用默认的任务调度器,运行 `Task`s 的工作线程由 `ThreadPool` 管理,通常至少有与目标 PC 的核心数一样多的工作线程。当 `Task`s 的数量多于工作线程的数量时,一些 `Task`s 必须排队,直到 `ThreadPool` 工作线程空闲下来以服务于该 `Task`。

这与现有的 `ThreadPool.QueueUserWorkItem(..)` 所采用的概念相似。事实上,您可以将默认任务调度器视为一个改进的 `ThreadPool`,其中工作项只是 `Task`s。默认调度器在核心数增加时,比单独使用标准 `ThreadPool` 能够获得更好的性能;我们将在下面进行探讨。

标准 ThreadPool

`ThreadPool` 本质上是一个全局的先进先出 (FIFO) 队列,工作项被分配到队列中执行出队的工作。

这在核心数增加时会成为瓶颈,因为队列一次只能被一个工作线程访问。当只有少数几个大粒度的并行项需要处理时,确保对这个全局队列的单次访问的同步成本很小,但当存在大量细粒度的并行(就像使用 `Task`s 时)时,处理这个单一全局队列的同步成本开始成为瓶颈。

任务一直被设计成能够根据可用核心数进行扩展,我读到过 .NET 能够高效地运行数百万个任务。为了处理这种情况,必须从集中式队列中采取不同的方法。我将在下面更详细地讨论这种更分散的调度方法。

分散的本地队列

.NET Framework 为 `ThreadPool` 中的每个工作线程提供了自己的本地任务队列。本地队列分发负载,并大大减轻了使用单一全局队列的需要。您可以看到下面,有与工作线程一样多的本地队列,以及单一的全局队列,所有这些队列都并行运行。

其思想是,工作线程可以以先进后出 (LIFO) 的方式从其本地队列中获取任务,它可能会找到工作,或者它可能不得不(并为此承担更重的同步成本)回到单一的全局队列。

TPL 设计者还实现了另一个技巧:如果一个工作线程的本地队列为空,它可以回到全局队列获取更多任务,但 TPL 设计者做的是让它以 FIFO 顺序从相邻的本地队列窃取工作。

前 MVP、现任微软员工 Daniel Moth 在他的博客文章中有一篇精彩的文章,其中包含一些极具启发性的图示来阐述这一切:http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx

非常值得阅读那篇文章。

总之,很抱歉稍有跑题,我只是觉得有必要说出来。好了,现在我们来谈谈延续。

延续,那是什么?

简单来说,延续允许将 `Task`s 链接在一起。虽然这本身听起来不算什么大事,但使延续概念真正闪光的是您可以有选择性的延续;也就是说,您可以有一个延续,它只在整个 `Task`s 组完成时触发,或者一个延续,它只在许多 `Task`s 中的一个完成时触发,或者我们可以有一个延续,它只在 `Task` 失败或被取消时触发。延续为我们提供了这种自由度。通过利用 TPL 提供的这种自由度,我们可以非常精细地控制并行代码的许多方面,而不是仅仅控制一个庞大的线程代码块。

在本篇文章中,我特意将任务链设计得相当短,但您确实可以根据自己的需要使这些链变得尽可能短或尽可能长。

简单的延续

演示代码项目:SimpleContinuation

关于这个小代码片段/演示,我真的没有什么要说的,除了说它是一个延续,说实话,这可能就是我需要说的全部内容了,因为这就是创建和使用延续的全部内容。真的很简单。

// create the task
Task<List<int>> taskWithFactoryAndState =
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 2000);


try
{
    //setup a continuation for task
    taskWithFactoryAndState.ContinueWith((ant) =>
    {
        List<int> result = ant.Result;
        foreach (int resultValue in result)
        {
            Console.WriteLine("Task produced {0}", resultValue);
        }
    });
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}

Console.ReadLine();

这是一个小的截图,我知道不是很激动人心,但它会变得更好。

WPF 同步

演示代码项目:WPFDispatcherSynchonizationContext

这是我在上一篇文章中讨论过的内容:《UI 同步》,更具体地说,《同步,WPF 同步》。我对那段代码没有做任何更改,但我将其包含在这里。您应该阅读那篇文章来了解我们要解决的基础。本文之所以再次包含这段代码,只是为了表明它确实是使用 TPL 延续的一个非常有效的理由,即可以将线程调度回 UI 控件的所有者线程。正如我所说,除非您阅读了第一篇文章的相关部分,否则这段代码片段将毫无意义。

private void btnDoIt_Click(object sender, RoutedEventArgs e)
{
    //create CancellationTokenSource, so we can use the overload of
    //the Task.Factory that allows us to pass in a SynchronizationContext
    CancellationTokenSource tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;

    Task taskWithFactoryAndState1 = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        // This is not run on the UI thread.
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 10000, token).ContinueWith(ant =>
    {
        //updates UI no problem as we are using correct SynchronizationContext
        lstBox.ItemsSource = ant.Result;
    }, token, TaskContinuationOptions.None,
        TaskScheduler.FromCurrentSynchronizationContext());
}

这是演示运行的截图

继续“WhenAny”

演示代码项目:ContinueWhenAny

在并行编程中,一个可能的需求是尝试在一个数据集上运行一组算法,并使用性能最佳的那个。这些算法可以是任何东西,从自定义的实验性搜索。我没有时间编写一整套自定义的实验性搜索算法,因此选择了一个稍微为人熟知的东西:“排序算法”。我使用的是一篇过去 C# 竞赛获胜者的示例:https://codeproject.org.cn/KB/recipes/SortVisualization.aspx,作者是 Kanasz Robert。

基本思路是,我只想等到第一个算法(即,希望是最快的那个)达到其目标。

我将这些算法打包到一个名为 `ContinueWhen.Common` 的小辅助 DLL 中,位于附加的 VS2010 解决方案中。但是,就获取只等待多个正在运行的任务中的一个来触发延续的概念而言,这段代码应该很容易理解,而无需查看实际的排序算法。

static void Main(string[] args)
{
    //create a list of random numbers to sort
    Random rand = new Random();
    List<int> unsortedList = new List<int>();
    int numberOfItemsToSort = 5000;

    for (int i = 0; i < numberOfItemsToSort; i++)
    {
        unsortedList.Add(rand.Next(numberOfItemsToSort));
    }

    //create 3 tasks to run 3 different sorting algorithms
    Task<SortingTaskResult>[] tasks = 
    new Task<SortingTaskResult>[3];

    //Bubble Sort Task
    tasks[0] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Bubble Sort");
    }, unsortedList);


    //Selection Sort Task
    tasks[1] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Selection Sort");
    }, unsortedList);

    //Counting Sort Task
    tasks[2] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.CountingSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Counting Sort");
    }, unsortedList);

    //Wait for any of them (assuming nothing goes wrong)
    Task.Factory.ContinueWhenAny(
        tasks,
        (Task<SortingTaskResult> antecedent) =>
        {
            Console.WriteLine(antecedent.Result.ToString());
        });


    Console.ReadLine();
}

上面的代码展示了我们创建了三个 `Task`s,每个任务对应一个排序算法,然后是一个单一的延续,它等待 `Task`s 中的 **任意一个**(即第一个完成的 `Task`)完成。下面是运行上述列表时产生的效果:

可以看到,“选择排序”赢得了比赛,尽管它不是第一个启动的 `Task`;它获胜是因为它比当时恰好使用了可用 CPU 核心的其他算法更优。我写这段测试代码的笔记本只有两个 CPU 核心,所以很有可能,如果我有四个 CPU 核心,第三种算法最终可能会获胜,因为它在理论上是更好的算法。

另一个值得注意的有趣之处是,由于我们只等待组(数组)中的一个 `Task` 完成,我们 **只能** 使用我们等待的单个 `Task` 的 `Result`,如本截图所示。

继续“WhenAll”

演示代码项目:ContinueWhenAll

`ContinueWhenAll` 很有意思,我想象过很多次它会非常有用。您已经将并行工作拆分了,但必须等待所有部分完成才能进入下一步,或者再次使用实验算法的想法。我们也可以想象,我们可能非常有兴趣了解我们的自定义算法的各种执行特征,因此必须等待它们全部完成才能继续。

我再次选择使用排序算法,因为它是一个简单的概念。通过这种方式,我们的想法是我们要对一个未排序的列表运行各种排序算法,并等待所有不同的算法完成,然后我们才能继续。

这是实现此目的的代码:

static void Main(string[] args)
{
    //create a list of random numbers to sort
    Random rand = new Random();
    List<int> unsortedList = new List<int>();
    int numberOfItemsToSort = 5000;

    for (int i = 0; i < numberOfItemsToSort; i++)
    {
        unsortedList.Add(rand.Next(numberOfItemsToSort));
    }

    //create 3 tasks to run 3 different sorting algorithms
    Task<SortingTaskResult>[] tasks = 
    new Task<SortingTaskResult>[3];

    //Bubble Sort Task
    tasks[0] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Bubble Sort");
    }, unsortedList);

    //Selection Sort Task
    tasks[1] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Selection Sort");
    }, unsortedList);

    //Counting Sort Task
    tasks[2] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.CountingSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Counting Sort");
    }, unsortedList);

    //Wait for all of them (assuming nothing goes wrong)
    Task.Factory.ContinueWhenAll(
        tasks,
        (antecedents) =>
        {
            foreach (Task<SortingTaskResult> task in antecedents)
            {
                Console.WriteLine(task.Result.ToString());
            }
        });

    Console.ReadLine();
}

可以看到,这次延续是在所有三个排序 `Task`s 完成后才触发的。这是运行此片段的结果:

另一个值得注意的有趣之处是,由于我们等待的是组(数组)中的所有 `Task`s 完成,因此我们可以使用我们等待的所有 `Task`s 的 `Result`,如本截图所示。

使用延续进行异常处理

演示代码项目:UsingContinuationForExceptionHandling

在第一篇文章中谈到如何 处理 Task 异常 的不同方法时,我还提到了一种当时没有展示的技巧。现在是时候展示那种方式了。它真的非常简单:我们只需使用延续。思路是,我们有一个延续,它在先驱 `Task` 成功完成时运行,另一个在先驱 `Task` 处于 Faulted 状态时运行。

这可以通过我们在创建任务延续时提供的 `TaskContinuationOptions` 来轻松实现。以下是一些示例代码来说明我的意思:

// create the task
Task<List<int>> taskWithFactoryAndState =
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            if (i > 100)
            {
                InvalidOperationException ex =
                    new InvalidOperationException("oh no its > 100");
                ex.Source = "taskWithFactoryAndState";
                throw ex;
            }
        }
        return ints;
    }, 2000);

//and setup a continuation for it only on when faulted
taskWithFactoryAndState.ContinueWith((ant) =>
    {
        AggregateException aggEx = ant.Exception;
        Console.WriteLine("OOOOPS : The Task exited with Exception(s)");

        foreach (Exception ex in aggEx.InnerExceptions)
        {
            Console.WriteLine(string.Format("Caught exception '{0}'",
                ex.Message));
        }
    }, TaskContinuationOptions.OnlyOnFaulted);

//and setup a continuation for it only on ran to completion
taskWithFactoryAndState.ContinueWith((ant) =>
{
    List<int> result = ant.Result;
    foreach (int resultValue in result)
    {
        Console.WriteLine("Task produced {0}", resultValue);
    }
}, TaskContinuationOptions.OnlyOnRanToCompletion);

Console.ReadLine();

这是运行这段代码时会发生什么的演示:

可以看到,只有一个延续运行了,那就是应该运行“OnlyOnFaulted”的那个。

将延续用作管道

演示代码项目:UsingContinuationsAsPipelines

我在本文开头提到过,您可以使用延续将任务链接起来,使其尽可能简单或复杂。我并没有发疯之类的,但我发现了一个下面展示的小例子,它比到目前为止您看到的例子稍微大一些。然而,它确实说明了您可以非常容易地继续一个延续。

static void Main(string[] args)
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                ints.Add(i);
            }
            return ints;
        }, 10);


    //and setup a continuation for it only on ran
    //to completion, where this continuation
    //returns a result too, which will be used by yet another continuation
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, " + 
                "which will be squared by continuation", 
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    }, TaskContinuationOptions.OnlyOnRanToCompletion)
    //Another continution
    .ContinueWith((ant) =>
        {
            List<int> parentResult = ant.Result;
            foreach (int resultValue in parentResult)
            {
                Console.WriteLine("Parent Continuation Task produced Square of {0}", 
                    resultValue);
            }
        }, TaskContinuationOptions.OnlyOnRanToCompletion);


    Console.ReadLine();

}

这肯定不是什么火箭科学。我所做的就是创建一个初始 `Task`,该任务创建一个并返回一个数字列表。这个由第一个 `Task` 产生的 `List<int>` 然后被传递到一个延续中,在那里打印原始 `Task`(先驱任务)的结果,并通过获取原始 `Task`(先驱任务)值的平方来创建一个新的 `List<int>`。这个延续的结果然后被馈送到另一个延续,该延续打印出平方延续 `Task`(这是此延续的先驱任务)的结果。

每个延续都假设一个理想的世界,并且只有在延续的原始 `Task` 成功完成时才会运行。

这是这个例子运行的一个小演示:

在延续的前驱任务中捕获异常

演示代码项目:CatchExceptionInAntecedent

现在我们已经看到了使用 `Task`s/continuations 的几个例子,我们也看到了我们可以使用延续在事情进展顺利时运行,我们也可以在原始 `Task` 未能完成其工作时运行 `Task`s,但有时,我们可能只是想有一个不指定的延续,它总是发生,无论原始 `Task` 是否成功完成,并由延续决定如何处理原始 `Task` 状态中的问题。

这里有一个例子说明我们如何在延续中检查原始 `Task` 的 `Exception`,在那里我们重新抛出原始 `Task` 提供的原始 `Exception`。由于在此示例中延续重新抛出 `Exception`,我们需要确保它抛出的 `Exception` 以某种方式被观察到(我上次讨论了 `Exception` 观察,当时我谈到了 `Wait()` / `Result` 等触发方法),因此我 `Wait()` 在延续上。

这是代码

try
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            Console.WriteLine("In TaskWithFactoryAndState");
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
                ints.Add(i);
                if (i == 5)
                    throw new InvalidOperationException(
                          "Don't like 5 its vulgar and dirty");
  
            }
            return ints;
        }, 100);


    //Setup a continuation which will not run
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        if (ant.Status == TaskStatus.Faulted)
            throw ant.Exception.InnerException;


        Console.WriteLine("In Continuation, no problems in Antecedent");

        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, " + 
                "which will be squared by continuation",
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    });


    //wait for the task to complete
    taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
    }
}

Console.WriteLine("Finished");

这是它运行时的一个小演示截图。您可以看到我们捕获了原始 `Task` 的原始 `Exception`,并成功地将其重新抛出(保留了 `Exception` 信息)到 `try/catch` 捕获 `Exception` 的点。

取消延续

演示代码项目:CancellingContinuations

您可能想对延续做的最明显的事情之一就是取消它,对吧?嗯,幸运的是,您已经在 第一篇文章 中看到了所有完成此任务的技巧,还记得我们上次看过的 `CancellationTokenSource` 对象吗?

没什么复杂的。我们创建一个新的 `CancellationTokenSource`,并将从中获得的 `CancellationToken` 传递给任何我们想要影响的 TPL `Task`s/continuations,当 `CancellationToken` 被取消时。

您在 第一篇文章 中看到的相同规则仍然适用,我们必须做好并且确保期望并使用 `CancellationToken` 的 `Task`s/continuations 在请求取消时抛出 `Exception`(记住,这对于确保 `Task` 转换到正确状态至关重要)。

总之,我可能说得太多了,代码本身就能说明问题。代码本身很简单。我们有一个原始 `Task`,它创建一个 `List<int>`,然后该列表在延续中使用,其中原始 `Task` 的数字被平方/打印并返回。但是,在创建原始 `Task` 的 5 秒后,传递给原始 `Task` 和延续的 `CancellationToken` 被取消了。

CancellationTokenSource tokenSource = new CancellationTokenSource();

CancellationToken token = tokenSource.Token;

try
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            Console.WriteLine("In TaskWithFactoryAndState");
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                tokenSource.Token.ThrowIfCancellationRequested();
                ints.Add(i);
                Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
                Thread.Sleep(1000); // simulate some work
            }
            return ints;
        }, 10000, tokenSource.Token);


    Thread.Sleep(5000); //wait 5 seconds then cancel the runnning Task

    tokenSource.Cancel();


    //Setup a continuation which will not run
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        Console.WriteLine("In Continuation");

        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, which will " + 
                              "be squared by continuation",
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    }, tokenSource.Token);


    taskWithFactoryAndState.Wait();

}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
    }
}
finally
{
    tokenSource.Dispose();
}

Console.WriteLine("Finished");
Console.ReadLine();

这是结果:

请注意,延续根本没有触发,我们只创建了 5 个项目。这是由于 `CancellationToken` 被取消。取消延续就是这么简单,您肯定会爱上 TPL 的。

本期到此为止

我知道这篇文章的内容比第一篇要少,原因在于延续出奇地容易掌握,所以要说的内容就比较少。接下来的两篇文章可能篇幅与此差不多,但之后的文章内容会更充实。

这就是我想在这篇文章中说的全部内容。希望您喜欢并想要更多。如果您喜欢这篇文章,并希望看到更多内容,能否花些时间留下评论和投票?非常感谢。

希望在下一篇文章、下下一篇文章以及再下一篇文章中见到您,总共六篇。我得赶紧忙起来了。

© . All rights reserved.