65.9K
CodeProject 正在变化。 阅读更多。
Home

任务并行库和 async-await 功能 - 简单示例中的用法模式

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (94投票s)

2012年12月30日

CPOL

23分钟阅读

viewsIcon

384093

downloadIcon

3501

TPL/Async 教程,重点介绍如何使用以及为什么需要它

引言

我为什么要写这篇文章

关于 TPL 和 .NET 4.5 新的 async-await 功能已经有很多教程,其中最著名的例子是 任务并行库:1/n任务并行库:2/n任务并行库:6/nC# 中的线程处理,第 5 部分:并行编程

在这里,我将展示我自己的 TPL 和 async-await 教程版本,演示 TPL 和 async-await 功能为何如此有用以及如何将其应用于一些众所周知的问题。

TPL 和 async-await 功能的目的 

TPL 和 async-await 功能简化了异步调用的处理。您的代码中可能存在异步调用的几个原因。

大多数软件产品并非独立运行——它们必须与其他系统联系。这种通信需求的常见示例是 UI 或 Web 客户端联系服务器读取或更新某些数据,或者中间件联系数据库等等。

当这种通信发生时,客户端的软件无法控制或预测服务器响应的速度。如果客户端向服务器发送同步请求,则调用线程会阻塞,直到获取并处理服务器响应。这种阻塞可能导致例如 GUI 或 Web 应用程序完全冻结一段不确定的时间——这绝对是不可取的。然而,从积极的方面来看,同步调用保留了程序结构——从开发人员的角度来看,同步调用服务器就像进行本地调用一样简单。

我们还可以通过使用与服务器的异步通信来解决通信问题。最常见的方法是在客户端注册一个回调,以便在服务器返回信息时触发,然后向服务器发送一个异步请求——一个不会阻塞当前线程的请求。在服务器响应后,回调代码将根据从服务器获得的信息更新客户端。这种方法的问题在于,代码流被分解为两个不同的部分——调用服务器之前的代码和服务器响应后运行的回调代码。换句话说——对服务器的异步调用极大地改变了代码的结构。当您一次只需要向服务器发出一个调用时,您仍然可以通过稍微降低代码质量来处理这种情况——如上所述,每个调用有两个函数,但是如果您需要向服务器发出多个调用怎么办——有些并行运行,有些顺序运行,有些只有在多个先前的异步调用完成后才能执行。在这种情况下,异步编程很容易导致开发人员的噩梦——每次异步调用都必须插入越来越多的回调,将程序分解成一堆必须逻辑连接的部分。此外,一些回调必须同步才能在开始其他处理之前检测它们的完成。

除了调用服务器之外,可能还有其他原因进行异步调用,例如,您可能希望在不同的线程上进行一些处理,以便更好地利用机器上的多个核心,或者因为这种处理已内置到您正在使用的框架中。

TPL 和 async-await 功能的目的是使异步编程几乎与同步编程一样简单,允许开发人员即使在异步执行代码时也能保留代码的逻辑结构。

文章组织 

首先,我将回顾 TPL 和 async-await 功能。然后,我将展示如何将它们应用于您可能遇到的问题——特别是对 Web 服务的调用、调度多个后台工作者和动画故事板。

使用的软件工具

我们使用 Visual Studio 2012 和 .NET 4.5 创建本文的示例。TPL 功能(不含 async-await 和一些其他 .NET 4.5 功能)都可以在 Visual Studio 2010 中在 .NET 4.0 下运行。

TPL 概述

有关 TPL 功能的完整介绍,请参阅文章开头提供的链接——这里我们只描述我们认为最有用的功能。

Tasks(任务)

TPL 的基本代码单元是 TaskTask 可以运行,可以等待其完成。任务可以有子任务。代码可以等待多个任务完成,然后才能继续。任务可以取消,并且可以监控其进度。

Task 只能从头到尾运行一次——您不能两次运行同一个任务对象。如果需要重新运行 Task,则需要创建另一个 Task 对象来运行相同的代码。

默认情况下,任务在线程池中的一个线程上运行。TaskScheduler 允许开发人员自定义运行任务的线程。一些使用 TaskCompletionSource 的任务不会生成单独的线程。

简单任务示例

显示基本任务功能的最简单示例位于 TaskSample 解决方案下。这是其代码
// task that does not return a value
Task t1 = new Task
(

    () =>
    {
        Thread.Sleep(1000);
        Console.WriteLine("Task Completed");
    }
);

// start the task
t1.Start();

// wait for the task to finish
t1.Wait();

Console.WriteLine("End of Main");

// make sure the application console window does not 
// disappear when run under the VS debugger.
Console.ReadLine();

不言自明——我们定义任务休眠 1 秒,然后打印“Task Completed”到控制台。我们在定义任务后启动任务,打印“We are waiting for the task to finish”,然后调用 Wait 方法来阻塞主程序,直到任务完成。

抛出异常的简单任务

如果任务抛出异常,可以在主线程中通过将 Wait() 方法与 try-catch 语句(如 TaskWithException 项目中所示)围绕起来来捕获该异常。

