65.9K
CodeProject 正在变化。 阅读更多。
Home

异步流入门

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2021年12月10日

CPOL

9分钟阅读

viewsIcon

8468

downloadIcon

144

异步流如何提高应用程序效率并缩短响应时间

异步流

异步流在 C# 8 中被引入。它们在枚举过程中提供了异步等待的能力,以便数据流中的每个单独项都能变为可用。这在使用来自 WebSocket 的零散数据流时特别有利。异步流的返回类型是 IAsyncEnumerable<T>。该可枚举类型有一个 async 枚举器,它会异步等待下一个项的可用。下面是一个 async 可枚举的示例。

      public async IAsyncEnumerable<string> GetLines(int count,
                      [EnumeratorCancellation]CancellationToken token = default)
        {
            bool isContinue = true;
            int itemCount = 0;
            while (isContinue)
            {
                string jsonResponse = await GetWebpageAsync(token);
                string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse);
                foreach (var item in items)
                {
                    yield return item;
                    itemCount++;
                    if (itemCount == count)
                    {
                        isContinue = false;
                        break;
                    }
                }
            }
        }

该可枚举使用 ValueTasks 来减少内存分配并提高效率。ValueTask 是一个结构体,可以存储在堆上。它也可以被重用。在示例中,异步 await 使用一个引用 Task<string> 实例的 ValueTask。但是,同步的 foreach 循环使用了不需要包含 Task 引用的 ValueTasks,因为数据项已经可用。这里有一个关于 ValueTasks 的精彩帖子 链接。处理 ValueTasks 时需要注意一些事项,因为它们不是 Tasks 的直接替代品。最安全的选择是只 await 一次并提取其负载,然后就此打住。该方法的使用方式如下。

  await foreach (string line in GetLines(42))
            {
                Console.WriteLine(line);
            }

取消

可以通过直接将 CancellationToken 传递给方法来实现取消。但也可以在不引用父方法的情况下取消枚举。

IAsyncEnumerable<string> lines= GetLines (42);
await foreach(var line in lines.WithCancellation(cancellationToken))
{
..
}

为了启用此功能,必须使用 [EnumeratorCancellation] 属性来修饰 GetLines 方法。默认情况下不处理取消。该方法需要监视令牌的状态,并在需要时采取行动。一种简单的处理方法是调用 cancellationToken.ThrowIfCancellationRequested();

使用 Linq

IAsyncEnumerables 的 Linq 扩展方法可在 System.Linq.Async NuGet 包中使用。所有熟悉的扩展方法都已提供。返回可枚举的查询返回 IAsyncEnumerable,返回单个值的返回 ValueTask<T> 并带有 Async 后缀,例如 AnyAsyncCountAsync。可以将异步 lambda 表达式传递给某些方法,以便它们可以被 await,这些扩展方法在后面加上 Await,例如 WhereAwait。所有这些方法都已添加到 System.Linq 命名空间中。

IAsyncEnumerable<string> lines= GetLines (42);

IAsyncEnumerable<string> query = lines.Where(l => l.Substring(1, 1) == "1");
var verySlowQuery = lines.WhereAwait(async l=> { await Task.Delay(500); return true; }) ;
int count =await lines.CountAsync();//CountAsync returns ValueTask<int>

消费异步流

使用异步流的主要原因之一是防止用户界面 (UI) 消息循环被阻塞。如果循环被阻塞,控件将不会更新,界面将变得无响应。因此,异步处理可枚举生成的每个项是有意义的。在接下来的示例中,项是一行文本,需要进行处理才能显示在 UI 中。这有两个部分:第一部分是处理该行,第二部分是显示结果。第一部分可以在自己的线程上运行,但显示必须在 UI 线程上更新。通过调用以下无意义的方法来模拟处理工作。

        private string WorkersFunction (string line)
        {
            string vowels = "aeiou";
            Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
            //remove all the vowels to show something has been done
            return string.Concat(line.Where(c => !vowels.Contains(c)));
        }

一个使用工作线程的示例

这里的想法是使用多个工作线程来处理这些行。通过调用 Task.Factory.StartNew 来运行 WorkersFunction 来启动工作线程。StartNew 附加了一个 ContinuationTask,该任务将在 WorkersFunction 完成后更新 UI。由于会有多个工作线程同时活动,因此需要一个独占的任务调度程序,该调度程序一次只允许一个任务写入 UI。这是通过使用 ConcurrentExclusiveSchedulerPairExclusiveScheduler 成员来实现的。

        private readonly TaskScheduler uiTaskScheduler;
        public AsyncStreamTest()
        {
           //call the constructor from the UI thread
            var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(
            TaskScheduler.FromCurrentSynchronizationContext());
            uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
        }

