65.9K
CodeProject 正在变化。 阅读更多。
Home

WS-Eventing for WCF (Indigo)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.90/5 (33投票s)

2006年6月4日

CPOL

25分钟阅读

viewsIcon

237445

downloadIcon

920

本文介绍了由新的微软通信模型 WCF (Windows Communication Foundation) 驱动的分布式应用程序的 WS-Eventing 的设计、实现和使用。

目录

注意:本文由 .NET Framework 3.0 (RTM) 编写

引言

事件驱动的异步分布式架构需要一种通用的交互模式,用于跨服务边界传输工作流状态。Web 服务事件(WS-Eventing)代表了一种基于 SOAP 的发布/订阅通信模式规范。这些规范能够根据事件兴趣(称为订阅)将事件源与其消费者解耦。WS-Eventing 规范由 IBM、BEA Systems、TIBCO Software、Sun Microsystems、Computer Associates 和 Microsoft 共同创建。2006 年 3 月 15 日,它被提交给 W3C(HP、IBM、Intel 和 Microsoft)作为 Web 服务标准。有关更多详细信息,请参见此处

Web 服务标准在 WS-Eventing 之上引入了一些新的规范,例如 WS-EventingNotification 和 WS-ResourceTransfer。这些新的 WS-* 规范可以标准化面向服务架构 (SOA) 中 WS-Eventing 的交互,例如,使用 WS-ResourceTransfer 进行订阅存储,并使用 WS-EventingNotification 将通知消息传递到事件接收器。请注意,这些规范是新的,因此尚未公开发布。本文描述了 WS-Eventing 规范在基础层面的设计、实现和使用,并具备使用上述新规范进行未来扩展的能力。

概念与设计实现

为了更好地理解事件通知系统和消息交换模式的概念,让我们从下图所示的简单通知案例开始

在上面的图中,我假设一些端点希望从发生某种情况(事件)的地方获得通知。发生情况的地方在 WS-Eventing 规范中称为 Event Source,即消息的启动者(生产者)。该消息称为 Event Message,因为该消息与某种事件情况相关。事件消息被发送到等待此事件情况的服务,即 Event Sink。Event Sink 由地址、绑定和契约 (ABC) 描述,称为 ServiceEndpoint,当然也由行为描述。然而,这对于每个 Event Sink 来说都是局部主观的。

现在,接下来的问题是:“事件源如何知道事件接收器和传递机制?” 最低要求是知道事件接收器的基本地址。为了获取事件接收器端点的绑定信息,我们可以使用 WS-MEX (Web Service MetadataExchange) 消息模式。此外,事件源需要知道如何将事件消息传递到目标接收器服务。这种传递机制在 WS-Eventing 中称为交付模式。

根据以上分析,事件源需要一些元数据,以便消息生产者能够为每种情况正确生成事件消息。与事件接收器兴趣相关的这些元数据称为 Subscription。订阅可以根据事件情况,按与应用程序特定的事件源(例如:天气、股票市场、杂志等)相关的事件主题进行组织。

还有一件事:每个事件情况都代表一些事件值——即事件接收器和事件源之间的数据契约——可以作为事件接收器兴趣的条件,用于传递通知消息。这种模式可以在订阅中使用 Filter 元素来定义。请注意,WS-Eventing 规范不限制通知,它仅取决于契约。基本上,任何基于 SOAP 的消息都可以是事件消息。但是,如果事件接收器需要一些额外的应用程序特定头块,订阅将被保留。此主题将在本文后面介绍。

事件源

事件源隐藏了许多 WS-Eventing 功能。让我们详细看看它们。我们知道事件接收器及其源之间的逻辑连接(兴趣)在名为 Subscription 的资源中定义(抽象)。对于与事件接收器的消息交换模式,事件源包含一个依赖于一个或多个事件主题的消息生产者(服务)。此生产者(Notification Manager)将事件接收器从事件源封装起来。下图显示了其在事件源中的位置

我们假设以某种方式将订阅存储在事件存储中。这可以通过管理方式(手动)或以编程方式使用 DAL 层或 WS-Transfer 服务来完成。此外,通知管理器能够根据事件主题查询事件存储以获取订阅。订阅作为资源与资源描述符一起存储在存储中。以下代码片段显示了订阅及其描述符的示例

<ResourceDescriptor>
  <Name>Subscription for weather report</Name>
  <Key>uuid:770b3a90-27c8-419e-94ae-301313d52793</Key>
  <Topic>weather:report.storm</Topic>
  <Expires>2007-05-22T05:58:14.4838Z</Expires>
  <Created>2006-05-22T06:35:57.7556Z</Created>
  <Updated/>
</ResourceDescriptor>


<wse:Subscription>  
  <wse:Delivery wse:Mode=
    "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push">
    <wse:NotifyTo>
      <a:Address>net.tcp://:33333/OnStormWarning</Address>
      <a:ReferenceParameters xmlns="http://www.w3.org/2005/08/addressing">
        <MySubscription xmlns="urn:MyNamespace">1234567890</MySubscription>
      </a:ReferenceParameters>
    </wse:NotifyTo>
  </wse:Delivery>
  <wse:EndTo>
    <a:Address>net.tcp://:44444/Admin</Address>
    <a:ReferenceParameters>
      <MySubscription xmlns="urn:MyNamespace">1234567890</MySubscription>
    </a:ReferenceParameters>
  </wse:EndTo>
  <wse:Expires>2007-05-22T05:58:14.4838368Z</wse:Expires>
  <wse:Filter xmlns:ow="http://www.example.org/oceanwatch">
     //ow:WindReport/ow:Speed <= 65
  </wse:Filter>
</wse:Subscription>

上述订阅示例包含主要的元数据,即通知管理器服务的交付模式知识库。它代表了事件接收器的兴趣。当事件源中发生情况时,特定的事件主题会根据内部连接知识库传递给通知管理器服务,例如:weather:report.storm。遍历此订阅集合时,服务会查找过滤器。它将在情况值(消息负载)上执行其表达式。当过滤通过时,交付元素开始根据交付模式进行处理。