// task that does not return a value
Task t1 = new Task
(

    () =>
    {
        Thread.Sleep(1000);

        // throw an exception within a task
        throw new Exception("This is an Exception within a task");

        Console.WriteLine("Task Completed");
    }
);

// start the task
t1.Start();

Console.WriteLine("We are waiting for the task to finish");

// wait for the task to finish
try
{
    t1.Wait();
}
catch (AggregateException e)
{
    // task framework wraps the thrown exception within
    // an aggregate exception
    Console.WriteLine("Caught Exception " + e.InnerException.Message);
}

Console.WriteLine("End of Main");

// make sure the application console window does not 
// disappear when run under the VS debugger.
Console.ReadLine();

带超时的等待

函数 Wait() 有一个接受超时参数的版本。当发生超时时,该版本返回 false 的布尔值,否则返回 true。相应的示例位于 TaskWithTimeout 项目中

// task that does not return a value
Task t1 = new Task
(

    () =>
    {
        // sleep for 4 seconds
        Thread.Sleep(4000);
        Console.WriteLine("Task Completed");
    }
);

// start the task
t1.Start();

Console.WriteLine("We are waiting for the task to finish");

// wait for 2 seconds for the task to finish
bool hasNotTimedOut = t1.Wait(2000);

if (hasNotTimedOut)
{
    Console.WriteLine("The task has not timed out");
}
else
{
    Console.WriteLine("The task has timed out");
}
Console.WriteLine("End of Main");

// make sure the application console window does not 
// disappear when run under the VS debugger.
Console.ReadLine();

主程序等待任务完成两秒钟,然后继续。请注意,在这种情况下任务并未取消——它在休眠期之后继续,“任务完成”消息在最后打印——在“主程序结束”消息之后。

取消任务

项目 TaskCancellation 展示了如何取消任务

CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

