65.9K
CodeProject 正在变化。 阅读更多。
Home

管道、河流、Rails 和 Binder 模式 - 第三部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.79/5 (9投票s)

2015 年 1 月 3 日

CPOL

5分钟阅读

viewsIcon

22047

downloadIcon

203

探讨可能的并行生产者-消费者模式。(最终部分)

<< 管道、河流、Rails 和 Binder 模式 - 第二部分

开始

终于,我能够撰写本系列的第三个(也是最后一个)最终部分了。坦率地说,当我开始着手这篇文章时,我只想将 Pipes 设计作为一个技巧/窍门来发布,但是,在我能够完成它的时候,我决定包含其他实现,如 (River & Binder)。但是,我又一次未能将其他模式包含在文章中,因为我一直没有时间完成它。所以,我决定将其分为 3 部分,并添加 Rails。到目前为止,这就是本系列背后我乏味的故事。无论如何,让我们快速回顾一下我们在第一部分和第二部分看到的内容。

  1. Pipes:一种基于“进出”成员的简单设计。还需要 Action<TInput, TOutput> 委托作为最终结果处理器。(参见:管道、河流、Rails 和 Binder 模式 - 第一部分
  2. River:对 Pipes 的改进,消除了对“进出”关系的需求。管道仅由 Action<TInput, TOutput> 委托组成,输入/输出对在整个管道中流动。因此,不需要最终结果处理器。(参见:管道、河流、Rails 和 Binder 模式 - 第二部分
  3. Rails:此模式类似于 River,但需要接口实现。输入数据是一个类,并实现所需的接口,其中接口代表管道逻辑。此实现的好处在于,同一种管道可以处理多种输入类型。(参见:管道、河流、Rails 和 Binder 模式 - 第二部分
  4. BinderRiver 的扩展,解耦了管道成员,并提供了为每个管道成员分配所需工作线程的机制。本部分将全部介绍它。

当我们构建一个管道时,有时我们希望为某个成员函数分配一些线程(出于结构、战术、性能或其他一些奇怪的原因)。因此,我向您提出了 Binder 的设计。如果您一直关注本系列,您会发现此实现的用法相似,即:

  1. 创建管道。
  2. 调用 StartProcessing
  3. 通过 AddValue 向管道添加值。
  4. 调用 StopProcessing

第四种模式:Binder

在实现 River 时,我们已经看到了如何构建一个允许输入和输出一起流动的管道。现在,我们将扩展这个想法,允许独立地为管道成员函数分配工作线程。为了获得这种控制,我决定为每个成员函数创建一个专用的管道实例,并将它们链接在一起;而不会让代码使用者担心管理这些实例。并为用户提供一个简单且熟悉的(与其他 3 种实现相同)使用结构。

让我们先看看 Binder 类的代码

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Piper
{
    public class Binder<TInput, TOutput>
        where TOutput : class, new()
    {
        private BlockingCollection<DataWrapper> _dataCollection = null;
        private ManualResetEvent _completionEvent;
        private CancellationTokenSource _cancelTokenSource;
        private Binder<TInput, TOutput> _nextBind = null;
        private Action<TInput, TOutput> _currentAction;
        private int _maxConcurrency;
        private string _opCode;

//This Ctor will create the seed instance
        public Binder(Action<TInput, TOutput> currFunc,
                      int maxConcurrency = -1,
                      string opCode = "MyBinder") 
              : this(currFunc, maxConcurrency, opCode, new CancellationTokenSource())
        {
        }

//This Ctor is to create the remaining pipes
        private Binder(Action<TInput, TOutput> currFunc,
                      int maxConcurrency = -1,
                      string opCode = "MyBinder",
                      CancellationTokenSource tokenSource = null)
        {
            _currentAction = currFunc;
            _maxConcurrency = maxConcurrency;
            _completionEvent = new ManualResetEvent(false);
            _dataCollection = new BlockingCollection<DataWrapper>();
            _opCode = opCode;
            _cancelTokenSource = tokenSource;
        }

        public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
                                            int maxConcurrency = -1,
                                            string opCode = "MyBinder1")
        {
            if (_nextBind == null)
            {
                _nextBind = 
                   new Binder<TInput, TOutput>
                            (nextFunc, maxConcurrency, opCode, _cancelTokenSource);
            }
            else
            {
                _nextBind.Bind(nextFunc, maxConcurrency, opCode);
            }
            return this;
        }

        public bool AddValue(TInput inputValue)
        {
            return AddValue(new DataWrapper
            {
                InputVal = inputValue,
                OutputVal = new TOutput()
            });
        }

        public void StopProcessing(bool waitForProcessing = false)
        {
            _dataCollection.CompleteAdding();
            if (waitForProcessing)
                _completionEvent.WaitOne();
        }

        public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
        {
            if (_nextBind != null)
                _nextBind.StartProcessing(errorHandler);
            var option = new ParallelOptions
            {
                MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
                CancellationToken = _cancelTokenSource.Token
            };

            Task.Factory.StartNew(() =>
            {
                try
                {
                    Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
                       option,
                       currData =>
                       {
                           try
                           {
                               option.CancellationToken
                                     .ThrowIfCancellationRequested();

                               _currentAction(currData.InputVal,
                                              currData.OutputVal);

                               if (_nextBind != null)
                                  _nextBind.AddValue(currData);
                           }
                           catch (Exception e)
                           {
                               errorHandler(currData.InputVal,
                                    currData.OutputVal,
                                    "Error occurred inside " + 
                                            _opCode + " pipeline.",
                                    e);
                           }
                       });
                }
                catch (OperationCanceledException)
                {
                }
                finally
                {
                    if (_nextBind != null)
                    {
                        _nextBind._dataCollection.CompleteAdding();
                        _nextBind._completionEvent.WaitOne();
                    }
                    _completionEvent.Set();
                }
            });
        }

        public void AbortProcessing()
        {
            _dataCollection.CompleteAdding();
            _cancelTokenSource.Cancel();
        }

        private bool AddValue(DataWrapper currData)
        {
            return _dataCollection.TryAdd(currData);
        }

        private class DataWrapper
        {
            internal TInput InputVal;
            internal TOutput OutputVal;
        }
    }
}

Binder 解析

现在,为了理解它是如何工作的,我们首先需要知道,在 C# 中,我们有类级别的隐私,即类的实例的私有成员在该类内部是可访问的。让我们看一个例子

public class MyInput
{
    private int a = 1;
    public MyInput GetInstanceWithValue(int b)
    {
        var newInstance = new MyInput();
        //This is legal in C#
        //accessing private variable of new instance inside the same class
        newInstance.a = b;
        return newInstance;
    }
}

理解构造

利用类级别的隐私功能,我可以私有地持有 Binder 类的实例,并访问所有属性/成员。而这正是我正在做的,但是是以链式的方式进行的(请遵循代码注释)

public class Binder<TInput, TOutput>
        where TOutput : class, new()
{
    private Binder<TInput, TOutput> _nextBind = null;

    public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
                                        int maxConcurrency = -1,
                                        string opCode = "MyBinder1")
    {
//If child pipe is not defined, then we define it as current pipe
        if (_nextBind == null)
        {
//Here we use the private Ctor and pass the same cancellation token source
//this way, cancelling on seed pipe's token will cancel all pipes in chain
            _nextBind = new Binder<TInput, TOutput>
                  (nextFunc, maxConcurrency, opCode, _cancelTokenSource);
        }
        else
        {
//Otherwise, we pass the current pipe to child pipe
//Then, child pipe will check its child pipe and so on...
//thus, given pipe will become the child of the last pipe in the chain
//and for the next Bind() call, current pipe will be the last pipe in the chain
            _nextBind.Bind(nextFunc, maxConcurrency, opCode);
        }

//Mind it, I am returning the seed instance for every call, thus,
//Consumer of this class can call this method ONLY on SEED pipe.
//This is also required, coz user must call Start/Stop/Abort Processing on seed
//As Seed is the first pipe to process input/output pair and
//we DO not want to overwhelm the user with new instant per BIND.
        return this;
    }
}