在我们的示例中,交付模式是这个其中事件消息以即发即弃(异步)方式强制推送到接收器。服务调度程序将为 NotifyTo EndpointAddress 启动一个输出通道目标。请注意,NotifyTo 的 ReferenceParameters 将添加到消息头中,并原样传递到接收器(接收器和订阅之间松散耦合的关联信息)。上述订阅包含附加信息,例如 EndTo EndpointAddress,这对于传递有关事件源状态(禁用、关闭等)的通知消息非常有用。此外,还可以向订阅添加附加信息,例如订阅策略、绑定密钥等。

注意

事件消息根据 ABC 描述传递给事件接收器。在用于 Web 服务的 WS-Eventing 中,绑定使用 BasicHttpBinding (Soap 1.1)。那么其他绑定呢,例如 tcp、msmq、namepipe 等?事件源如何知道特定事件接收器的绑定?或者,当事件源需要向数千个接收器传递通知消息时,可伸缩性如何?

WS-Eventing 规范没有指定这些问题中的任何一个。其中一些将需要向订阅添加额外信息或使用其他 WS-* 规范来获取。例如,WS-MetadataExchange 可以获取事件接收器的元数据以创建代理。当然,通知管理器的这一部分必须根据应用程序要求进行定制。本文描述了如何实现基本的事件消息推送传递。

基于以上描述,你可能会有一个根本性的问题。WS-Eventing 规范模式在哪里?事件源能否基于数据和操作契约向事件接收器发送消息?这是一个很好的问题。WS-Eventing 没有指定任何关于事件通知消息的内容,只有一个 Push (即发即弃) 传递模式。看起来通知管理器正在根据订阅执行路由工作,而无需了解事件情况。这是正确的。

例如:事件接收器可以通过电子邮件(或 FedEx)发送订阅,然后管理员将手动将其添加(订阅)到事件存储中,这将激活事件接收器以获取其通知兴趣。这种离线订阅过程被许多应用程序用于私有发布/订阅通知系统。这是基于事件源、订阅/通知管理器和事件接收器这三个参与者的 WS-Eventing 增量开发的第一步。

在线订阅 (WS-Eventing)

在事件源中添加一个参与者以订阅和管理事件存储中的订阅,可以实时控制发布/订阅通知。此服务(Subscription Manager)可以具有私有和/或标准化(由 W3C 组织)的操作契约,称为 WS-Eventing。下图显示了 WS-Eventing 模式中的参与者

上图在事件源中添加了一个订阅管理器服务。该服务允许名为 Subscriber 的服务请求者通过 WS-Eventing 操作(如 Subscribe、Unsubscribe、Renew 和 GetStatus)管理订阅资源。它看起来像一个 WS-Transfer 资源工厂和具有 Create、Delete、Put 和 Get 操作。订阅管理器和通知管理器共享包含订阅资源的事件存储。它们是独立的实体,可以托管在同一 appDomain 或跨网络上,并使用 WS-Transfer 访问事件存储。

请注意,WS-Eventing 未指定用于订阅订阅者兴趣的事件主题。组织发布/订阅通知系统的一种方式是根据事件主题启动订阅/通知管理器并分配其 ABC 端点地址。另一种方式是在订阅者兴趣中使用自定义私有标头(例如 Topic)。当然,此标头必须被订阅管理器识别。

交付模式

WS-Eventing 仅概述了推送通知模型。这种传递是基于异步推送(路由)通知消息到事件接收器,采用即发即弃的方式。其他可自定义的模式是

  • 批处理,事件源允许将多个通知批处理成一个通知消息。这将减少事件源和接收器之间的通知流量。
  • 拉取,事件接收器负责定期轮询事件源,并从事件源在订阅操作中给出的端点拉取通知。
  • 陷阱,事件源使用 UDP 多播地址将通知消息推送到事件接收器。
  • 封装,事件源向其消费者——事件接收器——发送唯一的封装通知消息。
  • 带确认的推送,事件源发送通知消息并等待其确认。

如前所述,通知管理器以透明的方式,根据订阅者的兴趣(订阅),将称为事件消息的事件情况消息推送到其消费者。这允许链接通知管理器并根据事件源以及位于不同事件存储中的订阅对它们进行分组。另一种情况是在特殊的接收器中拦截事件消息,如下图所示

在上图中,存储服务代表了三个参与者,即事件接收器、存储和事件源,用于处理批处理和拉取交付模式。Fire #1 消息在存储中累积,然后触发事件情况 Fire #2,并将其作为单个事件消息传递到目标事件接收器。存储中累积消息的数量可以根据数量、时间戳或持续时间进行配置。

上图还展示了拉取交付模式的一种模式。事件接收器可以定期轮询存储以拉取存储的事件消息 Fire#1。根据存储实现,两种类型的事件接收器可以并发工作。请注意,此模型由分布式订阅和“特殊”接收器服务驱动,其中本地事件源仅了解其自身周围的环境。

最后,我们来到了所有发布/订阅通知系统的主要参与者都已解释清楚的地方。现在我们可以专注于 WS-Eventing 规范及其通过微软 Windows Communication Foundation(即 Indigo)技术的实现。

WS-Eventing

WS-Eventing 是一个基于 WS-Addressing 的规范,用于事件接收器和事件源之间的消息交换,以注册对某些事件的兴趣。以下是主要亮点

  • 有三个参与者:事件接收器、事件源和订阅管理器。
  • 事件源负责控制事件接收器感兴趣的信息。
  • 事件接收器通过操作契约(如 Subscribe)向事件源订阅兴趣。
  • 订阅是表示事件接收器兴趣的资源。
  • 当订阅终止时(例如,生命周期到期,服务不可用等),事件源能够向事件接收器发送通知消息——称为 SubscriptionEnd
  • 订阅管理器负责使用操作契约(如 UnsubscribeRenewGetStatus)管理订阅的生命周期。
  • 交付模式是事件源向事件接收器发送通知消息的传输模式。默认模式是 push

