65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库:1/n

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (340投票s)

2011年2月1日

CPOL

23分钟阅读

viewsIcon

1669360

downloadIcon

10569

探究任务并行库的使用。

引言

我还记得我第一次在 .NET 中创建用户界面,它需要从数据库获取一些数据,而我获取的数据量在生产环境中远大于我简化测试设置中假设的数据量。猜猜发生了什么......我的用户界面一使用真实数据就冻结了。原因是用户界面线程(即我天真的用户界面中唯一的线程)正在用于从后端数据库获取数据。糟糕!肯定有更好的方法。

事实证明,确实有。 .NET 提供(并且仍然提供)各种各样的线程类来帮助处理这种情况,例如 ThreadBackgroundWorkerThreadPool 等。

所以我开始了解并喜欢上 System.Threading 命名空间中的一些类,它们确实对我的应用程序的响应能力产生了巨大的影响,一切都很好。

问题是,使用 System.Threading 命名空间(以下简称“经典线程”)进行一些线程操作时,有时需要编写大量代码,而且有时并不那么直观。经典线程众所周知是专家的领域;在长期处理其怪癖之后,人们会开始明白为什么……你就像“天哪,那个异常是从哪里来的,啊啊啊我正在多个线程中使用一个共享数据对象,哈!”。这主要是直觉/运气/技能的结合……而且不一定各占一半。

幸运的是,援手就在眼前。随着 .NET 4.0 的到来,城里有了一个新成员。它被称为 Task,你们中的一些人可能知道它是任务并行库 (TPL) 的一部分,TPL 是一个非常非常有用(我个人觉得高度直观)的新类集合,其目的不仅是让你的并行编程更容易阅读,而且与经典线程替代方案相比,它还提供更轻量级的对象。例如,当 .NET 中启动一个新的 Thread 时,会伴随一个完整的进程,例如创建队列、线程局部存储、管理 Thread 的生命周期等。这需要时间。好的,所以你可以使用经典的线程 ThreadPool,它确实允许你直接将工作项委托排队到 ThreadPool,这意味着你不会受到自己创建新 Thread 的开销的影响,因为 ThreadPool 会管理所有新 Thread 的创建等。

然而,即使使用经典的线程 ThreadPool,也存在问题,即一旦工作项排队到 ThreadPool 中,你就无法取消它,或者无法轻易获取返回结果。它读起来也不那么好。CodeProject 上有一篇很棒的文章解决了其中一些问题:《智能线程池》,实际上相当出色。然而,新的 TPL 基础架构解决了所有这些问题,而且在我看来,还有更多有用的功能。

TPL Task 实际上在内部使用了 ThreadPool,如果你使用默认调度程序的话,大多数时候你很可能会这样做。调度程序可以交换,我将在后续文章中展示这一点。目前,如果我们假设我们使用的是默认调度程序,那么 Task 将通过 ThreadPool 分配线程,ThreadPool 处理 Thread 的创建以执行 Task,因此 TPL 在后台为我们做了很多繁重的工作(可以这么说)。

值得一提的是,Task 仅仅是用于传递要完成的工作委托的包装器,同时还存储状态、异常和延续等。该工作可能由线程池完成,也可能不完成,正如已经指出的那样,这将取决于所使用的调度程序。

Steve Soloman,本文的读者之一,在论坛中这样说,我无法说得更好,所以将其包含在本文中。谢谢 Steve,提供了很好的额外信息。

TPL 的另一个优点是它旨在利用 CPU 的每个核心,否则这些核心可能会闲置。它显然在后台使用 Thread(s) 来完成此操作,但你根本不必参与启动新 Thread(好的,在高级场景中,例如自定义调度程序,也许需要,但稍后会详细介绍;日常使用中,你真的不必关心它)。

Task 似乎也更符合人们的思维方式。例如,想象一下这个场景:“我想调用一个 Web 服务并让它返回一个 List<int>”。使用 TPL Task,我会创建一个 Task<List<int>> 并让它在其负载委托中调用一些服务(这将使用 ThreadPool),该服务返回一个 List<int>

使用经典线程,我的代码肯定不会那么简单。当然你可以做同样的事情,但说实话,TPL 带来的纯粹的可读性简直不容忽视。再加上它实际使用了 ThreadPool 并为你处理了 Thread 的创建。再加上微软正在为 C# 5 投入大量时间。我想你会同意,了解 TPL 是大多数 .NET 程序员必备的技能。

在本文(以及后续文章)中,我希望向您展示 TPL 是多么容易使用。

文章系列路线图