理解处理

一旦管道构建完成(如上所述),我们的下一个目标就是实现 StartProcessing 方法,该方法可以处理 Binder 实例的这种链。为了实现我们的目标,我们需要牢记几点:

  • 我们必须以相反的顺序(自底向上)启动管道,即在父管道可以将数据传递给它之前,每个子管道的 GetConsumingEnumerable() 循环都必须初始化。这将确保平稳的数据处理而不会造成任何阻塞。自顶向下也是可能的,但在这种情况下,BlockingCollection 可能会开始膨胀。
  • 在处理完每个输入/输出对之后,我们需要确保该对流向链中的下一个管道(因为一切都是异步的)。
  • 当在父管道上调用 CompleteAdding() 时,我们必须确保子管道也已完成处理所有剩余项。
  • 调用 AbortProcessing() 必须中止链中的每个管道:这很容易实现,因为我们共享相同的 CancellationTokenSource

我将向您展示上述要点是如何集成到代码中的(请遵循代码注释)

public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
//If child pipe exist, call its StartProcessing in chain to have bottom-up initialization
    if (_nextBind != null)
        _nextBind.StartProcessing(errorHandler);

//each pipe will use its own maxConcurrency value
//and share the same cancellation token
    var option = new ParallelOptions
    {
        MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
        CancellationToken = _cancelTokenSource.Token
    };

    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
               option,
               currData =>
               {
                   try
                   {
                       option.CancellationToken
                                     .ThrowIfCancellationRequested();

//Execute the current pipe method with the data of OWN BlockingCollection
//NOTE: if this is inside SEED, then data pair is what USER supplied,
//      else it is the processed pair from previous pipe in chain.
                       _currentAction(currData.InputVal,
                                              currData.OutputVal);

//Pass the pair to child pipe
//NOTE: this call is NOT recursive => it just populate the
//                        BlockingCollection of immediate child.
//this pair will be available to child ForEach loop
//and then from there to its own child and so on...
                       if (_nextBind != null)
                          _nextBind.AddValue(currData);
                   }
                   catch (Exception e)
                   {
                       errorHandler(currData.InputVal,
                            currData.OutputVal,
                            "Error occurred inside " + 
                                            _opCode + " pipeline.",
                            e);
                   }
               });
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {

//If child is not null
            if (_nextBind != null)
            {

//Close BlockingCollection of Child for data-adding
//Reason: We are here means, parent is done, hence child shouldn't receive data
//        All data adding was done above in Try clause
//        Or by the user (SEED case) and he called StopProcessing.
                _nextBind._dataCollection.CompleteAdding();

//Wait until child gives completion signal
                _nextBind._completionEvent.WaitOne();
            }

//Give OWN completion signal
//Note: Last child in the chain will give this signal without waiting for
//      any other signal, thus, 
//      its parent could give its OWN signal to its parent and so on...
            _completionEvent.Set();
        }
    });
}

