65.9K
CodeProject 正在变化。 阅读更多。
Home

通用管道

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.85/5 (15投票s)

2013年12月28日

CPOL

5分钟阅读

viewsIcon

45447

downloadIcon

949

计算管道的实现,附带设计说明和代码示例

介绍  

本文 介绍了在 C# 中实现线程管道。该管道可以帮助优化存在“瓶颈”的进程。我第一次使用它来加速一个三步过程:第一步(“瓶颈”)- 从网页获取数据,第二步 – 解析 HTML 代码 查找关键字,第三步 – 进行一些分析。最初在我的项目中,我使用了 3 个 Task 对象(每步一个)并配合缓冲区和同步,然后创建了一个可重用的库。

基础

Wiki 中可以找到对管道概念的一个非常简单的解释

管道是一系列串联的数据处理单元(阶段),其中一个单元的输出是下一个单元的输入。管道的单元通常以并行或分时的方式执行;在这种情况下,通常会在单元之间插入一些数量的 存储 缓冲区

管道的主要优点是通过在处理数据流时并行执行多个操作来提高系统的 吞吐量

如果一个管道包含 k 个阶段(k>1),它们处理 1 个项目需要 t1, t2, … tk 个时钟周期,那么令 T = MAX(t1, t2, … tk)。如果我们有 N 个输入值,管道将需要 T*(N-1) + t1 + t2 +...+ tk 个时钟周期来为所有输入值生成结果。

类图和设计

乍一看,主类 Pipeline 与 .NET 类 Tuple 有些相似之处,因为它有不同数量类型参数的变体。

N 阶段的 Pipeline 有 N+1 个类型参数和 N 个私有字段,每个阶段对应一个(PipelineStageBase _stage1, _stage2 … _stageN),这些字段可以在构造函数中创建,也可以通过 MountStageX() 方法设置。

管道是工作的组织者:它调用阶段初始化,启动它们的执行并为客户端提供反馈。管道阶段(PipelineStageBase<TIn,TOut> 类)则负责其余工作。让我们仔细看看它们。

布尔属性 CanBeStarted, IsRunning, IsCompleted, IsCancelled, HasResults 描述了阶段的当前状态。该阶段从 PreviousStage 接收一个 Tin 项目,然后为下一个阶段生成一个 TOut 项目,并将其临时结果放入缓冲区(Results 属性)。管道的最后一个阶段(IsLastStage = true)保存最终结果。

当管道启动时,它会为每个阶段调用 Initialize() 和 Run() 方法。记住:管道并行运行阶段,并为每个阶段创建一个 Task!阶段初始化基本上意味着 IsCompletedIsCancelled IsLastStage 属性被设置为 false。Run 方法的不同重载接受第一个阶段的 IEnumerable<TIn> 参数,或者其他阶段的 IProducer<TIn> 参数,以及可选的 CancellationToken 参数以支持取消。

阶段操作在 Run() 方法中执行,只要前一个阶段有结果或有输入值。当生成结果时,阶段会报告成功(StageCompleted 方法)或异常(StageFailed 方法)。如果管道被客户端停止,正在工作的阶段会报告取消(StageCancelled 方法)。

阶段使用 ItemProgress 对象作为容器来传递结果。ItemProgress.CurrentValue 是最新生成的结果,ItemProgress.Results 属性包含所有先前阶段的结果。管道的最后一个阶段将此对象报告给管道。管道属性 State 帮助客户端监控进程。

Pipeline 本身继承了 PipelineStageBase ,它是创建长主管道的关键,可以将其他管道对象安装在阶段的位置。Pipeline 类重写了 IsRunning, IsCompleted, IsCancelled, Results 属性,从最后一个阶段获取它们。它还重写了 Initialize() 和 Run() 方法以正确启动阶段。

使用代码

使用公式很简单

  • 挂载所有管道阶段
  • 启动管道
  • [可选 – 偶尔进行监控]
  • 监听 Completed 事件

private void CreatePipeline()
{
    _demoPipeline=new Pipeline<string,int,double,bool>();
    _demoPipeline.MountStage1(IndexOfA);
    _demoPipeline.MountStage2(Sqrt);
    _demoPipeline.MountStage3(IsCorrect);
    _demoPipeline.Completed+=DemoPipelineOnCompleted;
}

