65.9K
CodeProject 正在变化。 阅读更多。
Home

DotNetCore 中的异步编程实践指南

starIconstarIconstarIconstarIconstarIcon

5.00/5 (7投票s)

2021 年 1 月 27 日

CPOL

9分钟阅读

viewsIcon

12964

DotNetCore 中异步编程的实际演练

引言

我在该主题上的第一篇文章概述了 DotNetCore 中的异步编程,并解释了一些关键概念。你可以在这里找到该文章。本文采用实际方法来演示其中一些关键概念,并介绍更复杂的编码模式。文章围绕 DotNetCore 控制台应用程序。

你需要一个与 DotNetCore 兼容的开发环境,通常是 Visual Studio 或 Visual Code,以及与该项目相关的存储库副本才能运行代码。

免责声明 - 代码是实验性的,并非用于生产。为了易于阅读和理解,代码设计得简洁,错误捕获和处理最少。出于同样的原因,类也保持简单。

代码仓库

代码可在 GitHub 存储库这里找到。此项目的代码位于 Async-Demo 中。忽略任何其他项目 - 它们是为另一篇异步编程文章准备的。

库类

在我们开始之前,你需要了解两个辅助类

  1. LongRunningTasks - 模拟工作
    1. RunLongProcessorTaskAsyncRunLongProcessorTask 使用素数计算来模拟处理器密集型任务。
    2. RunYieldingLongProcessorTaskAsync 是一个每计算 100 次就产生一次的版本。
    3. RunLongIOTaskAsync 使用 Task.Delay 来模拟慢速 I/O 操作。
  2. UILogger 提供了一个抽象层,用于将信息记录到 UI。你将一个委托 Action 传递给方法。UILogger 构建消息,然后调用 Action 来实际将其写入 Action 配置写入的任何位置。在我们的例子中,是 Program 中的 LogToConsole,它运行 Console.WriteLine。它也可以很容易地写入文本文件。

入门

我们的第一个挑战是从同步切换到异步。