这是可能多达 6 篇文章中的第 1 篇,我希望大家会喜欢。下面是我希望涵盖的大致内容:

  1. 启动任务 / 触发操作 / 异常处理 / 取消 / UI 同步(本文)
  2. 延续 / 取消链式任务
  3. 并行 For / 自定义分区器 / 聚合操作
  4. 并行 LINQ
  5. 管道
  6. 高级场景 / 任务的 v.Next

现在我知道有些人会简单地阅读这篇文章并说它与 MSDN 上目前可用的内容相似,我部分同意这一点。但是,我选择继续承担撰写这些文章的任务有几个原因,如下所述:

  • 只有前几篇文章会展示与 MSDN 相似的想法。之后,我觉得我将深入探讨的材料不会出现在 MSDN 上,而是我进行的一些 TPL 研究的结果,我将在文章中概述这些研究,这样你就可以从我的研究中受益,你可以直接阅读。是的,很好。
  • 这里会有实时输出的截图,这是 MSDN 不太多的,这可能有助于一些读者巩固文章文本。
  • 这里可能有一些读者甚至从未听说过任务并行库,因此不会在 MSDN 中遇到它。你知道老故事,你必须首先知道你在寻找什么。
  • 我喜欢关于线程的文章,所以喜欢写它们,所以我写了它们,将会写它们,已经写了它们,并将继续写它们。

话虽如此,如果人们在阅读了本文后,真的认为它与 MSDN 太相似(我仍然希望它不会),也请告诉我,我将尝试调整即将发布的文章以进行修改。

目录

本文将涵盖以下内容:

线程与任务

演示代码项目:ThreadsVersusTasks。

在简介中,我简要地讨论了 ThreadTask 之间的差异,其中启动一个 Thread 比启动一个 Task 的成本更高。为了说明这一点,让我们考虑以下一个小例子,它简单地创建 64 个 Thread 并等待(通过一个类线程同步原语:ManualResetEventSlim)这 64 个 Thread 完成,然后创建并启动 64 个 Task。每个 Thread/Task 将简单地向控制台写入 10 行。

完整的代码如下。别担心,我们稍后会深入探讨所有细节。现在,我只想向你展示运行此代码的结果截图。

static void Main(string[] args)
{
    Stopwatch watch = new Stopwatch();
    //64 is upper limit for WaitHandle.WaitAll() method
    int maxWaitHandleWaitAllAllowed = 64;
    ManualResetEventSlim[] mres = 
      new ManualResetEventSlim[maxWaitHandleWaitAllAllowed]; 

    for (int i = 0; i < mres.Length; i++)
    {
        mres[i] = new ManualResetEventSlim(false);
    }

            
    long threadTime = 0;
    long taskTime = 0;
    watch.Start();

    //start a new classic Thread and signal the ManualResetEvent when its done
    //so that we can snapshot time taken, and 

    for (int i = 0; i < mres.Length; i++)
    {
        int idx = i;
        Thread t = new Thread((state) =>
        {
            for (int j = 0; j < 10; j++)
            {
                Console.WriteLine(string.Format("Thread : {0}, outputing {1}",
                    state.ToString(), j.ToString()));
            }
            mres[idx].Set();
        });
        t.Start(string.Format("Thread{0}", i.ToString()));
    }

    WaitHandle.WaitAll( (from x in mres select x.WaitHandle).ToArray());

    threadTime = watch.ElapsedMilliseconds;
    watch.Reset();

    for (int i = 0; i < mres.Length; i++)
    {
        mres[i].Reset();
    }

    watch.Start();

    for (int i = 0; i < mres.Length; i++)
    {
        int idx = i;
        Task task = Task.Factory.StartNew((state) =>
            {
                for (int j = 0; j < 10; j++)
                {
                    Console.WriteLine(string.Format("Task : {0}, outputing {1}",
                        state.ToString(), j.ToString()));
                }
                mres[idx].Set();
            }, string.Format("Task{0}", i.ToString()));
    }

    WaitHandle.WaitAll((from x in mres select x.WaitHandle).ToArray());
    taskTime = watch.ElapsedMilliseconds;
    Console.WriteLine("Thread Time waited : {0}ms", threadTime);
    Console.WriteLine("Task Time waited : {0}ms", taskTime);

    for (int i = 0; i < mres.Length; i++)
    {
        mres[i].Reset();
    }
    Console.WriteLine("All done, press Enter to Quit");

    Console.ReadLine();
}

如果我们观察这段代码的输出是什么样子

可以看出,即使在这个小实验中,Thread 的创建时间也远大于 Task 完成相同工作所需的创建时间。这无疑是由于创建和管理经典 Thread 所需的工作。就像我说的,大多数经典线程开发人员会使用 ThreadPool,这将提供更好的结果,但就像我在介绍中说的,经典 ThreadPool 有其局限性,这些局限性都通过 TPL 解决了。

本文的其余部分将重点介绍如何创建/取消 Task 并处理它们的 Exception

