65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 DataFlowLite 通过管道处理数据

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (3投票s)

2018年4月18日

CPOL

8分钟阅读

viewsIcon

10201

downloadIcon

176

一个自定义框架,用于通过管道框架并行处理数据。

引言

首先,我必须说明的是,我知道有一个名为 Dataflow Framework 的组件,它是 Task Parallel Library 的一部分。在我创建即将展示的框架时,我从中获得了一些灵感。

背景

有一天,我偶然发现了 Dataflow Framework,并决定做一个概念验证。我这样做是为了理解它是什么,如何使用它,以及它的优点和缺点,以便确定我是否可以在我当前的项目中使用它。

Dataflow Framework 的文档中这样描述:

  • "这种数据流模型通过提供进程内的消息传递来促进面向 Actor 的编程,用于粗粒度的数据流和管道任务。"
  • "当你有多个必须异步通信的操作,或者当你想在数据可用时进行处理时,这些数据流组件就很有用。"

这个管道可以被认为是生成器模式的一种增强。通过将单个步骤进行排序,来创建一个可以产生输出或执行整体操作的管道。

例如,我使用自己的框架创建的管道将:

  • 输入几个范围从 1 到 200 的数字列表
  • 将每个数字列表转换为单独的数字
  • 确定数字是偶数/奇数,将其转换为 string,并过滤到相应的输出
  • 所有偶数字符串被批处理在一起,所有奇数字符串也被批处理在一起,形成一个 string 列表。收集到 9 个字符串后,该列表将被传递下去
  • 批量处理的偶数或奇数输出被重新组合成一个单一的管道
  • 处理的最后一步是将文本列表输出为一个整体

虽然这个管道很简单,甚至有些微不足道,但它展示了你可以用这个框架做什么。两个框架都会处理数据移动的整体功能,并处理每个步骤的多线程。这让你能够专注于编写你真正关心的事情。你在每个步骤中做什么完全取决于你自己。

我应该指出,虽然我从原始框架中获得了一些灵感,但它们是完全不同的。在某些情况下,我使用了与原始框架完全相同或相似的类名,而在其他情况下,则完全不同。那么真正的问题是,为什么要在已经存在一个框架的情况下再创建一个呢?

  • 大小 - 有 2 个不同的 Nuget 包可供选择。第二个包是最近才支持的,如日期所示,但它的安装体积很大,并且替换了一些常见的 DLL。
    • Microsoft.Tpl.Dataflow (3.01M) v4.5.24 2014/10/12
    • System.Threading.Tasks.Dataflow (12.4M) v4.8.0 2017/11/08
  • 公司指令 - 有些公司和行业不允许使用第三方代码或 Nuget 包,无论它们是由谁制作的。所以你需要自己实现。
  • 有时简单就是最好。
  • 满意度 - 我喜欢拆解事物,然后重新组装它们。这有助于我理解事物是如何工作的。

使用我的框架代码

构建块

基类

  • ExecutionBlock - 这是一个 abstract 类,其他块都继承自它。
    • Exceptions - 如果生成异常,则用于存储。
    • Execution State - 枚举。
    • Source - 方法可以操作的传入数据存储。
    • Execution Task - 一个 TPL Task,可以出于各种原因进行访问。
    • 代码完成、取消和等待的杂项操作。
    • Parallel Options - 如果需要,用于对代码进行并行化的选项。

每个块都有一个传入数据集合。如果你不提供自定义的集合,我将提供一个。内部集合基于 BlockingCollection,它具有围绕生产者/消费者并发控制的几个特性,以及限制收集数据量后再阻塞的特性,并且不允许再接收更多数据。我提供了一个自定义实现 (AnotherSource),它仍然基于 BlockingCollection,但展示了使其正常工作所需的一切。读者可以根据需要理解并创建自己的版本。

区块

以下预定义的块对于大部分来说,都引用了管道中的下一步以及你提供的方法/函数。此外,还有一段代码定义了如何从源获取数据,将其传递给方法,有条件地输出到管道中的下一步,并处理作业取消或生成的任何异常。下一步将被称为 Target。

  • TransformBlock - 它可以将数据从一种类型转换为另一种类型,或者用于执行诸如数据通过某些验证例程后,将其发送到目标等操作。
    • Function - 单个输入和单个输出 - 你定义两者。
  • TransformToManyBlock - 它与 Transform 块相同,唯一的区别在于它可能输出多个对象。在我的示例中,我有一个 List<int> 的输入,并输出 (n) 个单独的整数。
    • Method - 2 个输入,无输出 - 输入是你数据类型和目标 (ITarget<T>) 的引用,即管道中的下一步。
  • FilterBlock - 可选地转换传入数据并将其发送到目标。在我的示例中,我将偶数发送到一个管道,奇数发送到另一个管道。此块允许你添加任意数量的过滤器。为了保持实现简单,我限制所有目标必须具有相同的参数类型,并有意识地决定将所有数据传递给所有过滤器。这是故意的,因为可以添加另一个过滤器来处理素数,其中一些数字可能既是奇数又是素数。如果你有很多过滤器但只需要过滤到某一个位置,这可能会成为一个性能问题。在这种情况下,你可能需要创建一个 ChainOfResponsibilityBlock 并在 Execute 方法中提供你正在寻找的逻辑。
    • 方法和目标引用的配对 - 代码和目标对每个过滤器都可以不同。
  • BatchInputBlock - 这与 TransformToManyBlock 相反。它会将单个项收集到一个列表中,然后将其传递下去。你可以决定将多少项保存在列表中。这可以用于在批量加载数据到数据库之前收集数据。
     
  • SourceJoinBlock - 如果你想将 2 个或更多管道重新合并到未来的单个管道中,你需要使用此类。它能正确处理将数据从所有源块移动到单个目标。你可以将其视为一个漏斗。这对于防止目标过早关闭其输入流(这将导致异常抛出)非常重要。
    • Source - 将发送数据的管道 (N) 个。
  • TerminatorBlock - 这是管道的末端块。这个块很特殊,因为数据不会传递给任何其他块。

