65.9K
CodeProject 正在变化。 阅读更多。
Home

并行计算和使用Dataflow块构建数据管道导论

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2020 年 9 月 11 日

CPOL

8分钟阅读

viewsIcon

8528

downloadIcon

164

如何利用现代处理器的多核架构,通过Dataflow块。

引言

Dataflow 库由一系列线程安全的数据缓冲区组成,称为块(blocks),它们被设计成相互链接以形成数据处理管道。每个块在其自己的时间框架和线程空间中处理数据。块之间通过发送和接收数据消息进行通信。在软件建模术语中,这些块被称为 代理(Agents);这种模型具有良好的关注点分离,并且易于扩展。线程之间的交互由数据流块之间的消息交换来管理,因此无需深入研究 async 的复杂性。Dataflow 库可以通过 NuGet 包 System.Threading.Tasks.Dataflow 获得。

它们有效吗?

在深入之前,最好测试一下其中的一个块,看看它是否能通过测试。ActionBlock<T> 是一个线程安全的数据缓冲区,它使用 Action<T> 委托 来消耗数据消息。类型 T 是消息。ActionBlocks 被指定为目标块,因为它们被设计用于接收消息,而不是作为消息的来源。发布到块输入缓冲区的消息作为参数输入到 Action 委托 中。该块的强大之处在于它能够“启动”多个工作线程来并行处理消息。以下是实例化它的一种方法。

var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3,
            SingleProducerConstrained = true });

在这种情况下,消息是一个简单的 int,它通过让委托的线程休眠一段时间来模拟工作。ExecutionDataflowBlockOptions 被设置为允许三个工作线程并发运行,而 SingleProducerConstrained 标志告知该块只有一个生产者发布到其缓冲区,因此输入不必“受限制”。使用此设置,可以运行测试,看看 ActionBlock 并行 foreach 循环与传统的 foreach 循环相比如何。

      private readonly int[] arr = Enumerable.Repeat(20, 20).ToArray();
 
        [Benchmark(Baseline = true)]
        public bool ForEach()
        {
            foreach (var item in arr)
            {
                Thread.Sleep(item);
             }
            //keep Benchmark happy - it likes a test method to return something
            return true;
        }
        [Benchmark]
        public bool ParallelForEach()
        {
            var block = new ActionBlock<int>(msg => { Thread.Sleep(msg); },
                       new ExecutionDataflowBlockOptions { 
                       MaxDegreeOfParallelism = 3, 
                       SingleProducerConstrained = true });
            foreach (var item in arr)
            {
                block.Post(item);
            }
            block.Complete();         //close the pipeline
            block.Completion.Wait();  //wait for it to complete
            return true;
        }

结果表明,并行 foreach 循环的运行速度比标准循环快 2.9 倍。考虑到管道是在测试方法本身内部构建和关闭的,这相当令人印象深刻。

更实际的应用

Honey the Codewitch 最近发布了一篇有趣的博文,题为 使用消息传递演示线程池和任务队列,其中并发工作线程被配置为向 Windows Forms 窗口中显示的进度条报告其进度。这是通过直接实例化和操作线程实现的。对于我们这些没有 Honey 精湛线程编组技能的人来说,使用一对 ActionBlocks 就可以实现相同的功能。基本设置与并行 foreach 示例相同,唯一的区别是现在所有操作委托都将其进度发布到另一个共享的 ActionBlock,该 ActionBlock 被配置为将输出发布到 UI 线程。Message 类型被扩展以接受 WorkerCommands

  public enum WorkerCommands
    {
        Stop,
        Start,
        Report
    }

    public struct Message
    {
        public int Value;
        public int WorkerId;
        public WorkerCommands Command;
    }

进度报告块被配置为在其委托上运行用户界面

var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
 _progressReporter = new ActionBlock<Message>(msg => { reportProgress(msg); },
                     new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler, 
                     CancellationToken = _token }); 

工作块的委托捕获进度报告器并向其发布 Messages

  int workersCount = Math.Max(Environment.ProcessorCount - 2, 1);
...
   _workerBlock = new ActionBlock<Message>(msg => { DoWork(msg); },
                  new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = workersCount,
                  SingleProducerConstrained = true, CancellationToken = _token });
....
private void DoWork(Message msg)
        {
            //the same threads are reused while the workerBlock's buffer holds data.
            //The threads are released when it is  empty
            int id = Thread.CurrentThread.ManagedThreadId;
            //send start command
            _progressReporter.Post(new Message() {
            Command = WorkerCommands.Start, Value = 0, WorkerId = id 
            });
            try
            {
                for (var i = 1; i <= 50; ++i)
                {
                    Thread.Sleep(msg.Value);
                    //report progress, Value is the progress %
                    _progressReporter.Post(new Message() { 
                    Command = WorkerCommands.Report, Value = i * 2, WorkerId = id }
                    );
                    _token.ThrowIfCancellationRequested();
                }
                //send the stop command
                _progressReporter.Post(new Message() { 
                 Command = WorkerCommands.Stop, Value = 0, WorkerId = id });
            }
            catch (OperationCanceledException)
            {

            }
        }

