65.9K
CodeProject 正在变化。 阅读更多。
Home

使用迭代器在 C#4 中等待任务

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.97/5 (30投票s)

2012年12月7日

CPOL

14分钟阅读

viewsIcon

119810

downloadIcon

1066

在 Visual Studio 2010 中编写外观类似同步的异步方法,无需 async/await。

引言

您可能已经阅读了 C#5 中的 asyncawait 关键字,以及它们如何简化异步编程。但遗憾的是,您的雇主(或您自己)在两年前才升级到 Visual Studio 2010,现在还没有准备好再次花费金钱购买 VS 2012。您被困在 VS 2010 和不支持该功能的 C#4 中。(本文也适用于 VB.NET 2010;语法不同,但方法相同。)您不禁感叹:“如果我能在 VS 2010 中编写出外观同步执行异步的方法,我的代码该会多么清晰!”

阅读本文后,您将能够做到这一点。我们将开发一小段基础设施代码,负责繁重的工作,使我们能够以类似于 C#5 的方式编写外观同步的异步方法(SLAMs)。(注意:如果您已经在使用 C#5,或者您对使用 Microsoft 的不受支持的 Async CTP 感到满意,那么本文不适用于您。)

我们必须一开始就承认,async/await 是一种很好的语法糖,而我们没有它,所以我们的代码会比有它们时“更朴实”。但它比编写自己的 IAsyncResult 回调那样“苦涩”的味道要“可口”得多!当您最终升级到 VS 2012(或更高版本)时,将您的方法转换为利用 C#5 关键字的优势将是一件轻而易举的事;这只需要简单的语法更改,而不是劳动繁重的结构重写。

概述

async/await 关键字建立在任务异步模式(Task Asynchronous Pattern,TAP)之上。TAP 已在其他地方进行了详尽的文档记录,因此我在此不赘述。我必须以个人名义补充一点:TAP 非常酷!您可以创建许多待稍后完成的小工作单元(任务);任务可以启动其他(嵌套)任务,和/或设置将在一个或多个前置任务完成后才开始的连续任务。任务不一定会在嵌套任务完成时占用一个线程(一种沉重的资源)。您也无需担心调度线程来执行任务;这是由框架自动处理的,您只需提供少量有用的提示。然后,当您运行程序时,所有任务都会逐渐完成,如同在虚拟弹珠机(Pachinko Machine)中碰撞的钢球一样!

在 C#4/.NET 4 中,我们没有 asyncawait,但我们确实拥有 Task 类型,只是缺少一些 .NET 4.5 的附加功能,这些功能我们可以不使用或自行构建。

在 C#5 的 async 方法中,您会 await 一个 Task。这不会导致线程阻塞;相反,该方法会向其调用者返回一个 Task,调用者可以(如果自身也是 async)在该任务上 await,或者附加连续任务。 (非 async 调用者也可以 Wait() 该任务或其 Result,但这会占用线程,所以要避免这样做。)当被等待的任务成功完成时,您的 async 方法将从中断处继续执行。

您可能知道,C#5 编译器会将 async 方法重写为一个生成嵌套类,该类实现了一个状态机。C#(自 2.0 起)中还有一个功能可以做到这一点:迭代器(带 yield return)。这里的想法是使用迭代器方法在 C#4 中构建状态机,返回一系列 Tasks,这些 Tasks 是整个过程中需要等待的步骤。我们将开发一个方法,该方法接受由迭代器返回的任务的枚举,并返回一个代表整个序列完成的单个覆盖任务,并提供其最终的 Result(如果存在)。

最终目标

Stephen Covey 建议我们“以终为始”。我们在这里也将这样做。大量示例展示了如何使用 async/await 编写 SLAMs。我们将如何不使用这些关键字来编写它们?让我们从一个简单的 C#5 async 方法开始,看看如何用 C#4 来表示它。然后,我们将更普遍地讨论如何转换任何需要它的代码片段。

如果我们能在 C#5 中使用异步读写来实现 Stream.CopyToAsync(),尽管 .NET 4.5 中已经提供了它(但我们实际上可以在没有它的 .NET 4 中使用转换后的版本!下载 ReadAsync()WriteAsync() 的示例代码)。

