65.9K
CodeProject 正在变化。 阅读更多。
Home

使用并发和协调运行时实现的管道和过滤器并发设计模式

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.67/5 (3投票s)

2008年10月9日

Ms-PL

4分钟阅读

viewsIcon

38290

downloadIcon

409

一种用于并发消息传递的模块化方法。

引言

本文介绍了如何使用 Concurrency and Coordination Runtime (CCR) 来实现管道与过滤器设计模式。这个想法来源于 Bob Familiar 对 Stephen Tarmey 的一次采访 Bob Familiar had with Stephen Tarmey

免责声明:请注意,我不是并发专家,这是我第一次尝试使用 CCR。

背景

管道与过滤器设计模式用于描述一个消息流,消息由源对象创建,由过滤器对象转换,并由接收器对象消费。经典的传统多线程编程是一种命令式编程,开发者分配线程(每个对象一个线程,每个消息一个线程,或者采用异步线程池方案)。例如,一个命令式实现会使用 Queue<> 作为管道,使用 ReaderWriter 锁来保护访问,并以标准的异步操作在线程池上执行对象处理。

另一方面,声明式编程定义了“做什么”,但没有定义“怎么做”。例如,LINQ 允许使用条件过滤数组,而无需关心数组遍历的具体顺序。同样,Concurrency and Coordination Runtime (CCR) 允许开发者声明每个对象“应该做什么”,而无需担心线程、锁等(嗯,几乎)。

让我们深入了解一些基本的 CCR 声明

using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
   DispatcherQueue dq = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
}

调度器是持有线程池的对象。第一个参数是使用的线程数,其中 0 表示将线程数设置为 Max(2,NumberOfProcessors) * ThreadPerCPU。调度器队列保存了可以立即执行的待处理委托列表,这些委托正在等待线程可用。有关更多信息,请参阅文章 Concurrency and Coordination Runtime

这是一个注册三个监听三个端口的方法的片段。端口持有消息,并持有对处理这些消息感兴趣的方法。

public void Initialize(
               DispatcherQueue dispatcherQueue,
               PortSet<correctpinmessage> inputPort,
               PortSet<openalarm> outputPort)
{
    ...
    Arbiter.Activate(
        dispatcherQueue,
        Arbiter.Interleave(
            new TeardownReceiverGroup(),
            new ExclusiveReceiverGroup(
                Arbiter.Receive(true, _timeoutPort, HandleTimeout)),
            new ConcurrentReceiverGroup(
                Arbiter.Receive(true, InputPort.P0, HandleSuccessMessage),
                Arbiter.Receive(true, InputPort.P1, HandleFailureMessage))));
}

这样阅读代码

  • 每次 inputPort.Port0<correctpinmessage> 收到 CorrectPinMessage 对象时,调用 HandleSuccessMessage
  • 每次 inputPort.Port1<wrongpinmessage> 收到 WrongPinMessage 对象时,调用 HandleFailureMessage
  • 每次 _timeoutPort<datetime> 收到 DateTime 对象时,调用 HandleTimeout 方法。

Arbiter.Receive 的第一个参数(persist = true)表示该方法在处理完第一条消息后会继续接收消息。

Arbiter.Interleave 声明定义了 CCR 使用的锁定机制。HandleSuccessMessageHandleFailureMessage 在并发组中定义(本质上类似于 ReaderWriterLock.AcquireReaderLock())。

并发处理程序的实现必须是线程安全的(通过使用 interlocked increment 实现)。

void HandleFailureMessage(WrongPinMessage wrongPinMessage)
{
    if (Interlocked.Increment(ref _wrongPinCount) == 1)
    {
       _firstWrongPinMessage = wrongPinMessage;
       _dispatcherQueue.EnqueueTimer(_pollingPeriod, _timeoutPort);
    }
}

void HandleSuccessMessage(CorrectPinMessage correctPinMessage)
{
    Interlocked.Increment(ref _correctPinCount);
}

请注意,当收到第一条 WrongPinMessage 时,定时器会启动。几毫秒后,_timeoutPort 收到一个 DateTime 对象,这将触发 HandleTimeout 方法的调用。

由于 HandleTimeout 在独占组中定义(本质上类似于 ReaderWriterLock.AcquireWriterLock()),因此处理程序的实现不需要是线程安全的。它在不使用任何锁或 interlocked 调用的情况下访问 _wrongPinCount 成员。

void HandleTimeout(DateTime time)
{
   CardStatus newStatus = _currentCardStatus;

   if (_correctPinCount > 0)
   {
       newStatus = CardStatus.Ok;
   }
   else if (_wrongPinCount > 3)
   {
        newStatus = CardStatus.Stolen;
   }
   else if (_wrongPinCount > 0)
   {
        newStatus = CardStatus.Warning;
   }
   ...
}