另一个要求是控制活动工作线程的数量,以减轻线程池、内存和处理器的压力。这是通过使用 SemaphoreSlim 来实现的。SemaphoreSlim 记录了成功调用 Enter 的次数减去调用 Leave 的次数。如果差值等于信号量构造函数中设置的数字,则在收到 Leave 调用之前,不会接受进一步的 Enter 调用。通过调用 SemahoreSlim.Release() 来进行 Leave 调用。只有当从 SemaphoreSlim.WaitAsync() 返回的任务完成后,Enter 调用才会成功。确保在工作线程完成时始终调用 Release 是很重要的,最好的方法是在 finally 块中调用 SemaphoreSlim.Release()

public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
    var lines = GetLines(Constants.LinesRequired);
    var semaphoreSlim = new SemaphoreSlim(workersCount);
    List<Task> tasks = new();
    try
    {
      await foreach (var line in lines.WithCancellation(token))
      {
       await semaphoreSlim.WaitAsync(token);
       var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
       token,
       TaskCreationOptions.None,
       TaskScheduler.Default).ContinueWith(
       (t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
       token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
       tasks.Add(task);
      }
    }
    catch (OperationCanceledException)
    { }
    await Task.WhenAll(tasks);
}

这里需要的功能是父任务仅在 continuation task 完成后才完成,这通过设置 ContinuationOptions.AttachedToParent 请求标志来实现。如果不设置该标志,continuation task 将被忽略,并且将无法知道它是否完成以及以何种状态完成。使用 Task.Factory.StartNew() 而不是 Task.Run() 的原因是 Task.RunDenyChildAttach 标志设置为 true,这会导致 ContinuationOptions.AttachedToParent 请求被拒绝。

下面是示例应用程序的屏幕截图,显示了此方法的输出。

测试行附加了它们的索引号,很清楚它们不是按开始顺序完成的。为了确保保持原始顺序,需要调用一个实现某种先进先出队列的方法。

使用 Channel 的示例

System.Threading.Channel.Channel 是让两个不同线程交换数据的有效方法。Stephen Toub 撰写了一篇关于该类的精彩介绍,非常值得阅读。Channel 类本质上是一个托管的先进先出队列。它旨在异步使用,因为所有 API 都是异步的,这使其成为流行的 BlockingCollection<T> 的绝佳替代品。在此示例中,有一个单一的 Channel.Writer,它接受通过重复调用 Task.Run 创建的任务并将其写入缓冲区。还有一个单一的 Channel.Reader 实例,它从缓冲区读取 Tasks,等待它们完成,然后更新 UI 线程。

 public async Task ChannelExample(int workersCount, CancellationToken token = default)
        {
            var lines = GetLines(Constants.LinesRequired);
            var semaphoreSlim = new SemaphoreSlim(workersCount);
            var channel = Channel.CreateBounded<Task<string>>(
            new BoundedChannelOptions(Constants.BufferSize){ SingleWriter = true });
            
            var readerTask = ReadFromSingleChannelAsync(channel, token);
            try
            {
                await foreach (var line in lines.WithCancellation(token))
                {   //Cancelling the  semaphore directly can be problematical
                    await semaphoreSlim.WaitAsync(CancellationToken.None);
                    var workerTask = Task.Run(() =>; WorkersFunction(line, semaphoreSlim));
                    await channel.Writer.WriteAsync(workerTask, token);
                }
            }
            catch (OperationCanceledException) { }
            channel.Writer.Complete();
            await readerTask;
        }

与前一个示例一样,该方法使用 SemaphoreSlim 来限制工作任务的数量。它创建一个 Channel 实例并设置其数据缓冲区的[_size_]。然后启动以下 reader task,但此时不对其进行 await。

private async Task ReadFromChannelAsync(Channel<Task<string>> channel, 
                                        CancellationToken token = default)
 {
     while (await channel.Reader.WaitToReadAsync(token))
     {
      var readTask = await channel.Reader.ReadAsync(token);
      var result = await readTask;
      RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
     }
 }

当有数据可供读取时,channel.Reader.WaitToReadAsync 方法返回 true,当通道被 channel writer 关闭时,该方法返回 false。由于该方法是异步的,因此可以从 UI 线程调用它,并且由于只有一个 reader,因此它可以在不需要同步的情况下更新 UI 线程。当枚举结束时,调用 Channel.Writer.Complete,这反过来会导致 readerTask 完成。

