管道、河流、轨道和绑定器模式 - 第二部分





5.00/5 (4投票s)
探讨可能的并行生产者-消费者模式(第二部分)
回顾
在本系列文章中,我曾向您(读者)承诺将讨论以下四种并行生产者-消费者模式的实现:
管道 (Pipes)
:一种基于“出入”成员的简单设计。此外,还需要 Action<TInput, TOutput> 委托作为最终结果处理程序。河流 (River)
:对Pipes
的改进,消除了对“出入”关系的需求。管道仅由Action<TInput, TOutput>
委托组成,输入/输出对在整个管道中流动。因此,无需最终结果处理程序。轨道 (Rails)
:此模式与River
类似,但需要实现接口。输入数据是一个类,并实现所需的接口,其中接口代表管道逻辑。此实现的好处在于,可以使用同一个管道处理多种输入类型。绑定器 (Binder)
:对River
的扩展,它解耦了管道成员,并提供了为每个管道成员分配所需工作线程的机制。
在第一部分(管道、河流、轨道和绑定器模式 - 第一部分)中,我们已经了解了 Pipes
的实现,希望您喜欢它;如果您觉得它对您有任何帮助,我将非常高兴收到您的反馈。在本文中,我们将通过示例讨论 River
和 Rails
的实现。最后,按照我的承诺,我将在本系列的最后一篇中为您介绍 Binder
。
为什么不使用管道?
当管道成员只需要最后一个计算值时,实现 Pipes
是有用的。然而,在实践中,它过于受限,而且大多数情况下,我们需要的不只是最后一个计算值(例如,其他输入以及/或最后的输出)。考虑以下情况:
- 每个管道需要访问一些其他输入以及/或前一个管道的输出。
- 最终输出是每个管道的增量输出。
- 管道可以被描述为工作流。
- 创建“出入”关系过于繁琐。
在这些情况下,我们需要超越 Pipes
(或 MSDN Pipelines)的东西。因此,我提出了两种新模式:River
和 Rails
。让我们来看看它们。
第二种模式:河流 (River)
我将直接向您展示 River
的代码,然后我们将了解它是如何以及为何与 Pipes
不同。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace PipelinePartTwo
{
public sealed class River<TInput, TOutput>
where TOutput : new()
{
private Action<TInput, TOutput> _currentFlow;
private int _maxConcurrency;
private BlockingCollection<TInput> _flowCollection = null;
private CancellationTokenSource _cancelSource = null;
private Task _flowProcessor = null;
public River(Action<TInput, TOutput> currentFlow,
int maxConcurrency = -1)
{
_currentFlow = currentFlow;
_maxConcurrency = maxConcurrency;
}
public River<TInput, TOutput> Flow(Action<TInput, TOutput> newFlow)
{
return new River<TInput, TOutput>((inputValue, outPutValue) =>
{
_currentFlow(inputValue, outPutValue);
newFlow(inputValue, outPutValue);
}, _maxConcurrency);
}
public bool AddValue(TInput inputValue)
{
return _flowCollection.TryAdd(inputValue);
}
public void StopProcessing(bool waitForProcessing = false)
{
_flowCollection.CompleteAdding();
if (waitForProcessing)
_flowProcessor.Wait();
}
public void AbortProcessing()
{
_flowCollection.CompleteAdding();
_cancelSource.Cancel();
}
public void StartProcessing(Action<TInput, TOutput, string, Exception> errorHandler,
string operationCode = "My_Operation_Name")
{
_flowCollection = new BlockingCollection<TInput>();
_cancelSource = new CancellationTokenSource();
var option = new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency),
CancellationToken = _cancelSource.Token
};
_flowProcessor = Task.Factory.StartNew(() =>
{
try
{
Parallel.ForEach(_flowCollection.GetConsumingEnumerable(),
option,
flowInput =>
{
var outPut = new TOutput();
try
{
option.CancellationToken.ThrowIfCancellationRequested();
_currentFlow(flowInput, outPut);
}
catch (Exception e)
{
errorHandler(flowInput,
outPut,
"Error occurred inside " +
operationCode + " pipeline.",
e);
}
});
}
catch (OperationCanceledException)
{
}
});
}
}
}
如果您一直关注本系列,那么您现在一定已经猜到 Pipes
和这种(River
)实现之间的主要区别只有一行代码。是的,您说对了!这取决于 Flow
函数内部构造管道的方式。在 Pipes
中,我们建立了“出入”关系,而在 River
中,我们依次调用 Action
方法。
public River<TInput, TOutput> Flow(Action<TInput, TOutput> newFlow)
{
return new River<TInput, TOutput>((inputValue, outPutValue) =>
{
_currentFlow(inputValue, outPutValue); //First call the current member
newFlow(inputValue, outPutValue); //Now we will call the new member
}, _maxConcurrency);
}
这种设计使我们能够自由地访问输入/输出变量中的信息,并在流中对其进行修改。可以轻松删除简单的约束“where TOutput : new()
”,而改为创建一个包装器,就像我们在实现 Pipes
时所做的那样。以下是一个简单的用法示例及其输出屏幕截图:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace PipelinePartTwo
{
class Program
{
static void Main(string[] args)
{
SomeCode();
Console.WriteLine();
SomeCodeWithError();
Console.ReadLine();
}
private static void SomeCode()
{
var r = new River<MyInput, MyOutput>(First)
.Flow(Second)
.Flow(Third)
.Flow(Forth)
.Flow(FinalHandle);
r.StartProcessing(ErrorHandler, "MyRiverExample");
r.AddValue(new MyInput());
r.StopProcessing(true);
}
private static void SomeCodeWithError()
{
var r = new River<MyInput, MyOutput>(First)
.Flow(Second)
.Flow(Third)
.Flow(ThrowError)
.Flow(Forth)
.Flow(FinalHandle);
r.StartProcessing(ErrorHandler, "MyRiverExampleWithError");
r.AddValue(new MyInput());
r.StopProcessing(true);
}
static void FinalHandle(MyInput i, MyOutput p)
{
Console.WriteLine("MyRiverExample's Output is " + p.CalledCount);
}
private static void Forth(MyInput i, MyOutput p)
{
Console.WriteLine("Inside Forth: Computing based on MyInput and MyOutput");
p.CalledCount++;
}
private static void Third(MyInput i, MyOutput p)
{
Console.WriteLine("Inside Third: Computing based on MyInput and MyOutput");
p.CalledCount++;
}
private static void Second(MyInput i, MyOutput p)
{
Console.WriteLine("Inside Second: Computing based on MyInput and MyOutput");
p.CalledCount++;
}
private static void First(MyInput i, MyOutput p)
{
Console.WriteLine("Inside First: Computing based on MyInput and MyOutput");
p.CalledCount++;
}
private static void ThrowError(MyInput i, MyOutput p)
{
Console.WriteLine("Inside ThrowError: I will throw error");
throw new Exception("My Exception");
}
private static void ErrorHandler<TOne, TTwo>(TOne first, TTwo second, string name, Exception e)
{
Console.WriteLine(name + "." + Environment.NewLine +
"When Input: " + first.ToString() + Environment.NewLine +
"And Output: " + second.ToString() + Environment.NewLine +
"Error Details: " + e.Message);
}
}
public class MyOutput
{
public int CalledCount = 0;
//##### All Required OUTPUT Values
public override string ToString()
{
return "MyOutput Count is " + CalledCount;
}
}
public class MyInput
{
//##### All Required INPUT Values
public override string ToString()
{
return "I am MyInput";
}
}
}
第三种模式:轨道 (Rails)
Pipes
和 River
这两种模式都需要专门的管道实例来实现值处理。现在,想象一下,如果一个 Pipeline
实例可以适用于所有情况,那么我们就可以构建一个全局实例并在应用程序生命周期的任何地方使用它。我承认这有点过于想象了,然而,Rails
的实现基于这个想法。正如,一旦 rail
(运输线路)可用,任何兼容的火车都可以运行在上面;根据同样的类比,一旦 Rails
实例可用,任何兼容的管道都可以被处理。在 C# 世界中,一种实现兼容性的方法是通过使用 Interface
,因此,此模式基于一个特定的(且简单的)Interface
,名为 IRail
(当然),可以这样编写:
public interface IRails
{
void ProcessData();
void HandleError(Exception e);
}
是的,您猜对了!!!有了这种模式,只需要执行以下步骤:
- 构建
Rails
的单例实例。 - 准备您的类并实现
IRail
,将所有计算逻辑放在ProcessData()
中。 - 创建这些类的实例并将它们作为输入传递给管道。
- ……就是这样!
Rails
的实现如下(我已移除 AbortProcessing()
,因为它对于这种应用程序生命周期的实例来说没有意义,但是,您可以根据自己的需求决定是否包含它)。
public sealed class Rails
{
private int _maxConcurrency;
private BlockingCollection<IRail> _dataCollection = null;
private Task _dataProcessor = null;
public Rails(int maxConcurrency = -1)
{
_maxConcurrency = maxConcurrency;
}
public bool AddValue(IRail inputValue)
{
return _dataCollection.TryAdd(inputValue);
}
public void StopProcessing(bool waitForProcessing = false)
{
_dataCollection.CompleteAdding();
if (waitForProcessing)
_dataProcessor.Wait();
}
public void StartProcessing()
{
_dataCollection = new BlockingCollection<IRail>();
var option = new ParallelOptions{MaxDegreeOfParallelism = Math.Max(-1, _maxConcurrency)};
_dataProcessor = Task.Factory.StartNew(() =>
{
Parallel.ForEach(_dataCollection.GetConsumingEnumerable(),
option,
inputData =>
{
try
{
inputData.ProcessData();
}
catch (Exception e)
{
inputData.HandleError(e);
}
});
});
}
}
以下是 Rails
模式的简单用法示例和示例输出屏幕截图:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace PipelinePartTwo
{
class Program
{
private static readonly Rails Singleton = new Rails();//Creating Instance
static void Main(string[] args)
{
Singleton.StartProcessing(); // This would go to app start method
Singleton.AddValue(new RailOne()); // This can be called anywhere
Singleton.AddValue(new RailTwo()); // This can be called anywhere
Singleton.StopProcessing(true); // This would go to app stop method
Console.ReadLine();
}
public static void HandleError(string persoMess, Exception e)
{
Console.WriteLine(persoMess + Environment.NewLine +
"Error Details: " + e.Message);
}
}
public class RailOne : IRail
{
//private MyOverallInput EverythingRequiredAsInput;
//private MyOverallOutput EverythingRequiredAsOutput;
public RailOne(/*My Ctor Input*/)
{
//Init my inputs
//Init my outputs
}
public void ProcessData()
{
ProcessDataOne();
ProcessDataTwo();
ProcessDataThree();
}
private void ProcessDataOne()
{
Console.WriteLine("RailOne: ProcessDataOne");
}
private void ProcessDataTwo()
{
Console.WriteLine("RailOne: ProcessDataTwo");
}
private void ProcessDataThree()
{
Console.WriteLine("RailOne: ProcessDataThree");
}
public void HandleError(Exception e)
{
Program.HandleError("Error Occurred In RailOne", e);
}
}
public class RailTwo : IRail
{
//private MyOverallInput EverythingRequiredAsInput;
//private MyOverallOutput EverythingRequiredAsOutput;
public RailTwo(/*My Ctor Input*/)
{
//Init my inputs
//Init my outputs
}
public void ProcessData()
{
ProcessDataOne();
}
private void ProcessDataOne()
{
Console.WriteLine("RailTwo: ProcessDataOne");
ProcessDataTwo(true);
}
private void ProcessDataTwo(bool error)
{
Console.WriteLine("RailTwo: ProcessDataOne throws exception...");
if(error)
throw new Exception("RailTwo is in exception");
ProcessDataThree();
}
private void ProcessDataThree()
{
Console.WriteLine("RailTwo: ProcessDataThree");
}
public void HandleError(Exception e)
{
Program.HandleError("Error Occurred In RailTwo", e);
}
}
}
到目前为止…
到目前为止,我们已经了解了以下 4 种模式中的 3 种:
Pipes
:当管道成员之间存在“出入”关系时很有用。但否则则无用。River
:当执行管道中的计算需要多种(通用)信息时。管道中的成员可以轻松添加/删除(根据需求变化),因为管道成员的签名是完全相同的。为每个功能单元创建一个管道成员可能是明智的。Rails
:与River
相同,但当需要处理多种类型的数据时非常有用。所有计算逻辑都可以包装在IRail
类中,并包含所需的输入/输出,然后将其传递给此管道进行处理。
希望您喜欢这个系列,我保证会尽快推出最后一篇。
历史
- 建议解决方案 V1