65.9K
CodeProject 正在变化。 阅读更多。
Home

管道、河流、轨道和绑定器模式 - 第一部分

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.43/5 (16投票s)

2014年12月8日

CPOL

5分钟阅读

viewsIcon

24751

downloadIcon

208

探讨可能的并行生产者-消费者模式。

动机

本文从 MSDN 的管道 中汲取灵感。在这篇 MSDN 文章中,使用 BlockingCollection 的并行生产者-消费者模型得到了很好的开发/解释。因此,在这里我将不再重复已经由领域专家解释清楚的内容。我将尝试提出一些技巧,以

  • 简化并行编程
  • 提高代码的可重用性
  • 集中处理错误

引言

本文(以及未来的部分)将介绍四(4)种构建并行生产者-消费者解决方案的不同方法。借鉴维基百科的 软件设计模式 定义,我称它们为模式(正如标题所示),但读者始终保留同意/不同意我的权利。

首先,考虑一个关于“出入”关系的定义

“在管道领域,出入关系定义在两个函数之间,其中前一个函数的输出是后一个函数的输入。那么这两个函数就被称为(至少在我看来)出入成员。”

本系列将讨论以下模式的实现(为命名不当之处我深表歉意)

  1. 管道:基于“出入”成员的简单设计。此外,还需要 Action<TInput, TOutput> 委托(们)作为最终结果处理程序。我将在本文中阐述此模式。
  2. 河流:对管道的改进,消除了对“出入”关系的需求。管道仅由Action<TInput, TOutput>委托组成,输入/输出对在管道中流动。因此,无需最终结果处理程序。我将在第二部分介绍此模式。
  3. 轨道:此模式类似于河流,但需要实现接口。输入数据是一个类,并实现了表示管道逻辑的接口。这种实现的优点是可以通过同一个管道处理多种输入类型。第二部分也将包含此模式和河流模式。
  4. 绑定器:对河流的扩展,它解耦了管道成员,并提供了为每个管道成员分配所需工作线程的手段。最后,我将在第三部分专门介绍此模式。

好了,现在我们可以逐个考虑每种实现及其示例。但是,我想先从一个非并行示例开始(如果您不感兴趣,请跳过此部分)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            var input = 1;
            SomeCode(input);
            SomeCodeInAnotherWay(input);
            Console.ReadLine();
        }
        static void SomeCode(int input)
        {
            var finalAnswer = PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input))));
            Console.WriteLine("Answer: " + finalAnswer);
        }
        static void SomeCodeInAnotherWay(int input)
        {
            var a1 = RandomizeIt(input);
            var a2 = MultiplyIt(a1);
            var a3 = DoubleIt(a2);
            var finalAnswer = PowerIt(a3);
            Console.WriteLine("Answer: " + finalAnswer);
        }
        private static decimal PowerIt(double p)
        {
            return (decimal)Math.Pow(p, 2.7);
        }
        private static double DoubleIt(long p)
        {
            return Math.Log((double)p);
        }
        private static long MultiplyIt(int p)
        {
            return p * 200;
        }
        private static int RandomizeIt(int p)
        {
            return p + 56;
        }
    }
}

在上面的示例中,SomeCodeSomeCodeInAnotherWay 函数是相同的,并且产生相同的结果。唯一的区别是 SomeCodeInAnotherWay 写成了多行,而 SomeCode 函数在表示上有点混乱。总之,代码的编写方式不是问题,事实上,我想引起您对以下问题的注意

  • 在链中添加更多“出入”成员时,如何保持此代码片段的可读性?
  • 错误处理怎么办?我们应该为每一个“出入”成员创建try/catch,还是为 SomeCode(或SomeCodeInAnotherWay)创建一个单独的try/catch
  • 如果一个项目中有许多这样的“出入”链,有没有一种简单的方法来提高代码的可重用性/简化错误处理?
  • 我同意,在Parallel.For(或ForEach)中使用此函数并不难,但创建生产者-消费者模式怎么样?我们是否需要为每次使用而在解决方案中反复重写它?
  • ……以及其他许多类似的问题(嗯……我想不到别的了!)

在这一系列文章中,我们将考虑几种模式,这些模式将解决上述一个或多个问题。所以我们从第一个模式开始:管道

第一个模式:管道