创建任务

演示代码项目:CreatingTasks。

TPL 为开发人员提供了许多不同的创建 Task 的方法,从新建一个 Task 到使用静态 Task.Factory.StartNew() 方法的众多重载之一。下面是一些以各种方法创建不同 Task 的示例:

  1. 使用内联 Action 创建 Task
  2. 创建调用返回字符串的实际方法的 Task
  3. 使用 Task.Factory 创建并启动返回 List<int>Task
class Program
{
    static void Main(string[] args)
    {
        // *****************************************************************
        // OPTION 1 : Create a Task using an inline action
        // *****************************************************************
        Task<List<int>> taskWithInLineAction = 
        new Task<List<int>>(() =>
            {
                List<int> ints = new List<int>();
                for (int i = 0; i < 1000; i++)
                {
                    ints.Add(i);
                }
                return ints;
                    
            });


        // **************************************************
        // OPTION 2 : Create a Task that calls an actual 
        //            method that returns a string
        // **************************************************
        Task<string> taskWithInActualMethodAndState = 
            new Task<string>(new Func<object, 
            string>(PrintTaskObjectState),
            "This is the Task state, could be any object");

        // **************************************************
        // OPTION 3 : Create and start a Task that returns 
        //            List<int> using Task.Factory
        // **************************************************
        Task<List<int>> taskWithFactoryAndState = 
             Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                ints.Add(i);
            }
            return ints;
        }, 2000);



        taskWithInLineAction.Start();
        taskWithInActualMethodAndState.Start();



        //wait for all Tasks to finish
        Task.WaitAll(new Task[] 
        { 
            taskWithInLineAction, 
            taskWithInActualMethodAndState, 
            taskWithFactoryAndState 
        });

        //print results for taskWithInLineAction
        var taskWithInLineActionResult = taskWithInLineAction.Result;
        Console.WriteLine(string.Format(
            "The task with inline Action<T> " + 
            "returned a Type of {0}, with {1} items",
            taskWithInLineActionResult.GetType(), 
            taskWithInLineActionResult.Count));
        taskWithInLineAction.Dispose();

        //print results for taskWithInActualMethodAndState
        var taskWithInActualMethodResult = taskWithInActualMethodAndState.Result;
        Console.WriteLine(string.Format(
            "The task which called a Method returned '{0}'",
        taskWithInActualMethodResult.ToString()));
        taskWithInActualMethodAndState.Dispose();

        //print results for taskWithFactoryAndState
        var taskWithFactoryAndStateResult = taskWithFactoryAndState.Result;
        Console.WriteLine(string.Format(
            "The task with Task.Factory.StartNew<List<int>> " + 
            "returned a Type of {0}, with {1} items",
            taskWithFactoryAndStateResult.GetType(), 
            taskWithFactoryAndStateResult.Count));
        taskWithFactoryAndState.Dispose();

        Console.WriteLine("All done, press Enter to Quit");
        
        Console.ReadLine();
    }

    private static string PrintTaskObjectState(object state)
    {
        Console.WriteLine(state.ToString());
        return "***WOWSERS***";
    }
}

这是运行此演示的结果

那么,启动 Task 的首选方法是什么呢?通常,Task.Factory.StartNew() 是首选方法,但在少数极端情况下,直接新建 Task 会更好。

Stephen Toubs(微软工程师,TPL 团队成员)有一篇博客文章,其中对此主题进行了有趣的讨论,我从中截取了以下文本。哦,有一点,Stephen 的博客谈到了延续,这是我们将在下一篇文章中讨论的内容,所以我希望这不会让大家太困惑。

通过 TPL,有几种方法可以创建和启动新任务。一种方法是使用任务的构造函数,然后调用 Start 方法,例如:

new Task(...).Start();

另一种方法是使用 TaskFactoryStartNew 方法,例如:

Task.Factory.StartNew(...);

这引出了一个问题……什么时候以及为什么要使用一种方法而不是另一种方法?一般来说,我总是建议使用 Task.Factory.StartNew,除非特定情况提供了使用构造函数后跟 Start 的令人信服的理由。我推荐这样做的原因有几个。首先,它通常更高效。例如,我们在 TPL 内部非常小心地确保,当多个线程并发访问任务时,“正确”的事情会发生。任务只执行一次,这意味着我们需要确保多个线程并发调用任务的 Start 方法只会导致任务被调度一次。这需要同步,而同步是有成本的。如果你使用任务的构造函数构造一个任务,那么在调用 Start 方法时,你会付出这个同步成本,因为我们需要防止另一个线程并发调用 Start 的可能性。然而,如果你使用 TaskFactory.StartNew,我们知道任务在我们将任务引用返回给你的代码时就已经被调度了,这意味着线程不再可能竞争调用 Start,因为每次调用 Start 都会失败。因此,对于 StartNew,我们可以避免额外的同步成本,并为调度任务选择一条更快的路径。然而,在某些情况下,创建新任务然后启动它是有益的,甚至是必需的(如果没有,我们就不会提供 Start 方法)。一个例子是如果你从 Task 派生。这是一个高级情况,通常很少需要从 Task 派生,但尽管如此,如果你确实从它派生,调度你的自定义任务的唯一方法是调用 Start 方法,因为在 .NET 4 中,TaskFactory.StartNew 将始终返回具体的 TaskTask<TResult> 类型。另一个更高级的用例是处理某些竞态条件。考虑任务主体需要访问其自身引用的情况,例如如果任务想要调度自身的延续。你可能会尝试使用以下代码来完成:

Task t = null;
t = Task.Factory.StartNew(() =>
{
        ...
        t.ContinueWith(...);
});

然而,这段代码有 bug。ThreadPool 有可能在从 StartNew 返回的 Task 引用存储到 t 之前,获取并执行计划的任务。如果发生这种情况,任务的主体会将 Task t 视为 null。解决此问题的一种方法是分离任务的创建和计划,例如:

Task t = null;
t = new Task(() =>
{
        ...
        t.ContinueWith(...);
});
t.Start();

现在,我们知道在任务主体运行之前,t 将被正确初始化,因为我们只有在它被适当设置之后才计划它。简而言之,当然存在需要采用“new Task(...).Start()”方法的情况。但除非你发现自己处于这些情况之一,否则请首选 TaskFactory.StartNew

——http://blogs.msdn.com/b/pfxteam/archive/2010/06/13/10024153.aspx Stephen Toubs(Microsoft TPL 团队)博客,更新日期 2011年1月31日。

触发方法/属性

所以你现在已经创建了一些 Task 并让它们做事情,但事情会一直顺利吗?当然会。呃..不,它们不会,坦率地说,它们很少会。

所以你猜怎么着……是的,没错,我们必须学习如何处理可能在我们的 TPL Task 中出现的问题(又名 Exception)。我将在一分钟内向你展示在自己的 Task 中处理 Exception 的各种方法,但在那之前,我只想谈一谈你在阅读 System.Threading.Tasks 文档时可能或可能不明显的一件事;我个人认为它并不是那么明显。

所以,事情是这样的,在 Task 中,事情可能会在各种地方出错,从 Task 主体内发生正常的 Exception,到 CancellationTokenSource.Cancel() 请求(稍后会详细介绍)导致 OperationCancelledException 发生,这些异常将被组合成一种特定于 TPL 的新型 Exception。这种新的 Exception 称为 AggregateException,所有单独的 Exception 都捆绑在 AggregateException.InnerExceptions 属性中。AggregateException 还提供了一种处理 Task 异常的方法,但我将在一分钟内介绍。

目前,只需要知道在 Task 内部抛出的任何 Exception 都被打包到 AggregateException 中,并且**你**有责任处理它。TPL 还有一个 AggregateException 被观察的概念,也就是说,如果你的 Task 中有什么东西引发了 AggregateException,它只有在当前被观察到时才会被真正处理(通过使用我稍后展示的一种技术)。

如果你不理解这一点,请重新阅读该段落,这是一个非常重要的点。

Task 类本身有几个方法/属性会导致 AggregateException 被观察;其中一些如下所示:

  • Wait()
  • 结果

当你的代码使用这些时,你实际上是在说,是的,我对观察任何发生的 AggregateException 感兴趣。在本文的其余部分,我将这些特殊方法/属性称为触发方法。

需要注意的一点是,如果你不使用 Wait()/Result 等触发方法之一,TPL 不会升级任何 AggregateException,因为被认为没有任何东西在观察 AggregateException,因此会发生未处理的 Exception

这是使用 TPL 时的一个小陷阱,但它至关重要。

无论如何,现在我们知道了这一点,让我们来看看处理 Exception 的不同方式。

处理异常

在本节中,我将向你展示处理 Task Exception 的各种技术。

未捕获异常演示

演示代码项目:UncaughtExceptionInWinFormsDemoApp。

在我向你展示如何处理 Task Exception 之前,让我们先看看一些不处理 Exception 的代码,并熟悉一下我们会得到什么样的错误对话框。

以下是一些有问题的代码