下载内容中包含一个使用数据流块更新 ProgressBars 的 WPF 应用程序演示。

一点术语

输出消息的块称为生产者或源块。接收消息的块称为消费者或目标块。既是生产者又是消费者的块称为传播块。存在指定的块,ActionBlock 就是其中之一,但没有“源块”这个说法。该术语是相对于另一个块而言的。因此,传播块可以被引用为特定目标块的源块。更正式地说,源块实现 ISourceBlock<TOutput> 接口,目标块实现 ITargetBlock<TInput>。传播块同时实现这两个接口以及 IPropagatorBlock<in TInput, out TOutput>

管理数据流块

实例化一个块后,它将保持活动状态,持续监视其输入缓冲区并处理到达的消息。当它不再需要时,应该以结构化的方式关闭它,以免在过程中丢失数据。实现这一点的方法是调用块的 Complete 方法。该方法停止块接受消息并刷新数据缓冲区,以便处理所有存储的消息。从块的 Complete 方法返回的 Task 存储在块的 Completion 字段中,因此关闭块的正确方法是

  block.Complete();         //close the pipeline
  block.Completion.Wait();  //wait for it to complete

链接块

链接块以形成数据处理管道涉及使用源块的 LinkTo 方法将源块的输出耦合到目标块的输入。

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

设置 DataflowLinkOptions PropagateCompletion 标志会导致源块在自己的 Complete 方法完成后调用目标块的 Complete 方法。这使得完成信号能够沿着管道传播。因此,要关闭管道,请在第一个块上调用 Complete,然后等待最后一个块完成。

封装块以方便链接

在前面的示例中,ActionBlock 的委托使用另一个 ActionBlock 通过直接发布到该块来更新 UI。这是一个设计缺陷,workerBlockprogressBlock 过于紧密地耦合,实际上,管道正在 workerBlock 内部构建。workerBlock 应该将其数据发布到一个传播块,以便该传播块可以使用其 LinkTo 方法链接到 progressBlock。由于 workerBlock 和传播块相互依赖,因此最好将这两个块封装起来形成一个单一的实体。这是通过调用 Dataflow.Encapsulate 方法来实现的,将 workerBlock 作为目标,将传播块作为源。重要的是要将一个续接 Task 附加到目标的完成 Task。续接 Task 被配置为调用源的 Complete 方法,以便从方法返回的封装实体能够正确完成。下面的工厂方法实例化封装的传播器。

    public static class MessagePropagatorFactory
    {
      public static IPropagatorBlock<Message, Message> 
      CreateMessagePropagator(ExecutionDataflowBlockOptions blockOptions,
                             CancellationToken token)
      {
          //source is the output block. A BufferBlock can be both a source and a target
          var source = new BufferBlock<Message>(new DataflowBlockOptions 
                                               { CancellationToken = token 
          });
          //target is the input block
          var target = new ActionBlock<Message>(msg =>
          {
              //worker threads are released when the data buffer is  empty
              int id = Thread.CurrentThread.ManagedThreadId;
              //send start command
              source.Post(new Message() { 
              Command = WorkerCommands.Start, Value = 0, WorkerId = id 
              });
              try
              {
                  for (var i = 1; i <= 50; ++i)
                  {
                      Thread.Sleep(msg.Value);
                      //report progress, Value is the progress %
                      source.Post(new Message() { 
                      Command = WorkerCommands.Report, Value = i * 2, WorkerId = id 
                      });
                      token.ThrowIfCancellationRequested();
                  }
                  //send the stop command
                  source.Post(new Message() {
                  Command = WorkerCommands.Stop, Value = 0, WorkerId = id 
                  });
              }
              catch (OperationCanceledException){}
          },
           blockOptions);
          // When the target completes, it sets its Completion Task to a completed state
          // So, when that happens, call the source's Complete method.
          target.Completion.ContinueWith((t) => source.Complete());
          //the returned entity implements the IPropagatorBlock interface
          return DataflowBlock.Encapsulate(target, source);
      }
    }

工厂方法使用 BufferBlock 作为源块。BufferBlock 是一个传播块,可以将多个块链接到其输出,但第一个接受消息的链接块会将其带走,而不会将其提供给其他消费者。在底层,BufferBlock 与任何链接的消费者之间存在一个基于消息的握手协议。该协议允许消费者接受、拒绝或推迟他们收到的任何消息。这就是为什么应该使用 LinkTo 而不是手动发布和 async 读取消息的原因之一。自己编写数据流块是一项艰巨的任务。通常最好使用 DataflowBlock.Encapsulate 方法或基于现有块的包装类。有了工厂方法,构建管道就变成了这样:

    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;  
    var progressOptions= new ExecutionDataflowBlockOptions
     {
      MaxDegreeOfParallelism = 3,
      SingleProducerConstrained = true,
      CancellationToken = token
     };
     
     var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator
                              (progressOptions,token);
     
     var reporterOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
     
     var progressReporter = new ActionBlock<Message>((msg) => 
             Console.WriteLine($"{msg.WorkerId} {msg.Command}"),reporterOptions);
     
     var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
     progressPropagator.LinkTo(progressReporter, linkOptions);