public static async Task CopyToAsync(
    this Stream input, Stream output,
    CancellationToken cancellationToken = default(CancellationToken))
{
    byte[] buffer = new byte[0x1000];   // 4 KiB
    while (true) {
        cancellationToken.ThrowIfCancellationRequested();
        int bytesRead = await input.ReadAsync(buffer, 0, buffer.Length);
        if (bytesRead == 0) break;

        cancellationToken.ThrowIfCancellationRequested();
        await output.WriteAsync(buffer, 0, bytesRead);
    }
} 

对于 C#4,我们将将其分解为两部分:一个具有相同签名和可访问性的方法,以及一个具有相同参数但返回类型不同的私有方法。私有方法是实现相同过程的迭代器,它返回一系列要等待的任务(IEnumerable<Task>)。序列中的实际任务可以是无泛型的,也可以是不同类型的泛型的,可以是任何组合。(幸运的是,泛型的 Task<T> 类型是无泛型 Task 类型的子类型。)

具有相同可访问性(此处为“public”)的方法返回与相应 async 方法相同的类型:voidTask 或泛型 Task<T>。它是一个简单的单行代码,调用私有迭代器,并使用扩展方法将其转换为 TaskTask<T>

public static /*async*/ Task CopyToAsync(
    this Stream input, Stream output,
    CancellationToken cancellationToken = default(CancellationToken))
{
    return CopyToAsyncTasks(input, output, cancellationToken).ToTask();
}
private static IEnumerable<Task> CopyToAsyncTasks(
    Stream input, Stream output,
    CancellationToken cancellationToken)
{
    byte[] buffer = new byte[0x1000];   // 4 KiB
    while (true) {
        cancellationToken.ThrowIfCancellationRequested();
        var bytesReadTask = input.ReadAsync(buffer, 0, buffer.Length);
        yield return bytesReadTask;
        if (bytesReadTask.Result == 0) break;

        cancellationToken.ThrowIfCancellationRequested();
        yield return output.WriteAsync(buffer, 0, bytesReadTask.Result);
    }
} 

异步方法名通常以“Async”结尾(除非它是事件处理程序,例如 startButton_Click)。给它的迭代器起相同的名字,加上“Tasks”(例如 startButton_ClickTasks)。如果异步方法返回 void,它仍然调用 ToTask() 但不返回 Task。如果异步方法返回 Task<X>,则它调用泛型 ToTask<X>() 扩展方法。对于这三种返回类型,async 替换方法如下所示:

public /*async*/ void DoSomethingAsync() {
    DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task DoSomethingAsync() {
    return DoSomethingAsyncTasks().ToTask();
}
public /*async*/ Task<String> DoSomethingAsync() {
    return DoSomethingAsyncTasks().ToTask<String>();
} 

配对的迭代器方法也并不复杂。其中 async 方法会 await 一个非泛型 Task,迭代器只需 yield return 它。当 async 方法会 await 一个任务结果时,迭代器会将任务保存在一个变量中,yield return 它,然后稍后使用它的 Result。两种情况都如上面的 CopyToAsyncTasks() 示例所示。

对于具有泛型结果 Task<X> 的 SLAM,迭代器必须 yield return 一个具有确切类型的最终任务。ToTask<X>() 会将最终任务强制转换为该类型以提取其 Result。通常,您的迭代器将从中间任务结果计算出值,然后只需将其包装在 Task<T> 中。 .NET 4.5 提供了用于此目的的便捷静态方法。我们在 .NET 4 中没有它,因此我们将实现它为 TaskEx.FromResult<T>(value)

您需要知道的最后一件事是如何处理从中间返回。async 方法可以从任意嵌套块返回;我们的迭代器通过在 yield return 返回值(如果存在)后结束迭代来模仿这一点。

// C#5
public async Task<String> DoSomethingAsync() {
    while (…) {
        foreach (…) {
            return "Result";
        }
    }
}

// C#4;  DoSomethingAsync() is necessary but omitted here.
private IEnumerable<Task> DoSomethingAsyncTasks() {
    while (…) {
        foreach (…) {
            yield return TaskEx.FromResult("Result");
            yield break;
        }
    }
} 

现在我们知道如何在 C#4 中编写 SLAM,但在实现 FromResult<T>() 和两个 ToTask() 扩展方法之前,我们实际上无法做到。让我们开始吧。

轻松入门

我们将在 System.Threading.Tasks.TaskEx 类中实现我们的 3 个方法,从两个最直接的方法开始。FromResult<T>() 创建一个 TaskCompletionSource<T>,填充其结果,并返回其 Task

public static Task<TResult> FromResult<TResult>(TResult resultValue) {
    var completionSource = new TaskCompletionSource<TResult>();
    completionSource.SetResult(resultValue);
    return completionSource.Task;
} 

显然,这两个 ToTask() 方法本质上是相同的,唯一的区别在于返回的任务是否具有结果值。我们不想编写和维护相同的过程两次,所以我们将用一个方法来实现另一个。泛型实现将查找一个“标记类型”来知道我们实际上不关心结果值,并且它将避免对最终任务进行类型转换。然后,我们可以使用标记类型来实现非泛型版本。

private abstract class VoidResult { }

public static Task ToTask(this IEnumerable<Task> tasks) {
    return ToTask<VoidResult>(tasks);
} 

到目前为止,一切顺利。现在剩下要做的就是实现泛型 ToTask<T>()。伙计们,我们要出发了。

粗略的第一尝试

在第一次尝试实现该方法时,我们将枚举返回的任务,Wait() 直到每个任务完成,然后设置最终任务的结果(如果适用)。当然,我们不想在此过程中占用当前线程,所以我们将启动另一个任务来执行此循环。

// BAD CODE !
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
    var tcs = new TaskCompletionSource<TResult>();
    Task.Factory.StartNew(() => {
        Task last = null;
        try {
            foreach (var task in tasks) {
                last = task;
                task.Wait();
            }

            // Set the result from the last task returned, unless no result is requested.
            tcs.SetResult(
                last == null || typeof(TResult) == typeof(VoidResult)
                    ? default(TResult) : ((Task<TResult>) last).Result);

        } catch (AggregateException aggrEx) {
            // If task.Wait() threw an exception it will be wrapped in an Aggregate; unwrap it.
            if (aggrEx.InnerExceptions.Count != 1) tcs.SetException(aggrEx);
            else if (aggrEx.InnerException is OperationCanceledException) tcs.SetCanceled();
            else tcs.SetException(aggrEx.InnerException);
        } catch (OperationCanceledException cancEx) {
            tcs.SetCanceled();
        } catch (Exception ex) {
            tcs.SetException(ex);
        }
    });
    return tcs.Task;
} 

这里有一些优点,并且只要不触及用户界面,它实际上是可以工作的。

  1. 它正确地返回 TaskCompletionSourceTask 并通过 Source 设置完成状态。
  2. 它展示了我们如何使用迭代器的最后一个任务来设置任务的最终 Result,并在不需要结果时避免这种情况。
  3. 它会捕获迭代器中的异常以设置 CanceledFaulted 状态。它还会传播枚举任务的状态(此处通过 Wait(),它可能会抛出包装了取消或故障异常的 AggregateException)。

但这里存在重大问题。最严重的包括:

  1. 为了让迭代器兑现其“外观同步”的承诺,当它从 UI 线程启动时,迭代器方法应该能够访问 UI 控件。您在此处可以看到 foreach 循环(调用迭代器)在后台运行;不要从那里触碰 UI!此方法不尊重 SynchronizationContext
  2. 即使在 UI 之外,我们也存在问题。我们可能希望在 SLAM 实现中并行创建许多许多任务。但是看看循环中的 Wait()!在等待嵌套任务时,可能需要很长时间才能完成远程操作,我们正在占用一个线程。我们将耗尽线程池线程。
  3. 以这种方式解包 AggregateException 简直太业余了。我们需要捕获并传播其完成状态,而不会抛出异常。
  4. 有时 SLAM 可以立即确定其完成状态。在这种情况下,C#5 的 async 方法将同步高效地运行。我们总是在这里调度一个后台任务,所以我们失去了这种可能性。