private void btnStartTask_Click(object sender, EventArgs e)
{
    // create the task
    Task<List<int>> taskWithFactoryAndState = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            if (i > 100)
            {
                InvalidOperationException ex = 
            new InvalidOperationException("oh no its > 100");
                ex.Source = "taskWithFactoryAndState";
                throw ex;
            }
        }
        return ints;
    }, 2000);

    //wait on the task, but do not use Wait() method
    //doing it this way will cause aany unhandled Exception to remain unhandled
    while (!taskWithFactoryAndState.IsCompleted)
    {
        Thread.Sleep(500);
    }

    if (!taskWithFactoryAndState.IsFaulted)
    {
        lstResults.DataSource = taskWithFactoryAndState.Result;
    }
    else
    {
        StringBuilder sb = new StringBuilder();
        AggregateException taskEx = taskWithFactoryAndState.Exception;
        foreach (Exception ex in taskEx.InnerExceptions)
        {
            sb.AppendLine(string.Format("Caught exception '{0}'", ex.Message));
        }
        MessageBox.Show(sb.ToString());
    }

    //All done with Task now so Dispose it
    taskWithFactoryAndState.Dispose();
}

上面看到我没有使用任何触发方法/属性吗?因此任何 AggregateException 都未被观察到,所以当我们运行这段代码时,我们会看到一个错误对话框。

这很糟糕。如果不加以处理,这种事情可能会导致足够大的问题,甚至可能使你的进程彻底崩溃。

因此,处理 Task 中的任何 Exception 始终是一个好主意,所以请务必使用我谈到的触发方法,以确保所有 Exception 都被观察到。

使用 Try Catch

演示代码项目:HandlingExceptionsUsingTryCatch。

处理 AggregateException 最简单的方法之一是在 try/catch 内部使用其中一个触发方法。以下代码显示了一个示例。我认为这段代码非常直观。

// create the task
Task<List<int>> taskWithFactoryAndState = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
{
    List<int> ints = new List<int>();
    for (int i = 0; i < (int)stateObj; i++)
    {
        ints.Add(i);
        if (i > 100)
        {
            InvalidOperationException ex = 
                new InvalidOperationException("oh no its > 100");
            ex.Source = "taskWithFactoryAndState";
            throw ex;
        }
    }
    return ints;
}, 2000);

try
{
    //use one of the trigger methods (ie Wait() to make sure AggregateException
    //is observed)
    taskWithFactoryAndState.Wait();
    if (!taskWithFactoryAndState.IsFaulted)
    {
        Console.WriteLine(string.Format("managed to get {0} items", 
            taskWithFactoryAndState.Result.Count));
    }

}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", 
            ex.Message));
    }
}
finally
{
    taskWithFactoryAndState.Dispose();
}

Console.WriteLine("All done, press Enter to Quit");
Console.ReadLine();

这是我们运行此示例代码时得到的结果

使用 AggregateException.Handle()

演示代码项目:HandleUsingExHandle。

另一种方法是使用 AggregateException.Handle()。与以前一样,这依赖于你使用其中一个触发方法,你**必须**使用它来确保任何 Exception 都被观察到。以下代码显示了一个示例。同样,我希望代码非常直观。

static void Main(string[] args)
{
    // create the task
    Task<List<int>> taskWithFactoryAndState = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            if (i > 100)
            {
                InvalidOperationException ex = 
                    new InvalidOperationException("oh no its > 100");
                ex.Source = "taskWithFactoryAndState";
                throw ex;
            }
        }
        return ints;
    }, 2000);

    try
    {
        taskWithFactoryAndState.Wait();
        if (!taskWithFactoryAndState.IsFaulted)
        {
            Console.WriteLine(string.Format("managed to get {0} items", 
                taskWithFactoryAndState.Result.Count));
        }
    }
    catch (AggregateException aggEx)
    {
        aggEx.Handle(HandleException);
    }
    finally
    {
        taskWithFactoryAndState.Dispose();
    }

    Console.WriteLine("All done, press Enter to Quit");

    Console.ReadLine();
}

private static bool HandleException(Exception ex)
{
    if (ex is InvalidOperationException)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
        return true;
    }
    else
    {
        return false;
    }
}

这是我们运行此示例代码时得到的结果

读取任务值

演示代码项目:HandlingExceptionsUsingTryCatch。

这可能是最简单的方法之一,但可能没有那么有用(只是实话实说,抱歉),因为它是一种奇怪的 try/catch 混合体,但你忽略了 catch,而是从源 Task 中读取 Exception 属性。你**必须**仍然使用其中一个触发方法来确保 AggregateException 被观察到,并且你必须做与使用 try/catch 方法时大致相同的工作量。无论如何,我只是觉得这种方法不是那么有用,我为了完整性而介绍了它。

这是此演示代码

// create the task
Task<List<int>> taskWithFactoryAndState = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
{
    List<int> ints = new List<int>();
    for (int i = 0; i < (int)stateObj; i++)
    {
        ints.Add(i);
        if (i > 100)
        {
            InvalidOperationException ex = 
                new InvalidOperationException("oh no its > 100");
            ex.Source = "taskWithFactoryAndState";
            throw ex;
        }
    }
    return ints;
}, 2000);