广播消息

BroadcastBlock 将相同的数据消息发送给所有链接的目标块。好吧,它不完全是相同的消息,消费者收到的是原始消息的克隆版本。这是为了防止多个线程处理同一消息实例时出现问题。消息克隆方法作为参数传递给 BroadcastBlock 的构造函数。这类块用于将管道分成分支。使用 BroadcastBlock,可以修改示例中的管道,将消息发送到原始的 progressReporter 以及 stopCommandLogger,如下所示:

     var progressPropagator = MessagePropagatorFactory.CreateMessagePropagator(options,token);
     
     var executionOptions = new ExecutionDataflowBlockOptions { CancellationToken = token };
     var progressReporter = new ActionBlock<Message>((msg) => 
     Console.WriteLine($"{msg.WorkerId} {msg.Command}"),executionOptions);
     
     var stopCommandLogger = new ActionBlock<Message>((msg) => 
     Console.WriteLine($"Logged {msg.Command}"),executionOptions);
     
     //msg=>msg is the cloning function, no sort of conversion is need in this case
     var broadcastBlock = new BroadcastBlock<Message>
     (msg => msg, new DataflowBlockOptions { CancellationToken = token });
     
     var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
     progressPropagator.LinkTo(broadcastBlock, linkOptions);
     broadcastBlock.LinkTo(progressReporter,linkOptions);
     //pass in a Predicate so that the logger gets only messages where the Command is Stop.
     broadcastBlock.LinkTo(stopCommandLogger, linkOptions,msg=>msg.Command==WorkerCommands.Stop);

BroadcastBlock 还有另一个重要功能,它总是输出发布给它的最新消息,因此它提供了一个实时数据源。块之前的管道“瓶颈”不会导致块输出过时的消息。这对于像股票市场价格这样的时间敏感数据可能很重要。

并发读取和独占写入

在并行工作的情况下,可能会出现一些线程需要读取共享变量,而其他线程则想写入它。在这种情况下,可以允许读取者与其他读取者并发读取变量,但写入者必须独占更新变量,以免在读取者正在处理它时进行更新。为了实现这一点,Dataflow 库提供了一个 ConcurrentExclusiveSchedulerPair,其中包含两个调度器。读取者设置为使用 ConcurrentScheduler,写入者设置为使用 ExclusiveScheduler。为了使此功能正常工作,需要在并发线程之间进行负载平衡,以便将工作平均分配给它们。配置此的最佳方法是使用 BufferBlock 作为源,并将链接的 ActionBlocks 的委托返回一个 Task。忙碌的 ActionBlocks 将不会接收到消息。以下是读取者的设置,写入者的设置类似,但它们使用 ExclusiveScheduler

var readExWriteScheduler = new ConcurrentExclusiveSchedulerPair();
blockOptions = new ExecutionDataflowBlockOptions
   {
     CancellationToken = token,
     //Set the buffer capacity to 1 or else the block will swallow everything that's offered
     BoundedCapacity = 1,
     //set this as it avoids having to'gate'the input when there is only 1 active producer
     SingleProducerConstrained = true
   };
   
var actionBlock=new ActionBlock<int>(async (i) =>
   {
    //Use Task.Factory.Start.New() as need to specify the Scheduler.
    //Do not await anything inside the Action delegate passed to Start.New
    //If you do, the continuation will not run on the correct thread
    await Task.Factory.StartNew(() =>
    readerWork(i),
    token,
    TaskCreationOptions.None,
    readExWriteScheduler.ConcurrentScheduler);
   },
   blockOptions);

下载内容中有一个演示,使用了 ConcurrentExclusiveSchedulerPair,包含三个读取者和一个写入者。数据以批次发送,以说明工作线程是根据缓冲区是否有数据而雇用的——当缓冲区为空时,它们会被释放。显示屏显示了 BufferBlock 的负载平衡能力。

其他数据流块

本文介绍未涵盖许多类型的数据流块。大多数都侧重于增加数据的熵——即数据包含的信息量。单个项目进入块,然后出来的是容器负载。你甚至可以将一堆便士和一个一堆面包卷作为输入,然后得到一个元组流,每个元组包含一个便士和一个面包卷。Stephen Toub 在 这篇出色的指南中详细介绍了所有这些。

结论

Dataflow 库专注于在“内部”处理数据时使用并行编程。它通过将过程分解为一系列自主块来实现这一点,这些块在其自己的线程空间中运行并管理自己的线程。它们易于使用和简单链接,但需要注意一些事项,因为并发性的增加并不总是等于性能的提高。建议进行一些实验以找到最佳设计。毕竟,如果你的应用程序中的线程数超过了 贝叶斯挂毯,最好确保它们得到了有效部署。

历史

  • 2020 年 9 月 11 日:初始版本
© . All rights reserved.