并行计算和使用Dataflow块构建数据管道导论





5.00/5 (5投票s)
如何利用现代处理器的多核架构,通过Dataflow块。
引言
Dataflow
库由一系列线程安全的数据缓冲区组成,称为块(blocks),它们被设计成相互链接以形成数据处理管道。每个块在其自己的时间框架和线程空间中处理数据。块之间通过发送和接收数据消息进行通信。在软件建模术语中,这些块被称为 代理(Agents);这种模型具有良好的关注点分离,并且易于扩展。线程之间的交互由数据流块之间的消息交换来管理,因此无需深入研究 async
的复杂性。Dataflow
库可以通过 NuGet 包 System.Threading.Tasks.Dataflow
获得。
它们有效吗?
在深入之前,最好测试一下其中的一个块,看看它是否能通过测试。ActionBlock<T>
是一个线程安全的数据缓冲区,它使用 Action<T> 委托
来消耗数据消息。类型 T
是消息。ActionBlocks
被指定为目标块,因为它们被设计用于接收消息,而不是作为消息的来源。发布到块输入缓冲区的消息作为参数输入到 Action 委托
中。该块的强大之处在于它能够“启动”多个工作线程来并行处理消息。以下是实例化它的一种方法。
var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true });
在这种情况下,消息是一个简单的 int
,它通过让委托的线程休眠一段时间来模拟工作。ExecutionDataflowBlockOptions
被设置为允许三个工作线程并发运行,而 SingleProducerConstrained
标志告知该块只有一个生产者发布到其缓冲区,因此输入不必“受限制”。使用此设置,可以运行测试,看看 ActionBlock
并行 foreach
循环与传统的 foreach
循环相比如何。
private readonly int[] arr = Enumerable.Repeat(20, 20).ToArray();
[Benchmark(Baseline = true)]
public bool ForEach()
{
foreach (var item in arr)
{
Thread.Sleep(item);
}
//keep Benchmark happy - it likes a test method to return something
return true;
}
[Benchmark]
public bool ParallelForEach()
{
var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true });
foreach (var item in arr)
{
block.Post(item);
}
block.Complete(); //close the pipeline
block.Completion.Wait(); //wait for it to complete
return true;
}
结果表明,并行 foreach
循环的运行速度比标准循环快 2.9 倍。考虑到管道是在测试方法本身内部构建和关闭的,这相当令人印象深刻。
更实际的应用
Honey the Codewitch 最近发布了一篇有趣的博文,题为 使用消息传递演示线程池和任务队列,其中并发工作线程被配置为向 Windows Forms 窗口中显示的进度条报告其进度。这是通过直接实例化和操作线程实现的。对于我们这些没有 Honey 精湛线程编组技能的人来说,使用一对 ActionBlocks
就可以实现相同的功能。基本设置与并行 foreach 示例相同,唯一的区别是现在所有操作委托都将其进度发布到另一个共享的 ActionBlock
,该 ActionBlock
被配置为将输出发布到 UI 线程。Message
类型被扩展以接受 WorkerCommands
。
public enum WorkerCommands
{
Stop,
Start,
Report
}
public struct Message
{
public int Value;
public int WorkerId;
public WorkerCommands Command;
}
进度报告块被配置为在其委托上运行用户界面
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
_progressReporter = new ActionBlock<Message>(msg => { reportProgress(msg); },
new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler,
CancellationToken = _token });
工作块的委托捕获进度报告器并向其发布 Messages
。
int workersCount = Math.Max(Environment.ProcessorCount - 2, 1);
...
_workerBlock = new ActionBlock<Message>(msg => { DoWork(msg); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = workersCount,
SingleProducerConstrained = true, CancellationToken = _token });
....
private void DoWork(Message msg)
{
//the same threads are reused while the workerBlock's buffer holds data.
//The threads are released when it is empty
int id = Thread.CurrentThread.ManagedThreadId;
//send start command
_progressReporter.Post(new Message() {
Command = WorkerCommands.Start, Value = 0, WorkerId = id
});
try
{
for (var i = 1; i <= 50; ++i)
{
Thread.Sleep(msg.Value);
//report progress, Value is the progress %
_progressReporter.Post(new Message() {
Command = WorkerCommands.Report, Value = i * 2, WorkerId = id }
);
_token.ThrowIfCancellationRequested();
}
//send the stop command
_progressReporter.Post(new Message() {
Command = WorkerCommands.Stop, Value = 0, WorkerId = id });
}
catch (OperationCanceledException)
{
}
}
下载内容中包含一个使用数据流块更新 ProgressBars
的 WPF 应用程序演示。
一点术语
输出消息的块称为生产者或源块。接收消息的块称为消费者或目标块。既是生产者又是消费者的块称为传播块。存在指定的块,ActionBlock
就是其中之一,但没有“源块”这个说法。该术语是相对于另一个块而言的。因此,传播块可以被引用为特定目标块的源块。更正式地说,源块实现 ISourceBlock<TOutput>
接口,目标块实现 ITargetBlock<TInput>
。传播块同时实现这两个接口以及 IPropagatorBlock<in TInput, out TOutput>
。
管理数据流块
实例化一个块后,它将保持活动状态,持续监视其输入缓冲区并处理到达的消息。当它不再需要时,应该以结构化的方式关闭它,以免在过程中丢失数据。实现这一点的方法是调用块的 Complete
方法。该方法停止块接受消息并刷新数据缓冲区,以便处理所有存储的消息。从块的 Complete
方法返回的 Task
存储在块的 Completion
字段中,因此关闭块的正确方法是
block.Complete(); //close the pipeline
block.Completion.Wait(); //wait for it to complete
链接块
链接块以形成数据处理管道涉及使用源块的 LinkTo
方法将源块的输出耦合到目标块的输入。
sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });
设置 DataflowLinkOptions PropagateCompletion
标志会导致源块在自己的 Complete
方法完成后调用目标块的 Complete
方法。这使得完成信号能够沿着管道传播。因此,要关闭管道,请在第一个块上调用 Complete
,然后等待最后一个块完成。
封装块以方便链接
在前面的示例中,ActionBlock
的委托使用另一个 ActionBlock
通过直接发布到该块来更新 UI。这是一个设计缺陷,workerBlock
与 progressBlock
过于紧密地耦合,实际上,管道正在 workerBlock
内部构建。workerBlock
应该将其数据发布到一个传播块,以便该传播块可以使用其 LinkTo
方法链接到 progressBlock
。由于 workerBlock
和传播块相互依赖,因此最好将这两个块封装起来形成一个单一的实体。这是通过调用 Dataflow.Encapsulate
方法来实现的,将 workerBlock
作为目标,将传播块作为源。重要的是要将一个续接 Task
附加到目标的完成 Task
。续接 Task
被配置为调用源的 Complete
方法,以便从方法返回的封装实体能够正确完成。下面的工厂方法实例化封装的传播器。
public static class MessagePropagatorFactory
{
public static IPropagatorBlock<Message, Message>
CreateMessagePropagator(ExecutionDataflowBlockOptions blockOptions,
CancellationToken token)
{
//source is the output block. A BufferBlock can be both a source and a target
var source = new BufferBlock<Message>(new DataflowBlockOptions
{ CancellationToken = token
});
//target is the input block
var target = new ActionBlock<Message>(msg =>
{
//worker threads are released when the data buffer is empty
int id = Thread.CurrentThread.ManagedThreadId;
//send start command
source.Post(new Message() {
Command = WorkerCommands.Start, Value = 0, WorkerId = id
});
try
{
for (var i = 1; i <= 50; ++i)
{
Thread.Sleep(msg.Value);
//report progress, Value is the progress %
source.Post(new Message() {
Command = WorkerCommands.Report, Value = i * 2, WorkerId = id
});
token.ThrowIfCancellationRequested();
}
//send the stop command
source.Post(new Message() {
Command = WorkerCommands.Stop, Value = 0, WorkerId = id
});
}
catch (OperationCanceledException){}
},
blockOptions);
// When the target completes, it sets its Completion Task to a completed state
// So, when that happens, call the source's Complete method.
target.Completion.ContinueWith((t) => source.Complete());
//the returned entity implements the IPropagatorBlock interface
return DataflowBlock.Encapsulate(target, source);
}
}
工厂方法使用 BufferBlock
作为源块。BufferBlock
是一个传播块,可以将多个块链接到其输出,但第一个接受消息的链接块会将其带走,而不会将其提供给其他消费者。在底层,BufferBlock
与任何链接的消费者之间存在一个基于消息的握手协议。该协议允许消费者接受、拒绝或推迟他们收到的任何消息。这就是为什么应该使用 LinkTo
而不是手动发布和 async
读取消息的原因之一。自己编写数据流块是一项艰巨的任务。通常最好使用 DataflowBlock.Encapsulate
方法或基于现有块的包装类。有了工厂方法,构建管道就变成了这样:
CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
var progressOptions= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3,
SingleProducerConstrained = true,
CancellationToken = token
};
var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator
(progressOptions,token);
var reporterOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
var progressReporter = new ActionBlock<Message>((msg) =>
Console.WriteLine($"{msg.WorkerId} {msg.Command}"),reporterOptions);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
progressPropagator.LinkTo(progressReporter, linkOptions);
广播消息
BroadcastBlock
将相同的数据消息发送给所有链接的目标块。好吧,它不完全是相同的消息,消费者收到的是原始消息的克隆版本。这是为了防止多个线程处理同一消息实例时出现问题。消息克隆方法作为参数传递给 BroadcastBlock
的构造函数。这类块用于将管道分成分支。使用 BroadcastBlock
,可以修改示例中的管道,将消息发送到原始的 progressReporter
以及 stopCommandLogger
,如下所示:
var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator(options,token);
var executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
var progressReporter = new ActionBlock<Message>((msg) =>
Console.WriteLine($"{msg.WorkerId} {msg.Command}"),executionOptions);
var stopCommandLogger = new ActionBlock<Message>((msg) =>
Console.WriteLine($"Logged {msg.Command}"),executionOptions);
//msg=>msg is the cloning function, no sort of conversion is need in this case
var broadcastBlock = new BroadcastBlock<Message>
(msg => msg, new DataflowBlockOptions { CancellationToken = token });
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
progressPropagator.LinkTo(broadcastBlock, linkOptions);
broadcastBlock.LinkTo(progressReporter,linkOptions);
//pass in a Predicate so that the logger gets only messages where the Command is Stop.
broadcastBlock.LinkTo(stopCommandLogger, linkOptions,msg=>msg.Command==WorkerCommands.Stop);
BroadcastBlock
还有另一个重要功能,它总是输出发布给它的最新消息,因此它提供了一个实时数据源。块之前的管道“瓶颈”不会导致块输出过时的消息。这对于像股票市场价格这样的时间敏感数据可能很重要。
并发读取和独占写入
在并行工作的情况下,可能会出现一些线程需要读取共享变量,而其他线程则想写入它。在这种情况下,可以允许读取者与其他读取者并发读取变量,但写入者必须独占更新变量,以免在读取者正在处理它时进行更新。为了实现这一点,Dataflow
库提供了一个 ConcurrentExclusiveSchedulerPair
,其中包含两个调度器。读取者设置为使用 ConcurrentScheduler
,写入者设置为使用 ExclusiveScheduler
。为了使此功能正常工作,需要在并发线程之间进行负载平衡,以便将工作平均分配给它们。配置此的最佳方法是使用 BufferBlock
作为源,并将链接的 ActionBlocks
的委托返回一个 Task
。忙碌的 ActionBlocks
将不会接收到消息。以下是读取者的设置,写入者的设置类似,但它们使用 ExclusiveScheduler
。
var readExWriteScheduler = new ConcurrentExclusiveSchedulerPair();
blockOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
//Set the buffer capacity to 1 or else the block will swallow everything that's offered
BoundedCapacity = 1,
//set this as it avoids having to'gate'the input when there is only 1 active producer
SingleProducerConstrained = true
};
var actionBlock=new ActionBlock<int>(async (i) =>
{
//Use Task.Factory.Start.New() as need to specify the Scheduler.
//Do not await anything inside the Action delegate passed to Start.New
//If you do, the continuation will not run on the correct thread
await Task.Factory.StartNew(() =>
readerWork(i),
token,
TaskCreationOptions.None,
readExWriteScheduler.ConcurrentScheduler);
},
blockOptions);
下载内容中有一个演示,使用了 ConcurrentExclusiveSchedulerPair
,包含三个读取者和一个写入者。数据以批次发送,以说明工作线程是根据缓冲区是否有数据而雇用的——当缓冲区为空时,它们会被释放。显示屏显示了 BufferBlock
的负载平衡能力。
其他数据流块
本文介绍未涵盖许多类型的数据流块。大多数都侧重于增加数据的熵——即数据包含的信息量。单个项目进入块,然后出来的是容器负载。你甚至可以将一堆便士和一个一堆面包卷作为输入,然后得到一个元组流,每个元组包含一个便士和一个面包卷。Stephen Toub 在 这篇出色的指南中详细介绍了所有这些。
结论
Dataflow
库专注于在“内部”处理数据时使用并行编程。它通过将过程分解为一系列自主块来实现这一点,这些块在其自己的线程空间中运行并管理自己的线程。它们易于使用和简单链接,但需要注意一些事项,因为并发性的增加并不总是等于性能的提高。建议进行一些实验以找到最佳设计。毕竟,如果你的应用程序中的线程数超过了 贝叶斯挂毯,最好确保它们得到了有效部署。
历史
- 2020 年 9 月 11 日:初始版本