使用并发和协调运行时实现的管道和过滤器并发设计模式






3.67/5 (3投票s)
一种用于并发消息传递的模块化方法。
引言
本文介绍了如何使用 Concurrency and Coordination Runtime (CCR) 来实现管道与过滤器设计模式。这个想法来源于 Bob Familiar 对 Stephen Tarmey 的一次采访 Bob Familiar had with Stephen Tarmey。
免责声明:请注意,我不是并发专家,这是我第一次尝试使用 CCR。
背景
管道与过滤器设计模式用于描述一个消息流,消息由源对象创建,由过滤器对象转换,并由接收器对象消费。经典的传统多线程编程是一种命令式编程,开发者分配线程(每个对象一个线程,每个消息一个线程,或者采用异步线程池方案)。例如,一个命令式实现会使用 Queue<>
作为管道,使用 ReaderWriter 锁来保护访问,并以标准的异步操作在线程池上执行对象处理。
另一方面,声明式编程定义了“做什么”,但没有定义“怎么做”。例如,LINQ 允许使用条件过滤数组,而无需关心数组遍历的具体顺序。同样,Concurrency and Coordination Runtime (CCR) 允许开发者声明每个对象“应该做什么”,而无需担心线程、锁等(嗯,几乎)。
让我们深入了解一些基本的 CCR 声明
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}
调度器是持有线程池的对象。第一个参数是使用的线程数,其中 0 表示将线程数设置为 Max(2,NumberOfProcessors) * ThreadPerCPU
。调度器队列保存了可以立即执行的待处理委托列表,这些委托正在等待线程可用。有关更多信息,请参阅文章 Concurrency and Coordination Runtime。
这是一个注册三个监听三个端口的方法的片段。端口持有消息,并持有对处理这些消息感兴趣的方法。
public void Initialize(
DispatcherQueue dispatcherQueue,
PortSet<correctpinmessage> inputPort,
PortSet<openalarm> outputPort)
{
...
Arbiter.Activate(
dispatcherQueue,
Arbiter.Interleave(
new TeardownReceiverGroup(),
new ExclusiveReceiverGroup(
Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
new ConcurrentReceiverGroup(
Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
}
这样阅读代码
- 每次
inputPort.Port0<correctpinmessage>
收到CorrectPinMessage
对象时,调用HandleSuccessMessage
。 - 每次
inputPort.Port1<wrongpinmessage>
收到WrongPinMessage
对象时,调用HandleFailureMessage
。 - 每次
_timeoutPort<datetime>
收到DateTime
对象时,调用HandleTimeout
方法。
Arbiter.Receive
的第一个参数(persist = true
)表示该方法在处理完第一条消息后会继续接收消息。
Arbiter.Interleave
声明定义了 CCR 使用的锁定机制。HandleSuccessMessage
和 HandleFailureMessage
在并发组中定义(本质上类似于 ReaderWriterLock.AcquireReaderLock()
)。
并发处理程序的实现必须是线程安全的(通过使用 interlocked increment 实现)。
void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
if (Interlocked.Increment(ref _wrongPinCount) == 1)
{
_firstWrongPinMessage = wrongPinMessage;
_dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
}
}
void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
Interlocked.Increment(ref _correctPinCount);
}
请注意,当收到第一条 WrongPinMessage
时,定时器会启动。几毫秒后,_timeoutPort
收到一个 DateTime
对象,这将触发 HandleTimeout
方法的调用。
由于 HandleTimeout
在独占组中定义(本质上类似于 ReaderWriterLock.AcquireWriterLock()
),因此处理程序的实现不需要是线程安全的。它在不使用任何锁或 interlocked 调用的情况下访问 _wrongPinCount
成员。
void HandleTimeout(DateTime time)
{
CardStatus newStatus = _currentCardStatus;
if (_correctPinCount > 0)
{
newStatus = CardStatus.Ok;
}
else if (_wrongPinCount > 3)
{
newStatus = CardStatus.Stolen;
}
else if (_wrongPinCount > 0)
{
newStatus = CardStatus.Warning;
}
...
}
HandleTimeout
方法“假定”自调用第一个 WrongPinMessage
以来已经过去了 X 毫秒。它使用该时间段内接收到的消息总数来更改状态机的内部状态。
有关 CCR 基本概念的更多信息,请参阅视频 CCR Programming - Jeffrey Richter and George Chrysanthakopoulos,以及 关于 CCR 的 OOPSLA/SCOOL 论文。Ccr.Core.dll 可作为 Microsoft Robotics Studio 的一部分下载。
使用代码
本文中的代码定义了一个管道抽象:源(有一个输出端口 - 绿色)、过滤器(有一个输入端口和一个输出端口 - 蓝色)和接收器(只有一个输入端口 - 黄色)。
演示项目描述了一个拥有带有秘密 PIN(个人识别码)的卡片的工人。CorrectPinMessage
描述了工人输入正确 PIN 的事件,类似地,WrongPinMessage
代表错误的 PIN 号码。“工人模拟源”对象创建新消息,状态机通过计数来合并相似的事件。每次状态更改都会生成一个 OpenAlert
消息或一个 CloseAlert
消息,这些消息由警报接收器显示在控制台窗口中。
每个管道对象实现 IMessageSink
、IMessageFilter
或 IMessageSink
。Initialize
接口方法用于调用 CCR 的 Arbiter.Activate()
方法。在所有对象都初始化完成后,使用 Start
方法开始生成消息。
interface IMessageSink<tinputport> where TInputPort : IPort
{
TInputPort InputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}
interface IMessageSource<toutputport> where TOutputPort: IPort
{
TOutputPort OutputPortSet { get; }
void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
void Start();
}
interface IMessageFilter<tinputport,toutputport>
{
TInputPort InputPort { get; }
TOutputPort OutputPort { get; }
void Initialize(DispatcherQueue dispatcherQueue,
TInputPort inputPort, TOutputPort outputPort);
}
扩展方法用于提供这些对象的串行和并行连接。串行连接是自然的源 ==> 过滤器 ==> 接收器消息流连接。并行连接允许 N 个源将消息发布到同一个输出端口。
static public IMessageSource<toutputport> ConnectTo<tinputport,>(
this IMessageSource<tinputport> source,
IMessageFilter<tinputport,> filter)
where TInputPort : IPort,new()
where TOutputPort : IPort
{ ... }
public static IMessageSource<tinputport> ConnectInParallel<tinputport>(
this IMessageSource<tinputport> source1,
IMessageSource<tinputport> source2)
where TInputPort : IPort
{ ... }
当源连接到接收器时,就创建了一个完整的管道。
interface IMessagePipeLine
{
void Start(DispatcherQueue dispatcherQueue);
}
public static IMessagePipeLine ConnectTo<tport>(
this IMessageSource<tport> source,
IMessageSink<tport> sink)
where TPort : IPort,new()
{ ... }
每个工人的状态机
为了保存每个工人的状态机,我们需要将消息解复用到新的状态机中。
免责声明:以下实现并非最优。我很乐意听取您对此的意见。
第一个过滤器将类型为 T
的消息转换为 KeyValuePair<key,t>
消息。键是输入 PIN 的工人的姓名。
class WorkerKeyValueFilter :
IMessageFilter<PortSet<CorrectPinMessage>,
Port<KeyValuePair<string,CorrectPinMessage>>>
{
void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
{
OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
}
}
第二个过滤器使用键将消息发布到专用端口。
class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey>
: IMessageFilter<TInputPort,TOutputPort>
where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new()
where TInputPort : IPortSet,new()
where TOutputPort : IPort
{
void HandleMessage(KeyValuePair<TKey,object> message)
{
var messageFilter = GetMessageFilter(message.Key);
messageFilter.InputPort.PostUnknownType(message.Value);
}
TMessageFilter GetMessageFilter(TKey key)
{
TMessageFilter filter;
if (!_messageFilters.TryGetValue(key, out filter))
{
filter = new TMessageFilter();
filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort);
_messageFilters.Add(key, filter);
}
return filter;
}
}
历史
- 2008 年 10 月 10 日:初始版本。