try
{
    taskWithFactoryAndState.Wait();
    if (!taskWithFactoryAndState.IsFaulted)
    {
        Console.WriteLine(string.Format("managed to get {0} items", 
            taskWithFactoryAndState.Result.Count));
    }
}
catch (AggregateException aggEx)
{
    //do nothing
}
 
//so just read the Exception from the Task, if its in Faulted state
if (taskWithFactoryAndState.IsFaulted)
{
    AggregateException taskEx = taskWithFactoryAndState.Exception;
    foreach (Exception ex in taskEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
    }

}

//All done with Task now so Dispose it
taskWithFactoryAndState.Dispose();

Console.WriteLine("All done, press Enter to Quit");
Console.ReadLine();

这是运行此示例时的样子

使用延续

还有最后一种方法,那就是使用 Task 延续,但我将在下一篇文章中展示,所以在那之前……

取消任务

到目前为止,我们已经专注于创建和运行 Task 并处理可能发生的 Exception,很棒,很好的东西……但是如果我们想取消 Task 呢?这甚至可能吗?是的,TPL 为我们提供了一个非常简单的机制来做到这一点,那就是 CancellationToken

基本思想是我们需要从 CancellationTokenSource 获取一个 CancellationToken,并将获取到的 CancellationToken 作为 Task 创建参数之一传递,可以通过 Task 构造函数,或者通过使用 Task.Factory.StartNew(..) 方法的重载之一。

当我们要取消 Task 时,我们只需调用提供我们传递给 Task 创建的 CancellationTokenCancellationTokenSource 上的 Cancel() 方法。这基本上就是这么简单。唯一应该做的另一件事是在 Task 主体本身内部。推荐的做法是,如果发现 TaskCancellationToken 被取消,则应抛出 OperationCancelledException。抛出 OperationCancelledException **非常重要**,因为这是 Task 承认取消的方式,这将确保 Task 转换为 Cancelled 状态,这非常重要,因为用户代码或延续可能依赖于此状态。

CancellationToken 被视为 Cancelled 时,抛出一个新的 OperationCancelledException 将不会再调度该 Task 的任何工作。当然,抛出 OperationCancelledException 需要处理,你可以使用上面讨论的任何异常处理技术。

根据你的需求,抛出 OperationCancelledException 有几种选择。

选项 1:您的任务不依赖任何需要清理的资源

如果您的 Task **不**使用任何需要清理的资源,您可以简单地使用 token.ThrowIfCancellationRequested(),这将确保 Task 正确转换为 Cancelled 状态。

这就是我在演示代码中展示的内容。

选项 2:您的任务依赖需要清理的资源

如果您的 Task 使用了需要清理的资源(例如 Streams、WebClient、数据库连接等),您可以检查 CancellationTokenIsCancelledRequested 值,然后抛出新的 OperationCancelledException

这种方法的缺点是,过于频繁地检查 IsCancellationRequested 状态会付出代价,因此您应该尽量限制检查此状态的频率。我无法在这方面为您提供建议,这取决于您的需求。

尽管本文的演示代码没有显示这方面的示例,但这里有一个简单(且相当牵强)的示例,向您展示如何在 Task 内部清理创建的资源

Task<List<string>> someTask =
    Task.Factory.StartNew<List<string>>((website) =>
{
    System.Net.WebClient wc = new System.Net.WebClient();

    if (token1.IsCancellationRequested)
    {
        //cleanup your resources
        wc.Dispose();
        //and then throw new OperationCanceledException 
        //to acknowledge cancellation request
        throw new OperationCanceledException(token1);
    }
    else
    {
        //do something with a resource that should be cleaned up
        //this example is just that, an example, so this may not
        //be best 
        string webContent =
            wc.DownloadString((string)website);
        return webContent.Split(
            new string[] { " ", ","},
            Int16.MaxValue,
            StringSplitOptions.None).ToList();
    }
},"www.codeproject.com",token1);

好的,这就是想法。现在让我们看一些例子。

取消单个

演示代码项目:CancellingSingleTask。

在此示例中,我只是使用 Task.Factory.CreateNew(..) 创建了一个新的 Task,该 Task 传递了一个 CancellationToken,该令牌通过 CancellationTokenSource 立即取消。此示例还使用了我之前谈到的触发方法之一 Result,因此我们必须确保处理发生的任何 Exception。我选择了使用 try/catch 方法。

无论如何,这是代码

