管道、河流、Rails 和 Binder 模式 - 第三部分






4.79/5 (9投票s)
探讨可能的并行生产者-消费者模式。(最终部分)
<< 管道、河流、Rails 和 Binder 模式 - 第二部分
开始
终于,我能够撰写本系列的第三个(也是最后一个)最终部分了。坦率地说,当我开始着手这篇文章时,我只想将 Pipes
设计作为一个技巧/窍门来发布,但是,在我能够完成它的时候,我决定包含其他实现,如 (River
& Binder
)。但是,我又一次未能将其他模式包含在文章中,因为我一直没有时间完成它。所以,我决定将其分为 3 部分,并添加 Rails
。到目前为止,这就是本系列背后我乏味的故事。无论如何,让我们快速回顾一下我们在第一部分和第二部分看到的内容。
Pipes
:一种基于“进出”成员的简单设计。还需要 Action<TInput, TOutput> 委托作为最终结果处理器。(参见:管道、河流、Rails 和 Binder 模式 - 第一部分)River
:对Pipes
的改进,消除了对“进出”关系的需求。管道仅由Action<TInput, TOutput>
委托组成,输入/输出对在整个管道中流动。因此,不需要最终结果处理器。(参见:管道、河流、Rails 和 Binder 模式 - 第二部分)Rails
:此模式类似于River
,但需要接口实现。输入数据是一个类,并实现所需的接口,其中接口代表管道逻辑。此实现的好处在于,同一种管道可以处理多种输入类型。(参见:管道、河流、Rails 和 Binder 模式 - 第二部分)Binder
:River
的扩展,解耦了管道成员,并提供了为每个管道成员分配所需工作线程的机制。本部分将全部介绍它。
当我们构建一个管道时,有时我们希望为某个成员函数分配一些线程(出于结构、战术、性能或其他一些奇怪的原因)。因此,我向您提出了 Binder
的设计。如果您一直关注本系列,您会发现此实现的用法相似,即:
- 创建管道。
- 调用
StartProcessing
。 - 通过
AddValue
向管道添加值。 - 调用
StopProcessing
。
第四种模式:Binder
在实现 River
时,我们已经看到了如何构建一个允许输入和输出一起流动的管道。现在,我们将扩展这个想法,允许独立地为管道成员函数分配工作线程。为了获得这种控制,我决定为每个成员函数创建一个专用的管道实例,并将它们链接在一起;而不会让代码使用者担心管理这些实例。并为用户提供一个简单且熟悉的(与其他 3 种实现相同)使用结构。
让我们先看看 Binder
类的代码
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Piper
{
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private BlockingCollection<DataWrapper> _dataCollection = null;
private ManualResetEvent _completionEvent;
private CancellationTokenSource _cancelTokenSource;
private Binder<TInput, TOutput> _nextBind = null;
private Action<TInput, TOutput> _currentAction;
private int _maxConcurrency;
private string _opCode;
//This Ctor will create the seed instance
public Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder")
: this(currFunc, maxConcurrency, opCode, new CancellationTokenSource())
{
}
//This Ctor is to create the remaining pipes
private Binder(Action<TInput, TOutput> currFunc,
int maxConcurrency = -1,
string opCode = "MyBinder",
CancellationTokenSource tokenSource = null)
{
_currentAction = currFunc;
_maxConcurrency = maxConcurrency;
_completionEvent = new ManualResetEvent(false);
_dataCollection = new BlockingCollection<DataWrapper>();
_opCode = opCode;
_cancelTokenSource = tokenSource;
}
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
if (_nextBind == null)
{
_nextBind =
new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
return this;
}
public bool AddValue(TInput inputValue)
{
return AddValue(new DataWrapper
{
InputVal = inputValue,
OutputVal = new TOutput()
});
}
public void StopProcessing(bool waitForProcessing = false)
{
_dataCollection.CompleteAdding();
if (waitForProcessing)
_completionEvent.WaitOne();
}
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
_currentAction(currData.InputVal,
currData.OutputVal);
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
if (_nextBind != null)
{
_nextBind._dataCollection.CompleteAdding();
_nextBind._completionEvent.WaitOne();
}
_completionEvent.Set();
}
});
}
public void AbortProcessing()
{
_dataCollection.CompleteAdding();
_cancelTokenSource.Cancel();
}
private bool AddValue(DataWrapper currData)
{
return _dataCollection.TryAdd(currData);
}
private class DataWrapper
{
internal TInput InputVal;
internal TOutput OutputVal;
}
}
}
Binder 解析
现在,为了理解它是如何工作的,我们首先需要知道,在 C# 中,我们有类级别的隐私,即类的实例的私有成员在该类内部是可访问的。让我们看一个例子
public class MyInput
{
private int a = 1;
public MyInput GetInstanceWithValue(int b)
{
var newInstance = new MyInput();
//This is legal in C#
//accessing private variable of new instance inside the same class
newInstance.a = b;
return newInstance;
}
}
理解构造
利用类级别的隐私功能,我可以私有地持有 Binder
类的实例,并访问所有属性/成员。而这正是我正在做的,但是是以链式的方式进行的(请遵循代码注释)
public class Binder<TInput, TOutput>
where TOutput : class, new()
{
private Binder<TInput, TOutput> _nextBind = null;
public Binder<TInput, TOutput> Bind(Action<TInput, TOutput> nextFunc,
int maxConcurrency = -1,
string opCode = "MyBinder1")
{
//If child pipe is not defined, then we define it as current pipe
if (_nextBind == null)
{
//Here we use the private Ctor and pass the same cancellation token source
//this way, cancelling on seed pipe's token will cancel all pipes in chain
_nextBind = new Binder<TInput, TOutput>
(nextFunc, maxConcurrency, opCode, _cancelTokenSource);
}
else
{
//Otherwise, we pass the current pipe to child pipe
//Then, child pipe will check its child pipe and so on...
//thus, given pipe will become the child of the last pipe in the chain
//and for the next Bind() call, current pipe will be the last pipe in the chain
_nextBind.Bind(nextFunc, maxConcurrency, opCode);
}
//Mind it, I am returning the seed instance for every call, thus,
//Consumer of this class can call this method ONLY on SEED pipe.
//This is also required, coz user must call Start/Stop/Abort Processing on seed
//As Seed is the first pipe to process input/output pair and
//we DO not want to overwhelm the user with new instant per BIND.
return this;
}
}
理解处理
一旦管道构建完成(如上所述),我们的下一个目标就是实现 StartProcessing
方法,该方法可以处理 Binder
实例的这种链。为了实现我们的目标,我们需要牢记几点:
- 我们必须以相反的顺序(自底向上)启动管道,即在父管道可以将数据传递给它之前,每个子管道的
GetConsumingEnumerable()
循环都必须初始化。这将确保平稳的数据处理而不会造成任何阻塞。自顶向下也是可能的,但在这种情况下,BlockingCollection
可能会开始膨胀。 - 在处理完每个输入/输出对之后,我们需要确保该对流向链中的下一个管道(因为一切都是异步的)。
- 当在父管道上调用
CompleteAdding()
时,我们必须确保子管道也已完成处理所有剩余项。 - 调用
AbortProcessing()
必须中止链中的每个管道:这很容易实现,因为我们共享相同的CancellationTokenSource
。
我将向您展示上述要点是如何集成到代码中的(请遵循代码注释)
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler)
{
//If child pipe exist, call its StartProcessing in chain to have bottom-up initialization
if (_nextBind != null)
_nextBind.StartProcessing(errorHandler);
//each pipe will use its own maxConcurrency value
//and share the same cancellation token
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelTokenSource.Token
};
Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
currData =>
{
try
{
option.CancellationToken
.ThrowIfCancellationRequested();
//Execute the current pipe method with the data of OWN BlockingCollection
//NOTE: if this is inside SEED, then data pair is what USER supplied,
// else it is the processed pair from previous pipe in chain.
_currentAction(currData.InputVal,
currData.OutputVal);
//Pass the pair to child pipe
//NOTE: this call is NOT recursive => it just populate the
// BlockingCollection of immediate child.
//this pair will be available to child ForEach loop
//and then from there to its own child and so on...
if (_nextBind != null)
_nextBind.AddValue(currData);
}
catch (Exception e)
{
errorHandler(currData.InputVal,
currData.OutputVal,
"Error occurred inside " +
_opCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
finally
{
//If child is not null
if (_nextBind != null)
{
//Close BlockingCollection of Child for data-adding
//Reason: We are here means, parent is done, hence child shouldn't receive data
// All data adding was done above in Try clause
// Or by the user (SEED case) and he called StopProcessing.
_nextBind._dataCollection.CompleteAdding();
//Wait until child gives completion signal
_nextBind._completionEvent.WaitOne();
}
//Give OWN completion signal
//Note: Last child in the chain will give this signal without waiting for
// any other signal, thus,
// its parent could give its OWN signal to its parent and so on...
_completionEvent.Set();
}
});
}
其他函数,如 AbortProcessing()
、StopProcessing()
等,都是微不足道的,我将留给读者自行探索。因此,我认为我已经完成了这个三部分系列中的第四个也是最后一个模式。
Binder 示例
让我向您展示一个快速(简单且乏味)的上述模式的用法
using System;
using System.Threading.Tasks;
namespace Piper
{
class Program
{
static void Main(string[] args)
{
SomeCode();
GC.Collect();
GC.WaitForPendingFinalizers();
Console.WriteLine("###### DONE ######");
Console.ReadLine();
}
static void SomeCode()
{
var myPipes = new Binder<MyInput, MyOutput>(One, 1, "One")
.Bind(Two, 1, "Two")
.Bind(Three, 2, "Three")// => 2 threads for THREE
.Bind(Four, 1, "Four");
myPipes.StartProcessing(HandleError);
Parallel.For(0, 4, input => myPipes.AddValue(new MyInput()));
myPipes.StopProcessing();
}
static void HandleError<T1, T2>(T1 a, T2 b, string code, Exception e)
{
Console.WriteLine(code + Environment.NewLine + e.Message);
}
static void One(MyInput a, MyOutput b)
{
Console.WriteLine("One on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Two(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(1000);
Console.WriteLine("Two on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
}
static void Three(MyInput a, MyOutput b)
{
System.Threading.Thread.Sleep(2000);
Console.WriteLine("Three on T:" +
System.Threading.Thread.CurrentThread.ManagedThreadId);
//throw new Exception("Mine");
}
static void Four(MyInput a, MyOutput b)
{
Console.WriteLine("Four (or LAST)");
Console.WriteLine("A=" + a + ",B=" + b +
",On T:" + System.Threading.Thread.CurrentThread.ManagedThreadId);
}
}
public class MyInput
{
//All my Input values
}
public class MyOutput
{
//All my output values
}
}
在上面的示例中,采用了以下策略来模拟处理时间,以展示每个方法专有线程的好处
- 函数
One()
和Four()
没有等待,是单线程的, - 函数
Two()
等待 1 秒,是单线程的。 - 函数
Three()
等待 2 秒,有 2 个线程。
使用这样的示例,我们期望得到以下结果:
- 对
One()
的所有调用将立即执行,因此,Two()
的BlockingCollection
将立即被填充。 - 在前两次执行
Two()
之后,我们必须看到一次Three()
的执行(因为当Three()
等待 2 秒时,可以执行两次Two()
调用)。 - 然后对于
Two()
的每次输出,我们必须看到一次Three()
的输出(双倍时长,双倍线程效果)。 - 我们必须看到
Four()
的输出,几乎与Three()
的输出同时出现,因为Four()
没有等待。
在我个人配备 2 个核心的笔记本电脑上,我可以看到下面列出的结果。我让您准备并运行其他示例/测试。
2015 年新年快乐!
在本系列的最后一部分,我们看到了第四个也是最后一个模式。我希望您(读者)喜欢这个系列,尽管如此,如果您仍有任何问题/评论/建议/改进,请告诉我。最后,我祝大家新年快乐!愿上帝保佑……享受并继续分享有趣的 [代码]!2015 年新年快乐!!!哟呼!!!
<< 管道、河流、Rails 和 Binder 模式 - 第二部分
历史
这是建议解决方案的 V1。
逻辑上,没有改变,但是,添加了一些用于可视化的图表。