基于本文的一些补充要点

  • 订阅者是事件接收器的代表,负责处理事件源中的兴趣。
  • 通知管理器是事件源的代表,负责根据订阅(兴趣)向事件接收器发送通知消息。
  • 事件存储是用于存储订阅的资源驱动服务。

将上述所有要点围绕 WS-* 服务总线并根据 WS-Eventing 模式进行编排,我们可以得到下图所示的发布/订阅通知系统的逻辑模型

Screenshot - DrawingEventing5.jpg

该模型可以提供分布式通知,其中事件存储、事件源、事件接收器和管理器通过 WS-Transfer 和 WS-Eventing 模式逻辑连接。换句话说,有些公司可能只支持存储系统(例如 亚马逊 S3 - 简单存储服务),而其他公司则可以支持订阅管理器服务和通知管理器服务等。当您的应用程序希望收到某种情况的通知时,可以租用这些 WS-Eventing 服务来满足您的需求。

让我们描述应用程序如何向事件源订阅兴趣。在事件源中生成通知到事件接收器的工作流过程是什么?我将使用以下序列图来演示这个案例

假设订阅者(事件接收者代表)知道事件源的端点地址(ABC)。那么

  1. 为了从事件源接收事件情况,订阅者发起调用并使用 WS-Eventing Subscribe 操作向事件源代表服务发送兴趣(订阅)请求。此操作称为 Subscribe。以下是 Subscribe 请求的示例
<s:Envelope 
    xmlns:s="http://www.w3.org/2003/05/soap-envelope" 
    xmlns:a="http://www.w3.org/2005/08/addressing"
    xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing">
  <s:Header>
    <a:Action s:mustUnderstand="1">
      http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
    </a:Action>
    <h:SubscriptionTopic xmlns:h=
    "http://www.rkiss.net/schemas/sb/2004/08/servicebus">
      weather:report.storm
    </h:SubscriptionTopic>
    <a:MessageID>urn:uuid:b373f5d9-d6e9-471d-af73-28544290f146<
      /a:MessageID>
    <a:ReplyTo>
      <a:Address>http://www.w3.org/2005/08/addressing/anonymous<
      /a:Address>
    </a:ReplyTo>
    <a:To s:mustUnderstand="1"
      >net.tcp://:11111/SubscriptionManager</a:To>
  </s:Header>
  <s:Body>
    <wse:Subscribe>
    <wse:EndTo>
      <a:Address>net.tcp://:44444/Admin</a:Address>
      <a:ReferenceParameters>
        <MySubscription xmlns="urn:MyNamespace">1234567890<
          /MySubscription>
      </a:ReferenceParameters>
    </wse:EndTo>
    <wse:Delivery>
      <wse:NotifyTo>
        <a:Address>net.tcp://:33333/OnStormWarning</a:Address>
        <a:ReferenceParameters>
          <MySubscription xmlns="urn:MyNamespace">1234567890<
            /MySubscription>
        </a:ReferenceParameters>
      </wse:NotifyTo>
    </wse:Delivery>
    <wse:Expires>PT5M</wse:Expires>
    <wse:Filter xmlns:ow="http://www.example.org/oceanwatch">
      //ow:WindReport/ow:Speed &gt;= 65
    </wse:Filter>
    </wse:Subscribe>
  </s:Body>
</s:Envelope>

头部块使用 WS-Addressing 规范描述了 Subscribe 请求的目的地以及其响应将回复到的位置。这种 MEP 方式允许向 replyTo 服务传递额外的应用程序特定信息,例如 applicationId 等。在我们的示例中,响应会返回给订阅者。

消息负载描述了应用程序对特定事件的兴趣。它包含用于传递事件消息的元数据,可以选择包含一些应用程序特定数据、通知服务的租赁时间、由 Filter 元素表示的情况值兴趣,以及订阅终止时的故障消息将传递到的位置。正如我之前提到的,WS-Eventing 是在 WS-Addressing 之上设计的。因此,任何附加到目标地址的信息都可以添加到 ReferenceProperties 中。没有限制。它可能是一个小字符串或复杂类型,可以被目标服务识别。

  1. 订阅管理器验证消息正文,例如交付模式、过滤模式和过期时间。如果没有匹配(支持),订阅管理器将发送一个应用程序特定的故障消息(例如 FilteringNotSupported)。当兴趣被接受时,管理器将创建一个订阅资源并请求存储将其持久化。存储可以通过 WS-Transfer 接口访问,或者像本文中实现的那样简单地使用内部适配器。请注意,每个订阅都有一个唯一的标识符,用于其服务操作目的。基本上,订阅可以像资源一样存储,并带有其自己的资源描述符,用于额外的例如键、主题、创建时间戳等信息。此模式将资源主体从其标识中封装起来。
  1. 订阅管理器向指定地址返回响应消息,该地址可以是请求的发起者,也可以是 ReplyTo 头部定义的任何目标。响应消息负载非常简单。它包含订阅管理器服务终结点地址,其中包括订阅标识符。基本上,任何拥有此终结点地址的人都将被允许管理事件源存储中的特定订阅。以下代码片段显示了 SubscribeResponse 消息的示例
<s:Envelope 
    xmlns:a="http://www.w3.org/2005/08/addressing" 
    xmlns:s="http://www.w3.org/2003/05/soap-envelope"
    xmlns:wse="http://schemas.xmlsoap.org/ws/2004/08/eventing">
  <s:Header>
    <a:Action s:mustUnderstand="1">
      http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse
    </a:Action>
    <a:RelatesTo>urn:uuid:b373f5d9-d6e9-471d-af73-28544290f146<
      /a:RelatesTo>
  </s:Header>
  <s:Body>
    <wse:SubscribeResponse>
      <wse:SubscriptionManager>
        <a:Address>net.tcp://:11111/SubscriptionManager<
          /a:Address>
        <a:ReferenceParameters>
          <wse:Identifier>uuid:2c125232-93cd-4ea7-a9a1-ee2b163266fd<
            /wse:Identifier>
        </a:ReferenceParameters>
      </wse:SubscriptionManager>
      <wse:Expires>2006-06-04T04:24:46.5194432Z</wse:Expires>
    </wse:SubscribeResponse>
  </s:Body>