static void Main(string[] args)
{
    // create the cancellation token source
    CancellationTokenSource tokenSource = new CancellationTokenSource();
    // create the cancellation token
    CancellationToken token = tokenSource.Token;

    // create the task
    Task<List<int>> taskWithFactoryAndState = 
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            token.ThrowIfCancellationRequested();
            Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
        }
        return ints;
    }, 2000, token);



    // write out the cancellation detail of each task
    Console.WriteLine("Task cancelled? {0}", 
                      taskWithFactoryAndState.IsCanceled);

    // cancel the second token source
    tokenSource.Cancel();

    if (!taskWithFactoryAndState.IsCanceled && 
        !taskWithFactoryAndState.IsFaulted)
    {
        //since we want to use one of the Trigger method (ie Result), 
    //we must catch any AggregateException that occurs
        try
        {
            if (!taskWithFactoryAndState.IsFaulted)
            {
                Console.WriteLine(string.Format("managed to get {0} items", 
                    taskWithFactoryAndState.Result.Count));
            }
        }
        catch (AggregateException aggEx)
        {
            foreach (Exception ex in aggEx.InnerExceptions)
            {
                Console.WriteLine(
            string.Format("Caught exception '{0}'", ex.Message));
            }
        }
        finally
        {
            taskWithFactoryAndState.Dispose();
        }
    }
    else
    {
        Console.WriteLine("Task cancelled? {0}", 
        taskWithFactoryAndState.IsCanceled);

    }


    // wait for input before exiting
    Console.WriteLine("Main method complete. Press enter to finish.");
    Console.ReadLine();
}

这是演示运行时的情况。请看这个小型演示应用程序是如何捕获由于我们通过使用 CancellationTokenSource.Cancel() 取消 Task 而抛出的 Exception

取消 n 个任务中的一个

演示代码项目:CancellingOneOfSeveralTasks。

现在我们对如何使用 CancellationTokenSource 有了基本的了解,我们可以看一个稍微(但只是稍微)更奇特的例子,因为我不想混淆任何人,至少目前还不想。所以这个演示启动了两个几乎相同的 Task(再次使用 Task.Factory.StartNew()),我们只是取消其中一个。这是代码:

static void Main(string[] args)
{
    CancellationTokenSource tokenSource1 = new CancellationTokenSource();
    CancellationToken token1 = tokenSource1.Token;

    Task<List<int>> taskWithFactoryAndState1 = 
        Task.Factory.StartNew<List<int>>((stateObj) =>
    {
                
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            token1.ThrowIfCancellationRequested();
            Console.WriteLine("taskWithFactoryAndState1, creating Item: {0}", i);
        }
        return ints;
    }, 2000, token1);


    CancellationTokenSource tokenSource2 = new CancellationTokenSource();
    CancellationToken token2 = tokenSource2.Token;

    Task<List<int>> taskWithFactoryAndState2 = 
        Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            token2.ThrowIfCancellationRequested();
            Console.WriteLine("taskWithFactoryAndState2, creating Item: {0}", i);
        }
        return ints;
    }, 15, token2);


    // cancel the 1st token source
    tokenSource1.Cancel();

    //examine taskWithFactoryAndState1
    try
    {
        Console.WriteLine("taskWithFactoryAndState1 cancelled? {0}",
            taskWithFactoryAndState1.IsCanceled);

        //we did not cancel taskWithFactoryAndState1, so print it's result count
        Console.WriteLine("taskWithFactoryAndState1 results count {0}",
            taskWithFactoryAndState1.Result.Count);

        Console.WriteLine("taskWithFactoryAndState1 cancelled? {0}",
            taskWithFactoryAndState1.IsCanceled);
    }
    catch (AggregateException aggEx1)
    {
        PrintException(taskWithFactoryAndState1, aggEx1, 
                       "taskWithFactoryAndState1");
    }


    //examine taskWithFactoryAndState2
    try
    {
        Console.WriteLine("taskWithFactoryAndState2 cancelled? {0}",
            taskWithFactoryAndState2.IsCanceled);

        //we did not cancel taskWithFactoryAndState2, so print it's result count
        Console.WriteLine("taskWithFactoryAndState2 results count {0}",
            taskWithFactoryAndState2.Result.Count);

        Console.WriteLine("taskWithFactoryAndState2 cancelled? {0}",
            taskWithFactoryAndState2.IsCanceled);
    }
    catch (AggregateException aggEx2)
    {
        PrintException(taskWithFactoryAndState2, aggEx2, 
                       "taskWithFactoryAndState2");
    }

    // wait for input before exiting
    Console.WriteLine("Main method complete. Press enter to finish.");
    Console.ReadLine();

}


private static void PrintException(Task task, AggregateException agg, string taskName)
{
    foreach (Exception ex in agg.InnerExceptions)
    {
        Console.WriteLine(string.Format("{0} Caught exception '{1}'", taskName, ex.Message));
    }
    Console.WriteLine("{0} cancelled? {1}",taskName, task.IsCanceled);
}