CancellationToken cancellationToken = cancellationTokenSource.Token;
// task that does not return a value
Task t1 = new Task
(

    () =>
    {
        int i = 0;
        while (true) // create a task that iterates forever unless cancelled
        {
            i++;
            Thread.Sleep(1000);

            Console.WriteLine("This is task iteration " + i);

            // if cancellation requested - throw OperationCancelledException
            cancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken // pass the cancellation token as an argument to the task constructor
);

// start the task
t1.Start();

// sleep for 3 seconds
Thread.Sleep(3000);

// request for the task to cancel
cancellationTokenSource.Cancel();

try
{
    t1.Wait();
}
catch (AggregateException e)
{
    if (e.InnerException is OperationCanceledException)
    {
        Console.WriteLine("The operation has been cancelled");
    }
    else
    {
        Console.WriteLine("Some unexpected exception");
    }
}

Console.WriteLine("End of Main");

// make sure the application console window does not 
// disappear when run under the VS debugger.
Console.ReadLine();  

首先,我们创建 CancellationTokenSource。它的属性 Token 包含一个 CancellationToken 对象。此对象被传递给 Task 的构造函数。

启动 Task 后,我们等待 3 秒并调用 CancellationTokenSource 上的 Cancel() 方法。在任务中,我们在每次迭代结束时调用 cancellationToken.ThrowIfCancellationRequested() 方法。如果请求了取消,此方法将抛出 OperationCanceledException,否则,它不执行任何操作。当 OperationCanceledException 传播到 Task 的主体之外时,它被包装在 AggregateException 中——这就是我们在 Main 中捕获的异常。

带子任务的任务

您可以在另一个 Task 的主体内启动一个 Task。在这种情况下,在另一个 Task 的主体内启动的 Task 称为子 Task,而启动子 TaskTask 称为父 TaskTask 构造函数有一个重载,它接受 TaskCreationOptions 参数。当此参数指定包含 TaskCreationOptions.AttachedToParent 选项时,创建的子 Task 会“附加”到其父级。这种“附加”在父任务和子任务之间创建了非常有用的依赖关系,即

  • Task 在其所有附加子 Task 完成之前不会完成。
  • 父子 Task 形成了一个类似于函数堆栈的层次结构——只是在这里,子 Task 可能在不同的线程中执行。我们称之为“附加子堆栈”。如果一个附加子任务抛出异常,该异常将沿“附加子堆栈”向上传播——在传播的每一步都被另一个 AggregateException 包裹。它可以在“附加子堆栈”中的任何级别被捕获。

在父 Task 的主体内创建但没有 AttachedToParent 标志的 Task 称为“分离”任务。父 Task 和其“分离”子 Task 之间没有依赖关系,我们对“分离”子 Task 不太感兴趣。

TaskWithChildrent 项目展示了附加子 Task 功能

// task that does not return a value
Task task = new Task
(
    () =>
    {
        Task childTask = new Task
        (
            () =>
            {
                Thread.Sleep(1000);

                Task grandChildTask = new Task
                (
                    () =>
                    {
                        Thread.Sleep(2000);

                        // throw an exception from within grand-child's body
                        throw new Exception("Grandchild exception");
                    },
                    TaskCreationOptions.AttachedToParent
                );

                grandChildTask.Start();
            },
            TaskCreationOptions.AttachedToParent
        );

        childTask.Start();
    }
);

// start the task
task.Start();

try
{
    // wait function will throw the exceptions from
    // the child tasks
    task.Wait();
    Console.WriteLine("The task is finished");
}
catch (AggregateException e)
{
    // we catch the exception thrown by the grand-child and display its message
    Console.WriteLine("Exception caught: " + e.InnerException.InnerException.InnerException.Message);
}
Console.WriteLine("End of Main");

// make sure the application console window does not 
// disappear when run under the VS debugger.
Console.ReadLine();

一个异常在孙子级别抛出,并在 Main 函数的顶层被捕获。如上所述,当它在“附加子堆栈”的每个级别传递时,它都被一个新的 AggregateException 包裹,所以,为了获取原始异常,我们需要多次使用 InnerException 属性:e.InnerException.InnerException.InnerException.Message

返回结果的任务

如果您希望 Task 产生可从 Task 本身检索的结果,您可以使用其泛型形式 Task<TResult>。泛型参数 TResult 指定 Task 结果的类型。上面讨论的所有内容仍然适用于此类任务。此类 Task 唯一可用的额外功能是属性

  TResult Task<TResult>.Result;

它在 Task 运行时会阻塞(就像 Wait() 方法一样),并在 Task 运行完成后返回结果。

Task.WhenAll() 和 Task.WhenAny()

在 .NET 4.5 中,Task 类上有一个重要的静态方法——Task.WhenAll()。它返回一个 Task 对象,该对象仅当所有传递给它的 Task 对象都完成时才完成。它有多个方法重载——一些版本的方法接受参数列表——另一些版本接受 Task 对象的数组。如果输入 Tasks 返回结果,则 Task.WhenAll() 方法返回的 Task 可以返回一个与单个输入 Task 对象的结果相对应的值数组。

另一个方法 Task.WhenAny() 创建一个 Task,它只等待多个 Task 中的一个完成。它使用频率较低,但在某些场景中非常有用。

还有一些方法可以阻塞当前线程的执行,直到多个 Task 完成或其中一个 Task 完成,而无需创建新的 Task。这些方法是 Task.WaitAll(...)Task.WaitAny(...)

ContinueWith 方法

Task 有一个方法 ContinueWith,允许您指定在任务完成后要执行的功能——一种任务的回调。

测试此方法的代码位于 ContinueWithTest 项目下。

// create Task t1
Task t1 = new Task
(
    () =>
    {
        Thread.Sleep(1000);

        // make the Task throw an exception
        throw new Exception("this is an exception");

        Console.WriteLine("This is T1");
    }
);

Task t2 = t1.ContinueWith
(
    (predecessorTask) =>
    {
        // exception should be checked for null within the 
        // continuation functionality. An exception that is not
        // checked might bring the application down
        if (predecessorTask.Exception != null)
        {
            Console.WriteLine("Predecessor Exception within Continuation");
            return;
        }

            Thread.Sleep(1000);

            Console.WriteLine("This is Continuation");
    },
    // attach the continuation to parent and the continuation 
    // functionality work only if the predecessor task ran to completion
    // without exceptions thrown or without cancellations.
    TaskContinuationOptions.AttachedToParent | TaskContinuationOptions.OnlyOnRanToCompletion
);

t1.Start();

try
{
    t1.Wait();
    t2.Wait();
}
catch (AggregateException e)
{
    Console.WriteLine(e.InnerException.Message);
}  

我们创建 Task t2 作为 Task t1 的延续。通过传递 TaskContinuationOptions.OnlyOnRanToCompletion 选项,我们使延续只在前置 Task 成功运行时才运行。由于在我们的例子中,前置 Task 抛出了异常,因此后继 Task 将永远不会被调用。

ContinueWhenAll 和 ContinueWhenAny

可以使用两种方法来创建延续 Task,以便在多个前置 Task 完成后启动:Task.Factory.ContinueWhenAll(...)Task.Factory.ContinueWhenAny(...)。实际上,这两种方法与上面描述的 WhenAllWhenAny 方法非常相似,只不过它们是 .NET 4.0 的一部分,而 WhenAllWhenAny 是在 .NET 4.5 中添加的。

任务进度

.NET 4.5 添加了 IProgress<T> 接口和 Progress<T> 类,以方便测量 Task 或任何其他代码的进度。两者都接受一个泛型类型参数 T 来指定测量进度的值的类型。Progress<T> 隐式实现了 IProgress<T> 接口。

IProgress<T> 有一个方法 Report(T),应该调用它来报告进度。Progress<T> 有一个事件 ProgressChanged,每次报告进度时都会触发。此事件的第二个参数是进度值。可以将事件处理程序附加到此事件以处理进度报告。

TaskProgressTest 解决方案提供了进度功能的用法示例

Progress<string> progress = new Progress<string>();

Task t = new Task
(
    () =>
    {
        for (int i = 0; i < 5; i++)
        {
            Thread.Sleep(1000);

            // use IProgress'es Report() function to report progress
            ((IProgress<string>)progress).Report("" + i);
        }
    }
);

progress.ProgressChanged += (sender, str) =>
    {
        // print the current progress
        // when it is reported
        Console.WriteLine("Progress is " + str);
    };

t.Start();

t.Wait();  

任务 t 迭代 5 次,并使用迭代索引号 i 作为进度指示器(此数字传递给 IProgress<string>.Report(...) 函数)。

ProgressChanged 事件处理程序将报告的进度打印到控制台。

TaskCompletionSource 和将基于事件的功能包装在任务中

许多作为 .NET 一部分或来自一些第三方实现的异步功能都是基于事件驱动异步模式 (EAP) 构建的。EAP 的原理是异步功能提供一个方法来启动异步操作,以及一个回调事件,允许附加一个回调事件处理程序,该处理程序在异步操作完成时将被调用。客户端代码首先将回调处理程序附加到异步功能,然后调用方法来启动异步操作。

此类 EAP 功能的示例包括 BackgroundWorker、.NET 4.0 中的异步服务引用等。

.NET 4.5 的大部分功能已更改为 Task,并且 BackgroundWorker 现在几乎已经过时,但您仍然可能会遇到需要使用 EAP 模式的情况。在这种情况下,最好从 EAP 功能生成 Task 对象,以便您能够安排它们并行运行或等待多个先前的 Task 完成等。在本节中,我们将展示如何实现它。

我们使用 TaskCompletionSource 类来生成不创建自己的线程而是使用其他对象的异步功能的 Task

EAP 功能位于 MyEAPTest 类下。它提供函数 StartAsync(),该函数在一个单独的线程中运行一个方法,打印传递给它的字符串。在异步方法结束时,触发 EAPTestCompleted 事件。

public class MyEAPTest
{
    // thread to run the async action on
    Thread _thread;

    // event to fire on completing the async action
    public event Action EAPTestCompleted;

    public MyEAPTest(string stringToPrint)
    {
        _thread = new Thread
        (
            (ThreadStart) (() =>
            {
                Thread.Sleep(1000); // 1 second delay 
                Console.WriteLine(stringToPrint); // print the passed string
                if (EAPTestCompleted != null)
                    EAPTestCompleted();
            })
        );
    }

    // function to start the async action
    public void StartAsync()
    {
        _thread.Start();
    }
}  

EPAToTaskUtils 类提供 ToTask() 静态扩展方法,将 MyEAPTest 对象转换为 Task 对象

public static Task<object> ToTask(this MyEAPTest epaTest)
{
    // TaskCompletionSource should always have a generic
    // type argument so since we are not using any result,
    // we just set the argument to be of type object and will 
    // pass null to indicate the task's completion
    TaskCompletionSource<object> taskCompletionSource =
        new TaskCompletionSource<object>(TaskCreationOptions.AttachedToParent);

    Action onEpaTestCompleted = null;

    onEpaTestCompleted = () =>
        {
            epaTest.EAPTestCompleted -= onEpaTestCompleted;
            
            // we are setting the result to null to indicate 
            // the completion of the async opration
            taskCompletionSource.SetResult(null);
        };

    epaTest.EAPTestCompleted += onEpaTestCompleted;

    // start the async operation when the task is created
    epaTest.StartAsync();

    return taskCompletionSource.Task;
}

请注意,上面描述的 ToTask() 方法不仅创建了 Task,而且还启动了它。事实上,不建议您在不启动 EAP 的情况下创建 Task,因为返回的 TaskCompletionSourceTask 将没有有效的 Start 方法实现,这会给该功能的用户带来困惑。然而,Task 在转换 ToTask 方法中启动这一事实并不限制该功能的使用,因为我们总是可以推迟调用 ToTask,直到我们准备好启动 Task

调用 TaskCompletionSource 上的 SetResult 方法将导致其 Task 停止阻塞。您还可以使用 TaskCompletionSource.SetCanceled() 方法从阻塞方法中抛出 OperationCancelledException,或者您可以使用 TaskCompletionSource.SetException(...) 方法,这将使 TaskCompletionSourceTask 的阻塞方法表现得好像在 Task 执行期间抛出了异常一样。特别是,如果 TaskCompletionSource 是使用 AttachedToParent 创建选项创建的,则异常将沿“附加子堆栈”传播。

这是创建和调度 Task 的代码

// create the epa tests
MyEAPTest epaTest1 = new MyEAPTest("this is EPA test1");
MyEAPTest epaTest2 = new MyEAPTest("this is EPA test2");
MyEAPTest epaTest3 = new MyEAPTest("this is EPA test3");

// create the tasks - note that 
// the task starts the moment it is created. 
Task<object> t1 = epaTest1.ToTask();
Task<object> t2 = epaTest2.ToTask();

// wait for both t1 and t2 to finish
Task.WaitAll(t1, t2);

// create and start t3 task
Task<object> t3 = epaTest3.ToTask();

t3.Wait();  

Task t1t2 并行运行。Task t3 计划在 t1t2 都完成后才运行。

如果没有 TPL 功能,上述调度效果将难以实现。

新的 .NET 4.5 async 和 await 功能

.NET 4.5 的 asyncawait 功能进一步简化了异步编程。它们基本上允许编写和调试异步程序,几乎就像它是普通的同步程序一样。

为了运行本节的示例,您需要 Visual Studio 2012。

简单的 async-await 测试

查看 AwaitTest 程序下的 Program.cs 文件

static void Main(string[] args)
{
    RunIt();

    Console.WriteLine("End of Main");
    Console.ReadLine();
}

static async void RunIt()
{
    Task t = new Task
    (
        () =>
        {
            Thread.Sleep(3000);

            Console.WriteLine("Ended the task");
        }
    );

    t.Start();

    await t;

    Console.WriteLine("After await");
}

请注意,有一个从 Main() 调用的 RunIt() 方法。为什么我们不能在 Main() 方法本身内测试 await 关键字呢?原因是任何包含 await 关键字的方法都应该用 async 关键字声明为可能引入一些异步处理的方法,而 Main() 是一个不能用 async 关键字标记的特殊方法。

请注意,当您运行代码时,“End of Main”字符串在“Ended the task”之前打印。这意味着在“等待”期间,主线程没有被阻塞——它继续执行,如果不是 Main() 方法末尾的 Console.ReadLine() 调用,它本会完成程序。这是 awaitTask.Wait() 方法之间的主要区别。实际上,将 await t; 行更改为 t.Wait() 并从 RunIt 方法的声明中删除 async 关键字。之后,当您运行代码时,首先会打印“Ended the task”消息,然后才打印“End of Main”——这意味着 t.Wait() 正在阻塞主线程,直到 Task 完成。

执行上下文分配一个线程,用于在等待完成后执行 await 关键字之后的代码。

请注意,我们可以将 RunIt() 方法的返回类型更改为 Task,而无需从方法本身返回任何内容。编译器将为方法本身生成 Task 对象,因为该方法是用 async 关键字声明的。Main 方法可以获取任务并等待它

static void Main(string[] args)
{
    Task t = RunIt();

    t.Wait();

    Console.WriteLine("End of Main");
    Console.ReadLine();
}

static async Task RunIt()
{
    Task t = new Task
    (
        () =>
        {
            Thread.Sleep(3000);

            Console.WriteLine("Ended the task");
        }
    );

    // start the task
    t.Start();

    await t;
    Console.WriteLine("After await");
}  

由于对 RunIt() 方法返回的 Task 对象调用了 Wait() 方法,因此 Main() 方法会阻塞,直到 RunIt() 中的异步处理完成,并且“End of Main”字符串在“Ended the Task”之后打印。

带返回值的 Async-Await 测试

AwaitWithReturnsTest 项目的代码与上面考虑的非常相似,不同之处在于我们正在等待的 Task 返回一个字符串结果,相应地,我们使用 string result = await t;Task 的结果赋值给一个字符串。

static void Main(string[] args)
{
    Task<string> runItTask = RunIt();

    Console.WriteLine("End of Main");
    Console.ReadLine();
}

static async Task<string> RunIt()
{
    Task<string> t = new Task<string>
    (
        () =>
        {
            Thread.Sleep(3000);

            Console.WriteLine("Ended the task");

            return "This is task";
        }
    );

    // start the task
    t.Start();

    // get the result from the task - analogous to 
    // calling t.Result (without the thread blocked, of course)
    string result = await t;
    Console.WriteLine("After await, result = '" + result + "'");
 
   return result;
}  

请注意,我们的 RunIt() 函数返回类型为字符串的 result,而它的声明是返回 Task<string>。返回类型的更改是由于方法声明前的 async 关键字,它将函数返回的任何内容包装在 Task 中。返回的 Task<string> 可以用来阻塞 Main() 线程执行直到 Task 完成,或者在函数堆栈的更高层使用 await 关键字。

Async-Await 和异常

查看 AWaitWithException 项目。Program.cs 文件与我们的 AwaitTest 项目几乎相同,只是在 Task 中抛出了一个异常。这个异常被 await t; 行周围的 try-catch 块捕获,并打印异常消息。

static async void RunIt()
{
    Task t = new Task
    (
        () =>
        {
            Thread.Sleep(3000);
            throw new Exception("This is an exception thrown from a Task");
            Console.WriteLine("Ended the task");
        }
    );

    // start the task
    t.Start();

    try
    {
        await t;
    }
    catch (Exception e)
    {
        Console.WriteLine("Caught Exception '" + e.Message + "'");
    }

    Console.WriteLine("After await");
}  

请注意,await 关键字会解包一层 AggregateException,因此在上述情况下,我们捕获的是原始的 Exception 对象,而不是 AggregateException 对象。这是异步编程更像通常的同步编程的另一种方式。

任务和 Async-Await 功能的使用模式

在本节中,我将描述几个 TPL 和 async-await 功能非常方便的重要用例。

使用 .NET 4.5 进行远程服务调用

大多数金融应用程序从许多不同的不相交来源收集数据——通常有一些较新的数据库、较旧的数据库、一些数据源等。单个读取或修改金融数据的请求可能包括对多个服务的调用(无论是从 UI 或 Web 客户端还是从某些中间件发出此类请求)。其中一些服务调用可能需要并行执行才能更早地获取所有必需的回复,而另一些服务调用可能只有在其他服务调用的结果到达后才触发,因为它们的输入可能依赖于这些结果。Task 将是表示此类服务调用的理想方式,因为我们可以轻松地并行运行多个 Task 并在启动其他 Task 之前等待多个 Task 对象。

提供 Web Service 接口的服务可以作为服务引用添加到 .NET 应用程序并从中访问(服务本身可以用 Java 或 C# 或任何其他语言编写——只要它们是合法的 Web Service 即可)。在 .NET 4.5 中,可以将服务引用配置为以 Tasks 方式异步访问。

解决方案 CallingServiceViaTasksTest 包含两个项目

  • MockService - 包含一个非常简单的 WCF 服务,模拟大量返回财务数据的服务调用。
  • CallingServiceViaTasksTest - 包含获取模拟财务数据并将其打印到控制台的测试客户端。

该服务包含三个可远程使用的方法(操作)

  • GetSecurityName - 给定安全 ID,它将返回安全名称——实际上我的模拟实现将为您传递的任何内容返回相同的名称“MSFT”。
  • GetExchangesSecurityTradedOn - 给定证券 ID,它将返回该证券交易所在的交易所名称
  • GetSecurityExchangeData - 给定证券 ID 和交易所名称,它返回该证券在该交易所的价格。

这是模拟服务的非常简单的实现

[ServiceContract]
public class MockFinancialDataService
{
    [OperationContract]
    public string GetSecurityName(string securityID)
    {
        return "MSFT";
    }

    [OperationContract]
    public string[] GetExchangesSecurityTradedOn(string securityID)
    {
        return new string[] { "NASDAQ", "NYCE" };
    }

    [OperationContract]
    public double GetSecurityExchangeData(string securityID, string exchangeName)
    {
        if (exchangeName == "NASDAQ")
            return 20.00;

        else if (exchangeName == "NYCE")
            return 30.00;

        return -1.0;
    }
}  

假设您正在从头开始构建此项目,您可以通过在解决方案资源管理器中右键单击客户端项目名称下的“引用”并选择“添加服务引用”来向客户端项目添加服务引用。将“https://:20575/MockFinancialDataService.svc”设置为服务 URL,然后单击“高级”按钮,并确保选中“允许生成异步操作”复选框,并在该复选框下选择“生成基于任务的操作”选项

如果无法选择“生成基于任务的操作”,则表示您的客户端项目未使用 .NET 4.5(很可能使用 .NET 4.0),您需要更改您的框架,然后重新配置服务引用。

这个客户端代码也非常简洁和简单(归功于 TPL 和 async-await 功能)

public static async Task RunIt()
{
    MockFinancialDataServiceClient client = new MockFinancialDataServiceClient();

    // we get the security name and echanges it is traded on in parallel at
    // the first stage. 
    Task<string> securityNameTask = client.GetSecurityNameAsync("1234");
    Task<string[]> exchangesTask = client.GetExchangesSecurityTradedOnAsync("1234");

    // we wait for the first stage tasks to complete
    await Task.WhenAll(securityNameTask, exchangesTask);

    string securityName = securityNameTask.Result;
    foreach (string exchangeName in exchangesTask.Result)
    {
        double price = await client.GetSecurityExchangeDataAsync("1234", exchangeName);

        Console.WriteLine(securityName + "\t" + exchangeName + "\t" + price);
    }
}  

我们并行请求证券名称和它交易的交易所。然后等待两者都产生结果。之后,对于每个返回的交易所,我们请求价格信息,并将证券名称、交易所名称和价格打印到控制台。

现在,试想一下,如果没有 TPL 功能,如何实现上述代码。您将至少需要为每个远程调用编写两个函数(或 lambda),而不是一个简洁的小函数。最重要的是,您必须添加大量代码来实现等待前两个异步调用完成,然后才能调用 GetSecurityExchangeDataAsync 远程方法的功能。

使用旧 API 进行远程服务调用

如果您的公司尚未迁移到 .NET 4.5,您仍然可以使用 Task 包装器来封装异步服务调用。解决方案 CallingServicesViaTasksOlderAPITest 展示了如何实现。它包含 WCF MockService 项目——与上一个示例相同,以及客户端项目。

当服务引用配置为生成异步服务调用时,.NET 4.0 及更早的 .NET 版本将生成以两种不同方式调用服务的方法——基于事件的异步模式(EAP)和异步编程模型(APM)。我们上面已经介绍了 EAP——它创建一个方法来启动异步操作,以及一个注册的回调事件,当异步操作完成时触发。APM 模式有两种方法——一种用于启动异步操作,另一种用于阻塞直到操作完成。

APM 模式有两种方法——一种用于启动异步操作,另一种用于阻塞直到操作完成。对于我们的远程服务——启动操作的 APM 方法以“Begin”前缀开头,而阻塞直到操作完成的方法以“End”前缀开头,例如,服务操作 GetSecurityName 会产生两个 APM 方法:BeginGetSecurityName 用于启动操作,EndGetSecurityName 用于阻塞直到操作完成。

我们使用 APM 模式为调用异步服务创建 Task 包装器——使用 Task.Factory.FromAsync<TResult>(...) 函数可以比 EAP 模式更容易实现。这是我们的客户端代码

static void Main(string[] args)
{
    MockFinancialDataServiceClient client = new MockFinancialDataServiceClient();

    Task<string> getNameTask = Task<string>.Factory.FromAsync<string>
    (
        client.BeginGetSecurityName,
        client.EndGetSecurityName,
        "1234", // argument to the async call
        null // state (not needed)
    );


    Task<string[]> getExhcangesTask = Task<string[]>.Factory.FromAsync<string>
    (
        client.BeginGetExchangesSecurityTradedOn,
        client.EndGetExchangesSecurityTradedOn,
        "1234",
        null
    );

    Task.WaitAll(getNameTask, getExhcangesTask);

    string securityName = getNameTask.Result;

    string[] exchanges = getExhcangesTask.Result;

    foreach (string exchangeName in exchanges)
    {
        Task<double> priceTask = Task<double>.Factory.FromAsync<string, string>
        (
            client.BeginGetSecurityExchangeData,
            client.EndGetSecurityExchangeData,
            "1234",
            exchangeName,
            null
        );

        Console.WriteLine(securityName + "\t" + exchangeName + "\t" + priceTask.Result);
    }
} 

Task.Factory.FromAsync<TResult>(...) 方法在两个 APM 方法周围创建一个任务。在这些参数之后,您需要将输入参数传递给 Begin... APM 方法。FromAsync 的最后一个参数是 state 变量,我们不使用它并将其传递为 null。FromAsync 有许多重载,允许向 Begin APM 函数输入多达三个参数。但是,如果 Begin 函数有三个以上参数,您总是可以将其包装在一个接受少于三个参数的 lambda 表达式或委托中,并将此 lambda 表达式传递给 FromAsync

请注意,我们之前的服务调用示例(使用 .NET 4.5 功能)不会阻塞它所运行的线程,例如 WPF 应用程序中的 UI 线程。然而,当前使用旧 .NET 功能的示例,确实通过其 WaitWaitAll 方法阻塞了当前线程。因此,如果您不想让线程被它阻塞,您总是可以在不同的线程上启动它,例如通过让它继续在其自己的线程上运行的 Task

将 BackgroundWorker 包装在任务中

有了 Task 功能,BackgroundWorker 功能大部分都已过时——您可以使用 Task 来实现所需的一切,而不是使用 BackgroundWorker。但您仍可能出于某些原因希望在团队中使用 BackgroundWorker 功能——无论是由于您的遗留代码使用了它,还是因为您的团队成员或您的老板更喜欢它并比新的 Task 功能更了解它。

在这里,我们展示了如何将 BackgroundWorker 包装在 Task 中,以便您可以为基于 BackgroundWorker 的代码获得 Task 功能的所有灵活性。

解决方案 BackgroundWorkerTaskWrap 展示了如何将后台工作器包装在 Task 中。

任务包装代码位于 BackgroundWorkerUtils.ToTask() 扩展函数下。它是基于 TaskCompletionSource 的,因此在转换过程中会启动生成的任务。它提供了 Task 取消和进度报告的接口。

public static Task<object> ToTask
(
    this BackgroundWorker backgroundWorker, 
    CancellationTokenSource cancellationTokenSource = null,
    IProgress<object> progress = null
)
{
    TaskCompletionSource<object> taskCompletionSource =
        new TaskCompletionSource<object>(TaskCreationOptions.AttachedToParent);

    if (cancellationTokenSource != null)
    {
        // when the task is cancelled, 
        // trigger CancelAsync function on the background worker
        cancellationTokenSource.Token.Register
        (
            () =>
            {
                if (backgroundWorker.WorkerSupportsCancellation)
                    backgroundWorker.CancelAsync();
            }
        );
    }

    if (progress != null)
    {
        backgroundWorker.ProgressChanged += (sender, progressChangedArgs) =>
            {
                progress.Report(progressChangedArgs.ProgressPercentage);
            };
    }

    RunWorkerCompletedEventHandler onCompleted = null;

    onCompleted = (object sender, RunWorkerCompletedEventArgs e) =>
        {
            backgroundWorker.RunWorkerCompleted -= onCompleted;
                    
            if (e.Cancelled)
            {
                // if the background worker was cancelled,
                // set the Task as cancelled.  
                taskCompletionSource.SetCanceled();
                taskCompletionSource.SetException(new OperationCanceledException());
            }
            else if (e.Error != null)
            {
                taskCompletionSource.SetException(e.Error);
            }
            else
            {
                taskCompletionSource.SetResult(e.Result);
            }
        };

    backgroundWorker.RunWorkerCompleted += onCompleted;

    backgroundWorker.RunWorkerAsync();

    return taskCompletionSource.Task;
}

以下是 Program.cs 文件中如何使用该功能的示例

    static void Main(string[] args)
    {
        BackgroundWorker backgroundWorker1 = new BackgroundWorker();

        backgroundWorker1.DoWork += (sender, e) =>
            {
                Thread.Sleep(2000);
                Console.WriteLine("this is backgroundWorker1");
            };

        BackgroundWorker backgroundWorker2 = new BackgroundWorker();
        backgroundWorker2.DoWork += (sender, e) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("this is backgroundWorker2");
        };

        BackgroundWorker backgroundWorker3 = new BackgroundWorker();
        backgroundWorker3.DoWork += (sender, e) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("this is backgroundWorker3");
        };

        // schedule backgroundWorker1 and backgroundWorker2 to run in parallel
        Task<object> t1 = backgroundWorker1.ToTask();
        Task<object> t2 = backgroundWorker2.ToTask();

        Task.WhenAll(t1, t2).Wait();

        // schedule backgroundWorker3 to run when both backgroundWorker1 
        // and backgroundWorker2 are completed
        Task<object> t3 = backgroundWorker3.ToTask();

        t3.Wait();
    }
}