</s:Envelope>
  1. 订阅阶段完成。兴趣(订阅)已存储在事件存储中,事件接收器正在等待通知消息(默认交付模式 = 推送)。当然,我们也可以开始管理订阅,例如取消订阅、续订或查询状态。所以,让我们在事件源中制造相同的情况,这将触发“发送内部消息”事件到通知管理器。
  1. 通知管理器从特定主题源接收事件。此事件必须根据已注册的、作为订阅持久化的兴趣,传递给所有事件接收器。因此,管理器会向事件存储请求此主题的所有订阅。如果管理器未配置特定事件主题,存储将返回所有可用的订阅。遍历所有订阅后,管理器将检查过滤器的匹配交付,并将工作委托给 ThreadPool 以异步即发即弃的方式将消息传递到 NotifyTo 目标。

请注意,WS-Eventing 规范仅支持一项操作,用于特定地更新事件存储中已订阅的兴趣。此续订操作仅允许更改订阅的过期时间。对于其他更改,例如过滤器、交付模式等,事件接收器需要执行两个步骤。第一步是执行取消订阅操作,然后创建一个新的订阅。

实现

WCF 模型的 WS-Eventing 实现概念基于将 Eventing 存储从服务层解耦到可配置适配器中。这个概念在我之前的文章 WCF 的 WS-Transfer 中已详细描述。可配置适配器允许使用不同的存储,例如内存、SQL、WS-Transfer 等。本文中实现的默认适配器是内存适配器。请注意,用于插入自定义服务适配器的 behaviorExtensionSection 是从 WCF 的 WS-Transfer 解决方案中使用的。请参阅 ServiceAdapterBehaviorExtension.cs 源文件。WS-Eventing 实现需要构建以下内容

  • 用于 WS-Eventing 消息的 SOAP 堆栈(Xml/CLR 类集)
  • 用于订阅和管理事件存储中事件接收器兴趣的订阅管理器服务
  • 用于处理事件的通知管理器
  • 用于事件存储的内存适配器

服务合同

让我们从接口契约开始。基于 WS-Eventing 规范,有 4 个 Web 服务操作。第一个是 Subscribe,用于创建订阅,其他用于管理订阅。因此,我创建了 2 个服务契约。以下代码片段显示了这些契约

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)] 
public interface IWSEventing : IWSEventingFactory, IWSEventingOperation
{
}

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)]   
public interface IWSEventingFactory
{
  [OperationContract(Action = WSEventing.SubscribeAction, 
    ReplyAction = WSEventing.SubscribeResponseAction)]
  [FaultContract(typeof(SupportedDeliveryMode))]
  [FaultContract(typeof(SupportedDialect))]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  SubscribeResponse Subscribe(SubscribeRequest request);
}

