管道、河流、轨道和绑定器模式 - 第一部分






4.43/5 (16投票s)
探讨可能的并行生产者-消费者模式。
动机
本文从 MSDN 的管道 中汲取灵感。在这篇 MSDN 文章中,使用 BlockingCollection 的并行生产者-消费者模型得到了很好的开发/解释。因此,在这里我将不再重复已经由领域专家解释清楚的内容。我将尝试提出一些技巧,以
- 简化并行编程
- 提高代码的可重用性
- 集中处理错误
引言
本文(以及未来的部分)将介绍四(4)种构建并行生产者-消费者解决方案的不同方法。借鉴维基百科的 软件设计模式 定义,我称它们为模式(正如标题所示),但读者始终保留同意/不同意我的权利。
首先,考虑一个关于“出入”关系的定义
“在管道领域,出入关系定义在两个函数之间,其中前一个函数的输出是后一个函数的输入。那么这两个函数就被称为(至少在我看来)出入成员。”
本系列将讨论以下模式的实现(为命名不当之处我深表歉意)
管道
:基于“出入”成员的简单设计。此外,还需要 Action<TInput, TOutput> 委托(们)作为最终结果处理程序。我将在本文中阐述此模式。河流
:对管道
的改进,消除了对“出入”关系的需求。管道仅由Action<TInput, TOutput>
委托组成,输入/输出对在管道中流动。因此,无需最终结果处理程序。我将在第二部分介绍此模式。轨道
:此模式类似于河流
,但需要实现接口。输入数据是一个类,并实现了表示管道逻辑的接口。这种实现的优点是可以通过同一个管道处理多种输入类型。第二部分也将包含此模式和河流
模式。绑定器
:对河流
的扩展,它解耦了管道成员,并提供了为每个管道成员分配所需工作线程的手段。最后,我将在第三部分专门介绍此模式。
好了,现在我们可以逐个考虑每种实现及其示例。但是,我想先从一个非并行示例开始(如果您不感兴趣,请跳过此部分)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Piper
{
class Program
{
static void Main(string[] args)
{
var input = 1;
SomeCode(input);
SomeCodeInAnotherWay(input);
Console.ReadLine();
}
static void SomeCode(int input)
{
var finalAnswer = PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input))));
Console.WriteLine("Answer: " + finalAnswer);
}
static void SomeCodeInAnotherWay(int input)
{
var a1 = RandomizeIt(input);
var a2 = MultiplyIt(a1);
var a3 = DoubleIt(a2);
var finalAnswer = PowerIt(a3);
Console.WriteLine("Answer: " + finalAnswer);
}
private static decimal PowerIt(double p)
{
return (decimal)Math.Pow(p, 2.7);
}
private static double DoubleIt(long p)
{
return Math.Log((double)p);
}
private static long MultiplyIt(int p)
{
return p * 200;
}
private static int RandomizeIt(int p)
{
return p + 56;
}
}
}
在上面的示例中,SomeCode
和 SomeCodeInAnotherWay
函数是相同的,并且产生相同的结果。唯一的区别是 SomeCodeInAnotherWay
写成了多行,而 SomeCode
函数在表示上有点混乱。总之,代码的编写方式不是问题,事实上,我想引起您对以下问题的注意
- 在链中添加更多“出入”成员时,如何保持此代码片段的可读性?
错误处理
怎么办?我们应该为每一个“出入”成员创建try/catch
,还是为SomeCode
(或SomeCodeInAnotherWay
)创建一个单独的try/catch
?- 如果一个项目中有许多这样的“出入”链,有没有一种简单的方法来提高代码的可重用性/简化错误处理?
- 我同意,在
Parallel.For
(或ForEach
)中使用此函数并不难,但创建生产者-消费者模式怎么样?我们是否需要为每次使用而在解决方案中反复重写它? - ……以及其他许多类似的问题(嗯……我想不到别的了!)
在这一系列文章中,我们将考虑几种模式,这些模式将解决上述一个或多个问题。所以我们从第一个模式开始:管道
。
第一个模式:管道
此实现背后的想法源自基于 UNIX 的管道 "|"
命令。有关更多信息,请参阅 Pipeline (Unix) 的维基链接。总之,在不深入理论的情况下,让我们看看代码本身
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Piper
{
public sealed class Pipes<TInput, TOutput>
{
private BlockingCollection<InputWrapper>
_dataCollection = null; // Holds the data to process
private CancellationTokenSource _cancelSource = null; // For task cancellation
private Task _pipelineProcessor = null; // The task itself
private Func<TInput, TOutput>
_currentPipe; //Current function of "out-in" relation
private int _maxConcurrency;
public Pipes(Func<TInput, TOutput> currentPipe,
int maxConcurrency = -1)
{
_currentPipe = currentPipe;
_maxConcurrency = maxConcurrency;
}
// This function establishes the "out-in"
relationship among pipeline functions.
public Pipes<TInput, TNewPipeOutput>
Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
{
// Here we create new function as NewFunction(OldFunction(inputData))
// REMEMBER : PowerIt(DoubleIt(MultiplyIt(RandomizeIt(input)))) from above
return new Pipes<TInput, TNewPipeOutput>
(inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
}
// Adding values in pipeline
public bool AddValue(TInput inputValue, Action<TInput, TOutput> callbackAction)
{
return _dataCollection.TryAdd(new InputWrapper
{
InputValue = inputValue,
CallBackFunction = callbackAction
});
}
// Starts the processing
public void StartProcessing(Action<TInput, string, Exception> errorHandler,
string operationCode = "My_Operation_Name")
{
_dataCollection = new BlockingCollection<InputWrapper>();
_cancelSource = new CancellationTokenSource();
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelSource.Token
};
_pipelineProcessor = Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
inputWrapper =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
//Here we just call the Final Action method with
//input and outcome of current (LAST)
//pipe in the chain
//(which is THIS _currentPipe!)
inputWrapper.CallBackFunction(inputWrapper.InputValue,
_currentPipe(inputWrapper.InputValue));
}
catch (Exception e)
{
errorHandler(inputWrapper.InputValue,
"Error occurred inside " +
operationCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
});
}
public void StopProcessing(bool waitForProcessing)
{
_dataCollection.CompleteAdding();
if (waitForProcessing)
_pipelineProcessor.Wait();
}
public void AbortProcessing()
{
_cancelSource.Cancel();
_dataCollection.CompleteAdding();
}
// Wrapper class to hold the InputData and Final Action handle pair
private class InputWrapper
{
internal TInput InputValue;
internal Action<TInput, TOutput> CallBackFunction;
}
}
}
查看上面的实现,所有的“魔力”都发生在下面函数中仅一行代码里
public Pipes<TInput, TNewPipeOutput>
Pipe<TNewPipeOutput>(Func<TOutput, TNewPipeOutput> newPipe)
{
return new Pipes<TInput, TNewPipeOutput>
(inputValue => newPipe(_currentPipe(inputValue)), _maxConcurrency);
}
实际上,我们只是通过创建当前和新管道函数之间的“出入”关系,创建一个具有新定义的输出类型 TNewPipeOutput
的 Pipes
类的新实例。就是这样!如果我们再次对这个实例执行Pipe
操作,那么又会建立一个新的“出入”关系!!!现在,让我使用 Pipes 重新实现SomeCode
函数(如引言中的示例所示)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Piper
{
class Program
{
static void Main(string[] args)
{
SomeCode();
SomeErroredCode();
Console.ReadLine();
}
static void SomeCode()
{
var myPipes = new Pipes<int, int>(RandomizeIt, -1)
.Pipe(MultiplyIt) // Add Pipe function
.Pipe(DoubleIt) // More pipe
.Pipe(PowerIt); // And more...
//Step 1. Start the pipeline
myPipes.StartProcessing(HandleError, "SomeCode");
//Step 2. Add Input values with Final Handler
Parallel.For(1, 5,
input => myPipes.AddValue(input, ShowMyAnswer));
//Step 3. Wait until processing is done!
//(or call it without waiting)
//IMPORTANT: Do make a call to this
//function else the Loop will wait indefinitely!!!
myPipes.StopProcessing(true);
}
static void SomeErroredCode()
{
var myPipes = new Pipes<int, int>(RandomizeIt, -1)
.Pipe(MultiplyIt) //Add pipe...
.Pipe(DoubleIt) //Another...
.Pipe(PowerIt) // Another...
//Notice, how easily we can extend the pipeline
//with new functions, without much of code refactoring!!!
//Adding one more pipe without changing anything else!!!
.Pipe(ThrowError);
myPipes.StartProcessing(HandleError, "SomeErroredCode");
Parallel.For(1, 5, input => myPipes.AddValue(input, ShowMyAnswer));
myPipes.StopProcessing(true);
}
private static void ShowMyAnswer<TIn, TOut>(TIn input, TOut output)
{
Console.WriteLine("For Input value: " + input +
", Output is: " + output +
", Executed On Thread Id: " +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
private static decimal PowerIt(double p)
{
return (decimal)Math.Pow(p, 2.7);
}
private static double DoubleIt(long p)
{
return Math.Log((double)p);
}
private static long MultiplyIt(int p)
{
return p * 200;
}
private static int RandomizeIt(int input)
{
return input + 56;
}
private static decimal ThrowError(decimal p)
{
throw new Exception("Test Exception");
}
private static void HandleError<Tin>(Tin i, string mess, Exception e)
{
Console.WriteLine("Error On Thread Id: " +
System.Threading.Thread.CurrentThread.ManagedThreadId +
", Input was: " + i +
", Error In: " + mess +
", Message: " + e.Message);
}
}
}
从上面的例子中,我们注意到以下几点
- 构建管道非常简单,一旦“出入”对准备好进行管道传输。
- 使用此类只需要 3 个步骤
- 调用
StartProcessing
- 添加输入值,然后
- 调用
StopProcessing
- 调用
- 在尾部添加新管道非常容易,不需要重构(然而,移除/更改中间管道可能需要一些重构,如果“出入”关系发生变化)。
- 所有错误处理都是单独完成的,没有任何“出入”成员需要
try/catch
。
此设计可以在以下方面进一步改进
- 签名
HandleError<Tin, Tout>(Tin i, Tout o, string mess, Exception e)
的错误处理程序可以实现以建立管道级别的错误处理(类似于为每个成员函数添加try/catch
)。在这种情况下,可以通过在Pipe()
方法中替换管道 lambda 为多行函数来提供此类错误处理程序。我将此实现留给用户(原因是:河流
也采用了类似实现,我将在第二部分向您展示)。 - 其他小改进,例如
- 发生错误时停止处理
- 通过返回 bool 来向
StopProcessing
的调用者通知错误的存在 - 将输入/输出对的集合返回给
StopProcessing
的调用者,而不是 Action 处理程序等,都可以实现。我个人根据项目需求开发了一些这些变体。
以下是上述示例代码输出的快照
重要提示
如果您正在构建一个将在应用程序生命周期内运行的管道(通过创建单例类或其他方式),请考虑将 StartProcessing
方法中的 Parallel.ForEach
(当输入数据到达速率慢于数据处理时间时非常有用)更改为非并行 foreach
。如果这不符合您的要求,请考虑 GetConsumingPartitioner
BlockingCollection
扩展(Stephen Toub 的一篇精彩文章,它很好地解释了将 BlockingCollection
与 Parallel.ForEach
结合使用的风险)。
再见
希望您喜欢这篇文章,希望它有一天能帮助您进行开发。请随时发表您的评论,并分享您开发/使用的其他模式。我诚挚地向您道歉,我无法在本篇文章中发布所有模式。事实上,组织文字以良好地沟通需要大量时间来准备代码+示例。但我会尽我最大的努力尽快发布其他部分,并通过公告通知您。所以我会说“À bientôt”(再见)!
历史
这是建议实现的 V1 版本。