HandleTimeout 方法“假定”自调用第一个 WrongPinMessage 以来已经过去了 X 毫秒。它使用该时间段内接收到的消息总数来更改状态机的内部状态。

有关 CCR 基本概念的更多信息,请参阅视频 CCR Programming - Jeffrey Richter and George Chrysanthakopoulos,以及 关于 CCR 的 OOPSLA/SCOOL 论文Ccr.Core.dll 可作为 Microsoft Robotics Studio 的一部分下载。

使用代码

本文中的代码定义了一个管道抽象:源(有一个输出端口 - 绿色)、过滤器(有一个输入端口和一个输出端口 - 蓝色)和接收器(只有一个输入端口 - 黄色)。

演示项目描述了一个拥有带有秘密 PIN(个人识别码)的卡片的工人。CorrectPinMessage 描述了工人输入正确 PIN 的事件,类似地,WrongPinMessage 代表错误的 PIN 号码。“工人模拟源”对象创建新消息,状态机通过计数来合并相似的事件。每次状态更改都会生成一个 OpenAlert 消息或一个 CloseAlert 消息,这些消息由警报接收器显示在控制台窗口中。

每个管道对象实现 IMessageSinkIMessageFilterIMessageSinkInitialize 接口方法用于调用 CCR 的 Arbiter.Activate() 方法。在所有对象都初始化完成后,使用 Start 方法开始生成消息。

interface IMessageSink<tinputport> where TInputPort : IPort
{
   TInputPort InputPortSet { get; }
   void Initialize(DispatcherQueue dispatcherQueue, TInputPort inputPortSet);
}

interface IMessageSource<toutputport> where TOutputPort: IPort
{
    TOutputPort OutputPortSet { get; }
    void Initialize(DispatcherQueue dispatcherQueue, TOutputPort outputPortSet);
    void Start();
}

interface IMessageFilter<tinputport,toutputport>
{
    TInputPort InputPort { get; }
    TOutputPort OutputPort { get; }
    void Initialize(DispatcherQueue dispatcherQueue, 
         TInputPort inputPort, TOutputPort outputPort);
}

扩展方法用于提供这些对象的串行和并行连接。串行连接是自然的源 ==> 过滤器 ==> 接收器消息流连接。并行连接允许 N 个源将消息发布到同一个输出端口。

static public IMessageSource<toutputport> ConnectTo<tinputport,>(
            this IMessageSource<tinputport> source,
            IMessageFilter<tinputport,> filter)
            where TInputPort : IPort,new()
            where TOutputPort : IPort
{ ... }

public static IMessageSource<tinputport> ConnectInParallel<tinputport>(
            this IMessageSource<tinputport> source1, 
            IMessageSource<tinputport> source2)
            where TInputPort : IPort
{ ... }

当源连接到接收器时,就创建了一个完整的管道。

interface IMessagePipeLine
{
     void Start(DispatcherQueue dispatcherQueue);
}
    
public static IMessagePipeLine ConnectTo<tport>(
            this IMessageSource<tport> source, 
            IMessageSink<tport> sink)
            where TPort : IPort,new()
{ ... }

每个工人的状态机

为了保存每个工人的状态机,我们需要将消息解复用到新的状态机中。

免责声明:以下实现并非最优。我很乐意听取您对此的意见。

第一个过滤器将类型为 T 的消息转换为 KeyValuePair<key,t> 消息。键是输入 PIN 的工人的姓名。

class WorkerKeyValueFilter : 
   IMessageFilter<PortSet<CorrectPinMessage>, 
   Port<KeyValuePair<string,CorrectPinMessage>>>
{
   void HandleMessage<T>(T message) where T : WorkCardSwipeMessageBase
   {
      OutputPort.Post(new KeyValuePair<string,T>(message.Name, message));
   }
}

第二个过滤器使用键将消息发布到专用端口。

class DemuxMessageFilter<TMessageFilter, TInputPort,TOutputPort,TKey> 
        : IMessageFilter<TInputPort,TOutputPort>
        where TMessageFilter : IMessageFilter<TInputPort,TOutputPort>, new() 
        where TInputPort : IPortSet,new() 
        where TOutputPort : IPort
{

   void HandleMessage(KeyValuePair<TKey,object> message) 
   { 
      var messageFilter = GetMessageFilter(message.Key); 
      messageFilter.InputPort.PostUnknownType(message.Value); 
   } 

   TMessageFilter GetMessageFilter(TKey key) 
   { 
      TMessageFilter filter; 
      if (!_messageFilters.TryGetValue(key, out filter)) 
      { 
         filter = new TMessageFilter(); 
         filter.Initialize(_dispatcherQueue, new TInputPort(), OutputPort); 
         _messageFilters.Add(key, filter); 
      } 
      return filter; 
   }
}

历史

  • 2008 年 10 月 10 日:初始版本。
© . All rights reserved.