[ServiceContract(Namespace = WSEventing.NamespaceUri)]
[XmlSerializerFormat(Style = OperationFormatStyle.Document)]
public interface IWSEventingOperation
{
  [OperationContract(Action = WSEventing.GetStatusAction, 
    ReplyAction = WSEventing.GetStatusResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  GetStatusResponse GetStatus(GetStatusRequest request);

  [OperationContract(Action = WSEventing.RenewAction, 
    ReplyAction = WSEventing.RenewResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  RenewResponse Renew(RenewRequest request);

  [OperationContract(Action = WSEventing.UnsubscribeAction, 
    ReplyAction = WSEventing.UnsubscribeResponseAction)]
  [TransactionFlow(TransactionFlowOption.Allowed)]
  UnsubscribeResponse Unsubscribe(UnsubscribeRequest request);

}

订阅请求/响应消息合同

操作契约的模式通过使用请求/响应消息契约非常容易地创建。请求响应类表示带有头部和主体属性的消息信封样板。实际上,XML 序列化/反序列化逻辑被封装在基类中。以下代码片段显示了用于消息契约的 SubscribeRequest 和 SubscribeResponse 类。请注意,请求可以选择通过订阅者添加 SubscriptionTopic,以便在事件存储中进行分类订阅。

/// WS-Eventing Subscribe request message class
[MessageContract]
public class SubscribeRequest : Subscribe
{
  #region DataMembers
  private string _subscriptionTopic;
  #endregion

  // option: 
  [MessageHeader(Name = WSEventing.Extension.SubscriptionTopic, 
    Namespace = WSEventing.Extension.NamespaceUri)]
  public string SubscriptionTopic
  {
    get { return _subscriptionTopic; }
    set { _subscriptionTopic = value; }
  }

  #region Properties
  [MessageBodyMember(Name = WSEventing.ElementNames.Subscribe, 
    Namespace = WSEventing.NamespaceUri, Order = 0)]
  public Subscribe Body   
  {
    get { return this; }
    set { this.Copy(value); }
  }
  #endregion

  #region Constructors
  public SubscribeRequest()
  {
  }
  public SubscribeRequest(Subscribe subscribe)
  {
    Body = subscribe;
  }
  #endregion
}

/// WS-Eventing Subscribe response message class
[MessageContract]
public class SubscribeResponse : SubscribeResult
{
  #region Properties
  [MessageBodyMember(Name = WSEventing.ElementNames.SubscribeResponse, 
    Namespace = WSEventing.NamespaceUri, Order = 0)]
  public SubscribeResult Body
  {
    get { return this; }
    set { this.Copy(value); }
  }
  #endregion

  #region Constructors
  public SubscribeResponse()
  {
  }
  public SubscribeResponse(SubscribeResult result)
  {
     Body = result;
  }
  #endregion
}

所有基类都使用 IXmlSerializable 接口,根据 WS-Eventing 规范写入/读取元素/属性,以便更好地控制 XML 序列化。IXmlSerializable 接口的实现直接且轻量,使用公共 EndpointAddress 方法进行 XML 读取器和写入器。其他一些类,如 Delivery 和 Expires,也采用相同的技术风格创建。以下代码片段显示了 Subscribe 基类的实现

[MessageContract]
[XmlSchemaProvider(null, IsAny = true)]
public class Subscribe : IXmlSerializable
{
  #region DataMembers
  EndpointAddress _endTo;
  Delivery _delivery;
  Expires _expires;
  XPathMessageFilter _filter;
  string _filterDialect;
  #endregion

  #region Properties
  //...
  #endregion

  #region Constructors
  //...
  #endregion

  #region IXmlSerializable Members
  public System.Xml.Schema.XmlSchema GetSchema()
  {
      return null;
  }
  public void ReadXml(XmlReader reader)
  {
    reader.ReadStartElement(WSEventing.ElementNames.Subscribe, 
      WSEventing.NamespaceUri);
    while (reader.NodeType != XmlNodeType.EndElement)
    {
      if(reader.IsStartElement(WSEventing.ElementNames.EndTo,
        WSEventing.NamespaceUri))
      {
          EndTo = EndpointAddress.ReadFrom(AddressingVersion.WSAddressing10, 
            reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Delivery,
        WSEventing.NamespaceUri))
      {
          Delivery = new Delivery(reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Expires,
        WSEventing.NamespaceUri))
      {
          Expires = new Expires(reader);
      }
      else if(reader.IsStartElement(WSEventing.ElementNames.Filter,
        WSEventing.NamespaceUri))
      {
        string dialect = reader.GetAttribute("Dialect");
        if (dialect == null || dialect == WSEventing.XPathDialect)
        {
           Filter = new XPathMessageFilter(reader);
        }
        else
        {
          _filterDialect = dialect;
          reader.Skip();
        }                      
      }
      reader.MoveToContent();
    }
    reader.ReadEndElement();
  }
  public void WriteXml(XmlWriter writer)
  {
    writer.WriteStartElement(WSEventing.NamespacePrefix, 
      WSEventing.ElementNames.Subscribe, WSEventing.NamespaceUri);
    if (EndTo != null)
    {
      EndTo.WriteTo(AddressingVersion.WSAddressing10, writer, 
        WSEventing.ElementNames.EndTo, WSEventing.NamespaceUri);
    }
    if (Delivery != null)
    {
      Delivery.WriteXml(writer);
    }
    if (Expires != null)
    {
      Expires.WriteXml(writer);
    }
    if (Filter != null) 
    {
      Filter.WriteXPathTo(writer, WSEventing.NamespacePrefix, 
        WSEventing.ElementNames.Filter, WSEventing.NamespaceUri, true);
    }
    writer.WriteEndElement();
  }
  #endregion
}

我将跳过所有其他消息契约,因为它们使用与上述 Subscribe 操作相同的请求/响应样式。现在我将进入 WS-Eventing 服务。

订阅经理

订阅管理器服务已解耦为两层。第一层,接近“线路”,实现 IWSEventing 服务契约。另一层实现事件存储的可配置适配器。通过使用请求/响应类和存储适配器,接口实现轻量且可读

public class SubscriptionManagerService : 
  SubscriptionManagerService<ConfiguredAddapter> { }

[ServiceBehavior(ReturnUnknownExceptionsAsFaults = true)]
public class SubscriptionManagerService<T> : 
  ServiceAdapterBase<T, IStorageAdapter>, IWSEventing where T: class
{
  List<string> _delivering = new List<string>();
  List<string> _filtering = new List<string>();
  
  public SubscriptionManagerService()
  {
    // supported features
    // filtering
    _filtering.Add(WSEventing.XPathDialect);

    // delivery modes (Note that that the push mode is a mandatory mode)
    _delivering.Add(WSEventing.PushDeliveryMode);
    MessageProperties mp = OperationContext.Current.IncomingMessageProperties;
    if (mp.ContainsKey("EventingPublishers"))
    {
      Publishers publishers = mp["EventingPublishers"] as Publishers;
      if (publishers != null && publishers.Count > 0)
      {
        string[] modes = new string[publishers.Count];
        publishers.Keys.CopyTo(modes, 0);
        foreach (string mode in modes)
        {
          if (mode != WSEventing.PushDeliveryMode)
              _delivering.Add(mode);
        }
      }
    }         
  }
  
  public SubscribeResponse Subscribe(SubscribeRequest request)
  {
    // validate expiration type and value
    if (request.Expires != null)
    {
      request.Expires.IsValidTime(true);
    }
    
    // validate delivery mode
    SupportedDeliveryMode sdm = new SupportedDeliveryMode(_delivering);
    sdm.IsSupported(request.Delivery.DeliveryMode, true);
    
    // validate filtering 
    SupportedDialect sd = new SupportedDialect(_filtering);
    sd.IsSupported(request.FilterDialect, true);
    
    // response values
    Identifier identifier = new Identifier();
    Expires expires = request.Expires;
    Uri address = OperationContext.Current.IncomingMessageHeaders.To;
    SubscriptionManager manager = 
      new SubscriptionManager(address, identifier);
    
    // storage resource descriptor
    ResourceDescriptor rd = 
      new ResourceDescriptor(identifier.Value, expires.Value);
    
    // option: subscription topic
    rd.Topic = request.SubscriptionTopic;
    
    // Subscription - storage resource
    Subscription subscription = new Subscription(request, manager);
    
    // storage action 
    Adapter.Create<Subscription>(rd, subscription);
    
    // response
    SubscribeResponse response = new SubscribeResponse();
    response.SubscriptionManager = manager;
    response.Expires = expires; 
    return response;
  }
  
  public UnsubscribeResponse Unsubscribe(UnsubscribeRequest request)
  {
    // storage action
    Adapter.Delete<Subscription>(new ResourceDescriptor(
      request.Identifier.Value));
    
    UnsubscribeResponse response = new UnsubscribeResponse();
    return response;
  }
  
  public RenewResponse Renew(RenewRequest request)
  {
    // ...
  }
  
  public GetStatusResponse GetStatus(GetStatusRequest request)
  {
    // ...
  } 
}

上述类展示了 Subscribe 操作契约的实现,其中请求开始处理验证,例如交付模式、过期时间和过滤。之后,创建订阅资源及其描述符并将其传递给适配器工厂以进行持久化。请注意,每个资源(订阅)都可以通过非 WS-Eventing 头部根据订阅主题进行分类,或者可以通用地用于配置文件中所有已订阅的资源。

服务适配器

我们通过服务扩展中的适配器概念,将与事件存储相关的服务行为封装到通用虚拟资源目标中。此概念在我关于 WCF 的 WS-Transfer 的文章中已更详细地描述。服务适配器通过 WCF 扩展模型插入到服务行为层中。

以下代码片段显示了服务配置文件的一部分

<behaviors>
  <serviceBehaviors>
    <behavior name="ManagerExtention" >
      <storage name="LocalStorage" 
        type="RKiss.WSEventing.Adapters.MemoryStorageAdapter" 
        maxsubscriptions="10" 
        topic="weather:report.storm"/>
    </behavior>
  </serviceBehaviors>
</behaviors>

<extensions>
  <behaviorExtensions>
    <add name="storage" 
      type="RKiss.WSLib.ServiceAdapterBehaviorElement, WSEventing
      Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"/>
  </behaviorExtensions>
</extensions>

上述配置段将事件存储配置为 MemoryStorageAdapter。存储中相同 weather:report.storm 主题的最大订阅数限制为 10。请注意,主题可以通过订阅请求覆盖。本文仅包含使用以下接口实现的内存存储。基于此实现,可以构建其他存储,例如 SQL、WS-Transfer 等,并以松散耦合的方式将其插入到服务中。

public interface IStorageAdapter
{
  // basic
  ResourceDescriptor Create<T>(ResourceDescriptor rd, 
    T resource) where T : class;
  T Get<T>(ResourceDescriptor rd) where T : class;
  void Put<T>(ResourceDescriptor rd, T resource) where T : class;
  T Delete<T>(ResourceDescriptor rd) where T : class;
  
  // advanced methods
  void CreateOrUpdate<T>(ResourceDescriptor rd, T resource) where T : class;
  IList<T> Get<T>(string topic) where T : class;
  IList<T> Delete<T>(string topic) where T : class;
  T Delete<T>(UniqueId id);
  ResourceDescriptor GetResourceDescriptor(ResourceDescriptor rd);
  ResourceDescriptor[] GetResourceDescriptors(ResourceDescriptor rd);
  void Expires(ResourceDescriptor rd); 
}

以上就是 WS-Eventing 规范的全部实现。接下来,我将重点关注事件消息传递的实现。正如我前面提到的,事件消息是事件接收器和事件源之间的契约。事件源为未知接收器生成事件消息负载,而无需了解传递和应用程序特定数据。事件将内部消息发送到通知管理器,通知管理器根据事件接收器的兴趣(订阅)负责消息传递。这对事件源来说是一个完全透明的事件。没有 WS-Eventing 特定操作,因此我设计了以下服务契约(仅限本文)

通知管理器

通知管理器具有与订阅管理器相同的设计实现,基于可插拔的事件存储适配器。我们使用的是 MemoryStorage 适配器;因此,两个管理器必须托管在同一个 appDomain 上。在其他情况下,需要插入 SqlStorage 或 WS-Transfer 适配器。

通知管理器内置了一个用于默认交付模式的推送发布器。有关详细信息,请参阅下一个代码片段。对于其他交付模式(自定义),例如拉取、包装、批处理等,通知管理器服务能够根据配置以松散耦合的方式插入它们。使用 RKiss.WSEventing.PublishersBehaviorSection 类,可以配置服务行为,如下图所示

上图显示了通知管理器服务解耦为事件存储适配器和发布器。请注意,内置的默认推送发布器可以通过可配置的推送发布器禁用。可配置(自定义)发布器通过抽象 PublisherBase 类中的回调 DeliveryMessage 方法插入到发布器线程池中

public abstract class PublisherBase
{
  #region Properties
  HybridDictionary _properties;
  public HybridDictionary Properties
  {
      get { return _properties; }
  }
  #endregion

  #region Constructors
  protected PublisherBase()
    : this(null)
  {
  }
  protected PublisherBase(HybridDictionary properties)
  {
     _properties = properties;
  }
  #endregion

  public void DeliveryMessage(object state)
  {
    FireState firestate = state as FireState;
    if (firestate != null)
    {
        OnDeliveryMessage(firestate.Message, firestate.Subscription);
    }
  }

  public virtual void OnDeliveryMessage(Message message, 
    Subscription subscription)
  {
  }
}

您可以通过在派生类中重写 OnDeliveryMessage 方法来为特定订阅实现自定义交付模式。请注意,发布器以即发即弃的方式在线程池中运行。以下代码片段显示了一个虚拟自定义发布器(测试器)

public class PublisherTester : PublisherBase
{
  #region Constructors
  public PublisherTester() : base() { }
  public PublisherTester(HybridDictionary properties) : base(properties) { }
  #endregion

  #region PublisherBase override
  public override void OnDeliveryMessage(Message message, 
    Subscription subscription)
  {
    Console.WriteLine("\n=============== tester =========================");
    Console.WriteLine(message);
    Util.ShowMe(subscription);
    Console.WriteLine("\n================================================");
  }
  #endregion
}

通知管理器 - 服务合同

通知管理器支持简单的服务契约。第一个操作用于触发将调度事件消息的事件,另一个操作用于发送 SubscriptionEnd 消息以通知 WS-Eventing 规范要求的事件接收器。以下代码片段显示了此服务契约。

[ServiceContract]
public interface INotificationManager
{
  [OperationContract(Action = "*", IsOneWay = true)]
  void FireSubscription(Message message);

  [OperationContract(Action = "CancelSubscription", IsOneWay = true)]
  void CancelSubscription(Uri code, string reason, string culturename);
}

请注意,FireSubscription 操作已指定任何 Action (*) 以接收消息,以便根据订阅交付进行重新路由。让我们看看 FireSubscription 操作契约的实现。根据事件主题,我们可以获取此情况的所有订阅列表。遍历列表,我们将消息的副本委托给工作线程,以异步方式进行消息传递。

public void FireSubscription(Message message)
{
  // configuration
  string topic = Properties["topic"] as string;
  MessageProperties mp = OperationContext.Current.IncomingMessageProperties;
  Publishers publishers = mp.ContainsKey("EventingPublishers") ?
      mp["EventingPublishers"] as Publishers : null;
 
  using (TransactionScope ts = new TransactionScope(
    TransactionScopeOption.Required))
  {
    // Get subscriptions from Event Storage
    IList<Subscription> subscriptions = Adapter.Get<Subscription>(topic);
    
    // Create buffered message
    using (MessageBuffer buffer = message.CreateBufferedCopy(0xffffff))
    {
      // walk through all subscriptions
      foreach (Subscription subscription in subscriptions)
      {
        //Util.ShowMe(subscription);

        // filter message
        if (subscription.Filter == null || subscription.Filter.Match(buffer))
        {
          FireState state = new FireState(subscription,buffer.CreateMessage(),
            Properties);
          Publisher publisher = null;

          // delivery mode
          if (publishers != null && 
            publishers.TryGetValue(subscription.Delivery.DeliveryMode, 
              out publisher))
          {
            // loosely coupled Publisher
            Properties["Publisher"] = publisher;
            object[] args = new object[] { Properties };
            PublisherBase driver = 
              (PublisherBase)Activator.CreateInstance(publisher.Type, args );
            ThreadPool.QueueUserWorkItem(new WaitCallback(
              driver.DeliveryMessage), state);
          }
          else if (subscription.Delivery.DeliveryMode == null || 
            subscription.Delivery.DeliveryMode == WSEventing.PushDeliveryMode)
          {
            // default built-in Push Publisher
            ThreadPool.QueueUserWorkItem(new WaitCallback(
              FireSubscriptionWorker), state);
          }
        }
      }
    }
  }
}

这是消息传递的工作人员

public void FireSubscriptionWorker(object state)
{
  // state
  FireState firestate = state as FireState;

  try
  {
    // destination
    EndpointAddress epa = firestate.Subscription.Delivery.NotifyTo;
    
    // create config name for binding 
    string epname = (string)firestate.Properties["epname"] ?? string.Empty;
    string configname = string.Concat(epname, epa.Uri.Scheme);
    
    // action
    using (ChannelFactory<IOneWay> cf = new ChannelFactory<IOneWay>(
      configname,epa))
    {
      IOutputChannel channel = (IOutputChannel)cf.CreateChannel();
      channel.Send(firestate.Message);
      channel.Close();
    }  
  }
  catch (Exception ex)
  {
    try
    {
      Adapter.Delete<Subscription>(
        firestate.Subscription.Manager.Identifier.Value);
      SubscriptionEnd subscriptionEnd = new SubscriptionEnd(
          firestate.Subscription.Manager,
          SubscriptionEndCode.DeliveryFailure,
          ex.Message);
      CancelState cancelstate = 
        new CancelState(firestate.Subscription.EndTo,subscriptionEnd,
        firestate.Properties);
      CancelSubscriptionWorker(cancelstate);

      Trace.WriteLine(ex.Message);
    }
    catch (Exception ex2)
    {
        Trace.WriteLine(ex2.Message);
    }
  }
}

FireSubscription 工作程序拥有关于如何传递消息的所有信息(订阅),除了一点。WS-Eventing for Web Services 期望使用基本绑定将通知传递到事件接收器(ASMX 服务)。在其他绑定情况下——例如,TCP 绑定——我们没有订阅中接收器的绑定描述。如果接收器不支持 WS-MetadataExchange,我们必须找到一些配置机制。其中一种解决方案是在通知管理器服务中有一个客户端配置部分。根据 NotifyTo EndpointAddress 模式,我们可以映射正确的绑定以进行通知传递。请注意,我计划在下一版本中更改此机制。以下代码片段显示了用于交付绑定的客户端部分的示例

<client>
  <!-- Notification binding -->
  <endpoint name ="http"
    address="http://tbd" binding="basicHttpBinding" 
    contract="RKiss.WSEventing.IOneWay" />
  <endpoint name ="net.tcp" 
    address="net.tcp://tbd" binding="customBinding" 
    contract="RKiss.WSEventing.IOneWay" 
    bindingConfiguration="binding1"/>
  <endpoint name ="net.msmq" 
    address="net.msmq://tbd" binding="netMsmqBinding" 
    contract="RKiss.WSEventing.IOneWay" />
</client>

请注意,应用程序可以为特定事件主题配置多个通知管理器。

测试

WS-Eventing 解决方案分为 WSEventing 核心库和测试项目

WSEventing 程序集必须包含在您的事件驱动应用程序中,才能使用订阅和通知管理器。Test 文件夹包含一个项目,用于托管生成和消费事件以及订阅者(客户端应用程序)的管理器。所有进程都是控制台程序。启动 ServiceHost(服务器)程序和 ClientApplication(订阅者)。在 ClientApplication 上按 Enter 键以订阅应用程序兴趣。以下屏幕截图显示了这种情况

Screenshot - DrawingEventing9.jpg

接下来的步骤是启动 EventSink 和 EventSource 控制台程序。您应该在 5 分钟内完成此操作;这是此示例中设置的过期时间。否则,通过 ClientApplication 创建另一个订阅。现在按 EventSource 控制台上的任意键以生成 EventSink 的天气报告。您应该会看到以下屏幕

Screenshot - DrawingEventing10.jpg

用法

终于到了这里。我将展示使用 WSEventing 程序集将任何(无响应)Web 服务消息传输到 WS-Eventing 驱动消息的步骤。

步骤 1. 事件源 - 事件接收器

假设我们有一个标准的 OneWay 服务契约,如以下示例所示。此示例可以在我的测试器中找到。它是一个非常简单的示例,只有一个操作用于生成 WindReport。

[ServiceContract]
[XmlSerializerFormat]
public interface IWeather
{
  [OperationContract(IsOneWay=true,
    Action="http://www.example.org/oceanwatch/WindReport")] 
  void WindReport(WindReportRequest request);
}

[MessageContract]
public class WindReportRequest
{
  [MessageBodyMember]
  public WindReport WindReport;
  public WindReportRequest() { }
  public WindReportRequest(WindReport report) { WindReport = report; }
}

下图显示了一个连接系统,包含事件接收器和事件源两端。事件接收器是基于 IWeather 服务契约的 OneWay 消息的消费者。

Screenshot - DrawingEventing11.jpg

请注意,事件源和事件接收器已根据 ABC 描述逻辑连接。请参阅上述事件源配置文件的一部分。另请注意,地址是相同的:net.tcp://:33333/OnStormWarning

步骤1中没有涉及任何特殊之处。事件源以即发即弃的方式向已知事件接收器发送一条消息。一条消息被传递到一个目标端点。如果应用程序将保持这种配置模式(一对一),我们不需要知道任何关于发布/订阅通知的信息。当事件源希望根据未知事件接收器的兴趣向其发送消息时,情况将开始。对于这种交付模式(一对多),我们需要步骤2并将事件源的地址重定向到通知管理器。在我们的示例中,它是 address="net.tcp://:22222/NotificationManager"

步骤 2. 事件接收器/订阅者

对于此步骤,我们需要一个托管服务器,用于加载来自 WSEventing 程序集的 WS-Eventing 管理器。我们可以使用 Test 包中的 ServerHost 控制台宿主程序。所以,让我们启动这个控制台程序。屏幕将显示两个监听器:订阅管理器和通知管理器。由于事件源已寻址到通知管理器,我们可以通过事件源生成一个事件,并看到消息到达管理器,但仅此而已。事件接收器仍将等待事件。

为什么?发生了什么?为什么通知管理器没有将消息传递到事件接收器?答案很简单。通知管理器的知识中没有这样的指令。这是订阅管理器根据事件接收器的兴趣创建指令(订阅)的过程。以下代码片段是客户端应用程序如何订阅兴趣以接收事件的示例。诸如交付、筛选等兴趣需要填充到订阅请求中并发送给订阅管理器。更多详细信息可在测试包 ClientApplication (Subscriber) 项目中找到。

using(SubscriptionManagerClient sm = 
  new SubscriptionManagerClient("SubscriptionManager"))
{

  #region Operation Subscribe
  // Prepare request for subscribing interest (subscription)
  SubscribeRequest subscribeRequest = new SubscribeRequest();
 
  // option: Organize subscriptions in the Storage based on the Topic
  subscribeRequest.SubscriptionTopic = "weather:report.storm";
 
  // Push Delivery
  Uri uri = new Uri("net.tcp://:33333/OnStormWarning");
  subscribeRequest.Delivery.NotifyTo = new EndpointAddress(uri);

  // option: EndTo
  uri = new Uri("net.tcp://:44444/Admin");
  subscribeRequest.EndTo = new EndpointAddress(uri);

  // option: Filter
  XmlNamespaceManager nsmgr = new XmlNamespaceManager(
    new XmlDocument().NameTable);
  nsmgr.AddNamespace("ow", "http://www.example.org/oceanwatch");
  string expression = "//ow:WindReport/ow:Speed >= 65";
  subscribeRequest.Filter = new XPathMessageFilter(expression, nsmgr);

  // option: Lifetime of the Subscription
  subscribeRequest.Expires = new Expires(TimeSpan.FromSeconds(300));
    
  // Action - Subscribe this interest
  SubscribeResponse subscribeResponse = sm.Subscribe(subscribeRequest);
  #endregion

}

请注意,交付部分中上述突出显示的地址是事件接收器的地址。一旦订阅者订阅了兴趣,通知管理器将把事件源消息传递到 NotifyTo 地址。当然,如果事件存储的看门狗不会终止此订阅,请参阅上面设置的 300 秒。下图显示了发布/订阅通知场景中的连接系统。我们可以为不同的事件主题、过滤器等订阅更多的事件接收器,并检查 WS-Eventing 如何处理它。

结论

在本文中,我描述了 WS-Eventing 规范的设计和实现。它是一个基本(轻量级)规范,可与 WS-Notification 系列(基于主题的发布/订阅模式)相媲美。发布/订阅通知在事件驱动的面向服务架构中扮演着重要角色。例如,长时间运行的业务工作流或更新业务状态需要使用事件模式来高效利用资源。这种场景通常分为前处理器、处理器和后处理器(同步-异步-通知)工作流。后处理器可以使用 WS-Eventing 通知客户端工作流已完成。另一个非常常见的场景是数据驱动存储,其中客户端正在等待存储中的情况。例如,在房地产业务中,买家希望在市场上找到他们感兴趣的房地产时收到通知。还有许多其他场景,但它们的共同模式是事件驱动模型。WS-Eventing 和 WS-Transfer 在 WS-* 服务总线连接系统中实现了此模型。

参考文献

历史

  • 2007 年 6 月 4 日 -- 文章编辑并发布到 CodeProject.com 主文章库
  • 2006 年 11 月 23 日 -- 文章更新
  • 2006 年 6 月 4 日 -- 原始版本发布
© . All rights reserved.