在上面的示例中,我们安排了两个后台工作者并行运行,第三个后台工作者在两个第一个后台工作者完成运行后启动。

将 WPF Storyboard 包装到任务中

我一直希望能够组合多个故事板,以便其中一些可以并行运行,然后当它们完成时,其他一些动画会开始。Task 功能非常适合实现这一点。

故事板示例位于 WrappingStoryboardsInTasks 项目下。

项目的 XAML 文件包含两个蓝色矩形和三个 Storyboard 对象。前两个故事板分别旋转蓝色和红色矩形,而第三个则增加它们的宽度。

按下“Start Animation”按钮后,我们希望启动前两个 Storyboard,当它们都完成后,我们希望启动第三个。除此之外,一旦点击 StartAnimationButton 按钮,我们希望禁用它,并在最后一个动画停止后重新启用它,以防止在上次动画运行过程中启动新的故事板。

当我们使用 TPL 和 async-await 功能时,代码变得如此简单

async void StartAnimationButton_Click(object sender, RoutedEventArgs e)
{
    StartAnimationButton.IsEnabled = false;

    Storyboard storyboard1 = (Storyboard)MyGrid.Resources["Storyboard1"];
    Storyboard storyboard2 = (Storyboard)MyGrid.Resources["Storyboard2"];
    Storyboard storyboard3 = (Storyboard)MyGrid.Resources["Storyboard3"];

    Task<object> t1 = storyboard1.ToTask();
    Task<object> t2 = storyboard2.ToTask();

    // wait for storyboard1 and storyboard2 to finish
    await Task.WhenAll(t1, t2);

    // start storyboard3
    await storyboard3.ToTask();

    StartAnimationButton.IsEnabled = true;
}  