是时候发挥创意了!

通过连续任务循环

关键思想是立即同步获取迭代器产生的第一个任务。我们设置一个连续任务,以便当它完成时,该连续任务会检查任务的状态,(如果成功)获取下一个任务并设置另一个连续任务;依此类推,直到完成。(如果它确实完成了;迭代器没有要求它必须结束。)

// Pretty cool, but we're not there yet.
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks)
{
    var taskScheduler =
        SynchronizationContext.Current == null
            ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
    var tcs = new TaskCompletionSource<TResult>();
    var taskEnumerator = tasks.GetEnumerator();
    if (!taskEnumerator.MoveNext()) {
        tcs.SetResult(default(TResult));
        return tcs.Task;
    }

    taskEnumerator.Current.ContinueWith(
        t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
        taskScheduler);
    return tcs.Task;
}
private static void ToTaskDoOneStep<TResult>(
    IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
    TaskCompletionSource<TResult> tcs, Task completedTask)
{
    var status = completedTask.Status;
    if (status == TaskStatus.Canceled) {
        tcs.SetCanceled();

    } else if (status == TaskStatus.Faulted) {
        tcs.SetException(completedTask.Exception);

    } else if (!taskEnumerator.MoveNext()) {
        // Set the result from the last task returned, unless no result is requested.
        tcs.SetResult(
            typeof(TResult) == typeof(VoidResult)
                ? default(TResult) : ((Task<TResult>) completedTask).Result);

    } else {
        taskEnumerator.Current.ContinueWith(
            t => ToTaskDoOneStep(taskEnumerator, taskScheduler, tcs, t),
            taskScheduler);
    }
} 

这里有很多值得称赞的地方:

  1. 我们的连续任务使用了 TaskScheduler,如果存在 SynchronizationContext,它会尊重它。这使得我们的迭代器,无论是立即调用还是从连续任务调用,在从 UI 线程启动时都可以访问 UI 控件。
  2. 该过程通过连续任务运行,因此没有线程被占用等待!顺带一提,ToTaskDoOneStep() 中对自身的调用不是递归调用;它在 taskEnumerator.Current 任务完成后被调用的 lambda 表达式中。当前激活在调用 ContinueWith() 后几乎立即退出,并且与连续任务是独立的。
  3. 我们直接在每个嵌套任务的连续任务中检查其状态,而不是通过检查异常。
  4. 第一次迭代是同步进行的。

然而,这里至少有一个巨大的问题,还有一些较小的问题。

  1. 如果迭代器抛出未处理的异常,或通过抛出 OperationCanceledException 进行取消,我们不会处理它并设置主任务的状态。这是我们之前有但在此版本中丢失的功能。
  2. 要修复问题 #1,我们必须在调用 MoveNext() 的两个方法中引入相同的异常处理程序。即使现在,我们在两个方法中也设置了相同的连续任务。我们违反了“不要重复自己”的规则。
  3. 如果 Async 方法的任务期望提供 Result,但我们的迭代器在未提供任何任务的情况下退出怎么办?或者如果其最终任务类型不正确怎么办?在第一种情况下,我们默默地返回默认结果类型;在第二种情况下,我们抛出一个未处理的 InvalidCastException。由于此异常从未被观察到,任务系统将中止我们的整个进程!
  4. 最后,如果嵌套任务被取消或失败怎么办?我们设置主任务的状态,并且不再调用迭代器。它可能在一个 using 块或 try 块中,并有一个 finally 用于清理。我们应该 Dispose() 迭代器,而不是等待垃圾回收器来处理它。(我之前为此使用了一个连续任务,但它没有处理异常。我发现了一个更轻量级的替代方案,它可以。)

为了解决这些问题,我们将 MoveNext() 调用从 ToTask() 中移除,而是进行一个初始的同步调用 ToTaskDoOneStep()。然后我们就可以在一个地方添加适当的异常处理。

最终版本