其他函数,如 AbortProcessing()StopProcessing() 等,都是微不足道的,我将留给读者自行探索。因此,我认为我已经完成了这个三部分系列中的第四个也是最后一个模式。

Binder 示例

让我向您展示一个快速(简单且乏味)的上述模式的用法

using System;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            SomeCode();
            GC.Collect();
            GC.WaitForPendingFinalizers();
            Console.WriteLine("###### DONE ######");
            Console.ReadLine();
        }
        static void SomeCode()
        {
            var myPipes = new Binder<MyInput, MyOutput>(One, 1, "One")
                .Bind(Two, 1, "Two")
                .Bind(Three, 2, "Three")// => 2 threads for THREE
                .Bind(Four, 1, "Four");
            myPipes.StartProcessing(HandleError);
            Parallel.For(0, 4, input => myPipes.AddValue(new MyInput()));
            myPipes.StopProcessing();
        }
        static void HandleError<T1, T2>(T1 a, T2 b, string code, Exception e)
        {
            Console.WriteLine(code + Environment.NewLine + e.Message);
        }
        static void One(MyInput a, MyOutput b)
        {
            Console.WriteLine("One on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        static void Two(MyInput a, MyOutput b)
        {
            System.Threading.Thread.Sleep(1000);
            Console.WriteLine("Two on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        static void Three(MyInput a, MyOutput b)
        {
            System.Threading.Thread.Sleep(2000);
            Console.WriteLine("Three on T:" +
                System.Threading.Thread.CurrentThread.ManagedThreadId);
            //throw new Exception("Mine");
        }
        static void Four(MyInput a, MyOutput b)
        {
            Console.WriteLine("Four (or LAST)");
            Console.WriteLine("A=" + a + ",B=" + b +
                ",On T:" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
    }
    public class MyInput
    {
        //All my Input values
    }
    public class MyOutput
    {
        //All my output values
    }
}

在上面的示例中,采用了以下策略来模拟处理时间,以展示每个方法专有线程的好处

  1. 函数 One()Four() 没有等待,是单线程的,
  2. 函数 Two() 等待 1 秒,是单线程的。
  3. 函数 Three() 等待 2 秒,有 2 个线程。

使用这样的示例,我们期望得到以下结果:

  1. One() 的所有调用将立即执行,因此,Two()BlockingCollection 将立即被填充。
  2. 在前两次执行 Two() 之后,我们必须看到一次 Three() 的执行(因为当 Three() 等待 2 秒时,可以执行两次 Two() 调用)。
  3. 然后对于 Two() 的每次输出,我们必须看到一次 Three() 的输出(双倍时长,双倍线程效果)。
  4. 我们必须看到 Four() 的输出,几乎与 Three() 的输出同时出现,因为 Four() 没有等待。

在我个人配备 2 个核心的笔记本电脑上,我可以看到下面列出的结果。我让您准备并运行其他示例/测试。

2015 年新年快乐!

在本系列的最后一部分,我们看到了第四个也是最后一个模式。我希望您(读者)喜欢这个系列,尽管如此,如果您仍有任何问题/评论/建议/改进,请告诉我。最后,我祝大家新年快乐!愿上帝保佑……享受并继续分享有趣的 [代码]!2015 年新年快乐!!!哟呼!!!

<< 管道、河流、Rails 和 Binder 模式 - 第二部分

历史

这是建议解决方案的 V1。

逻辑上,没有改变,但是,添加了一些用于可视化的图表。

© . All rights reserved.