异步流入门





5.00/5 (5投票s)
异步流如何提高应用程序效率并缩短响应时间
异步流
异步流在 C# 8 中被引入。它们在枚举过程中提供了异步等待的能力,以便数据流中的每个单独项都能变为可用。这在使用来自 WebSocket 的零散数据流时特别有利。异步流的返回类型是 IAsyncEnumerable<T>
。该可枚举类型有一个 async
枚举器,它会异步等待下一个项的可用。下面是一个 async
可枚举的示例。
public async IAsyncEnumerable<string> GetLines(int count,
[EnumeratorCancellation]CancellationToken token = default)
{
bool isContinue = true;
int itemCount = 0;
while (isContinue)
{
string jsonResponse = await GetWebpageAsync(token);
string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse);
foreach (var item in items)
{
yield return item;
itemCount++;
if (itemCount == count)
{
isContinue = false;
break;
}
}
}
}
该可枚举使用 ValueTasks
来减少内存分配并提高效率。ValueTask
是一个结构体,可以存储在堆上。它也可以被重用。在示例中,异步 await 使用一个引用 Task<string>
实例的 ValueTask
。但是,同步的 foreach
循环使用了不需要包含 Task
引用的 ValueTasks
,因为数据项已经可用。这里有一个关于 ValueTasks
的精彩帖子 链接。处理 ValueTasks
时需要注意一些事项,因为它们不是 Tasks
的直接替代品。最安全的选择是只 await 一次并提取其负载,然后就此打住。该方法的使用方式如下。
await foreach (string line in GetLines(42))
{
Console.WriteLine(line);
}
取消
可以通过直接将 CancellationToken
传递给方法来实现取消。但也可以在不引用父方法的情况下取消枚举。
IAsyncEnumerable<string> lines= GetLines (42);
await foreach(var line in lines.WithCancellation(cancellationToken))
{
..
}
为了启用此功能,必须使用 [EnumeratorCancellation]
属性来修饰 GetLines
方法。默认情况下不处理取消。该方法需要监视令牌的状态,并在需要时采取行动。一种简单的处理方法是调用 cancellationToken.ThrowIfCancellationRequested();
。
使用 Linq
IAsyncEnumerables
的 Linq 扩展方法可在 System.Linq.Async
NuGet 包中使用。所有熟悉的扩展方法都已提供。返回可枚举的查询返回 IAsyncEnumerable
,返回单个值的返回 ValueTask<T>
并带有 Async
后缀,例如 AnyAsync
和 CountAsync
。可以将异步 lambda 表达式传递给某些方法,以便它们可以被 await,这些扩展方法在后面加上 Await
,例如 WhereAwait
。所有这些方法都已添加到 System.Linq
命名空间中。
IAsyncEnumerable<string> lines= GetLines (42);
IAsyncEnumerable<string> query = lines.Where(l => l.Substring(1, 1) == "1");
var verySlowQuery = lines.WhereAwait(async l=> { await Task.Delay(500); return true; }) ;
int count =await lines.CountAsync();//CountAsync returns ValueTask<int>
消费异步流
使用异步流的主要原因之一是防止用户界面 (UI) 消息循环被阻塞。如果循环被阻塞,控件将不会更新,界面将变得无响应。因此,异步处理可枚举生成的每个项是有意义的。在接下来的示例中,项是一行文本,需要进行处理才能显示在 UI 中。这有两个部分:第一部分是处理该行,第二部分是显示结果。第一部分可以在自己的线程上运行,但显示必须在 UI 线程上更新。通过调用以下无意义的方法来模拟处理工作。
private string WorkersFunction (string line)
{
string vowels = "aeiou";
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
//remove all the vowels to show something has been done
return string.Concat(line.Where(c => !vowels.Contains(c)));
}
一个使用工作线程的示例
这里的想法是使用多个工作线程来处理这些行。通过调用 Task.Factory.StartNew
来运行 WorkersFunction
来启动工作线程。StartNew
附加了一个 ContinuationTask
,该任务将在 WorkersFunction
完成后更新 UI。由于会有多个工作线程同时活动,因此需要一个独占的任务调度程序,该调度程序一次只允许一个任务写入 UI。这是通过使用 ConcurrentExclusiveSchedulerPair
的 ExclusiveScheduler
成员来实现的。
private readonly TaskScheduler uiTaskScheduler;
public AsyncStreamTest()
{
//call the constructor from the UI thread
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(
TaskScheduler.FromCurrentSynchronizationContext());
uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
}
另一个要求是控制活动工作线程的数量,以减轻线程池、内存和处理器的压力。这是通过使用 SemaphoreSlim
来实现的。SemaphoreSlim
记录了成功调用 Enter
的次数减去调用 Leave
的次数。如果差值等于信号量构造函数中设置的数字,则在收到 Leave
调用之前,不会接受进一步的 Enter
调用。通过调用 SemahoreSlim.Release()
来进行 Leave
调用。只有当从 SemaphoreSlim.WaitAsync()
返回的任务完成后,Enter
调用才会成功。确保在工作线程完成时始终调用 Release
是很重要的,最好的方法是在 finally
块中调用 SemaphoreSlim.Release()
。
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
List<Task> tasks = new();
try
{
await foreach (var line in lines.WithCancellation(token))
{
await semaphoreSlim.WaitAsync(token);
var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
token,
TaskCreationOptions.None,
TaskScheduler.Default).ContinueWith(
(t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
tasks.Add(task);
}
}
catch (OperationCanceledException)
{ }
await Task.WhenAll(tasks);
}
这里需要的功能是父任务仅在 continuation task 完成后才完成,这通过设置 ContinuationOptions.AttachedToParent
请求标志来实现。如果不设置该标志,continuation task 将被忽略,并且将无法知道它是否完成以及以何种状态完成。使用 Task.Factory.StartNew()
而不是 Task.Run()
的原因是 Task.Run
的 DenyChildAttach
标志设置为 true,这会导致 ContinuationOptions.AttachedToParent
请求被拒绝。
下面是示例应用程序的屏幕截图,显示了此方法的输出。
测试行附加了它们的索引号,很清楚它们不是按开始顺序完成的。为了确保保持原始顺序,需要调用一个实现某种先进先出队列的方法。
使用 Channel 的示例
System.Threading.Channel.Channel
是让两个不同线程交换数据的有效方法。Stephen Toub 撰写了一篇关于该类的精彩介绍,非常值得阅读。Channel
类本质上是一个托管的先进先出队列。它旨在异步使用,因为所有 API 都是异步的,这使其成为流行的 BlockingCollection<T>
的绝佳替代品。在此示例中,有一个单一的 Channel.Writer
,它接受通过重复调用 Task.Run
创建的任务并将其写入缓冲区。还有一个单一的 Channel.Reader
实例,它从缓冲区读取 Tasks
,等待它们完成,然后更新 UI 线程。
public async Task ChannelExample(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
var channel = Channel.CreateBounded<Task<string>>(
new BoundedChannelOptions(Constants.BufferSize){ SingleWriter = true });
var readerTask = ReadFromSingleChannelAsync(channel, token);
try
{
await foreach (var line in lines.WithCancellation(token))
{ //Cancelling the semaphore directly can be problematical
await semaphoreSlim.WaitAsync(CancellationToken.None);
var workerTask = Task.Run(() =>; WorkersFunction(line, semaphoreSlim));
await channel.Writer.WriteAsync(workerTask, token);
}
}
catch (OperationCanceledException) { }
channel.Writer.Complete();
await readerTask;
}
与前一个示例一样,该方法使用 SemaphoreSlim
来限制工作任务的数量。它创建一个 Channel
实例并设置其数据缓冲区的[_size_]。然后启动以下 reader task,但此时不对其进行 await。
private async Task ReadFromChannelAsync(Channel<Task<string>> channel,
CancellationToken token = default)
{
while (await channel.Reader.WaitToReadAsync(token))
{
var readTask = await channel.Reader.ReadAsync(token);
var result = await readTask;
RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
}
}
当有数据可供读取时,channel.Reader.WaitToReadAsync
方法返回 true
,当通道被 channel writer 关闭时,该方法返回 false
。由于该方法是异步的,因此可以从 UI 线程调用它,并且由于只有一个 reader,因此它可以在不需要同步的情况下更新 UI 线程。当枚举结束时,调用 Channel.Writer.Complete
,这反过来会导致 readerTask
完成。
使用 DataFlow 块的示例。
DataFlow
块是 TPL DataFlow 库 中的类,它们链接在一起形成数据处理管道。有许多不同的类型,但在此示例中,只使用了两种块:一种 TransformBlock
,其操作方式类似于前面描述的 Channel,以及一种 ActionBlock
,它只是将 TransformBlock
的输出写入 UI。需要编写的用户代码非常少,只需设置各种选项并将两个块“连接”在一起即可。该方法已被注释,因为几乎每一行都需要解释,这可能是 DataFlow
库不像应有的那样受欢迎的原因。
public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
//set the TransformerBlock options
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = workersCount, //number of active worker threads
SingleProducerConstrained = true, //this saves having to gate the input
BoundedCapacity = Constants.BufferCapacity,
CancellationToken = token
};
//The Transform block takes a string as its input. It passes it to the WorkersFunction
//and outputs the value returned from that function.
var transformBlock = new TransformBlock<string, string>(
(message) => WorkersFunction(message), options);
//The ActionBlock takes the output string from the TransformBlock and
//raises the ItemsUpdateEvent on the UI thread, passing the output string to the
//EventArgs of that event
var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(
new ItemsUpdatedEventArgs(msg)),
new ExecutionDataflowBlockOptions {
TaskScheduler = uiTaskScheduler, CancellationToken = token });
//Setting the DataFlowLinkOption PropagateCompletion flag means that,
//if the TransformBlock receives completion request that request will be passed
//on to the ActionBlock
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
//Couple the TransformerBlock to the ActionBlock
transformBlock.LinkTo(uiUpdaterBlock, linkOptions);
await foreach (var line in lines.WithCancellation(token))
{ //Send the line to the TransformerBlock and await for it to be accepted
_ = await transformBlock.SendAsync(line);
}
//Complete the TransformBlock and await for the ActionBlock to complete
transformBlock.Complete();
await uiUpdaterBlock.Completion;
}
DataFlow
管道具有结构化且健壮的关闭或取消方式。所有输入都会被拒绝,然后数据缓冲区会按顺序 along 管道刷新。TransformBlock
将数据划分为大小等于工作线程数量的组。当一个分区完成时,它将作为一个数据批次输出。这种技术很有效,但会导致第一个项出现在 UI 之前出现延迟。
三种方法的基准测试结果
以下测试是使用卓越的 BenchmarkDotNet NuGet 包 运行的。它在没有消息循环的控制台应用程序中隔离运行测试。因此,测试方法中更新 UI 的调用是强制性的。在运行测试时,需要捕获默认的同步上下文,而不是当前的上下文。如果捕获了当前上下文,则测试时间将是无限的。显示的平均时间仅用于三个测试之间的比较。它们不表示单次调用方法的耗时。它们是通过运行大约 10 次预热迭代,然后进行另外 100 次计时迭代,并去除异常值后取计时运行的平均值来获得的。测试是以 10,000 行文本、3000 页大小和最多 35 个活动工作线程进行的。
方法 | 平均 | 标准差 | 秩 | Gen 0 | Gen 1 | 已分配 |
DataFlow | 4.510 秒 | 0.0015 秒 | 1 | 1000.0000 | 5MB | |
WorkerTasks | 4.645 秒 | 0.0326 秒 | 2 | 2000.0000 | 1000.0000 | 8MB |
通道 | 4.666 秒 | 0.0251 秒 | 2 | 1000.0000 | 6MB |
在上表的结果中,Gen 0 和 Gen 1 列中的数字与垃圾回收活动有关。它们是测试方法每 1000 次调用中的收集次数。DataFow
和 Channel
测试会产生一次轻量级的第 0 代集合。但是 WorkerTasks
测试会导致两次第 0 代集合运行,此外,它还会触发一次更密集的第 1 代集合调用。该方法输出无序,并且占用空间最大。它似乎受到需要维护和查找 10,000 个任务列表以检查它们是否仍然“热”的困扰。使用 DataFlow
块的测试表现良好,并且内存占用相对较小。但是,它在第一个数据项发送到 UI 之前有最长的延迟。Channel
方法性能和扩展性都很好。就效率和快速更新 UI 的能力而言,它可能是最佳选择。
结论
我希望这里提出的观察和示例能帮助理解 C# 语言和 .NET 框架中最近添加的许多异步数据管理模式。
历史
- 2021年12月10日:初始版本