这是 ToTask<T>() 的最终实现。它:

  • 使用 TaskCompletionSource 返回一个主任务,
  • 同步/高效地执行第一次迭代,
  • 如果存在,则尊重 SynchronizationContext
  • 从不阻塞线程,
  • 处理来自迭代器的异常,
  • 直接传播嵌套任务的完成(无需 AggregateException),
  • 在适当的时候将值返回给主任务,
  • 当 SLAM 迭代器未以有效结果结束时,以有用的异常进行故障处理,并且
  • 在枚举器完成时将其释放。
public static Task<TResult> ToTask<TResult>(this IEnumerable<Task> tasks) {
    var taskScheduler =
        SynchronizationContext.Current == null
            ? TaskScheduler.Default : TaskScheduler.FromCurrentSynchronizationContext();
    var taskEnumerator = tasks.GetEnumerator();
    var completionSource = new TaskCompletionSource<TResult>();

    ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, null);
    return completionSource.Task;
}

private static void ToTaskDoOneStep<TResult>(
    IEnumerator<Task> taskEnumerator, TaskScheduler taskScheduler,
    TaskCompletionSource<TResult> completionSource, Task completedTask)
{
    try {
        // Check status of previous nested task (if any), and stop if Canceled or Faulted.
        // In these cases, we are abandoning the enumerator, so we must dispose it.
        TaskStatus status;
        if (completedTask == null) {
            // This is the first task from the iterator; skip status check.
        } else if ((status = completedTask.Status) == TaskStatus.Canceled) {
            taskEnumerator.Dispose();
            completionSource.SetCanceled();
            return;
        } else if (status == TaskStatus.Faulted) {
            taskEnumerator.Dispose();
            completionSource.SetException(completedTask.Exception.InnerExceptions);
            return;
        }
    } catch (Exception ex) {
        // Return exception from disposing the enumerator.
        completionSource.SetException(ex);
        return;
    }

    // Find the next Task in the iterator; handle cancellation and other exceptions.
    Boolean haveMore;
    try {
        // Enumerator disposes itself if it throws an exception or completes (returns false).
        haveMore = taskEnumerator.MoveNext();

    } catch (OperationCanceledException cancExc) {
        completionSource.SetCanceled();
        return;
    } catch (Exception exc) {
        completionSource.SetException(exc);
        return;
    }

    if (!haveMore) {
        // No more tasks; set the result (if any) from the last completed task (if any).
        // We know it's not Canceled or Faulted because we checked at the start of this method.
        if (typeof(TResult) == typeof(VoidResult)) {        // No result
            completionSource.SetResult(default(TResult));

        } else if (!(completedTask is Task<TResult>)) {     // Wrong result
            completionSource.SetException(new InvalidOperationException(
                "Asynchronous iterator " + taskEnumerator +
                    " requires a final result task of type " + typeof(Task<TResult>).FullName +
                    (completedTask == null ? ", but none was provided." :
                        "; the actual task type was " + completedTask.GetType().FullName)));

        } else {
            completionSource.SetResult(((Task<TResult>) completedTask).Result);
        }

    } else {
        // When the nested task completes, continue by performing this function again.
        taskEnumerator.Current.ContinueWith(
            nextTask => ToTaskDoOneStep(taskEnumerator, taskScheduler, completionSource, nextTask),
            taskScheduler);
    }
}  

在 Try/Catch 块中等待

在 C#5 的 async 方法中,您可以在 try-catchtry 块中 await 一个任务;它的状态机支持这种情况。我们正在使用的 C#2 迭代器状态机不允许在这样的 try 块中 yield return;因此,我们无法轻易地在等效的位置执行等待。处理具有多个或嵌套 try-catch 块的通用情况需要一些手动工作。 

有一种简单的方法可以处理一个常见的特殊情况,即 try-catch 包含整个方法。私有迭代器仅包含 try 主体。在主异步方法中,在 ToTask() 调用后附加一个连续任务,并在那里处理异常。它看起来会像这样:

public Task<TheResult> DoSomethingAsync(TheArgs args) {
    return DoSomethingAsyncTasks(args).ToTask<TheResult>().ContinueWith(t => {
        try {
            if (t.IsFaulted)
                throw t.Exception.Flatten().InnerException;

        } catch (Exception ex) {    // One per handled exception.
            // Handle it.
        } finally {
            // Wrap up, if you need it.
        }
        return t;
    }).Unwrap();
}

