65.9K
CodeProject 正在变化。 阅读更多。
Home

Adapting Event and Callback Based Asynchronicity to the Task Framework

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.43/5 (7投票s)

2021 年 2 月 19 日

MIT

7分钟阅读

viewsIcon

9046

downloadIcon

103

Use TaskCompletionSource to turn an event or callback based model into a Task based one

Task completion source

引言

.NET 的 TPL/Task API 是为您的应用程序添加异步功能而无需大量麻烦的出色方式。C# 编译器对它的支持使其更加出色。但是,某些 API 仍使用其他方式,例如回调或事件来提供异步完成通知。

在本文中,我们将探讨 TaskCompletionSource<T>,它允许我们基于其他形式的异步通知来创建 Task<T> 对象。

概念化这个混乱的局面

Task<T> 是什么,不是什么

本质上,Task 是一种契约,它提供了一种标准的机制来等待工作完成、报告该工作中的错误以及发出该工作取消的信号。它仅此而已。

Tasks 不是线程。Tasks 再次,只是一个用于发出异步工作单元信号的契约。如何异步执行该工作是实现细节,如果它确实是异步的。例如,可以基于与外部硬件的通信来发出工作开始和完成的信号,这与占用 CPU 资源无关,甚至可能根本不涉及额外的线程。

了解 TaskCompletionSource<T>

TaskCompletionSource<T> 可能是创建 Task<T> 对象以返回给异步方法调用者的最常见方式。这些是轻量级对象,它们仅保留进行中异步操作的状态信息。

您只需创建一个对象,然后在稍后工作完成后,您可以调用 SetResult(T result),或者在出错的情况下调用 SetException(Exception ex)。这个对象没有什么神奇之处。使用它的唯一技巧在于您。您的工作是设法传递 TaskCompletionSource<T> 对象,以便您以后可以在工作完成后找到它。我们可以利用匿名方法提供的提升功能来完成 TaskCompletionSource<T> 对象的传递到完成处理程序。当稍后深入代码时,这一切都会变得清晰。比听起来简单得多。

一旦我们创建了 TaskCompletionSource<T>,我们就可以使用它的 Task 属性来获取一个 Task<T> 实例,我们可以将其返回给异步方法的调用者。当然,如果我们从未对 TaskCompletionSource<T> 调用 SetResult()SetException()SetCanceled(),那么等待我们的任务将永远不会返回,所以请确保在任务处理代码中的每种情况下都调用这些方法之一。

注意:对于用于不返回结果的非泛型 Task 对象,没有相应的非泛型 TaskCompletionSource 对象。您可以简单地将 bool 用作虚拟的 T 值。所有 Task<T> 对象都可以转换为 Task。如果您担心有人可能会将您的 Task 转换回 Task<bool>,您可以创建一个私有的虚拟类型作为您的 T 参数。无论如何,在不返回结果时,您只会使用 SetResult(default(T)) 来返回您的结果,而该结果永远不会被使用。

这是比描述更容易编码的事情之一。阅读代码可能也比我笨拙的描述更容易。这种技术有一些注意事项,我们将在介绍代码后进行讨论,但首先,更多背景信息。

看,一个用例!

值得庆幸的是,随着任务框架和基于任务的 TPL 模型的引入,Microsoft 已经用基于任务的异步方法包装了他们的大部分基础类库。然而,这使得我很难找到一个类来演示这项技术,而这个类还没有被包装。

最近,我构建了这里概述的技术,以将商业库中基于事件的异步模型适配到基于 TPL 的模型。然而,尽管这很有用,但我不希望您因为需要使用商业产品才能跟上。

经过一番思考,我确实想到了一个我们可以尝试的类。System.Diagnostics.Process 类允许您运行一个进程,并使用基于事件的模型来通知进程退出(无论是完成还是错误)。我们将把它适配到基于 TPL 的模型。

有时能够执行命令行进程并在您的程序中捕获输出非常有用。您目前可以使用 Process 类通过挂钩 Exited 事件或使用 WaitForExit() 方法来做到这一点,每个方法都提供自己的完成信号方式。两者中,前者更灵活,因为后者无条件地挂起当前线程直到进程完成,有效地使其成为阻塞的-同步的。

我们要做的就是提供一个单一的异步方法,该方法接受一个 ProcessStartInfo 对象并返回一个(可等待的)Task<Stream>。当进程退出时,任务将完成,届时您可以获取进程写入 stdout 的内容作为操作结果。该代码返回一个 Stream 而不是 TextReader,仅仅是为了允许返回图像等二进制内容的进程。

编写这个混乱的程序

最后,我们来看一些代码。

首先,我创建了一个该方法的同步版本和一个异步版本。每个版本使用不同的方法来发出进程已完成的信号。当然,同步版本会阻塞。异步版本是可等待的。除此之外,它们的行为(或至少应该)是相同的。

我们首先介绍同步版本,因为这样,我们就可以在创建基于任务的使用方法之前,介绍使用进程的基本步骤。