块执行示例

这是我的框架中块所包含的 Execute 方法的示例。虽然这段代码来自 TransformToManyBlock,但其他块同样简单。它们包含:

  • 一个处理传入数据的循环
  • 一种识别何时没有更多传入数据的方法
  • 一个并行语句,用于根据你的需求处理数据
  • 对你自定义代码的调用
  • 异常处理
  • 一个告诉下一个块没有更多数据传入的语句
protected override void Execute()
{
    State = ExecutionState.Running;

    while (State == ExecutionState.Running)
    {
        try
        {
            if (Source.Count == 0 && Source.IsCompleted)
            {
                State = ExecutionState.Done;
                continue;
            }

            Parallel.ForEach(Source.GetConsumingEnumerable(), ParallelOptions, item =>
            {
                if (item == null)
                    return;

                Method(item, Target);
            });
        }
        catch (OperationCanceledException)
        {
            State = ExecutionState.Cancel;
        }
        catch (Exception ex)
        {
            State = ExecutionState.Error;
            Exceptions.Add(ex.GetBaseException());
            ParallelOptions.Cancel();
        }
    }

    Target.CompleteAdding();
}

示例管道

public SimplePipeline()
{
    // Create the steps using the built-in data source 
    _blocks.Add(_step1  = new TransformToManyBlock<List<int>, int>(MethodStep1));
    _blocks.Add(_step2  = new FilterBlock<int, string>());
    _blocks.Add(_step3A = new BatchInputBlock<string>(9));
    _blocks.Add(_step3B = new BatchInputBlock<string>(9));
    _blocks.Add(_step4  = new SourceJoinBlock<List<string>>());
    _blocks.Add(_step5  = new ActionBlock<List<string>>(MethodStep6));

    // Set the number of processors per step and Cancellation Token if desired
    var cancellationSource = new CancellationTokenSource();
    _step1.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(6);
    _step2.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(3);
    _step3A.ParallelOptions.SetCancellationSource(cancellationSource);
    _step3B.ParallelOptions.SetCancellationSource(cancellationSource);
    _step4.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);
    _step5.ParallelOptions.SetCancellationSource(cancellationSource).SetMaxDegreeOfParallelism(1);

    // Link one step to the next
    _step1.LinkTo(_step2);
    _step2.LinkTo(_step3A, FilterMethod2A);
    _step2.LinkTo(_step3B, FilterMethod2B);

    _step3A.LinkTo(_step4);
    _step3B.LinkTo(_step4);

    _step4.LinkTo(_step5);
    _step4.AddSource(_step3A);
    _step4.AddSource(_step3B);
}

所需步骤的自定义代码

static private void MethodStep1(List<int> list, ITarget<int> target)
{
    list?.ForEach(i => target?.TryAdd(i));
}

static private void FilterMethod2A(int value, ITarget<string> target)
{
    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    if (value % 2 != 0)
        return;

    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    target.TryAdd($"EVEN - {value}");
}

static private void FilterMethod2B(int value, ITarget<string> target)
{
    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random1.Next(100)).Wait();

    // Used to test throwing an exception
    if (value == 13131313)
    {
        Console.WriteLine($"Throwing Exception - {value} is unlucky\n");
        throw new System.IO.InvalidDataException($"{value} is unlucky");
    }

    if (value % 2 == 0)
        return;

    // Not necessary - Used to help randomize length of call
    Task.Delay(50 + Random2.Next(100)).Wait();

    target.TryAdd($" ODD - {value}");
}

static private void MethodStep6(List<string> items)
{
    var len = items.Count;
    if (len <= 0)
        return;

    var sb = new StringBuilder(1000);
    sb.AppendLine("==============================");

    for (var i = 0; i < len; i++)
        sb.AppendLine($"{i + 1} - {items[i]}");

    Console.WriteLine(sb.ToString());
}

闭幕词

正如我之前所说的,这是一个创建概念验证的练习,目的是了解数据如何从一个步骤流向下一个步骤。因此,我没有对任何一个框架进行性能测试。如果看看我的框架,会发现它确实没有太多开销。大部分问题会出现在自定义代码的编写方式或不恰当地设置并行度上。

我还包含了一个使用 TPL Dataflow Framework 的示例应用程序,以便你可以比较和对比它们之间的差异。我发现使用 TPL Dataflow Framework 创建管道有时会令人沮丧,因为:

  • 我无法总是让它做到我真正想要的(例如,将一个项目列表转换为单个项目)
  • 在创建方法调用时遇到困难
  • JoinBlock 似乎很奇怪,我必须同时处理两者,而不是分别用 item1item2 调用。

花点时间浏览代码,有问题可以随时提出,但请知悉,我认为这已是终点,不打算添加或增强代码,因为我认为我的目标已经达成。

代码完全开放给你,可以随意修改或增强。

尽情享用!

© . All rights reserved.