确保你使用的是正确的框架和最新的语言版本。(C# 7.1 及更高版本支持基于 Task 的 Main)。

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net5</TargetFramework>
    <LangVersion>latest</LangVersion>
    <RootNamespace>Async_Demo</RootNamespace>
  </PropertyGroup>

在 #7.1 之前,Main 只能同步运行,你需要一个“NO-NO”,使用 Wait,来防止 Main 提前退出并关闭程序。在 #7.1 之后,将 Main 声明为返回一个 Task

下面的代码展示了 async Main 模式。声明 async 取决于代码中是否有 await

// With await
static async Task Main(string[] args)
{
    // code
    // await somewhere in here
}

// No awaits
static Task Main(string[] args)
{
    // code
    // no awaits
    return Task.CompletedTask;
}

注意:

  1. 如果你使用了 async 关键字但没有 await,编译器会发出警告,但仍会编译,将该方法视为同步代码。
  2. 你不能声明一个方法为 async 并返回一个 Task。你只需返回正确的值,编译器就会完成所有繁重的工作。

那么让我们运行一些代码。我们的第一次运行

static Task Main(string[] args)
{
    var watch = new Stopwatch();
    watch.Start();
    UILogger.LogThreadType(LogToConsole, "Main");
    var millisecs = LongRunningTasks.RunLongProcessorTask(5);
    watch.Stop();
    UILogger.LogToUI(LogToConsole, $"Main ==> Completed in 
                   { watch.ElapsedMilliseconds} milliseconds", "Main");
    return Task.CompletedTask;
}

如预期的那样,Task 是同步运行的。Task 内部包含一堆同步代码。没有产生。

[11:35:32][Main Thread][Main] >  running on Application Thread
[11:35:32][Main Thread][LongRunningTasks] > ProcessorTask started
[11:35:36][Main Thread][LongRunningTasks] > ProcessorTask completed in 3399 millisecs
[11:35:36][Main Thread][Main] > Main ==> Completed in 3523 milliseconds
Press any key to close this window . . .

我们的第二次运行

static async Task Main(string[] args)
{
    var watch = new Stopwatch();
    watch.Start();
    UILogger.LogThreadType(LogToConsole, "Main");
    var millisecs = await LongRunningTasks.RunLongProcessorTaskAsync(5, LogToConsole);
    UILogger.LogToUI(LogToConsole, $"Yielded to Main", "Main");
    watch.Stop();
    UILogger.LogToUI(LogToConsole, $"Main ==> Completed in 
                   { watch.ElapsedMilliseconds} milliseconds", "Main");
}

Task 是同步运行的 - 没有产生。这是合乎逻辑的,因为没有理由产生。RunLongProcessorTaskAsync 是 Task 内的一堆同步代码 - 计算素数 - 因此它运行完成。await 是多余的,它可能是一个 Task,但它不会产生,所以直到完成之前都不会放弃线程。

[11:42:43][Main Thread][Main] >  running on Application Thread
[11:42:43][Main Thread][LongRunningTasks] > ProcessorTask started
[11:42:46][Main Thread][LongRunningTasks] > ProcessorTask completed in 3434 millisecs
[11:42:46][Main Thread][Main] > Yielded
[11:42:46][Main Thread][Main] > Main ==> Completed in 3593 milliseconds

我们的第三次运行

static async Task Main(string[] args)
{
    var watch = new Stopwatch();
    watch.Start();
    UILogger.LogThreadType(LogToConsole, "Main");
    var millisecs = LongRunningTasks.RunYieldingLongProcessorTaskAsync(5, LogToConsole);
    UILogger.LogToUI(LogToConsole, $"Yielded to Main", "Main");
    watch.Stop();
    UILogger.LogToUI(LogToConsole, $"Main ==> Completed in 
                   { watch.ElapsedMilliseconds} milliseconds", "Main");
}

在我们查看结果之前,让我们看看 RunLongProcessorTaskAsyncRunYieldingLongProcessorTaskAsync 之间的区别。我们添加了 Task.Yield() 以便每计算 100 个素数就产生一次控制。

if (isPrime)
{
    counter++;
    // only present in Yielding version
    if (counter > 100)
    {
        await Task.Yield();
        counter = 0;
    }
}

长时间运行的任务没有完成。RunYieldingLongProcessorTaskAsync 在计算完前 100 个素数后(约 173 毫秒)将控制权交还给 Main,而 Main 在产生期间运行完成。

[12:13:56][Main Thread][Main] >  running on Application Thread
[12:13:56][Main Thread][LongRunningTasks] > ProcessorTask started
[12:13:57][Main Thread][Main] > Yielded to Main
[12:13:57][Main Thread][Main] > Main ==> Completed in 173 milliseconds

如果我们更新 Mainawait 长处理器任务

    var millisecs = await LongRunningTasks.RunYieldingLongProcessorTaskAsync(5, LogToConsole);

它运行完成了。虽然它产生了,但我们在 Main 中继续之前,等待 RunYieldingLongProcessorTaskAsync Task 完成。这里还有另一个重要点需要注意。看看长任务在哪条线程上运行,并与之前的运行进行比较。它在开始于 [Main Thread] 之后跳到了一个新线程 [LongRunningTasks Thread]

[12:45:10][Main Thread:1][Main] >  running on Application Thread
[12:45:11][Main Thread:1][LongRunningTasks] > ProcessorTask started
[12:45:14][LongRunningTasks Thread:7][LongRunningTasks] > 
                                      ProcessorTask completed in 3892 millisecs
[12:45:14][LongRunningTasks Thread:7][Main] > Yielded to Main
[12:45:14][LongRunningTasks Thread:7][Main] > Main ==> Completed in 4037 milliseconds

RunYieldingLongProcessorTaskAsync 中添加一个快速的 Console.Write 来查看每次产生迭代在哪个线程上运行 - 打印 ManagedThreadId

counter++;
if (counter > 100)
{
    Console.WriteLine($"Thread ID:{Thread.CurrentThread.ManagedThreadId}");
    await Task.Yield();
    counter = 0;
}

结果如下所示。请注意规律的线程跳转。Yield 会创建一个新的 continuation Task,并将其调度为异步运行。在第一次 Task.Yield 时,应用程序线程调度器会将新的 Task 传递给应用程序池,然后应用程序池调度器会决定在哪里运行 Tasks。

引用 Microsoft 的话说,Task.Yield()“创建了一个可等待的任务,该任务在等待时会异步地回到当前上下文。”我将其翻译为它是产生控制权并创建 continuation `Task` 的语法糖,该 continuation `Task` 会在调度器准备好时重新调度以运行。进一步引用:“一个上下文,当等待时,将在等待时异步地转换回当前上下文。”换句话说,除非你告诉它,否则它不会 `await`。在 continuation 中遇到第一个 yield,处理就会继续到 `Task.Yield()` 下面的代码。我已经测试过了。

但是,以下警告仍然适用 - 再次引用官方文档

但是,上下文将决定如何优先处理此工作与可能挂起的其他工作。大多数 UI 环境中 UI 线程上的同步上下文通常会优先处理发布到该上下文的工作,而不是输入和渲染工作。因此,不要依赖 await Task.Yield 来保持 UI 的响应。
[12:38:16][Main Thread:1][Main] >  running on Application Thread
[12:38:16][Main Thread:1][LongRunningTasks] > ProcessorTask started
Thread ID:1
Thread ID:4
Thread ID:4
Thread ID:6
Thread ID:6
Thread ID:7

最后,切换到 RunLongIOTaskAsync 长时间运行的任务。

    var millisecs = await LongRunningTasks.RunLongIOTaskAsync(5, LogToConsole);

如果你不 await,结果和之前一样

[14:26:46][Main Thread:1][Main] >  running on Application Thread
[14:26:47][Main Thread:1][LongRunningTasks] > IOTask started
[14:26:47][Main Thread:1][Main] > Yielded to Main
[14:26:47][Main Thread:1][Main] > Main ==> Completed in 322 milliseconds

如果你 await,它会运行完成,同样会发生线程切换。

[14:27:16][Main Thread:1][Main] >  running on Application Thread
[14:27:16][Main Thread:1][LongRunningTasks] > IOTask started
[14:27:21][LongRunningTasks Thread:4][LongRunningTasks] > IOTask completed in 5092 millisecs
[14:27:21][LongRunningTasks Thread:4][Main] > Yielded to Main
[14:27:21][LongRunningTasks Thread:4][Main] > Main ==> Completed in 5274 milliseconds

更复杂的场景

好的,现在让我们更贴近现实,看看实际工作的代码。

JobRunner

JobRunner 是一个用于运行和控制异步作业的简单类。就我们的目的而言,它运行了一个长时间运行的任务来模拟工作,但你可以在真实世界中使用基本模式。

代码的大部分内容不言自明,但我将介绍 TaskCompletionSource

引用 MS 的话说,“表示一个未绑定到委托的 Task<TResult> 的生产者端,通过 Task 属性提供对消费者端的访问。”你将获得一个由 `TaskCompletionSource.Task` 公开的 `Task`,你可以通过 `TaskCompletionSource` 实例来控制它 - 换句话说,一个手动控制的 `Task`,与方法解耦。

代表 JobRunner 状态的 Task 作为 JobTask 属性公开。如果底层 TaskCompletionSource 未设置,它将返回一个简单的 Task.CompletedTask 对象,否则返回 JobTaskControllerTaskRun 方法使用异步事件模式 - 我们需要一段异步运行的代码,并通过 await 产生控制权。Run 控制 Task 状态,但 Task 本身独立于 RunIsRunning 确保你无法在作业运行时启动它。

class JobRunner
{
    public enum JobType { IO, Processor, YieldingProcessor } 

    public JobRunner(string name, int secs, JobType type = JobType.IO)
    {
        this.Name = name;
        this.Seconds = secs;
        this.Type = type;
    }

    public string Name { get; private set; }
    public int Seconds { get; private set; }
    public JobType Type { get; set; }
    private bool IsRunning;

    public Task JobTask => this.JobTaskController == null ? 
                          Task.CompletedTask : this.JobTaskController.Task;
    private TaskCompletionSource JobTaskController { get; set; } = new TaskCompletionSource();

    public async void Run()
    {
        if (!this.IsRunning) {
            this.IsRunning = true;
            this.JobTaskController = new TaskCompletionSource();
            switch (this.Type)
            {
                case JobType.Processor:
                    await LongRunningTasks.RunLongProcessorTaskAsync
                         (Seconds, Program.LogToConsole, Name);
                    break;
                    
                case JobType.YieldingProcessor:
                    await LongRunningTasks.RunYieldingLongProcessorTaskAsync
                         (Seconds, Program.LogToConsole, Name);
                    break;

                default:
                    await LongRunningTasks.RunLongIOTaskAsync
                         (Seconds, Program.LogToConsole, Name);
                    break;
            }

            this.JobTaskController.TrySetResult();
            this.IsRunning = false;
        }
    }
}

JobScheduler

JobScheduler 是用于实际调度作业的方法。它与 Main 分开,以演示异步编程的一些关键行为。

  1. Stopwatch 提供计时功能。
  2. 创建了四个不同的 IO 作业。
  3. 启动了四个作业。
  4. 使用 Task.WhenAll 在继续之前等待某些任务。请注意,TaskJobRunner 实例公开的 JobTask
`WhenAll` 是几个静态 `Task` 方法之一。`WhenAll` 创建一个单一的 `Task`,该 `Task` 等待提交数组中的所有 Tasks。当所有 Tasks 都完成时,其状态将变为*完成*。`WhenAny` 类似,但在任何一个 Tasks 完成时都会变为*完成*。它们可以命名为 *AwaitAll* 和 *AwaitAny*。`WaitAll` 和 `WaitAny` 是阻塞版本,与 `Wait` 类似。我不确定命名约定略有混淆的原因 - 我确定有一个。
 
static async Task JobScheduler()
{
    var watch = new Stopwatch();
    watch.Start();
    var name = "Job Scheduler";
    var quickjob = new JobRunner("Quick Job", 3);
    var veryslowjob = new JobRunner("Very Slow Job", 7);
    var slowjob = new JobRunner("Slow Job", 5);
    var veryquickjob = new JobRunner("Very Quick Job", 2);
    quickjob.Run();
    veryslowjob.Run();
    slowjob.Run();
    veryquickjob.Run();
    UILogger.LogToUI(LogToConsole, $"All Jobs Scheduled", name);
    await Task.WhenAll(new Task[] { quickjob.JobTask, veryquickjob.JobTask }); ;
    UILogger.LogToUI(LogToConsole, $"Quick Jobs completed in 
                   {watch.ElapsedMilliseconds} milliseconds", name);
    await Task.WhenAll(new Task[] { slowjob.JobTask, quickjob.JobTask, 
                                   veryquickjob.JobTask, veryslowjob.JobTask }); ;
    UILogger.LogToUI(LogToConsole, $"All Jobs completed in 
                   {watch.ElapsedMilliseconds} milliseconds", name);
    watch.Stop();
}

我们现在需要对 Main 进行一些更改

static async Task Main(string[] args)
{
    var watch = new Stopwatch();
    watch.Start();
    UILogger.LogThreadType(LogToConsole, "Main");
    var task = JobScheduler();
    UILogger.LogToUI(LogToConsole, $"Job Scheduler yielded to Main", "Main");
    await task;
    UILogger.LogToUI(LogToConsole, $"final yield to Main", "Main");
    watch.Stop();
    UILogger.LogToUI(LogToConsole, $"Main ==> Completed in 
                   { watch.ElapsedMilliseconds} milliseconds", "Main");

    //return Task.CompletedTask;
}

运行此代码时,你会看到下面的输出。有趣的地方在于

  1. 每个作业都会启动,然后在第一个 await 处产生,将控制权交还给调用者 - 在此例中是 JobSchedular
  2. JobScheduler 运行到其第一个 await 并产生回 Main
  3. 当前两个作业完成时,它们的 JobTask 被设置为完成,JobScheduler 继续执行下一个 await
  4. JobScheduler 完成的时间略长于运行最长 Job 所需的时间。
[16:58:52][Main Thread:1][Main] >  running on Application Thread
[16:58:52][Main Thread:1][LongRunningTasks] > Quick Job started
[16:58:52][Main Thread:1][LongRunningTasks] > Very Slow Job started
[16:58:52][Main Thread:1][LongRunningTasks] > Slow Job started
[16:58:52][Main Thread:1][LongRunningTasks] > Very Quick Job started
[16:58:52][Main Thread:1][Job Scheduler] > All Jobs Scheduled
[16:58:52][Main Thread:1][Main] > Job Scheduler yielded to Main
[16:58:54][LongRunningTasks Thread:4][LongRunningTasks] > 
          Very Quick Job completed in 2022 millisecs
[16:58:55][LongRunningTasks Thread:4][LongRunningTasks] > 
          Quick Job completed in 3073 millisecs
[16:58:55][LongRunningTasks Thread:4][Job Scheduler] > 
          Quick Jobs completed in 3090 milliseconds
[16:58:57][LongRunningTasks Thread:4][LongRunningTasks] > 
          Slow Job completed in 5003 millisecs
[16:58:59][LongRunningTasks Thread:6][LongRunningTasks] > 
          Very Slow Job completed in 7014 millisecs
[16:58:59][LongRunningTasks Thread:6][Job Scheduler] > 
          All Jobs completed in 7111 milliseconds
[16:58:59][LongRunningTasks Thread:6][Main] > final yield to Main
[16:58:59][LongRunningTasks Thread:6][Main] > Main ==> Completed in 7262 milliseconds

现在将作业类型更改为 Processor,如下所示

var quickjob = new JobRunner("Quick Job", 3, JobRunner.JobType.Processor);
var veryslowjob = new JobRunner("Very Slow Job", 7, JobRunner.JobType.Processor);
var slowjob = new JobRunner("Slow Job", 5, JobRunner.JobType.Processor);
var veryquickjob = new JobRunner("Very Quick Job", 2, JobRunner.JobType.Processor);

当你运行此代码时,你会发现所有内容都在 Main Thread 上顺序运行。起初,你会想为什么?我们有多个可用线程,调度器也展示了它在线程之间切换任务的能力。为什么它没有切换?

答案非常简单。一旦我们初始化 JobRunnner 对象,我们就将它们一个接一个地放入 Scheduler。由于我们运行的代码是顺序的 - 计算素数时不间断 - 在第一个作业完成之前,我们不会执行下一行代码(输入第二个作业)。

[17:59:48][Main Thread:1][Main] >  running on Application Thread
[17:59:48][Main Thread:1][LongRunningTasks] > Quick Job started
[17:59:53][Main Thread:1][LongRunningTasks] > Quick Job completed in 4355 millisecs
[17:59:53][Main Thread:1][LongRunningTasks] > Very Slow Job started
[17:59:59][Main Thread:1][LongRunningTasks] > Very Slow Job completed in 6057 millisecs
[17:59:59][Main Thread:1][LongRunningTasks] > Slow Job started
[18:00:03][Main Thread:1][LongRunningTasks] > Slow Job completed in 4209 millisecs
[18:00:03][Main Thread:1][LongRunningTasks] > Very Quick Job started
[18:00:05][Main Thread:1][LongRunningTasks] > Very Quick Job completed in 1737 millisecs
[18:00:05][Main Thread:1][Job Scheduler] > All Jobs Scheduled
[18:00:05][Main Thread:1][Job Scheduler] > Quick Jobs completed in 16441 milliseconds
[18:00:05][Main Thread:1][Job Scheduler] > All Jobs completed in 16441 milliseconds
[18:00:05][Main Thread:1][Main] > Job Scheduler yielded to Main
[18:00:05][Main Thread:1][Main] > final yield to Main
[18:00:05][Main Thread:1][Main] > Main ==> Completed in 16591 milliseconds

现在,将作业更改为运行 YieldingProcessor

var quickjob = new JobRunner("Quick Job", 3, JobRunner.JobType.YieldingProcessor);
var veryslowjob = new JobRunner("Very Slow Job", 7, JobRunner.JobType.YieldingProcessor);
var slowjob = new JobRunner("Slow Job", 5, JobRunner.JobType.YieldingProcessor);
var veryquickjob = new JobRunner("Very Quick Job", 2, JobRunner.JobType.YieldingProcessor);

结果截然不同。所需时间取决于计算机上的处理器核心数和线程数。你可以看到所有作业都快速启动,并在 11 秒内完成,其中最慢的作业耗时 9 秒。关键区别在于,处理器长时间运行的作业会定期产生。这使得调度器有机会将工作分配给其他线程。

产生处理器代码

[17:50:12][Main Thread:1][Main] >  running on Application Thread
[17:50:12][Main Thread:1][LongRunningTasks] > Quick Job started
[17:50:12][Main Thread:1][LongRunningTasks] > Very Slow Job started
[17:50:12][Main Thread:1][LongRunningTasks] > Slow Job started
[17:50:12][Main Thread:1][LongRunningTasks] > Very Quick Job started
[17:50:12][Main Thread:1][Job Scheduler] > All Jobs Scheduled
[17:50:12][Main Thread:1][Main] > Job Scheduler yielded to Main
[17:50:16][LongRunningTasks Thread:7][LongRunningTasks] > 
          Very Quick Job completed in 4131 millisecs
[17:50:18][LongRunningTasks Thread:7][LongRunningTasks] > 
          Quick Job completed in 6063 millisecs
[17:50:18][LongRunningTasks Thread:7][Job Scheduler] > 
          Quick Jobs completed in 6158 milliseconds
[17:50:21][LongRunningTasks Thread:6][LongRunningTasks] > 
          Slow Job completed in 9240 millisecs
[17:50:23][LongRunningTasks Thread:9][LongRunningTasks] > 
          Very Slow Job completed in 11313 millisecs
[17:50:23][LongRunningTasks Thread:9][Job Scheduler] > 
          All Jobs completed in 11411 milliseconds
[17:50:23][LongRunningTasks Thread:9][Main] > final yield to Main
[17:50:23][LongRunningTasks Thread:9][Main] > Main ==> Completed in 11534 milliseconds

结论和总结

希望有所帮助/提供信息?在我探索异步的旅程中,我学到的一些关键点,并且在此得到了演示:

  1. 全程异步 await。不要混用同步和异步方法。从底层开始 - 数据或过程接口 - 并沿着数据和业务/逻辑层一路编码到 UI,全程使用异步。
  2. 如果你不产生,你就无法异步运行。你必须给任务调度器机会!将一些同步例程包装在 Task 中只是纸上谈兵,而不是实际行动。
  3. “即发即弃” void 返回方法需要产生才能将控制权交还给调用者。它们在行为上与返回 Task 的方法没有区别。它们只是不返回可以 await 或监视进度的 Task。
  4. 如果你正在编写处理器密集型活动 - 建模、大型数字计算……确保使其异步并在适当的地方产生。考虑将其切换到任务池(同时考虑到下面的警告)。测试不同的场景,没有一成不变的规则。
  5. 仅在 UI 中使用 Task.Run,放在调用堆栈的最顶层。切勿在库中使用。除非有充分的理由,否则不要使用它。
  6. 使用日志记录和断点来监视 awaits,以便了解你何时命中它们。你的代码回退到外部 await 的速度是响应能力的绝佳指标。取出你的外部 await,看看你下降到底部有多快!
  7. 你可能注意到没有 ContinueWith。我很少使用它。通常,一个简单的 await 后跟 continuation 代码可以实现相同的结果。我读到过关于它处理开销更大的评论,因为它会创建一个新的任务,而 await/continuation 会重用同一个 Task。我还没有深入研究代码来核实这一点。
  8. 始终使用 AsyncAwait,不要耍花招。
  9. 如果你的库同时提供异步和同步调用,请将它们分开编码。“一次编写,到处运行”的最佳实践不适用于此。切勿从一个调用另一个,否则你将在某个时候自食其果!

历史

  • 2021 年 1 月 27 日:初始版本
© . All rights reserved.