65.9K
CodeProject 正在变化。 阅读更多。
Home

生产、适配、消费……并发数据处理

starIconstarIconstarIconstarIconstarIcon

5.00/5 (26投票s)

2018年7月4日

MIT

32分钟阅读

viewsIcon

26362

一种独立于语言/通用的提案,用于扩展原始的生产者-消费者模型以执行并发数据处理。

引言

我们还有很长的路要走,因此,为了快速铺垫,我们考虑以下我们经常遇到的伪代码:

   List<T> data = CreateData(...) 
   //where T is some known datatype, 
   //     CreateData is some function which returns a collection of instances of T
   
   ProcessList(data)
   //where ProcessList performs the required processing on the generated data

根据上面的例子,我们将“CreateData”声明为我们的数据生产者(或简称为Producer),将“ProcessList”声明为我们的数据消费者(或简称为Consumer)。然后我们考虑对我们目标的如下描述:

我们的目标

给定两段代码,一段生成数据(调用生产者),另一段执行数据处理(调用消费者),并假设每个数据实例都可以独立处理;我们有兴趣设计一个通用的机制,该机制利用数据并行性,实现并发数据处理,同时隐藏相关的线程同步的复杂性,并提供简化的API

现在目标已经公布,我们需要决定方法。为此,当我们查阅文献时,我们从生产者-消费者这一长期存在的概念中汲取灵感(您可以在这个维基百科文章中找到其通用案例的信息)。我们希望采用这个概念,将工作负载(来自生产者)分配给消费者,同时让所有实体(所有生产者/消费者)并发运行。在我们的讨论中,我们将逐步介绍一种可能的实现方法,即让我们的producer(s)独立运行,同时与我们的consumer(s)和谐相处。我们将看到如何创建这种协调(以下简称Pipeline),同时满足以下设计要求:

  • 缓冲区大小
    • 固定大小
    • 无界
  • 链特性
    • 无损(生产的每个项目都会被消耗)与有损(当缓冲区满时丢弃生产的项目)
    • 不间断(一旦启动,管道将一直运行直到最后一个项目被消耗)与可中断(在生命周期内的任何时候都可以销毁管道)
    • 一致(生产者/消费者共享完全相同的datatype)与不一致(生产的项目需要某种类型的数据转换才能匹配可消耗项目的datatype
    • 附加(所有生产者在编译时已知并绑定到链)与分离(生产者,可能是临时的,在运行时出现,可能并行):我们将分别讨论这两种情况的实现。

尽管原始的生产者-消费者问题只考虑了一致的情况(以上特征中的第三(3rd)点),但由于GoF著名的适配器设计模式(请参阅GoF设计模式),我们希望扩展这个想法,同时拓展设计模式的原始哲学,以创建一个面向数据的适配器(以下简称适配器,除非另有说明),以便在给定的不一致的生产者/消费者之间创建这样的管道(在某个条件下,我们识别一个适配器)。这样做的兴趣点在于保持关注点分离,从而实现一个简化的管道,并具有分离的数据转换逻辑。

为什么是".Pipe"?要理解这一点,请暂时想象数据流,并从一个简化的视角看待数据从生产者产生并被消费者吸收。想象一下,有两个人在(P和C)和P将他得到的东西交给C。有了这个,我想到了UNIX。当我们在UNIX终端中要创建这样的动作链(在命令之间传递数据)时;它正好允许我们这样做,这要归功于著名的"|"(又名管道)语法(在此处查看一些示例),并且,所以我才想到了这个名字

为什么?

当我们谈论为什么需要这样的实现时,我们需要考虑几个因素:

  • 除了复杂性顺序(大O表示法)之外,延迟也是生产质量代码中的一个重要因素。
  • 并行计算已成为行业规范,在许多情况下可以帮助降低延迟。
  • 随着技术的进步,新的框架/库/包提供了更好的并发编程工具,例如更好的线程池管理、比线程更轻量级的替代品(例如任务、纤程、协程),以减少由于线程上下文切换造成的浪费周期。

现在,谈到我们的管道,让我们考虑一个任务(也许是琐碎的)。

 

假设我们需要解析一个文件,为了简单起见,假设它是一个CSV文件,其中包含相当多的记录(即行)。进一步假设我们需要将这些记录存储到某个数据库;而无需对数据进行任何额外计算。

在这里,我们观察到两个(2)独立且不相关的子任务:读取文件(生产者)和将数据保存到数据库(消费者)。现在,考虑两种经典的(非并发)实现方法:

  1. 读取整个文件 -> 创建对象列表 -> 将列表推送到数据库(我们称之为Approach1
    • 这种方法看起来不错,但忽略了保存列表的内存需求。
    • 列表越大,延迟越高,因为消费者等待接收列表的时间越长。
    • 总延迟将是:文件处理时间 + 数据库事务时间 => 假设生产者和消费者之间的数据传输时间可忽略不计。
  2. 读取单行 -> 将对象推送到数据库 -> 重复直到文件结束(我们称之为Approach2
    • 改进了内存使用,但执行多次数据库事务,这可能导致延迟飙升 => 通常,批量插入更便宜。
    • 在任何给定时间点,要么生产者在工作,要么消费者在工作。
    • 总延迟将是:n x (单条记录处理时间 + 数据库事务时间) => 其中n是文件中的总记录数,并假设生产者和消费者之间的数据传输时间可忽略不计。

最重要的是,这两种方法都忽略了文件和数据库操作都是I/O操作这一事实,即使只有一个核心处理器,也可以通过线程交错实现并发,这要归功于非阻塞I/O 还可以设计另一种平衡的方法,即不推单条记录到数据库,而是将固定大小(块)的列表推送到数据库。然而,正如我们接下来(下面)描述我们的管道方法时,可以看出,它的性能吸引力较低。

假设我们已经有了.Pipe实现。我们可以设计一个生产者方法(读取文件)和一个消费者方法(执行数据库事务),我们可以简单地这样写代码:producer.Pipe(consumers)(我们称之为Approach3)。

  • 生产者在读取文件的同时创建多个列表(预定义大小)=> 多个较小大小的列表(即块)。块的大小可以调整以获得最佳批量插入,例如。
  • 消费者接收每个列表(块)并将其推送到数据库 => 我们可以跨多个消费者,因为每次推送都是独立的。
  • 我们.Pipe的粘合代码将促进块(列表)从生产者到消费者的通道传输 => 假设此数据传输时间可忽略不计且缓冲区无界。
  • 总延迟将是:文件处理时间 + k x 块数据库事务时间 => 其中k = 1/c x (总块数 - 文件操作期间推送的块数),而c = 总消费者数(假设由于并行推送导致的数据库性能下降可忽略不计)。

通过这种方法,我们得出以下重要观察结果:

  1. 作为生产者和消费者之间并发的好处:我们能够在生产者尚未完成工作(此处为读取文件)的同时消耗数据(此处为推送到数据库)。
  2. 作为消费者之间并发的好处:我们能够减少端到端延迟(此处为1/c的因子,其中c是消费者的数量)。
  3. 因此,从理论上讲,我们可以向我们的管道添加总共n个(其中n ~ 总记录数 / 块大小)消费者,以获得最小延迟 ~ 文件处理时间 + 块数据库事务时间

总的来说,随着记录总数的增加,我们会注意到(最左边最低,最右边最高):(越低越好)

  • 内存(Approach2)< 内存(Approach3)< 内存(Approach1
  • 延迟(Approach3)< 延迟(Approach1)< 延迟(Approach2

因此,也许可以安全地说,我们的并发管道方法是一种平衡的方法。因此,这就是原因。

但是,在我们讨论实现之前,我们需要考虑/做出以下限制/假设:

  • 存在多个消费者是为了实现并发优势。此假设很重要,因为我们的设计与广播参见此处)不同。在我们的方法中,每个生产的项目将被所有可用消费者中的一个且仅一个消费者消耗(接受/处理/处理)。
  • 尽管生产者可以采取不同的方法来创建项目(例如,一个生产者从数据库获取记录,另一个从文件获取,另一个接收Web请求等);但是,它们必须生成相同datatype的项目才能成为管道的一部分。此假设非常重要,因为管道设计必须对不同的生产者通道保持开放,只要生产的项目类型相同。
  • 一旦管道构建完成,就不能向其中添加或删除消费者。
  • 实现必须保持通用,即不应对生产者/消费者的行为做出任何假设。
  • 在固定大小和无界缓冲区的情况下,管道理论上应该支持无限数量的生产者和消费者。
  • 在可中断模式下,管道将在中断后被销毁,从而销毁所有未处理的数据。
  • 在附加模式下,一旦管道构建完成,就不能向其中添加或删除生产者。
  • 在分离模式下,管道在生命周期内不应对生产者的生命周期或数量做出任何假设。它必须能够接受其生命周期内任何生产者(临时或长期运行)的(预定义类型)项目。
  • 在不一致模式下,给定一个适配器,必须能够构建管道。

关于实现

生产者-消费者这一想法实际上是与语言无关的,可以用多种编程语言开发。但是,为了实现我们的目标,我们选择使用C# .NET Framework 4.6.1来实现,同时利用几个TPL功能(尤其是async-await)以及语言固有的扩展方法功能。如果您有兴趣消费此实现,根据您的语言选择,您可能会实现不同的用法形式。

在我们的讨论中,我们提供了大量的注释以及C# .NET代码片段,并添加了一些有趣的图片(展示实体之间的对话)。即使您对.NET语法感到不适,也不用担心,您仍然可以通过阅读本文来理解要点,并能用您选择的语言来实现它。

没有恐龙!

在我上大学的时候,我总是问自己,每次拿起操作系统书时,为什么会有恐龙?(不幸的是,我找不到原来的封面,但这张图片目前应该可以了)我安慰自己说,这本书不像斯皮尔伯格的侏罗纪公园那样可怕。我仍然有时好奇,这是象征操作系统像恐龙一样巨大/迷人/令人敬畏,还是只是为了吓倒大二学生。尽管如此,现在既不是时候,也不是我们讨论的主题,所以,无论情况如何,在本次讨论中,没有恐龙,我们将尽力保持简单。

创建接口(契约)

首先,让我们看一下下面简单的图示,以理解我们的一些设计选择,更重要的是,我们实际要构建什么。

simple-shared-buffer

因此,基于上图,我们想要:

  1. 标准化生产者获取缓冲区和添加项目的方式,隔离进行,即,不知道其他生产者或消费者的存在。
  2. 标准化消费者从缓冲区检索已填充的项目并执行所需处理的方式,隔离进行,即,不知道其他消费者或生产者的存在。
  3. 拥有一个能够处理这些并发操作的缓冲区。

为了设计我们的解决方案,我们希望关注buffer,因为它是我们解决方案的核心;它的实现将受到生产者端和消费者端的需求的影响,而且我们不应该忘记我们需要将所有功能扩散到我们的设计中。因此,为了使讨论复杂化,我们将所有内容都包含在一个单一的、基于筒仓的建议解决方案中,我们将讨论进一步细分为几个小部分,如下所示:

1. 我们的生产者和缓冲区

由于我们的解决方案是生产者无关的,即我们不知道生产者将如何具体地生产一个项目(即实际的生产者实现)。在这种情况下,我们只能定义一个通用的签名,因此我们的生产者可以简单地定义为以下delegate

//NOTE: Some explanation is provided as comments

public delegate Task ProduceAsync<TP>
(IProducerBuffer<TP> buffer, CancellationToken token);

//accepts buffer and cancellation token as inputs and returns a Task
//   where TP is the datatype of item produced by producer
//   and IProducerBuffer is an interface to our Buffer implementation
//we add CancellationToken as an input parameter in order to support 
//       interruptible pipeline feature
//In this way, by simply supplying CancellationToken.None to the pipeline 
//       we can create uninterruptible pipeline.

基于上述委托签名,我们可以为我们的生产者创建以下接口:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IProducer<TP> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data generating method
    Task ProduceAsync(IProducerBuffer<TP> buffer, CancellationToken token);
}

现在,实际的生产者实现可以简单地继承IProducer<TP>接口。尽管我们已经设计了如何为我们的生产者提供缓冲区访问,但我们还不知道如何填充缓冲区。因此,我们在缓冲区方面的第一个要求,即提供一个用于填充的方法。让我们看一下:

//NOTE: Some explanation is provided as comments

public interface IProducerBuffer<T>
{
    //adds an item to the buffer
    //it blocks, if buffer is full, until the item can be added
    void Add(T item, CancellationToken token);

    //adds an item to the buffer with given millisecond timeout
    //if the item was added with in timeout period returns true else false
    bool TryAdd(T item, int millisecTimeout, CancellationToken token)

    //we add this second method to support our lossy pipeline feature
    //     millisecTimeout=0 means immediately add or discard
    //based on boolean outcome, the actual producer implementation can 
    //  decide the fate of produced yet discarded item

    //we also add CancellationToken to support cancellation based on
    //transient method token (we will see an example when we talk about
    //                        detached pipeline)
}

因此,到目前为止,我们有了生产者及其缓冲区接口,以及添加生产项目到缓冲区的方法。现在,让我们看看消费者端的要求。

2. 我们的消费者和缓冲区

与生产者类似,我们的解决方案也是消费者无关的(即不知道实际的消费者实现),因此,以类似的方式,我们可以定义以下消费者接口:

//IDisposable to avail Dispose method to perform resource clean-up
public interface IConsumer<TC> : IDisposable
{
    //to perform some pre-processing initialization
    Task InitAsync();

    //actual data consuming method
    Task ConsumeAsync(TC item, CancellationToken token);
}

在决定消费者接口时,尤其是ConsumeAsync的签名,我们本可以选择将缓冲区作为方法参数传递,就像我们对生产者所做的那样。然而,这样做,我们发现这种设计:

  1. 使消费者实现负担了样板代码。
  2. 需要精细的实现来循环遍历缓冲的项目。
  3. 为我们的不一致管道功能(稍后讨论)增加了进一步的复杂性。

因此,最终,我们决定将这种复杂性隐藏在API中,并获得一个可调用的消费者。这样,具体的消费者实现将专注于业务逻辑。

虽然在消费者端没有明显的必要性,但我们可以从上面的第(2)点推断出我们需要循环遍历项目以排空缓冲区。因此,我们需要:

  1. 一个弹出项目的方法。
  2. 一个布尔指示器来验证所有项目都已处理。

所以,我们创建ConsumerBuffer接口:

public interface IConsumerBuffer<T>
{
    //true when all items are drained (producers are done producing too!)
    bool Finished { get; }

    //to retrieve an item
    //returns true when item was available within given millisecond timeout
    bool TryGet(int millisecTimeout, CancellationToken token, out T data);
    //we add millisecond timeout to support a special case of 
    //our discordant pipeline feature anyway we can always 
    //pass millisecTimeout=Timeout.Infinite, i.e., wait infinitely
}

3. 保留两者

直到这一点,我们一直在努力满足所有要求,以下项目列表快速涵盖了这些要点:

  • 缓冲区大小:我们将通过Ctor参数控制。
  • 无损:通过TryAdd方法的millisecondTimeout参数控制。(注意Add方法类似于TryAdd(item, Timeout.Infinite)
  • 可中断性:通过CancellationToken控制。
  • 可附加性:由最终用户控制(我们将单独查看用例)。

现在,唯一剩下的就是一致性。事实上,我们上面定义的接口方式,我们故意将TP设为生产者类型参数,将TC设为消费者类型参数。虽然在泛型中,这种不同的符号(类型占位符)几乎无关紧要,但目的是强加一个想法,即在构建一致管道时,我们将为TPTC注入相同<data-type>,而在构建不一致管道时使用不同的<data-types>。此外,为了快速理解这种冲突,我们提供以下图示:

dicordant-shared-buffer

现在,我们看到:

  • 生产者仅当生产者的datatype <TP>与缓冲区的datatype <T>相同时才能添加项目。
  • 消费者仅当消费者的datatype <TC>与缓冲区的datatype <T>相同时才能消耗项目。
  • 只有在一种特殊情况下,我们称之为一致管道,当所有三个(3)datatype都相同时,即<TP> = <TC> = <T>,我们当前的管道才能工作。

因此,上述设计在不一致管道的情况下将不起作用。考虑到这一点,我们暂时保留*Buffer interface的两个片段。即使从“抽象”的角度来看,向只关心填充缓冲区的生产者公开TryGet方法也是明智的。

4. 插入适配器

为了满足我们的最后一个要求,我们需要回顾我们上面显示的图2;因为具有不同datatype将导致冲突。但是,在我们讨论如何使用适配器克服这一限制之前,让我们根据下图可视化适配器在逻辑上必须做什么:

ppc-adapter

因此,如果我们把提供的适配器看作一个黑盒子,我们期望通过传递一个type <TI>的对象,它会输出一个type <TO>的对象。根据我们的要求,因此,如果我们传递类型为<TP>的生产项目,并将它们转换为消费者的类型<TC>;我们的管道应该可以工作。

重要提示:为了保持通用性,对于一致情况,当TC = TP = T(因此TI = TO)时,我们准备了一个默认的同一性适配器,它什么都不做,即它将返回与我们作为输入提供的相同项目,而不做任何更改。以下C# .NET代码片段大致代表了这个想法:

public static TI IdentityAdapter<TI>(TI input)
{
    return input;
}

为了插入这样的适配器,我们有以下选择:

  1. 在生产者和缓冲区之间注入适配器:我们将缓冲区设计为<TC>类型(如图4所示)

    after-producer

  2. 在两个缓冲区之间注入适配器:我们引入第二个缓冲区,并在它们之间注入适配器,第一个缓冲区类型为<TP>,第二个缓冲区类型为<TC>(如图5所示)

    two-buffers

  3. 在缓冲区和消费者之间注入适配器:我们将缓冲区设计为<TP>类型(如图6所示)

    after-consumer

在提供的实现选择中,我们选择第三(3rd)个选项,即在缓冲区和消费者之间注入适配器,因为:

  • 通过在生产者和缓冲区之间注入适配器,我们复杂化了生产者实现。
    • 要求生产者进行适配器调用。
    • 增加了实现精细的边缘情况对象转换(我们将看到一个此类转换的示例)的出错风险。
    • ProduceAsync方法签名将负担第三(3rd)个参数(适配器实例)。
  • 通过在两个缓冲区之间注入适配器,我们复杂化了我们的管道实现。
    • 我们需要维护两个(2)缓冲区。
    • 我们需要同步两个(2)缓冲区循环(缓冲区排空)。

通过在缓冲区和消费者之间注入适配器,我们只需要维护一个缓冲区(因此,一个排空循环),并且在正确的时间(在将数据馈送给消费者之前)进行透明的适配器调用。通过这样做,我们隐藏了所有这些复杂的实现细节在我们的.Pipe调用背后,并提供了生产者、消费者和适配器之间完全的关注点分离,以便这三部分代码可以独立演进。

5. 恶性循环的不可知性

到目前为止,我们保持了设计在生产者和消费者方面都是不可知的,然而,为了避免在讨论中引入复杂性,我们对适配器采取了一种朴素的方法。如上所示,我们向适配器提供了一个给定类型的对象实例,并返回一个定义明确的类型的对象实例。但是,随着我们接近完成我们的接口设计,我们希望摆脱这种关于适配器的给予-获取的假设。事实上,我们希望最终确定设计也应该是适配器不可知的!这是我们确保为最终用户提供完全自由以从此类管道中获得期望的最终结果,而无需黑客/修补业务逻辑的唯一方法。最终用户随后可以专注于实际逻辑和相关数据模型,而无需担心生产者、消费者和适配器之间的普通技术连接。

为了实现这种适配器不可知的 ontwerp,我们提出以下接口:

//NOTE: Some explanation is provided as comments 

public interface IDataAdapter<TP, TC>
{
    //accept the buffer and cancellation token and out consumable object
    //   returns true until buffer is NOT empty! else false.
    bool TryGet(IConsumerBuffer<TP> buffer, CancellationToken token, out TC consumable);

    //we notice that we provide buffer containing produced object instances
    //   with IConsumerBuffer interface, thus exposing TryGet method!
    //Actual adapter implementation can then recover produced item (or several items)
    //    to construct an item of type TC
}

现在,我们已经定义了管道的三个关键部分,给定任何任务并假设我们的管道可以实现,我们可以通过将这些子组件考虑进去来实现一个最优解决方案,如下所示:

thinker

以下列表总结了上述想法:

  • 首先,我们致力于最优的生产项目策略。
  • 其次,我们确定一个最优的消费这些生产项目的策略。
  • 如果需要适配器,我们单独编写适配器,否则我们使用IDENTITY适配器。
  • 我们将所有三个部分插入管道。

在疯狂写代码之前!

直到这一点,我们试图使用许多图画来传达我们的想法,但不幸的是,现在我们不得不引入代码,因此,下面您将看到一些长代码片段。但请不要担心,我们将添加一些有趣的图画以伪代码的方式来说明同样的想法;尽管如此,您必须记住下面的金字塔形思维导图,它与我们的具体实现密切相关:

pyramidical-mind-map

实现接口(契约完成)

正如我们所知,我们的解决方案是生产者/消费者/适配器不可知的,因此,它们的具体实现不是我们的关注点;一旦我们暴露了接口,最终用户就可以继承它们在管道中使用。然而,实现一些默认的适配器以涵盖一些普通的使用场景会很好。因此,在本节中,我们将提出以下实现:

  1. 适配器
    • 同一性适配器
    • 可等待列表适配器
  2. 缓冲区
  3. 附加管道
  4. 分离管道

1.a 同一性适配器

identity-adapter-mind-map

为了简单起见,我们选择首先实现同一性适配器,如果我们还记得上面的内容,它应该只按原样返回生产的项目。我们通过以下方式实现这一点:

//NOTE: Some explanation is provided as comments

//generic adapter satisfying TP = TC = T, buffer type:<T>
public class IdentityAdapter<T> : IDataAdapter<T, T>
{
    public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, 
                       out T consumable)
    {
        //we just transfer the call on the buffer and return boolean
        //   status and also the object as it.

        return buffer.TryGet(Timeout.Infinite, token, out consumable);

        //NOTE: we pass INFINITE timeout on buffer, thus:
        //if all buffered items are processed AND producers are done...
        //   buffer will return false. Thus satisfying adapter boolean status.
        //else buffer will return True and out an instance of the produced object
        //   this again fulfils adapter behaviour.
    }
}

1.b 可等待列表适配器

实现前的一句话

有时,我们遇到一种情况,即消费单个项目会导致次优解决方案;批量处理在技术上更具成本效益。以下是一些例子:

  • 数据库批量插入更便宜。
  • 批处理。
  • 对象数组流…等等。

为了处理此类用例,我们决定实现可等待列表适配器,以便最终用户得到解脱并可以直接使用它。其思想是在每次调用适配器的TryGet时恢复List<TC>,如下面的图7所示。

注意:从现在开始,我们将互换使用“块”和“列表”这两个词,即,除非另有说明,否则列表将表示整个数据的一个子集(一部分)。

awaitable-list-adapter

一旦我们想到列表,以下与TryGet方法相关的设计选项就会浮现在脑海中:

  • 我们是否应该总是返回相同大小的列表?
  • 我们是否应该返回可变大小的列表,带有大小限制?
  • 我们是否应该返回列表没有大小限制?

对于第一种(1st)选择,鉴于可能可能生成相同大小的列表(考虑如果我们总共有103个项目,我们将列表大小固定为10,那么最后一个列表将只包含3个项目而不是10个);然而,我们选择实现它,基于以下信念:消费者逻辑对块的大小不敏感(并且它应该如此!),而消费块(而不是单个实例)的整个想法是为了减少相关的技术延迟。

第二种(2nd)选择是第一种选择的通用情况,所以我们将实现它,但有一些假设。当我们描述实现细节时,我们将重点介绍这些假设。

我们选择退出第三种(3rd)选择,因为它再次质疑跨多个消费者是否有用。让我们重新思考一下,如果我们能够提供无界列表给消费者,那么也许我们可以不顾消费者是否有能力处理这样一个列表,而将可用项目提供给单个消费者;那么为什么还要同时跨多个消费者呢?因此,我们观察到我们的设计正在偏离(根据我们预先设定的目标)。

注意:也许由于我们目光短浅,我们放弃了实现第三种(3rd)选择。然而,不要忘记我们的管道是适配器不可知的,因此,最终用户始终可以构建自己的适配器版本并插入它。

有什么大想法;不就是简单的列表适配器吗?

简短的回答是:不!它不是!

如果您一直关注我们,您可能已经认为这个适配器就是创建列表,那么为什么我们称它为“可等待”列表适配器呢?难道它不像跨循环生成列表那样简单吗?如果您有类似的疑虑,那么我们向您保证,它不止于此;最简单的原因是,我们想要迭代的项目可能并非即时可等待。事实上,为了进一步阐述,让我们考虑以下列出的论点:

  • 假设,适配器的TryGet方法被调用时,缓冲区是空的,生产者正在忙于创建对象实例,因此,很快缓冲区里就会有一些项目,但目前,我们需要等待(休眠)。
  • 实际的问题是:
    • 我们的线程应该休眠多久?
    • 万一等待后缓冲区仍然是空的,即生产者尚未完成填充缓冲区?我们应该再次休眠吗?多久?
    • 假设即使我们提出了一个非常聪明的等待算法,当我们在决定休眠后,生产者填充了缓冲区怎么办?(记住所有东西都在并发运行,我们无法控制这些事件的时机!)
    • 我们是否也应该设计线程唤醒机制?
  • 即使我们决定不等待而退出TryGet调用,我们也无法逃脱这个困境。并且,所有上述问题都回落到调用方级别(即,首先调用TryGet的代码)。
  • 另一个想到的问题是,如果用户不想在消耗块之前等待太久,即如果用户希望在未来项目累积之前消耗可用项目怎么办?(也许他的目标是时间敏感的,例如,将日志写入文件,将行推送到数据库,处理批处理项目等)。

有一点是肯定的,如果我们想减少延迟(作为我们目标的一部分),我们需要在我们的线程休眠时,在项目到达缓冲区时有某种通知。类似建议也可以在原始生产者-消费者问题中找到。现在,当然,我们不想在适配器内部构建这样的机制,否则它将失败(想象一下,每次最终用户/我们编写适配器时,都需要编写一个单独的通知机制)。尽管如此,如果我们查阅生产者-消费者的文献,我们已经知道生产者能够在(在向缓冲区添加项目时)提供这样的信号。因此,考虑到这两种观点,目前,我们假设缓冲区能够进行此类通知。

基于上述讨论,我们对缓冲区的行为有了以下见解(我们将在缓冲区TryGet实现中使用它):

  • 如果缓冲区为空,则在给定的超时期间内,如果一个元素被填充,它将尽快(不等待整个睡眠时间)从中唤醒,并out出该元素(布尔返回值true)。
  • 如果缓冲区中有元素,则无论超时值如何,它都应立即out出元素(布尔返回值true)。
  • 缓冲区必须能够捕获production_finished信号,然后,一旦所有缓冲的项目都被消耗,每个后续的TryGet调用都将导致布尔返回值falseoutnull/默认类型)。

目前,我们可以安全地假设,如果我们向buffer.TryGet方法传递INFINITE超时,那么只要项目被添加,缓冲区就会返回一个项目。这解决了一个问题,但我们仍然需要处理固定大小列表和可变大小列表的准备。

约束/假设

在实现可等待列表适配器时,我们牢记以下重要点:

  1. 我们总是可以在缓冲区上以无限超时等待。如果它有元素,它应该立即返回一个,否则它应该尽快返回一个。
  2. 没有最终用户有兴趣消费空列表,即没有项目的列表。因此,我们只需要在列表至少包含一个(1)元素时提供列表。
  3. 最终用户决定列表的大小,因为他了解系统能力和他的需求。
  4. 当最终用户运行管道以获得固定大小的列表时(如图图8所示):
    • 人们意识到,为了填充列表,我们可能需要更长时间的等待,因为一些/每个buffer.TryGet最终都会等待一个项目。因此,人们对准备此列表所需的时间漠不关心
    • 人们更关心获得完整大小的列表,因为它根据他们的管道策略有利。
    • 人们意识到最后一个块可能是部分块(如上所述)。但他的消费者可以处理它(1 <= last_chunk_length <= length_of_full_size_list)。
    • 因此,我们可以说他有无限的超时时间,但偏爱列表的大小。

      fixedsizelist
  5. 当最终用户运行管道以获得固定持续时间的列表(或可变大小)时(如图图9所示):
    • 人们宁愿在给定时间限制内消耗一些东西,而不是等待更长时间才能完全填充列表。因此,他受到时间的限制。
    • 人们意识到每个块的尺寸可能不同,他的消费者可以处理它(1 <= chunk_size <= max_length)。
    • 人们意识到,如果buffer.TryGet最终等待第一个(1st)项目,他可能需要等待更长时间才能获得列表的第一个(1st)项目。
    • 因此,我们可以说他偏爱超时时长(一旦收到第一个项目)和列表的最大大小。

      fixeddurationlist

注意:我们在文章中(下面)单独讨论了这些适配器的一些可能用例。

实现

list-adapter-mind-map

根据我们的约束/假设,我们有两个参数需要处理:

  1. 列表大小和
  2. 超时周期

我们已经知道,如果timeout=Infinite,那么我们输出的是固定大小的列表,否则就是可变大小的列表。让我们看看代码:

//NOTE: Some explanation is provided as comments

//generic adapter satisfying TP = T and TC = List<T>, buffer type:<T>
public class AwaitableListAdapter<T> : IDataAdapter<T, List<T>>
{
	private read only int _millisecTimeout;
	private readonly int _maxListSize;

    //we ask user for timeout value and list size
    //if millisecTimeout = Timeout.Infinite
    //   we will construct fixed size chunks with size = maxListSize
    //otherwise,
    //   we will contruct variable sized chunks with max. size = maxListSize
	public AwaitableListAdapter(int maxListSize, int millisecTimeout)
	{
		_millisecTimeout = millisecTimeout;
		_maxListSize = maxListSize;
	}

	public bool TryGet(IConsumerBuffer<T> buffer, CancellationToken token, 
        out List<T> consumable)
	{
		consumable = default(List<T>);
		
        //NOTE: From our discussion, no user wants to consume 0-length list
        //      so we wait with INFINITE timeout, i.e.
        //      if item is already in buffer, we promptly receive it
        //      else we wait until an item is available... so we are good!

        if (!buffer.TryGet(Timeout.Infinite, token, out var value)) return false;

        //init list WITH first item
		consumable = new List<T>(_maxListSize) {value};

        //we choose what kind of list is required based on timeout.		
        return _millisecTimeout == Timeout.Infinite
			? TryFillFixedSize(buffer, token, consumable)
			: TryFillFixedDurationChunk(buffer, token, consumable);
	}

	private bool TryFillFixedSize(IConsumerBuffer<T> buffer, CancellationToken token,
		List<T> consumable)
	{
        //We loop until we fill the list

		while (consumable.Count < _maxListSize)
		{
            //we always wait for INFINITE time to be sure of having an item
            //     except
            // we are left with no item and production is over!

			if (buffer.TryGet(Timeout.Infinite, token, out var value))
			{
				consumable.Add(value);
			}
            else return true;
		} 
        return true;

        //our list already has at least 1 item, so we return TRUE!
	}

	private bool TryFillFixedDurationChunk(IConsumerBuffer<T> buffer, 
                                           CancellationToken token,
		List<T> consumable)
	{
		var timeRemains = _millisecTimeout;
		var sw = Stopwatch.StartNew();

        //using stopwatch we can measure elapsed time

		while (consumable.Count < _maxListSize)
		{
            //and we loop until 
            //     1. chunk is not full
            //     2. we receive item with-in remaining time

			if (buffer.TryGet(timeRemains, token, out var value))
			{
				consumable.Add(value);

				if (timeRemains != 0)
				{
                    //IMPORTANT:
                    //we put a lower limit to zero coz:
                    //   1. of course, we can't wait with -ve time
                    //   2. but we want to keep looping even if given timeout has over
                    //      and we can still recover items from buffer
                    //      indeed, with timeout=0, we either promptly receive an item
                    //      or buffer returns FALSE.
                    //      this way we can always be able to provide larger chunk
                    //      when possible
                    //      hence the IF does NOT has ELSE with break/return
                    //      but the OUTER IF does has!

					timeRemains = 
                    (int) Math.Max(0, _millisecTimeout - sw.ElapsedMilliseconds);
				}
			}
			else return true;
		}
		return true;

        //our list already has at least 1 item, so we return TRUE!
	}
}

2. 实现缓冲区

到目前为止,我们已经有了缓冲区接口和基于行为的实现要求。使用下面的代码片段,我们实现了这些要求:

//NOTE: Some explanation is provided as comments

//we implement both interfaces
public class PpcBuffer<T> : IProducerBuffer<T>, IConsumerBuffer<T>
{
    private readonly CancellationToken _token;
    private BlockingCollection<T> _collection;

    public PpcBuffer(int bufferSize, CancellationToken token)
    {
        //we say 0 represents unbounded buffer
        _collection = bufferSize.Equals(0) ? new BlockingCollection<T>()
                : new BlockingCollection<T>(bufferSize);
        _token = token;
    }

    //IProducerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public void Add(T item, CancellationToken token)
    {
        //Add should wait even if buffer is FULL, so we
        //simply call TryAdd with INFINITE timeout

        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(T item, int millisecTimeout, CancellationToken token)
    {
        //either blocking collection will add it with in timeout
        //   or return false... so our requirement is satisfied
        //when timeout is INFINITE, this method would either
        //   finish with item being added or in an exception
        //         1. when either of cancellation token is cancelled
        //         2. buffer is closed
        //   so again we satisfy our requirements.

        using (var mergeToken = 
               CancellationTokenSource.CreateLinkedTokenSource(token, _token))
        {
           return _collection.TryAdd(item, millisecTimeout, mergeToken.Token);
        } 
    }

    //IConsumerBuffer<T> IMPLEMENTATION >>>>>>>>>

    public bool TryGet(int millisecTimeout, CancellationToken token, out T data)
    {
        //we do not create merge token, as user should be able to
        //extract queued items once pipeline is closed for addition.
        return _collection.TryTake(out data, millisecTimeout, token);
    }

    //shows together both... closed for adding and empty... so we are good.
    public bool Finished => _collection.IsCompleted;

    //we implement CloseForAdding method to support implementation of 
    //  detached mode >>>>>>>

    public void CloseForAdding()
    {
        _collection.CompleteAdding();
    }
}

通过这样的实现,我们能够涵盖上述所有要求。现在,只剩下将这些单独的工件连接起来。因此,我们分别对附加和分离管道进行了处理:

3. 附加管道

正如我们所讨论的,附加管道模式具有以下特征:

  • 一旦管道构建完成,就不能向其中添加或删除消费者。
  • 一旦管道构建完成,就不能向其中添加或删除生产者。
  • 管道可以两种方式形成:一致和不一致。
原始实现

由于我们的兴趣是创建producers.Pipe(consumers)的形式,我们首先需要设计一个原始实现,因为制造最终形式只是创建一个扩展方法的问题。我们将单独创建这个方法。我们原始实现的实现将围绕以下思想:

  1. 将所有生产者作为异步方法独立运行。
  2. 将所有消费者作为异步方法独立运行。
  3. 将适配器转换的项目馈送到消费者。
  4. 观察生产者完成生产。
  5. 在所有生产者完成后,向缓冲区发出信号。
  6. 生产者完成工作后将其处置。
  7. 让消费者消耗所有剩余的项目。
  8. 处置消费者。

注意:我们使用了一个自制的扩展方法来启动和等待任务(用于生产者/消费者)。

  • 签名:static Task WhenAll(this Func<int, CancellationToken, Task> func, int repeatCount, CancellationToken token = default(CancellationToken))
  • 实现细节:请参见此处

attached-pipeline-mind-map

以下方法实现了上述所有列出的步骤:

//NOTE: Some explanation is provided as comments

//We hide the implementation inside INTERNAL class to
//expose it through extension method
internal static class PipeImpl<TP, TC>
{
    public static Task Execute(CancellationToken token, 
        int bufferSize, 
        IDataAdapter<TP, TC> adapter,
        IReadOnlyList<IProducer<TP>> producers, 
        IReadOnlyList<IConsumer<TC>> consumers)
    {
        //instead of using await, we decided to create a new Task
        //so that caller func can await as per its convenience

        return Task.Run(async () =>
        {
            using (var localCts = new CancellationTokenSource())
            {
                using (var combinedCts = CancellationTokenSource
                    .CreateLinkedTokenSource(token, localCts.Token))
                {
                    //creating buffer as per required size
                    using (var ppcBuffer = new PpcBuffer<TP>(bufferSize, 
                                                      combinedCts.Token))
                    {
                        //span consumers
                        var rc = RunConsumers(consumers, ppcBuffer, adapter, 
                                              combinedCts.Token, localCts);
                        //span producers
                        var rp = RunProducers(producers, ppcBuffer, 
                                              combinedCts.Token, localCts);

                        //wait until all consumers and producers finish
                        await Task.WhenAll(rc, rp).ConfigureAwait(false);
                    }
                }
            }
        });
    }
    
    internal static Task RunConsumers(IReadOnlyList<IConsumer<TC>> consumers,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        //following line span all consumers (RunConsumer method) in the list
        //as separate task

        return new Func<int, CancellationToken, Task>(async (i, t) =>
                await RunConsumer(consumers[i], feed, adapter, t, tokenSrc)
                                .ConfigureAwait(false))
            .WhenAll(consumers.Count, token);
        //our home-made WHENALL line waits on all created tasks
        // (i.e., waits on all consumers to finish)
    }

    private static async Task RunConsumer(IConsumer<TC> parallelConsumer,
        IConsumerBuffer<TP> feed, IDataAdapter<TP, TC> adapter,
        CancellationToken token, CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the consumer once we have nothing left
            //to consume
            using (parallelConsumer)
            {
                //init consumers
                await parallelConsumer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
          
                //we loop until adapter is capable to create a consumable
                //   instance
                while (adapter.TryGet(feed, token, out var consumable))
                {
                    //we feed the item to consumer and wait before
                    // supplying another item.          
                    await parallelConsumer.ConsumeAsync(consumable, token)
                                          .ConfigureAwait(false);
                }
            }
        }
        catch
        {
            //in case producer ends up in error
            // we cancel the token so that producer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }

    private static Task RunProducers(IReadOnlyList<IProducer<TP>> producers,
        PpcBuffer<TP> buffer, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        return Task.Run(async () =>
        {
            try
            {
                //following line spans all consumers (RunProducer method) in the list
                //as separate task

                await new Func<int, CancellationToken, Task>(async (i, t) =>
                        await RunProducer(producers[i], buffer, t, tokenSrc)
                                        .ConfigureAwait(false))
                    .WhenAll(producers.Count, token).ConfigureAwait(false);
                //our home-made WHENALL line waits on all created tasks
                // (i.e., waits on all producer to finish)
            }
            finally
            {
                //>>>>> IMPORTANT: No matter whether producers finish normally
                //                 or end-up in error
                //                 we close the buffer
                buffer.CloseForAdding();
            }
        });
    }

    private static async Task RunProducer(IProducer<TP> parallelProducer,
        IProducerBuffer<TP> feed, CancellationToken token,
        CancellationTokenSource tokenSrc)
    {
        try
        {
            //this would dispose the producer once we have nothing left
            //to produce
            using (parallelProducer)
            {
                //initialize producer
                await parallelProducer.InitAsync().ConfigureAwait(false);
                token.ThrowIfCancellationRequested();
                
                //we provide our buffer to producer
                //it will be producer responsibility to populate it
                //    and return from it once there is nothing left
                //    to produce.
                await parallelProducer.ProduceAsync(feed, token)
                                      .ConfigureAwait(false);
            }
        }
        catch
        {
            // in case producer ends up in error
            // we cancel the token so that consumer can intercept it
            if (!token.IsCancellationRequested) tokenSrc.Cancel();
            throw;
        }
    }
}
实现.Pipe Usage Form(语法糖)

我们拥有了制作扩展方法的所有原料,并提出了以下四种这样的方法来达到我们上面讨论的不同管道:

  1. 一致管道:生产者类型匹配消费者类型(即<TP> = <TC>)。通常,最终用户需要向其注入IDENTITY适配器,但我们可以在方法内部隐式完成,如下所示:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<T>> consumers,
                                    CancellationToken token = 
                                                      default(CancellationToken),
                                    int bufferSize = 256)
    {
        return PipeImpl<T, T>.Execute(token, bufferSize, 
                                      new IdentityAdapter<T>(), 
                                      producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
  2. 固定大小块的不一致管道:如果生产者类型为<T>,则消费者类型为List<T>,并且最终用户寻求固定大小的块。通常,需要向其注入可等待列表适配器,但我们可以在方法内部隐式完成,如下所示:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listSize,
                                    CancellationToken token = 
                                                      default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout is INFINITE, we will get FIXED-size chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>
                                      (listSize, Timeout.Infinite),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM };
    //await producers.Pipe(consumers, some_positive_int_for_chunk_size);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
  3. 固定持续时间块的不一致管道:如果生产者类型为<T>,则消费者类型为List<T>,并且最终用户寻求使用固定持续时间创建的可变大小块。通常,需要向其注入可等待列表适配器,但我们可以在方法内部隐式完成,如下所示:
    //IMPLEMENTATION
    public static Task Pipe<T>(this IReadOnlyList<IProducer<T>> producers,
                                    IReadOnlyList<IConsumer<List<T>>> consumers,
                                    int listMaxSize,
                                    int millisecondTimeout,
                                    CancellationToken token = 
                                                default(CancellationToken),
                                    int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<T, List<T>>.Execute(token, bufferSize, 
                                  new AwaitableListAdapter<T>
                                  (listMaxSize, millisecondTimeout),
                                  producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<T>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<List<T>>[]{ consumer1, ..., consumerM }; 
    //await producers.Pipe(consumers, some_positive_int_for_max_chunk_size,
    //                                some_positive_int_for_timeout);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
  4. 通用管道:生产者类型为<TP>,消费者类型为<TC>,并且IDataAdapter<TP, TC>实现对最终用户可用。
    //IMPLEMENTATION
    public static Task Pipe<TP, TC>(this IReadOnlyList<IProducer<TP>> producers,
                                         IReadOnlyList<IConsumer<TC>> consumers,
                                         IDataAdapter<TP, TC> adapter,
                                         CancellationToken token = 
                                                     default(CancellationToken),
                                         int bufferSize = 256)
    {
        //timeout and MAX list size passed to adapter to avail chunks
        return PipeImpl<TP, TC>.Execute(token, bufferSize, 
                                        adapter,
                                        producers, consumers);
    }
    
    //=========================
    //========= USAGE==========
    //=========================
    //var producers = new IProducer<TP>[]{ producer1, ..., producerN };
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //await producers.Pipe(consumers, adapter);
    
    //producer1, ..., producerN denotes comma separated N producer instances
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    

4. 分离管道

分离管道略有不同,因为在管道构建时(即调用PipeImpl<TP,TC>.Execute时)我们没有可用的生产者实例,而对于附加模式我们有。由于缺少这些生产者,我们没有机制来填充缓冲区。此外,我们在最初的讨论中也提到过,此类管道的生产者可能会零星出现。因此,不幸的是,我们无法实现我们期望的producers.Pipe(consumers)用法形式,但是,我们尝试实现一种类似的简化用法形式,基于以下信息:

  • 实际生产者未知,并可能零星出现以向管道注入项目。
  • 一旦管道构建完成,就不能向其中添加或删除消费者。
  • 管道可以两种方式形成:一致和不一致。
原始实现

由于我们没有代码中的任何单一断点来await,我们需要制造一种方法来维持我们的管道在整个持续时间内运行,以便所有生产的项目(由临时零星出现的生产者或长期运行的生产者)都可以添加到其中。出于所有实际目的,我们将此类持续时间衡量为:“从构建此类管道的时刻开始,直到调用CloseForAdding方法的方法的时刻。”

在做出这些假设并声明意图后,我们继续进行分离管道接口设置:

//we adopt this interface as this nearly mimics
//all operations of RunProducers method of 
//PipeImpl<TP, TC> static class we used for attached mode
//i.e.
//    Add and TryAdd method
//    and Dispose method
//    we have nothing to Init.
public interface IPipeline<T> : IProducerBuffer<T>, IDisposable
{
}

detached-pipeline-mind-map

现在,有了IPipeline,我们将能够模拟我们在PipeImpl<TP, TC> static classRunProducers方法中进行的与生产者相关的所有操作。让我们看一下实现:

//NOTE: Some explanation are provided as comments

internal sealed class PipelineImpl<TP,TC> : IPipeline<TP>
{
    private readonly CancellationTokenSource _mergedCts;
    private readonly PpcBuffer<TP> _feed;
    private readonly Task _consumerTask;
    private CancellationTokenSource _localCts;

    public Pipeline(IReadOnlyList<IConsumer<TC>> consumers, 
                    IDataAdapter<TP, TC> adapter, 
                    CancellationToken token, 
                    int bufferSize)
    {
        _localCts = new CancellationTokenSource();
        _mergedCts = CancellationTokenSource.CreateLinkedTokenSource(token, 
                                                           _localCts.Token);
        _feed = new PpcBuffer<TP>(bufferSize, _mergedCts.Token);

        //in order to span and await on our consumer
        //    we simply call the existing implementation
        //        from PipeImpl class
        _consumerTask = PipeImpl<TP, TC>.RunConsumers(consumers, _feed, 
                                                adapter, token, _localCts);
    }

    public void Add(TP item, CancellationToken token)
    {
        TryAdd(item, Timeout.Infinite, token);
    }

    public bool TryAdd(TP item, int millisecTimeout, CancellationToken token)
    {
        //passing the item to buffer
        return _feed.TryAdd(item, millisecTimeout, token);
    }

    public void Dispose()
    {
        if (_localCts == null) return;
        try
        {
            using (_localCts)
            {
                using (_mergedCts)
                {
                    using (_feed)
                    {
                        //FIRST, we cancel our local token
                        _localCts.Cancel();
                        
                        //SECOND, we close the feed for addition
                        _feed.CloseForAdding();
                        
                        //Then, we wait for remaining items to be
                        //      consumed
                        _consumerTask.Wait();
                    }
                }
            }
        }
        finally
        {
            _localCts = null;
        }
    }
}
实例管理

与附加管道不同,在附加管道中,我们在代码中有一个单一的位置来await整个管道工作流;在分离模式下,我们没有这种便利。因此,最终用户需要维护IPipeline<TP>的实例,并在构建后将其某处保存,并显式调用Dispose。当然,这需要一些注意,但在拒绝使用此实现之前,我们需要思考以下几点:

  • 分离管道
    • 不需要在构建时完全了解所有可能的生产者。
    • 接受来自零星临时生产者和长期运行生产者的项目。
    • 提供消费者和生产者之间的松耦合。
    • 促进并发。
    • 提供线程安全。
  • 分离管道,其性质:
    • 在消费者比生产者寿命长得多的情况下很有用(大多数情况下消费者可能要活到应用程序的生命周期),例如:
      • 基于Web(WCFApiController等)的数据处理。
      • 后台文件处理。
      • 后台数据库操作。
      • 基于事件的处理。
      • 基于计时器的处理。
      • 异步批处理,等等……
    • 需要管理单个实例,该实例也可以方便地维护为:
      • 依赖注入容器中的单例。
      • 静态字段/属性等。
    • 可以根据需要处置:在应用程序关闭时,关闭网络接口后,等等。
实现.Pipeline Usage Form(语法糖)

上面,我们已经创建了.Pipe扩展方法。以类似的方式,我们可以获得以下.Pipeline方法:

  1. 一致管道:生产者类型匹配消费者类型(即<TP> = <TC>)。通常,最终用户需要向其注入IDENTITY适配器,但我们可以在方法内部隐式完成,如下所示:
    //Implementation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                               CancellationToken token = default(CancellationToken), 
                               int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new IdentityAdapter<T>(),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline();
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    
  2. 固定大小块的不一致管道:如果生产者类型为<T>,则消费者类型为List<T>,并且最终用户寻求固定大小的块。通常,需要向其注入可等待列表适配器,但我们可以在方法内部隐式完成,如下所示(重要:由于生产者是零散的,人们可能希望完全避免此适配器,因为如果长时间没有生产者出现,将观察到不必要的消费者端延迟……在分离模式下,固定持续时间块是首选)。
    //Implementation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listSize,
                                   CancellationToken token = 
                                               default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>
                                  (listSize, Timeout.Infinite),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
  3. 固定持续时间块的不一致管道:如果生产者类型为<T>,则消费者类型为List<T>,并且最终用户寻求使用固定持续时间创建的可变大小块。通常,需要向其注入可等待列表适配器,但我们可以在方法内部隐式完成,如下所示:
    //Implementation
    public static IPipeline<T> Pipeline<T>(this IReadOnlyList<IConsumer<T>> consumers,
                                   int listMaxSize,
                                   int millisecondTimeout, 
                                   CancellationToken token = 
                                      default(CancellationToken), 
                                   int bufferSize = 256)
    {
        return new PipelineImpl<T, T>(consumers, 
                                  new AwaitableListAdapter<T>
                                      (listMaxSize, millisecondTimeout),
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<T>[]{ consumer1, ..., consumerM };
    //var save_this_instance_somewhere = consumers.Pipeline(some_positive_list_size,
    //                                                      some_positive_timeout);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    
  4. 通用管道:生产者类型为<TP>,消费者类型为<TC>,并且IDataAdapter<TP, TC>实现对最终用户可用。
    //Implementation
    public static IPipeline<TP> Pipeline<TP, TC>
                                (this IReadOnlyList<IConsumer<TC>> consumers,
                                IDataAdapter<TP, TC> adapter,
                                CancellationToken token = default(CancellationToken), 
                                int bufferSize = 256)
    {
        return new PipelineImpl<TP, TC>(consumers, 
                                  adapter,
                                  token, bufferSize);
    }
    
    //=========================
    //========= USAGE==========
    //========================= 
    //var consumers = new IConsumer<TC>[]{ consumer1, ..., consumerM };
    //IDataAdapter<TP, TC> adapter = ...end-user-adapter-creation-call...
    //var save_this_instance_somewhere = consumers.Pipeline(adapter);
    //consumer1, ..., consumerM denotes comma separated M consumer instances
    
    //===================================
    //=========Add/TryAdd USAGE==========
    //===================================
    //elsewhere (in API methods, Event Handlers etc)
    //saved_instance.Add(item, token) OR saved_instance.TryAdd(item, timeout, token)
    
    //===================================
    //=========Dispose USAGE=============
    //===================================
    //during App Shutdown Or after network close
    //saved_instace.Dispose();
    

评论

功能实现

到目前为止,我们已经实现了最初设定的所有要求。在我们结束这次讨论之前,我们想总结一下我们的功能:

功能实现
功能 实现
缓冲区大小 通过方法参数(bufferSize);0表示无界。
无损 使用Add/TryAdd期间的毫秒TimeoutTimeout.Infinite表示无损。
可中断性 使用CancellationToken
一致性 使用适配器。
可附加性

使用.Pipe扩展方法实现的附加模式。

使用.Pipeline扩展方法实现的分离模式。

我们还注意到:

  • 附加模式可以使用固定大小固定持续时间块适配器。
  • 分离模式应避免固定大小块适配器,以避免意外延迟。

原创作品(C# .NET)和Nuget包

注意:如果您不对此类C# .NET库感兴趣,可以完全忽略此部分。

在我们最初的作品(源代码链接NuGet包链接)中,我们进一步阐述了我们的实现,如下所示:

  • 适配器接口实现为abstract类,以便<TP><TC>的数据转换可以完全基于业务逻辑完成,而无需担心调用buffer.TryGet(请参阅AwaitableAdapter AwaitableListAdapter)。

    abstract-adapter-impl

    因此,继承AwaitableAdapter<TP, TC> abstract类(如果一次消费单个实例)或AwaitableListAdapter<TP, TC> abstract类(如果以块的形式消费数据)比实现IDataAdapter<TP, TC>接口要简单。这种abstract class类的继承将保持面向业务,因为我们仅在Adapt方法中编写数据转换逻辑,而不必担心缓冲区处理,从而进一步减少了样板代码。

  • .Pipe.Pipeline扩展方法也可用于Action(同步委托)和Func(任务返回的异步委托)。因此,当不需要Init/Dispose方法时,避免了继承IProducerIConsumer接口的需要。(请参阅PipeExtsPipelineExts)。
  • 我们当前的实现无法进行方法链式调用,就像我们在UNIX中看到的那样,我们可以链接多个管道,如下面的示例所示:
    ls -l | grep key | less      (3 operations with 2 pipes)
  • 我们当前的实现仅支持void消费者,即消费者不能有返回值。

用法

重要提示:我们建议您使用v1.4.0或更高版本;因为它包含与先前版本的一些重大更改。该库还包含其他一些有趣的扩展方法,我们可能会在CodeProject上未来的文章中介绍。但是,如果您有兴趣使用这些方法,您可以在此处找到信息

  • 如果有人想使用.Pipe实现,而不是一次性考虑整个解决方案,他必须这样组织他的想法:
    • 创建生产者(隔离地)。
      • 要么实现IProducer<TP>接口来填充缓冲区如果需要Init/Dispose方法。
      • 要么构建一个lambda。
        • 同步lambda签名:Action<IProducerBuffer<TP>, CancellationToken>
        • 异步lambda签名:Func<IProducerBuffer<TP>, CancellationToken, Task>
    • 创建消费者(隔离地)。
      • 要么实现IConsumer<TC>接口来填充缓冲区如果需要Init/Dispose方法。
      • 要么构建一个lambda。
        • 同步lambda签名:Action<TC, CancellationToken>
        • 异步lambda签名:Func<TC, CancellationToken, Task>
    • 创建适配器(如果现有适配器不符合要求)。
      • 要么继承自AwaitableAdapter<TP, TC>AwaitableListAdapter<TP, TC>,如果需求符合,并实现抽象方法:abstract TC Adapt(TP produced, CancellationToken token)
      • 或者实现IAdapter<TP, TC>接口。
    • 选择现有的producers.Pipe(consumers)扩展方法之一,并根据需要注入值。
  • 如果有人想使用.Pipeline实现,他必须这样组织他的想法:
    • 创建消费者和适配器,如上所述,用于.Pipe用法。
    • 维护IPipeline<TP>实例,根据需要。
      • 在依赖注入容器中作为单例。
      • 作为静态字段/属性等。
    • 根据需要调用实例的Dispose
      • 在应用程序关闭方法中。
      • 关闭网络连接后。
      • 注销事件处理程序后,等等。
    • 使用IPipeline<TP>实例,根据需要。
      • ApiController中。
      • WCF端点中。
      • EventHandler中。
      • 在批处理方法调用中。
      • Timer回调中……等等。

历史

  • 2018年7月4日:本构想的v1。
© . All rights reserved.