此实现背后的想法源自基于 UNIX 的管道 "|" 命令。有关更多信息,请参阅 Pipeline (Unix) 的维基链接。总之,在不深入理论的情况下,让我们看看代码本身

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Piper
{
    public sealed class Pipes<TInput, TOutput>
    {
        private BlockingCollection<InputWrapper> 
        _dataCollection = null; // Holds the data to process
        private CancellationTokenSource _cancelSource = null; // For task cancellation
        private Task _pipelineProcessor = null; // The task itself
        private Func<TInput, TOutput> 
        _currentPipe; //Current function of "out-in" relation
        private int _maxConcurrency;
        
        public Pipes(Func<TInput, TOutput> currentPipe,
                        int maxConcurrency = -1)
        {
            _currentPipe = currentPipe;
            _maxConcurrency = maxConcurrency;
        }

        // This function establishes the "out-in" 
        relationship among pipeline functions.
        public Pipes<TInput, TNewPipeOutput> 
        Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
        {
            // Here we create new function as NewFunction(OldFunction(inputData))
            // REMEMBER : PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input)))) from above
            return new Pipes<TInput, TNewPipeOutput>
            (inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
        }

        // Adding values in pipeline
        public bool AddValue(TInput inputValue, Action<TInput, TOutput> callbackAction)
        {
            return _dataCollection.TryAdd(new InputWrapper
            {
                InputValue = inputValue,
                CallBackFunction = callbackAction
            });
        }

        // Starts the processing
        public void StartProcessing(Action<TInput, string, Exception> errorHandler,
                                    string operationCode = "My_Operation_Name")
        {
            _dataCollection = new BlockingCollection<InputWrapper>();
            _cancelSource = new CancellationTokenSource();

            var option = new ParallelOptions
            {
                MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
                CancellationToken = _cancelSource.Token
            };

            _pipelineProcessor = Task.Factory.StartNew(() =>
            {
                try
                {
                    Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
                        option,
                        inputWrapper =>
                        {
                            try
                            {
                               option.CancellationToken
                                     .ThrowIfCancellationRequested();
                               //Here we just call the Final Action method with
                               //input and outcome of current (LAST) 
                               //pipe in the chain
                               //(which is THIS _currentPipe!)
                               inputWrapper.CallBackFunction(inputWrapper.InputValue,
                                                _currentPipe(inputWrapper.InputValue));
                            }
                        catch (Exception e)
                        {
                            errorHandler(inputWrapper.InputValue,
                                    "Error occurred inside " + 
                                    operationCode + " pipeline.",
                                    e);
                        }
                    });
                }
                catch (OperationCanceledException)
                {
                }
            });
        }

        public void StopProcessing(bool waitForProcessing)
        {
            _dataCollection.CompleteAdding();
            if (waitForProcessing)
                _pipelineProcessor.Wait();
        }

        public void AbortProcessing()
        {
            _cancelSource.Cancel();
            _dataCollection.CompleteAdding();
        }

        // Wrapper class to hold the InputData and Final Action handle pair
        private class InputWrapper
        {
            internal TInput InputValue;
            internal Action<TInput, TOutput> CallBackFunction;
        }
    }
}

查看上面的实现,所有的“魔力”都发生在下面函数中仅一行代码里