private int IndexOfA(string str)
{
    Thread.Sleep(Interval1);
    return Math.Max(str.IndexOf('a'),0);
}
 
private double Sqrt(int i)
{
    if(i>28)
        throw new IndexOutOfRangeException("fail test");
    Thread.Sleep(Interval2);
    return Math.Sqrt(i);
}
 
private bool IsCorrect(double d)
{
    Thread.Sleep(Interval3);
    return d>3;
}

private int Interval1 {get{return (int)nudDuration1.Value*1000;}}
private int Interval2 {get{return (int)nudDuration2.Value*1000;}}
private int Interval3 {get{return (int)nudDuration3.Value*1000;}}

private void StartPipelineClick(object sender, EventArgs e)
{
    var items=new List<String>();
    for(inti=0;i<ItemsCount;i++)
    items.Add(Guid.NewGuid().ToString());
 
    timerState.Start();
    _cts=newCancellationTokenSource();
    _demoPipeline.Start(items,_cts.Token);
}

private void DemoPipelineOnCompleted(object sender,CancelEventArgs e)
{
    timerState.Stop();
    DisplayPipelineState();
}

private void DisplayPipelineState()
{
    int t;
    for(int i=0;i<_demoPipeline.State.Results.Count;i++)
    {
        var status=_demoPipeline.State.Results[i];
        for(t=0;t<status.Results.Count;t++)
        {
            // display results
        }
        if(status.Error!=null)
        {
            // display error
        }
        else if(status.Cancelled)
        {
            // display cancellation
        }
    }
}  

演示

为管道设置项目计数,为每个阶段设置超时时间,然后单击“开始”按钮。

DemoSettings 可以监控可用结果

时间测量

在演示中,使用了带有内部 2 阶段管道的 2 阶段管道。我进行了测试,将 ItemCount 设置为 1000,并将所有 Intervals 设置为 100 毫秒。估计时间为 100 秒,而在 4 核处理器上,总时间为 101.4 秒 – 延迟小于 1.5 秒(1.5%)。

新的勇敢阶段

文章的这一部分出现的原因是: 

1. One-T 及其设计建议  

2. Paulo Zemek 及其激励性的问题和阶段缓冲区思想

改进的实现只需要 1 个泛型接口(IStage<TIn>)和 1 个泛型类(Stage<TIn, TOut> : IStage<TIn>)。与 Pipeline 的主要区别在于阶段之间的连接。阶段保留对管道下一个阶段的引用(IStage<TOut> Next 属性),并且没有指向前一个阶段的链接。

当 Pipeline 的第一个阶段启动时,它会通过调用 Init() 方法来初始化第二个阶段。第二个阶段初始化第三个阶段,依此类推。初始化意味着每个阶段将 IsRunning 属性设置为 true,创建一个输入值缓冲区(Queue<TIn>),禁用对其状态(函数、缓冲区容量和下一个阶段的链接)的修改,并启动一个新的 Task 以并行执行操作。

当一个阶段生成一个项目时,它必须使用 Next.Enqueue() 方法将该项目发送到下一个阶段。但是,只有当实际缓冲区容量小于 BufferCapacity 属性时,该项目才会被接受。每个阶段都必须等待生成的项目被入队。管道的最后一个阶段将其结果放入 StageContext 对象中,供客户端访问。

当一个阶段处理完所有输入值后,它会使用 Next.SendTerminationSignal() 方法允许下一个阶段在处理完缓冲区中的所有项目后停止。

代码使用

public override void Start(IEnumerable<string> args)
{
      _first = new Stage(IndexOfA);
      _first.Add(Sqrt).Add(IsCorrect);

      var _ctx = new StageContext();
      _ctx.Completed += (sender, e) => OnCompleted(e);
      _cts = new CancellationTokenSource();
      _ctx.Token = _cts.Token;            
      _ctx.Results.Clear();
      _first.Start(args, _ctx);
}

代码更少,性能相同 

历史 

原文(管道实现):2013 年 12 月 27 日

更新(阶段实现):2014 年 1 月 21 日

© . All rights reserved.