对于在 trycatch 块中都使用的参数和局部变量,创建一个私有的嵌套类,并将它们作为公共字段。创建一个实例,复制参数,并将实例传递给迭代器。这是状态机构建器为我们完成的一些魔鬼工作,但在这里我们必须自己完成。 

public Task<TheResult> DoSomethingAsync(TheArgs args) {
    var locals = new DoSomethingAsyncLocals();
    locals.Fields = args;           // Copy each argument into a field of the nested class.
    return DoSomethingAsyncTasks(locals).ToTask<TheResult>().ContinueWith(t => {
		//...as above...

要处理通用情况,请将每个 try 主体与 catch 分成自己的 AsyncTasks 方法。在出现 try-catch 的地方,yield return 该方法的迭代器,将其转换为任务,附加连续任务,解开结果,如上所示。唉,这会丢失 async 方法的简洁性。如果迭代器支持从 try 进行 yield return 该有多好!幸运的是,许多(如果不是大多数)实际场景都可以实现,而无需为局部变量创建嵌套类。在 9 个月使用本文所述技术的使用过程中,我一次也没有需要这样做。(我只使用了上面所示的整个方法异常处理模式。) 

瞧! 现在您可以在 Visual Studio 2010 和 C#4(或 VB.NET 2010)中使用 SLAMs(外观同步的异步方法)了,而 asyncawait 不受支持。

关于下载示例

下载的项目包含两个基础设施文件,您可以将它们编译到您的程序集中以支持 .NET 4 中的异步编程:TaskEx.cs 包含本文开发的方法;AsyncIoEx.cs 提供了 .NET 4.5 中添加的一些方法来支持异步流和 Web 操作。(当然,将它们转换为 VB.NET 2010 使用也很简单。)

作为示例,MainWindow.xaml.cs 实现了本文所述的两个异步方法,并在事件处理程序中有效使用了连续任务。该示例源自一个Async/Await 演练项目。作为一项练习,移除 ToTask() 方法,并尝试仅使用任务连续任务或其他回调来重新实现异步方法。如果过程是线性的,并且所有等待都位于顶层,则代码虽然丑陋但编写起来并不太难。一旦所需的等待落入嵌套块,就几乎不可能保持相同的语义并保持异步(即,从不 Wait() 一个 Task),而无需使用本文所述的方法。 

关注点

在最终版本之前,我将一个 CancellationToken 传递给 ToTask() 并将其传递到 ToTaskDoOneStep() 的连续任务中。(对本文来说,这是无关紧要的噪音,所以我删除了它们。)这有两个原因。首先,在处理 OperationCanceledException 时,我会检查其 CancellationToken 以确保它与操作的令牌匹配。如果不匹配,则视为故障而非取消。虽然技术上正确,但取消令牌混淆的可能性非常小,因此不值得费力地将其传递给 ToTask() 调用并在连续任务之间传递。(如果您是 Task 专家,能在评论中给我一个可能有效发生的用例,我会重新考虑。)

第二个原因是,我可以在每次调用 MoveNext() 进入迭代器之前检查令牌是否已取消,立即取消主任务,并退出进程。这提供了取消行为,而无需您的迭代器检查令牌。我现在确信这是错误的做法,因为在某个给定的 yield return 点取消可能不适合异步进程——也许最好完全由迭代器进程控制——但我曾想尝试一下。结果不起作用。我发现有些情况下,任务被取消了,但其连续任务没有被触发。在示例代码中,我依赖一个连续任务来重新启用按钮,但它并不总是可靠地工作,所以有时按钮在进程被取消后仍然保持禁用状态。(如果任何 Task 专家能解释为什么会发生这个问题,我将不胜感激!)  

历史 

2012-12-06 

  • 初始版本 

2012-12-11 

  • 添加了“与 Async/Await 的区别”部分 

2013-08-29 

  • 将“与 Async/Await 的区别”替换为“在 Try/Catch 块中等待”部分 
  • 更新了 ToTask 实现
© . All rights reserved.