这是运行这个小型演示的结果

可以看出,对象状态为 taskWithFactoryAndState1Task 甚至没有开始运行就被立即取消了,而另一个未取消的 Task(对象状态为 taskWithFactoryAndState2)则运行完成了。

取消链式任务

这将在第二篇文章中介绍,届时我们将讨论延续。

SynchronizationContext

现在,我不知道你们中的许多人,但我来自 WinForms 背景,并且对以下错误对话框非常熟悉:

在 Windows Forms(以及 WPF/Silverlight)中,发生这种情况的原因是 UI 控件具有线程亲和性,也就是说,它们只能由拥有/创建它们的线程(通常是主线程)修改。为了解决这个问题,我们不得不让代码(好的,WPF 使用 Dispatcher.CheckAccess() 有稍微不同的语法,但它做着同样的工作)充斥着检查,以查看控件是否是由不同的线程创建的,通过使用 xxx.InvokeRequired,然后如果需要,在正确的线程上调用委托。下面显示了这种代码的典型外观示例(P.S.:这是 WinForms 代码):

这不仅很痛苦,而且成本很高。还有其他方法可以使用 WinForms 和 WPF 的内置功能来做到这一点,它们被称为 SynchronizationContext。它们已经存在很长时间了,提供了在正确线程上 Post/Send 委托的方法,这样开发人员就不需要用大量的调用检查来污染他们的代码库;只需一直使用 SynchronizationContext 来完成工作。

问题是,这些 SynchronizationContext 并没有对代码库进行多少清理;至少我是这么认为的,其他人可能不同意。

幸运的是,TPL 很好地解决了这个问题。使用 TPL 调度器,我们能够摆脱任何调用检查。以下两部分将向您展示如何将 TPL Task 的结果调度到正确的线程。它通过使用 SynchronizationContext 来实现,正如我所说,这不是一个新事物,但 TPL 为您做了这件事,所以您不必担心手动在 SynchronizationContext 上发出 Post/Send 委托,TPL 会处理它,我觉得这很好。

为了演示 TPL 如何与 SynchronizationContext 一起使用,我必须在小例子中引入延续。我将在下一篇文章中解释这一点,但我认为这一切都非常直观。

我应该说,一位读者 Neo Blanque 指出我的例子在这个主题上有点令人困惑,所以谢谢 Neo 指出这一点,你做得对。

WinForms 同步

演示代码项目:WinformsSynchonizationContext。

所以这是代码中最相关的部分。请注意 TaskScheduler.FromCurrentSynchronizationContext() 的使用。这是确保 Task 结果调度到正确(通常是主)线程的“魔力”部分。

在下一篇文章中,我们将探讨延续,所以请在那之前放松一下。

在此示例中,我们简单地创建了一个 List<int>,并将其设置为 ListBoxDataSource。您可以看到根本没有任何访问检查代码,TPL 为我们完成了这项工作。

private void btnDoIt_Click(object sender, EventArgs e)
{
    Task taskWithFactoryAndState1 = 
         Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        // This is not run on the UI thread.
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 10000).ContinueWith(ant =>
    {
        //updates UI no problem as we are using correct SynchronizationContext
        lstBox.DataSource = ant.Result;
    }, TaskScheduler.FromCurrentSynchronizationContext());
}

这是演示运行时的截图,只是为了向大家证明它确实有效。这里没有烟雾弹和镜子,哦不……这里没有……用《性感野兽》(一部非常棒的电影,不惜一切代价去看)中唐·洛根的话说,“不不不不不……不,这次不行。”

WPF 同步

演示代码项目:WPFDispatcherSynchonizationContext。

WPF 中的情况大同小异,只是在底层,TPL Scheduler 将使用 WPF 特定的 DispatcherSynchronizationContext。这是 WPF 代码示例中最相关的部分。注意:此代码与之前的 WinForms 示例几乎相同,唯一的区别在于我们设置列表框项目的方式。

private void btnDoIt_Click(object sender, RoutedEventArgs e)
{
    Task taskWithFactoryAndState1 = 
         Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        // This is not run on the UI thread.
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 10000).ContinueWith(ant =>
    {
        //updates UI no problem as we are using correct SynchronizationContext
        lstBox.ItemsSource = ant.Result;
    }, TaskScheduler.FromCurrentSynchronizationContext());
}

这是演示运行时的截图

暂时就这些

这就是我在这篇文章中想说的。希望您喜欢它并想了解更多。如果您喜欢这篇文章并想了解更多,您能否花点时间留下评论和投票?非常感谢。

希望下一次、再下一次、再下一次(总共六次)能再见到你,我最好开始忙碌起来。

© . All rights reserved.