使用 DataFlow 块的示例。

DataFlow 块是 TPL DataFlow 库 中的类,它们链接在一起形成数据处理管道。有许多不同的类型,但在此示例中,只使用了两种块:一种 TransformBlock,其操作方式类似于前面描述的 Channel,以及一种 ActionBlock,它只是将 TransformBlock 的输出写入 UI。需要编写的用户代码非常少,只需设置各种选项并将两个块“连接”在一起即可。该方法已被注释,因为几乎每一行都需要解释,这可能是 DataFlow 库不像应有的那样受欢迎的原因。

 public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
    var lines = GetLines(Constants.LinesRequired);
    //set the TransformerBlock options
    var options = new ExecutionDataflowBlockOptions
    {
     MaxDegreeOfParallelism = workersCount, //number of active worker threads
     SingleProducerConstrained = true,      //this saves having to gate the input
     BoundedCapacity = Constants.BufferCapacity,
     CancellationToken = token
    };
    //The Transform block takes a string as its input. It passes it to the WorkersFunction 
    //and outputs the value returned from that function.
    var transformBlock = new TransformBlock<string, string>(
                        (message) => WorkersFunction(message), options);

    //The ActionBlock takes the output string from the TransformBlock and 
    //raises the ItemsUpdateEvent on the UI thread, passing the output string to the 
    //EventArgs of that event
    var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(
                         new ItemsUpdatedEventArgs(msg)),
                         new ExecutionDataflowBlockOptions { 
                         TaskScheduler = uiTaskScheduler, CancellationToken = token });

    //Setting the DataFlowLinkOption PropagateCompletion flag means that, 
    //if the TransformBlock receives  completion request that request will be passed    
    //on to the ActionBlock
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    //Couple the TransformerBlock to the ActionBlock
    transformBlock.LinkTo(uiUpdaterBlock, linkOptions);

    await foreach (var line in lines.WithCancellation(token))
    {      //Send the line to the TransformerBlock and await for it to be accepted
           _ = await transformBlock.SendAsync(line);
    }
    //Complete the TransformBlock and await for the ActionBlock to complete
    transformBlock.Complete();
    await uiUpdaterBlock.Completion;
}

DataFlow 管道具有结构化且健壮的关闭或取消方式。所有输入都会被拒绝,然后数据缓冲区会按顺序 along 管道刷新。TransformBlock 将数据划分为大小等于工作线程数量的组。当一个分区完成时,它将作为一个数据批次输出。这种技术很有效,但会导致第一个项出现在 UI 之前出现延迟。

三种方法的基准测试结果

以下测试是使用卓越的 BenchmarkDotNet NuGet 包 运行的。它在没有消息循环的控制台应用程序中隔离运行测试。因此,测试方法中更新 UI 的调用是强制性的。在运行测试时,需要捕获默认的同步上下文,而不是当前的上下文。如果捕获了当前上下文,则测试时间将是无限的。显示的平均时间仅用于三个测试之间的比较。它们不表示单次调用方法的耗时。它们是通过运行大约 10 次预热迭代,然后进行另外 100 次计时迭代,并去除异常值后取计时运行的平均值来获得的。测试是以 10,000 行文本、3000 页大小和最多 35 个活动工作线程进行的。

方法 平均 标准差 Gen 0 Gen 1 已分配
DataFlow 4.510 秒 0.0015 秒 1 1000.0000   5MB
WorkerTasks 4.645 秒 0.0326 秒 2 2000.0000 1000.0000 8MB
通道 4.666 秒 0.0251 秒 2 1000.0000   6MB

在上表的结果中,Gen 0Gen 1 列中的数字与垃圾回收活动有关。它们是测试方法每 1000 次调用中的收集次数。DataFowChannel 测试会产生一次轻量级的第 0 代集合。但是 WorkerTasks 测试会导致两次第 0 代集合运行,此外,它还会触发一次更密集的第 1 代集合调用。该方法输出无序,并且占用空间最大。它似乎受到需要维护和查找 10,000 个任务列表以检查它们是否仍然“热”的困扰。使用 DataFlow 块的测试表现良好,并且内存占用相对较小。但是,它在第一个数据项发送到 UI 之前有最长的延迟。Channel 方法性能和扩展性都很好。就效率和快速更新 UI 的能力而言,它可能是最佳选择。

结论

我希望这里提出的观察和示例能帮助理解 C# 语言和 .NET 框架中最近添加的许多异步数据管理模式。

历史

  • 2021年12月10日:初始版本
© . All rights reserved.