static Stream RunWithCapture(ProcessStartInfo startInfo)
{
    // create a process
    var proc = new Process();
    // fill it with what we need to capture
    proc.StartInfo = startInfo;
    proc.EnableRaisingEvents = false;
    startInfo.RedirectStandardOutput = true;
    startInfo.UseShellExecute = false;
    // start doesn't block
    proc.Start();
    // so we use WaitForExit() to block
    proc.WaitForExit();
    // grab our output stream
    var result =proc.StandardOutput.BaseStream;
    // close the process
    proc.Dispose();
    return result;
}

注意对 WaitForExit() 的调用,这会导致方法阻塞直到进程不再运行。

您可以像这样调用 RunWithCapture()

var psi = new ProcessStartInfo() {
    FileName = "targetapp.exe" 
};
using (var stream = RunWithCapture(psi))
{
    // use a streamreader because we want text
    var sr = new StreamReader(stream);
    // no need to dispose it because 
    // we are disposing the stream
    Console.Write(sr.ReadToEnd());
}

现在让我们来看看基于任务的异步版本。我们在这里需要挂钩 Exited 事件而不是调用 WaitForExit(),因为我们不能阻塞。

static Task<Stream> RunWithCaptureAsync(ProcessStartInfo startInfo)
{
    // we use this as a signal for our custom task
    // we can use SetResult, SetException, and even SetCancelled
    // to signal the completion of a task, with or without errors
    // do not throw exceptions in your async handlers.
    // Use SetException.
    var tcs = new TaskCompletionSource<Stream>();
    try
    {
        // create a process, and set it up for capturing
        // and raising events
        var proc = new Process();
        proc.StartInfo = startInfo;
        proc.EnableRaisingEvents = true;
        startInfo.RedirectStandardOutput = true;
        startInfo.UseShellExecute = false;
        // attach an event handler that signals completion
        // of our task - here Exited serves us well.
        // note we're using hoisting to pass the 
        // TaskCompletionSource through to the anonymous
        // method:
        proc.Exited += (object sender, EventArgs e) => {
            // if we were doing anything non-trivial here
            // we'd wrap everything in a try/catch block
            // and use tcs.SetException() in the catch block
            // instead of throwing. We don't need that here
            // because nothing should throw.
            tcs.SetResult(proc.StandardOutput.BaseStream);
            (sender as IDisposable)?.Dispose();
        };
        // finally, start the process
        proc.Start();
    }
    catch (Exception ex)
    {
        // signal an exception
        tcs.SetException(ex);
    }
    // here we return the task to the caller
    return tcs.Task;
}

我对这个做了大量注释。这里的大问题(如果有的话)是 Exited 事件处理程序,我们使用匿名函数挂钩了它。在该处理程序中,我们只需调用 tcs.SetResult() 来获取进程的输出流,然后 Dispose() 进程 - 这里由 sender 表示。我们通过提升的魔力获得了 tcs,这是 C# 在您使用匿名函数时提供的。我们基本上无法将其传递给 Exited 处理程序,除非我们使用静态成员变量,这很丑陋。传递任务完成源的首选方法是使用 state 参数,但这通常是回调才有的,而不是事件。请注意,您也可以在回调中使用这种整体技术,只需稍作修改。

如果您仔细看该方法,您会注意到事件处理程序先出现,但其中的代码在包含方法(RunWithCaptureAsync())返回很久之后才执行。请记住,Start() 不会阻塞。相反,我们通过事件而不是阻塞来获得完成信号。

使用方式与之前几乎相同

var psi = new ProcessStartInfo() {
  FileName = "targetapp.exe"
};
using (var stream = await RunWithCaptureAsync(psi))
{
    var sr = new StreamReader(stream);
    Console.Write(sr.ReadToEnd());
}

注意事项

您可能想知道为什么这些不是 Process 的扩展方法。主要原因,至少在异步方法的情况下,是因为 Exited 处理程序会使事情复杂化。首先,Process 对象可以被回收,这意味着 Exited 事件在其生命周期内可以触发多次,但我们的处理程序只期望它触发一次。如果您使用 TrySetResult()TrySetException(),它至少可以在连续调用时默默失败,但这仍然一点都不好。如果您确实需要挂钩一个可能被多次调用而不是一次的事件,或者您必须与其他方共享该对象及其事件处理程序,您需要在处理程序运行时删除您自己的处理程序委托。这比听起来要困难得多,因为您的处理程序是一个匿名函数,并且它需要访问自己的委托。解决方案是使用包含类的成员变量来保存它,但这很丑陋。不过,它允许我们调用 Exited-=_myExitedHandler 并传递有意义的东西。我在这里没有这样做。另外,如果您在不进行同步的情况下从多个线程访问您的代码,这样做也会有问题。

总之,对于被触发多次的事件,或者您必须与其他方共享对象及其事件处理程序,使用起来并不容易。

希望这项技术可以帮助您用更现代的可等待方法来包装一些丑陋的旧事件或基于回调的异步代码。

历史

  • 2021年2月19日 - 首次提交
© . All rights reserved.