由于 TPL 和 async-await 的魔力,这段代码产生了一些回调——一个用于启动 storboard3,另一个用于启用 StartAnimationButton,但它看起来都像线性同步代码。

Storyboard 转换为 Task 的代码放置在 StoryboardToTask 静态类中,作为 ToTask(...) 扩展方法。

public static Task<object> ToTask
(
    this Storyboard storyboard, 
    CancellationTokenSource cancellationTokenSource = null
)
{
    TaskCompletionSource<object> taskCompletionSource =
        new TaskCompletionSource<object>(TaskCreationOptions.AttachedToParent);

    if (cancellationTokenSource != null)
    {
        // when the task is cancelled, 
        // Stop the storyboard
        cancellationTokenSource.Token.Register
        (
            () =>
            {
                storyboard.Stop();
            }
        );
    }

    EventHandler onCompleted = null;

    onCompleted = (object sender, EventArgs e) =>
    {
        storyboard.Completed -= onCompleted;

        taskCompletionSource.SetResult(null);
    };

    storyboard.Completed += onCompleted;

    // start the storyboard during the conversion.
    storyboard.Begin();

    return taskCompletionSource.Task;
}  

请注意,我们使用 TaskCompletionSource 来创建围绕 StoryboardsTask 包装器,因此我们应该在转换期间启动 Storyboard

Storyboard 本身由动画对象组成。最终(如果时间允许)我希望使用 Task 对象来包装单个动画,并构建一种新型的 Storyboard 类对象,其不同的动画基于前一个动画完成的时间而不是基于时间。这将使动画对象更容易构建和控制。

摘要

在本文中,我们提供了有关使用 TPL 和 async-await 功能创建几乎与同步代码一样简单的异步功能的信息。我们展示了如何将这些概念应用于发送和处理大量服务请求、调度多个 BackgroundWorkerStoryboard

© . All rights reserved.