public Pipes<TInput, TNewPipeOutput> 
Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
{
     return new Pipes<TInput, TNewPipeOutput>
     (inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
}

实际上,我们只是通过创建当前和新管道函数之间的“出入”关系,创建一个具有新定义的输出类型 TNewPipeOutputPipes 类的新实例。就是这样!如果我们再次对这个实例执行Pipe操作,那么又会建立一个新的“出入”关系!!!现在,让我使用 Pipes 重新实现SomeCode函数(如引言中的示例所示)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Piper
{
    class Program
    {
        static void Main(string[] args)
        {
            SomeCode();
            SomeErroredCode();
            Console.ReadLine();
        }
        static void SomeCode()
        {
            var myPipes = new Pipes<int, int>(RandomizeIt, -1)
                                 .Pipe(MultiplyIt) // Add Pipe function
                                 .Pipe(DoubleIt) // More pipe
                                 .Pipe(PowerIt); // And more...
            //Step 1. Start the pipeline
            myPipes.StartProcessing(HandleError, "SomeCode");
            //Step 2. Add Input values with Final Handler
            Parallel.For(1, 5, 
                input => myPipes.AddValue(input, ShowMyAnswer));
            //Step 3. Wait until processing is done!
            //(or call it without waiting)
            //IMPORTANT: Do make a call to this
            //function else the Loop will wait indefinitely!!!
            myPipes.StopProcessing(true);
        }
        static void SomeErroredCode()
        {
            var myPipes = new Pipes<int, int>(RandomizeIt, -1)
                                    .Pipe(MultiplyIt) //Add pipe...
                                    .Pipe(DoubleIt) //Another...
                                    .Pipe(PowerIt) // Another...
//Notice, how easily we can extend the pipeline
//with new functions, without much of code refactoring!!!
//Adding one more pipe without changing anything else!!!
                                    .Pipe(ThrowError); 

            myPipes.StartProcessing(HandleError, "SomeErroredCode");
            Parallel.For(1, 5, input => myPipes.AddValue(input, ShowMyAnswer));
            myPipes.StopProcessing(true);
        }
        private static void ShowMyAnswer<TIn, TOut>(TIn input, TOut output)
        {
            Console.WriteLine("For Input value: " + input +
                ", Output is: " + output +
                ", Executed On Thread Id: " + 
                System.Threading.Thread.CurrentThread.ManagedThreadId);
        }
        private static decimal PowerIt(double p)
        {
            return (decimal)Math.Pow(p, 2.7);
        }
        private static double DoubleIt(long p)
        {
            return Math.Log((double)p);
        }
        private static long MultiplyIt(int p)
        {
            return p * 200;
        }
        private static int RandomizeIt(int input)
        {
            return input + 56;
        }
        private static decimal ThrowError(decimal p)
        {
            throw new Exception("Test Exception");
        }
        private static void HandleError<Tin>(Tin i, string mess, Exception e)
        {
            Console.WriteLine("Error On Thread Id: " + 
            System.Threading.Thread.CurrentThread.ManagedThreadId +
                ", Input was: " + i +
                ", Error In: " + mess +
                ", Message: " + e.Message);
        }
    }
}

从上面的例子中,我们注意到以下几点

  • 构建管道非常简单,一旦“出入”对准备好进行管道传输。
  • 使用此类只需要 3 个步骤
    1. 调用 StartProcessing
    2. 添加输入值,然后
    3. 调用 StopProcessing
  • 在尾部添加新管道非常容易,不需要重构(然而,移除/更改中间管道可能需要一些重构,如果“出入”关系发生变化)。
  • 所有错误处理都是单独完成的,没有任何“出入”成员需要 try/catch

此设计可以在以下方面进一步改进

  • 签名HandleError<Tin, Tout>(Tin i, Tout o, string mess, Exception e) 的错误处理程序可以实现以建立管道级别的错误处理(类似于为每个成员函数添加try/catch)。在这种情况下,可以通过在 Pipe() 方法中替换管道 lambda 为多行函数来提供此类错误处理程序。我将此实现留给用户(原因是:河流也采用了类似实现,我将在第二部分向您展示)。
  • 其他小改进,例如
    1. 发生错误时停止处理
    2. 通过返回 bool 来向 StopProcessing 的调用者通知错误的存在
    3. 将输入/输出对的集合返回给 StopProcessing 的调用者,而不是 Action 处理程序等,都可以实现。我个人根据项目需求开发了一些这些变体。

以下是上述示例代码输出的快照

重要提示

如果您正在构建一个将在应用程序生命周期内运行的管道(通过创建单例类或其他方式),请考虑将 StartProcessing 方法中的 Parallel.ForEach(当输入数据到达速率慢于数据处理时间时非常有用)更改为非并行 foreach。如果这不符合您的要求,请考虑 GetConsumingPartitioner BlockingCollection 扩展(Stephen Toub 的一篇精彩文章,它很好地解释了将 BlockingCollectionParallel.ForEach 结合使用的风险)。

再见

希望您喜欢这篇文章,希望它有一天能帮助您进行开发。请随时发表您的评论,并分享您开发/使用的其他模式。我诚挚地向您道歉,我无法在本篇文章中发布所有模式。事实上,组织文字以良好地沟通需要大量时间来准备代码+示例。但我会尽我最大的努力尽快发布其他部分,并通过公告通知您。所以我会说“À bientôt”(再见)!

管道、河流、轨道和绑定器模式 - 第二部分>>

历史

这是建议实现的 V1 版本。

© . All rights reserved.