使用 Windows Azure 服务总线消息。






4.94/5 (27投票s)
本文介绍如何通过 WCF 和 WF 技术使用 Windows Azure 服务总线。
目录
引言
Windows Azure AppFabric 服务总线 2011 年 9 月版(版本 1.5)引入了服务总线的增强功能,例如通过由队列、主题和订阅表示的命名空间实体实现的“代理”消息功能。代理消息基础设施能够构建事件驱动的分布式面向服务架构、分布式异步云通知和应用程序内部消息。基本上,它可以在任何需要松散耦合消息的场景中使用,例如跨本地和云目标的队列和发布-订阅消息。
从抽象的角度来看,Azure 服务总线使用松散耦合模型,表示事件源及其消费者之间的逻辑连接。消费者需要预先订阅其对服务总线的兴趣,才能接收到特定的事件。这些元数据(订阅)表示事件兴趣的逻辑连接;它们存储在云中,并通过工具和/或特定的客户端应用程序进行管理。
下图显示了 Azure 服务总线抽象
上图显示了非常高的抽象级别,以隐藏 AppFabric 基础设施、实体等,从而展示 Azure 服务总线的基本模型。基本上,有一个事件兴趣的发布者、一个接收此兴趣的订阅者以及一个以安全方式在发布者和订阅者之间分配逻辑连接的管理员。服务总线是一个高度可用、可扩展、多租户的服务,用于连接发布者、订阅者、管理员等客户端。逻辑连接模型存储在 SQL Azure 中。目前,Azure 服务总线模型支持命名空间实体,例如队列、主题和订阅。
下图显示了 WindowsAzurePlatform 服务总线管理工具的屏幕截图
如您所见,Subscription-1 帐户有一个名为 rk2011
的服务总线命名空间范围。正如我之前提到的,服务总线是多租户模型,其中命名空间是逻辑隔离的实体,换句话说,发布者、订阅者和管理工具根据命名空间范围彼此可见。
下图显示了服务总线抽象的下一层
基本上,上图显示了服务-客户端架构。Azure SDK 1.5 的当前发布版本包含许多“主力”类和方法,以简化对服务总线服务的访问。Azure 服务总线上的代理消息功能是建立在 WCF 范式之上的,它使用发送方和接收方(侦听器)之间的全双工面向连接会话,以及双向通信(Microsoft.ServiceBus.Messaging.SBmp
内部类)。从应用程序(业务)层来看,发送方和接收方之间的消息传递是单向的,支持单播和多播数据报分发。
本文重点介绍 WCF 和 WF 技术对 Azure 服务总线的消费。我将讨论它如何作为逻辑模型连接的一部分以透明的方式使用。在参考文献中,您可以找到有关服务总线实体及其使用 Microsoft.ServiceBus.Messaging 命名空间中的 API 方法和 RESTful 编程的更多详细信息。我强烈建议阅读服务总线代理消息性能 - 最佳实践指南。
让我们描述一下 Azure 服务总线的主要实体,重点关注 WCF/WF 的用法。
代理消息
代理消息(BM)是服务总线消息基础设施中的基本实体。其概念设计类似于 WCF 消息。BM 包含用于消息控制流的系统属性(如 Label
、TimeToLive
、CorrelationID
等)以及用户应用程序特定属性,当然还有消息正文(消息负载)。系统和用户属性打包在一个名为 BrokeredMessageProperty
(BMP)的包中。重要的是,此 BMP 包可以在业务层之间流动。下图显示了用于 ServiceBus 消息的 WCF 消息:
BrokeredMessageProperty
包通过其 Properties 集合附加到 WCF 消息。将 BMP 从消息正文中分离出来,使我们能够在不通过服务总线基础设施序列化/反序列化业务负载的情况下使用它。请注意,集合中键的名称是 BrokeredMessageProperty
。
将 BMP 添加到 WCF 消息 Properties 集合中非常简单,如以下代码片段所示
using (OperationContextScope scope = new OperationContextScope((IContextChannel)channel)) { var bmp = new BrokeredMessageProperty() { Label = "HelloCloud" }; bmp.Properties.Add("abcd", (long)12345); bmp.Properties.Add("id", Guid.NewGuid()); OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, bmp); channel.Message(text); }
以上示例是使用 OperationContextScope
以命令式方式为通道(代理)添加 BMP。下图以图形方式展示了此过程。关键是发送方/发布者是向 Azure 服务总线生成 BrokeredMessage
的一方。换句话说,WCF Message + BMP 被转换为 BrokeredMessage
。此任务由 Azure SDK 1.5 中包含的 NetMessagingTransport
的自定义通道实现。
上图背后是 ServiceBusOutputChannel
的实现,其中 wcfMessage 被转换为 BrokeredMessage 并由 SbmpMessageSender
发送到服务总线通道。以下来自 .NetReflector 的代码片段展示了 Microsoft.ServiceBus.Messaging.Channels.ServiceBusOutputChannel
中的此逻辑
public SendAsyncResult(ServiceBusOutputChannel outputChannel, Message message, TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state) { this.outputChannel = outputChannel; outputChannel.AddHeadersTo(message); outputChannel.TraceSendingMessage(message); BrokeredMessage message2 = null; try { message2 = outputChannel.ConvertToBrokerMessage(message, out this.bufferToReturn); IAsyncResult result = outputChannel.MessageSender.BeginSend( new BrokeredMessage[] { message2 }, timeout, base.PrepareAsyncCompletion(onSendComplete), this); if (base.SyncContinue(result)) { base.Complete(true); } } catch (MessagingException exception) { // ... throw FxTrace.Exception.AsError(exception2); } catch (Exception) { // ... throw; } }
太棒了。但是我需要为 Azure 服务总线消息修改我现有的 WCF 发送方/发布者吗?
好吧,这不是必需的,本文为您提供了一种解决方案,可以使用配置文件和自定义 EndpointBehavior
以声明方式添加 BrokeredMessageProperty
。以下代码片段展示了此功能
serviceEndpoint.Behaviors.Add(
new RKiss.Activities.Azure.Messaging.MessagePropertyActionEndpointBehavior()
{
Expression = "SET sys.Label='HelloCloud'; SET abcd=12345; SET id=newid()"
});
如您所见,此自定义终结点行为实现了基于 SqlExpression 的 Action 功能。上述示例将设置一个消息标签(系统属性)和几个用户属性,例如 abcd
和 id
,并使用新的 Guid 值。请注意,BMP 对于消息处理非常重要,例如,根据用户属性,消息可以转发给订阅者等。更多详细信息可以在本文的用法示例中找到。
队列实体
队列实体代表生产者和消费者端之间的逻辑连接。它是一种点对点模式连接。其概念类似于本地 MSMQ 技术。下图展示了这一基本概念
发送方将 BrokeredMessage
发送到特定队列,接收方可以从该队列拉取消息。还有一些高级消息功能,例如调度时间、生存时间、会话功能等。更多详细信息可以在参考文献中找到。基本上,这种消息传递可以用于以透明方式将 NetMsmqBinding
配置迁移到 NetMessagingBinding
(Azure 服务总线),只需更新配置文件即可。在某些情况下,在此迁移过程中我们可以使用更好的模式,例如,使用 SchedulerEnqueueTimeUtc
属性允许在特定时间将消息发送到队列,这对于重试机制可能很有用。
如上图所示,同一队列有多个 ReceiverN
。在此场景中,消息将以轮询方式在接收器之间进行负载均衡,这非常适合 IIS/WAS/WebRole 托管。请注意,托管环境上的接收器服务必须启用自动启动。
以下代码片段显示了队列“Queue1
”上接收器配置的示例
太棒了。如您所见,上述配置将 WCF 服务“插入”到从 Azure 服务总线队列接收消息。所有神奇的工作都通过 WCF 范式使用其扩展(如 netMessagingBinding
和 transportClientEndpointBehavior
)完成。此功能允许构建存储在存储库中的逻辑连接,并根据部署模型物理部署到目标。
主题实体
主题实体表示特定命名空间中事件兴趣的逻辑连接。这是一种发布/订阅(Pub/Sub
)模式连接,其中源将事件兴趣发布到称为 Topic
的逻辑通道,消费者将根据其兴趣接收它。这种连接模式使我们能够构建事件驱动的分布式架构,其中生产者和消费者是松散耦合和可断开的。
下图展示了基于主题的消息概念,以显示与之前基于队列的消息的基本区别:
如您所见,发布者以松散耦合的方式将 BrokeredMessage
发送到命名空间/主题虚拟通道,而无需了解其消费者。发布者可以决定发送消息的生命周期,例如 TimeToLive
、SchedulerEnquueTime
等。应用程序可以使用源事件兴趣标记消息以识别特定事件。在松散耦合的方式中,有订阅者注册到主题,这些订阅者代表事件兴趣的消费者。注册过程称为订阅主题上的兴趣。
订阅实体
在主题模型中,此抽象由 Subscription
表示。每个订阅者都必须有一个订阅。此实体表示发布者和订阅者之间在特定主题通道上的逻辑连接。基于订阅,订阅者表示对主题的兴趣,以接收消息(由 RuleDescription 描述)以及一些用于消息控制流的信息,以应对异常、批处理操作等情况。
正如我之前提到的,为了接收已发布消息的副本,订阅者需要将其订阅订阅到特定主题。订阅可以是空的、带有默认 RuleDescription
或带有多个 RuleDescription。
以下图片片段显示了此抽象,其中 Subscription 是 RuleDescription
实体集合
要从主题通道获取 BrokeredMessage 的副本,订阅内的 RuleDescription
必须成功。换句话说,根据订阅过滤器,消息副本将传递给订阅者。Azure 服务总线消息中的 RuleDescription
内置了两个功能,即过滤器和操作。两者都基于 SqlExpression,并且已针对 BrokeredMessage
实体实施了某些限制,例如:属性的范围可以是 sys 和/或 user
。
从主题的角度来看,只有在消息生命周期内我们拥有有效订阅时,消息才会被传递。基本上,如果我们没有任何有效的订阅者订阅主题,消息将根据其 TimeToLive
属性保留在那里。另一方面,主题可以有许多(基于 配额,例如每个主题 2000 个订阅)空订阅、默认订阅 - TrueFilter
(一个默认 RuleDescription)或带有多个 RuleDescriptions
的订阅。
正如我之前所说,主题通道是松散耦合模型,订阅者和订阅彼此独立。换句话说,订阅可以由订阅者或任何应用程序组件和/或管理门户站点创建和注册。订阅可以是静态的,也可以是动态创建的。
请记住,当订阅从主题取消注册(删除)时,其活动的订阅者将断开连接,并且需要重新连接才能续订订阅。在动态订阅的情况下,当订阅者创建自己的订阅时,这不是问题,但对于静态订阅,则需要进行处理。
空订阅与默认订阅
众所周知,订阅者需要订阅特定主题的兴趣。但有一种特殊情况是订阅者(或管理方)事先不知道这种兴趣。因此,使用空订阅是有益的,它将允许为主题创建订阅者,并在空订阅上创建其侦听器。
当订阅自动创建时,它会为一条(默认)RuleDescription
创建。如果没有过滤器表达式(默认构造函数),则应用 TrueFilter
作为默认订阅。在这种情况下,每个已发布的消息都会将其副本传递给此订阅者。当需要多播消息时,请记住此功能。
通过删除默认订阅中的默认 RuleDescription
,我们可以获得一个 Empty Subscription
,而不会对活动的订阅者造成任何影响。订阅中 RuleDescription 的删除可以通过其订阅者(一次性消息接收者)或其他组件完成。
在相反的过程中,RuleDescription
可以添加到 Subscription 中。请注意,RuleDescription
中的每个 Filter 都必须匹配才能将消息(副本)传递给其 Subscriber。
RuleDescription
主题通道接收到的代理消息会流经每个订阅以检查其兴趣。此兴趣由 Filter Entity 声明。服务总线消息内置了几个过滤器,例如 TrueFilter、FalseFilter、CorrelationFilter 和用于通用 SQL92 表达式的 SqlFilter。当过滤器匹配为真时,RuleDescription 会执行其 Action。默认操作是 EmtyRuleAction,对于自定义操作,可以使用 SQL92 表达式的 SqlRuleAction。
过滤器和动作的范围是 BrokeredMessage 实体。SqlExpression 仅支持 BrokeredMessage 的两个范围,例如其属性(范围名称为 sys)和在 BrokeredMessage.Properties 集合中声明的用户范围(默认范围)。
过滤器示例:“sys.Label='Order' AND TicketID=12345”
动作示例:“SET sys.Label='Order'; SET abcd=12345; REMOVE TicketID; SET flag=TRUE”
负载均衡 vs. 多播(多个订阅)
在队列中,我们看到通过共享队列实体,我们可以以轮询方式在队列接收器之间平衡消息。当订阅者使用相同的订阅时,我们在订阅中也具有相同的功能。这很好,并且对于负载平衡目的非常有用。但是等等,当订阅具有相同的过滤器表达式时会发生什么?
嗯,我们可以获得一种新的连接模式,例如多播。下图显示了这两种情况
它们的抽象如下图所示
基本上,我们可以说,“共享”订阅是为了平衡,而“共享”过滤器是为了消息多播。请注意,“共享”用于逻辑解释。如果有一个支持引用实体并在存储库中共享它们的 Topic Model,那将是很好的,但当前版本不支持它。
寻址队列、主题和订阅
以下是连接命名空间中寻址队列、主题和订阅的示例
寻址也可以使用分支结构,例如:/myTopics/myTopic
等。寻址的用法可以在以下接收器、发布者和订阅者的配置示例中看到
这是一个简单的标准终结点配置,有趣的是订阅者终结点。如您所见,基地址用于寻址主题实体,而 listenUri
用于寻址其订阅。每个终结点都必须具有用于凭据的 behaviorConfiguration
。
太棒了。我们可以在 Windows Azure 平台中为我们的订阅创建命名空间、主题和订阅。从业务模型的角度来看,我们可以将服务总线视为连接的抽象,其中业务实体通过元数据逻辑连接和驱动。我们也可以将服务总线(例如命名空间、队列、主题和订阅 (NQTS))视为此元数据的一部分。在逻辑模型中,我们可以声明逻辑订阅,而无需了解发布/订阅模型的物理拓扑。基本上,业务模型中对事件(消息)感兴趣的订阅者可以分解为多个主题等。我们可以通过为公共和私有范围声明 NQTS 来帮助我们的业务模型。但是,我们存储在存储库中的逻辑业务模型需要物理部署到目标,并且部署模型将需要为目标的特定物理实体进行映射过程。
最后,当前版本的 Azure 服务总线消息不支持订阅或发布者针对多个主题的“通配符”功能。在当前版本中,我们可以将每个订阅视为与特定主题紧密耦合。发布者端也是如此,当事件兴趣仅分发给特定主题,而不是跨多个主题时。如果能支持多个主题的订阅——通配符订阅,那将很好,它将简化我们从业务模型进行的映射。
但是,我们可以(在基本服务总线实体之上)构建高级层次结构,例如“通配符订阅”、跨多个主题、命名空间发布、创建公共和内部网关、桥接等。概念非常简单,订阅特定主题的兴趣并将其分发到其他主题,或将事件兴趣多播到多个主题。
在下一节中,我将描述使用 WCF RoutingService 的这种模式场景,它基本上代表了一个由配置文件中(终结点、路由表、过滤器等)元数据驱动的订阅者/发布者组件。请注意,此服务内置了远程动态更新路由表和出站客户端的功能。
服务总线上的 WCF4 路由服务
WCF RoutingService
是为服务总线构建网关和/或桥接的完美解决方案。对于此场景,它可以完全透明地进行声明式配置。RoutingService
支持服务总线的两种契约,即 ISimplexDatagramRouter
和 ISimplexSessionRouter
。
下图显示了一个可能的示例,其中一个命名空间/主题的事件兴趣必须传递到另一个主题通道(桥接模式),并且还有一些“遗留”生产者/消费者希望成为服务总线在发布/订阅场景中的一部分(网关模式)。
在上面的示例中,RoutingService
可以配置为接收两种消息,即 BrokeredMessage
和 Message
。这是一个简单的配置,在主题通道上设置地址,在订阅上设置侦听器。对于“遗留”发布者,我们可以使用标准寻址。这就是入站消息的全部。对于出站消息,例如“遗留”订阅者和主题通道 (2),我们可以根据目标(服务或主题)设置一个带有寻址的客户端终结点。
现在,我们需要配置一个路由表,它将“连接”——将入站消息转发到特定的出站通道。对于从发布者到订阅者(“遗留”连接)的简单转发消息,我们可以使用标准内置消息过滤器(Action、XPath、EndpointName 等),但对于 BrokeredMessage,我们需要一个基于 BrokeredMessageProperty 的自定义消息过滤器,并能够以声明方式为出站消息设置此属性包到服务总线。另一个选项是转发入站 BrokeredMessage,而不对 BMP 包进行过滤,例如:EndpointName、Action、Address 等。
好的,没有任何实现细节(稍后会描述),以下配置允许过滤来自服务总线的入站消息,并为发往服务总线的出站消息生成 BMP 包
messagePropertyAction
是一个自定义终结点行为,用于消息检查器,以更新传出消息的 BMP 包属性。另一个元素是一个自定义 MessageFilter,用于对带有 BMP 包的传入消息执行 SqlExpression
。
这很令人兴奋,我们可以拥有一个通用的解决方案,用于服务总线的网关和桥接,它可以非常轻松地部署到云端,并用作 Azure 存储 Blob 的路由表——有关此解决方案的更多详细信息,请参阅我之前的文章 Azure 上的 RoutingService。本文为您提供了一个如何使用 WF 技术调解进出服务总线消息的解决方案。
请注意,RoutingService 具有内置的备份消息传递功能,也可以在我们的场景中使用。
基于上述示例中描述的概念,RoutingService 可以配置为跨多个命名空间和主题桥接消息。RoutingService 可以配置多个终结点,用于不同的主题和订阅。下图显示了此抽象
RoutingService 用于服务总线的另一个高级功能。我们知道,当前发布的服务总线配额限制了主题上的订阅数量。下图显示了我们如何解决此限制
概念非常简单明了。RoutingService 可以订阅主题,并根据 FilterTable 规则以多播方式将消息转发给多个客户端。
服务总线上 RoutingService 的另一个高级功能。下图显示了订阅了包含多个 RuleDescriptions 的 Subscription 的 RoutingService。此 Subscription(来自每个 Rule)接收到的入站消息可以根据由路由(过滤)表驱动的 BrokeredMessageProperty 包的自定义消息过滤器转发到适当的出站通道。
出版社
发布者代表事件兴趣的发送者(生产者)。在发布/订阅模式中,发布者与消费者松散耦合,因此其发布地址指向虚拟通道,在 Azure 服务总线消息中,该虚拟通道由主题表示。发布者仅支持一种消息交换模式,即单向操作(Fire&Forget),不带 SessionMode.Required
,因此发布者 WCF 通道只能实现 IOutputChannel
通道。以下示例显示了发布者的通用(非类型化)契约
[ServiceContract] public interface IGenericOneWayContract { [OperationContract(IsOneWay = true, Action="*")] void Message(System.ServiceModel.Channels.Message msg); }
让我们看看如何为主题创建发布者。我将通过 WF 技术以声明方式演示它。基本上,在简单的情况下,当事件兴趣可以静态地在 BMP 包中签名时,代理通道(发送者)没有区别,BMP 包可以像前面描述的那样以声明方式创建,使用自定义终结点行为(messagePropertyAction
)。当 BMP 包根据业务模型动态创建时,我们需要一种机制将其添加到传出消息属性中。
以下示例展示了将 BMP 包添加到由基于契约优先模型创建的自定义活动 CreateMessage 生成的非类型化消息中,然后通过活动 Send 发送它
上图展示了两个自定义活动,以简化此消息中介。第一个可以在我的文章WF4 消息中介自定义活动中找到,另一个是本文的一部分,稍后将详细描述。因此,这两个自定义活动将创建一个完整的消息,用于通过 Send 活动发送,包括标头、属性、正文等,以完全声明的方式用于任何元素和实体。
另一种方法是,当我们想要使用强类型消息契约时,我们可以使用基于在消息检查器中注入 BMP 包的以下模式。下图显示了这种情况
同样,我们需要此扩展的一些帮助,这就是为什么这里有一个自定义活动 SendScope
,它基本上是 Send
活动的包装器。用于服务总线的消息调解(例如将 BMP 包添加到消息属性)非常简单明了。第一步是创建此 BMP 包,第二步是将其分配给 SendScope 活动中的 Properties。此自定义活动包含在本文下载中。
注意:WF4 Send Activity 不允许以声明方式添加自定义终结点行为。只有一种方法可以实现,即通过配置文件中客户端终结点的名称(例如 topic)显式设置 Send.EndpointConfigurationName 属性。如果能够在下一个版本 WF 4.5 中通过 Send Activity 的自定义包装器注入自定义终结点行为,那将是很好的。
AssignBMP<T>
AssignBMP<T> 是一个自定义活动,用于创建 BrokeredMessageProperty
对象的实例,并将其分配给 System.ServiceModel.Channels.Message.Properties
或 Microsoft.ServiceBus.Messaging.BrokeredMessageProperty
声明的变量。
此自定义活动的设计和实现基于 BMP 类类型,用于 sys 范围内的属性和用户参数的集合。此 BMP 包是服务总线基础设施所需的,用于消息控制流以及生产者和消费者之间的附加用户特定数据。自定义 AssignBMP 活动简化了消息中介过程。
订阅者
订阅者表示 BrokeredMessage 的接收端点(消费者)。在服务中介中没有有趣的 BMP 的情况下,订阅者是一个常规接收器。在另一种情况下,当服务中介需要 BMP 包(附加业务数据)时,我们需要从自定义活动 OperationContextScope
中获取它。这是一个接收器包装器,用于将 OperationContext.Current 的值注入到工作流线程范围,该范围将从 IncomingMessageProperties 集合中获取我们的 BMP 包。
服务总线支持订阅和队列两种消息交换模式,例如 IInputChannel
和 IInputSessionChannel
。一旦这些服务总线实体被声明为会话(必需),订阅者(接收者)需要实现一个会话感知契约。以下是会话模式的无类型契约示例
[ServiceContract(SessionMode = SessionMode.Required)] public interface IGenericOneWaySessionContract { [OperationContract(IsOneWay = true, Action="*")] void Message(System.ServiceModel.Channels.Message msg); }
以下图片显示了通过 xaml 声明的订阅者示例
本文下载包含一个 AzureMessagingActivityPack 解决方案,其中包含对 BrokeredMessageProperty
和 MessageProperties
类的扩展。上述 GetBrokeredMessageProperty
方法是其中之一,用于隐藏一些细节。
订阅者可以在配置文件中配置。上面的示例显示了此服务 Subscriber2 配置部分,其中其终结点具有指向主题通道的地址,并且侦听器寻址到订阅地址。
订阅者和多个发布者
以下场景展示了一个中介器,我们有一个订阅者和多个发布者。这个中介器是事件驱动虚拟存储的一个示例。中介器(StoragePublisher
)负责将资源放入虚拟存储(例如:Azure Blob 存储),将此事件发布到主题通道,并通过向队列发送事件消息来触发清理机制。此消息标记有 ScheduledEnqueueTimeUtc
属性,该属性表示资源的 TimeToLive。此消息的接收者将清理存储中的特定资源。
这是一个消费者从租赁存储中提取资源的模式示例。服务中介器不是多播大型业务负载(资源主体),而是通过主题通道向未知数量的消费者发送通知(短消息)。
下图显示了 BMP 包
对于事件驱动存储的演示,使用了自定义活动 Microsoft.Activities.Azure.Storage.PutBlob。如您所见,设计非常简单,使用自定义活动将资源放入 Azure 存储并将事件发布到 Azure 服务总线主题。请注意,此过程不受 TransactionScope 的管理,就像我们在本地环境 SQL/MSMQ 中可能拥有的那样,它应该由某种看门狗机制覆盖。
所有管道作业都在配置文件中完成。以下代码片段显示了此示例
<configuration> <appSettings> <add key="AzureStorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=MY_ACCOUNT;AccountKey=MY_ACCOUNT_KEY"/> </appSettings> <system.serviceModel> <services> <service name="Subscriber3"> <endpoint address="sb://MY_NAMESPACE.servicebus.windows.net/worker" listenUri="sb://MY_NAMESPACE.servicebus.windows.net/worker/subscriptions/image12" binding="netMessagingBinding" contract="IGenericOneWayContract" behaviorConfiguration="sharedSecretCredentials" /> </service> </services> <client> <endpoint name="TopicPublisher" address="sb://MY_NAMESPACE.servicebus.windows.net/worker" binding="netMessagingBinding" contract="IGenericOneWayContract" behaviorConfiguration="sharedSecretCredentials" /> <endpoint name="QueuePublisher" address="sb://MY_NAMESPACE.servicebus.windows.net/search" binding="netMessagingBinding" contract="IGenericOneWayContract" behaviorConfiguration="sharedSecretCredentials" /> </client> <behaviors> <endpointBehaviors> <behavior name="sharedSecretCredentials"> <transportClientEndpointBehavior credentialType="SharedSecret"> <clientCredentials> <sharedSecret issuerName="MY_ISSUER_NAME" issuerSecret="MY_ISSUER_SECRET" /> </clientCredentials> </transportClientEndpointBehavior> </behavior> </endpointBehaviors> <serviceBehaviors> <behavior> <serviceMetadata httpGetEnabled="true"/> <serviceDebug includeExceptionDetailInFaults="false"/> </behavior> </serviceBehaviors> </behaviors> <extensions> <behaviorExtensions> <add name="transportClientEndpointBehavior" type="Microsoft.ServiceBus.Configuration.TransportClientEndpointBehaviorElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/> </behaviorExtensions> <bindingElementExtensions> <add name="netMessagingTransport" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingTransportExtensionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/> </bindingElementExtensions> <bindingExtensions> <add name="netMessagingBinding" type="Microsoft.ServiceBus.Messaging.Configuration.NetMessagingBindingCollectionElement, Microsoft.ServiceBus, Version=1.5.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/> </bindingExtensions> </extensions> </system.serviceModel> </configuration>
如您所见,有一个 appSettings 部分用于连接到 Azure 存储的连接字符串,其余部分是针对一个订阅者和两个发布者的配置,当然,我们必须添加服务总线的扩展,例如 transportClientEndpointBehavior
、netMessagingTransport
和 netMessagingBinding
。
介绍到此为止。我希望您能了解服务总线在基于 WCF/WF 技术的生产者和消费者之间的逻辑连接模型中的位置。我没有描述服务总线管理工具,创建实体(如队列、主题、订阅等)。有关这方面的更多信息,包括其 API 编程,可以在参考文献中找到。此外,我假设您对 Windows Azure 平台和 WCF/WF 技术有一定了解。
好的,让我们继续,在进入使用和测试部分之前还有很多东西要展示。
AzureMessagingActivityPack
AzureMessagingActivityPack 是一个小型包,包含自定义活动和类扩展,用于与 Azure 服务总线消息进行消息中介。其完整的实现解决方案包含在本文下载中。基本上,有两组,例如
1. 终结点和类扩展
此扩展旨在简化对 BrokeredMessageProperty
包的访问和操作。它可以在 xaml 或配置文件中以命令式或声明式方式使用。
有两个类扩展,即 MessagePropertiesExtension
和 BrokeredMessagePropertyExtesion
。以下示例展示了一些用法。
var bmp = OperationContext.Current.IncomingMessageProperties.GetBrokeredMessageProperty(); long prop1 = bmp.RequiredItem<long>("prop1"); string prop2 = bmp.RequiredItem("prop2", "Missing {0} in the bag", "prop2"); XElement xbmp = bmp.ToXml(); Trace.WriteLine(bmp.ToXml().ToString()); var bmp1 = message.Properties.GetBrokeredMessageProperty("Missing BMP bag"); string prop3 = message.Properties.GetBrokeredMessageProperty().RequiredItem("prop3"); bmp1.Put(message2.Properties);
消息属性操作
这是一个客户端终结点行为扩展,用于在配置文件中为出站消息注入 BrokeredMessageProperty 包。有了这个扩展,我们可以将“事件兴趣”从“传统”发布者发送到服务总线主题通道。
public class MessagePropertyActionClientInspector : IClientMessageInspector { public string Expression { get; set; } void IClientMessageInspector.AfterReceiveReply(ref Message reply, object correlationState) { // nothing to do } object IClientMessageInspector.BeforeSendRequest(ref Message request, IClientChannel channel) { if (request.Properties.ContainsKey(BrokeredMessageProperty.Name)) { var bmp = request.Properties[BrokeredMessageProperty.Name] as BrokeredMessageProperty; var sra = new SqlRuleAction(this.Expression); sra.Preprocess().Execute(bmp.Message); } else { var bmp = new BrokeredMessageProperty(); var sra = new SqlRuleAction(this.Expression); var bm = sra.Preprocess().Execute(bmp.Message); request.Properties.Add(BrokeredMessageProperty.Name, bmp); } return null; } }
上述检查器的核心是对 BrokeredMessage 对象执行 SqlExpression。
以下代码片段展示了它在配置文件中的用法
<behaviors> <endpointBehaviors> <behavior name="sharedSecretClientCredentials"> <transportClientEndpointBehavior credentialType="SharedSecret"> <clientCredentials> <sharedSecret issuerName="MY_ISSUER_NAME" issuerSecret="MY_ISSUER_SECRET" /> </clientCredentials> </transportClientEndpointBehavior> <!-- Set BrokeredMessageProperty Bag --> <messagePropertyAction enable="true" expression="SET sys.Label='Router';SET sys.TimeToLive='00:01:00';SET id=newid()"/> </behavior> </endpointBehaviors> </behaviors>
它也可以通过编程方式使用
serviceEndpoint.Behaviors.Add(
new RKiss.Activities.Azure.Messaging.MessagePropertyActionEndpointBehavior()
{
Expression = "SET sys.Label='Router'; SET sys.TimeToLive='00:01:00'; SET id=newid()"
});
代理消息过滤器
BrokeredMessageFilter 是为 RoutingService 创建的,用于自定义 BrokeredMessageProperty 包上的消息过滤。它是 RoutingService 订阅者将消息转发到适当的出站通道的宝贵扩展。
public class BrokeredMessageFilter : MessageFilter { public string Expression { get; set; } public BrokeredMessageFilter(string expression) { this.Expression = expression; } public override bool Match(System.ServiceModel.Channels.Message message) { if (message.Properties.ContainsKey(BrokeredMessageProperty.Name)) { var bmp = message.Properties[BrokeredMessageProperty.Name] as BrokeredMessageProperty; var filter = new SqlFilter(this.Expression); bool bMatch = filter.Preprocess().Match(bmp.Message); return bMatch; } else { return false; } } public override bool Match(System.ServiceModel.Channels.MessageBuffer buffer) { throw new NotImplementedException(); } }
上述过滤器的核心是在 BrokeredMessage 对象上执行 SqlFilter。以下代码片段展示了此自定义过滤器在配置文件(路由部分)中的用法
<filters> <clear /> <filter name="MatchForSubscriber1" filterType="Custom" filterData="image=111 AND sys.Label='Hello Cloud'" customType="RKiss.Activities.Azure.Messaging.BrokeredMessageFilter, RKiss.Activities.Azure.Messaging" /> </filters>
请注意,上述扩展对于通过修改配置文件将“遗留”发布/订阅生产者和消费者连接到 Azure 服务总线非常有帮助。
2. 自定义活动
AssignBMP<T>
AssignBMP<T> 是一个自定义活动,用于创建 BrokeredMessageProperty
包并将其分配给 T 类型,T 类型只能接受 System.ServiceModel.Channels.Message
或 Microsoft.ServiceBus.Messaging.BrokeredMessageProperty
类型。其设计基于 BrokeredMessageProperty 类,其中包含服务总线属性(sys 范围)和参数集合中的应用程序(用户)属性(user 范围)
参数集合可以在弹出对话框中声明。下图显示了声明不同类型用户属性的示例
AssignBMP<T> 根据所需类型 T 进行工厂化。以下代码片段显示了这些模板。它展示了一些如何在参数集合中预定义一些常见应用程序属性的示例
public class AssignBMPtoMessage : IActivityTemplateFactory { public Activity Create(DependencyObject target) { AssignBMP<Message> assign = new AssignBMP<Message>() { DisplayName = "AssignBMPtoMessage" }; assign.Parameters.Add(new KeyValuePair<string,InArgument>("myKey", InArgument<string>.FromValue("12345"))); return assign; } } public class AssignToBMP : IActivityTemplateFactory { public Activity Create(DependencyObject target) { return new AssignBMP<BrokeredMessageProperty>() { DisplayName = "AssignToBMP" }; } }
发送范围
SendScope 是 Send 活动的通用自定义活动,允许以声明方式创建 Send 活动的绑定并动态插入消息头和属性
此自定义活动允许为类型化消息契约添加 BrokeredMessageProperty 包,当工作流编排基于 CLR 类型(DataContract)时。请注意,对于非类型化消息,BMP 可以直接分配给消息属性集合。
Subscribe (订阅)
此自定义活动旨在订阅订阅者对主题通道的事件兴趣。此自定义活动的结果是创建具有一个 RuleDescription(名称 = $Default)的订阅,该订阅基于 Filter 和 Action SqlExpression。
订阅活动背后是对服务总线的 API 调用,用于检查现有订阅,然后删除(如果需要续订)并创建一个新订阅。此自定义活动的当前实现并非完全异步过程,只有创建订阅是以异步方式完成的,其他都是同步过程。
请注意,Azure SDK 1.5 不支持更新现有订阅。如果能有此功能,而不是总是删除和重新创建,那将是很好的。此过程将影响依赖于此续订订阅的所有活动订阅者。为了避免此问题,我建议使用空订阅并使用 API 调用 AddRule
到订阅,换句话说,保持订阅处于活动状态。
取消订阅
这是一个自定义活动,用于异步方式从主题通道删除订阅。它是一个非常轻量级的活动,需要配置、命名空间、主题和订阅名称。请注意,通过处理此活动,此订阅的所有活动订阅者都将逻辑上从主题通道断开连接。
操作上下文范围
此自定义活动对于订阅者(接收者)中介器非常有用,用于获取传入消息的操作上下文,我们需要它来获取服务总线发送的 BrokeredMessageProperty 包。这是一个通用自定义活动,最初在 WF Security Pack CTP 1 中创建,并在我的包中稍作修改。其概念非常简单明了,即在其范围内在工作流线程槽中创建 OperationContext
。为了获取运行时 OperationContext,使用了内置消息检查器接口 System.ServiveModel.Activities.IReceiveMessageCallback 和 System.ServiveModel.Activities.ISendMessageCallback 的实现。
这就是这个包的全部内容。我没有围绕服务总线 API 创建一个完整的 WF 包装器。上述扩展侧重于生产者和消费者方面,即发布和消费事件兴趣,创建订阅者及其订阅。对于管理服务总线,我们可以使用 Windows Azure 平台门户和/或第三方浏览器,例如即将推出的新版本 Service Bus Explorer。
好的,让我们休息一下,因为我们要经历一些困难的部分,例如 用法和测试
。我假设您对 Windows Azure 平台有一定了解。
用法与测试
首先,以下是先决条件:
- Visual Studio 2010 SP1
- Windows Azure SDK v1.5(2011 年 9 月)
- Azure 存储资源管理器(选项,用于一个示例 - 测试 3)
- WF Azure 活动包 CTP 1(可选,用于一个示例 - 测试 3)
- Windows Azure Platform 账户
- Http/s Internet 连接
- 下载本文的软件包
- 对 Windows Azure 平台、WCF 和 WF 的一些知识和经验
让我们从 Windows Azure 服务总线开始
步骤 1. 在服务总线上创建您的命名空间
步骤 2. 在您的命名空间范围内创建以下实体,例如队列和主题
步骤 3. 在 Azure 存储 Blob 中创建容器“box”(这是针对一个测试场景的可选项 - Test 3
)
步骤 4. 下载本文并使用 VS2010SP1 打开它。
您应该看到以下解决方案
基本上,该解决方案分为两部分,即 AzureMessagingAcitivtyPack
和 Test
。第一个实际上是两个程序集的包,用于简化服务总线的消费。这些程序集的使用在 Test 包中显示。如您所见,Test 文件夹包含不同的应用程序(场景),由 WCF 和 WF 技术驱动,使用命令式和声明式方式创建生产者和消费者。
请注意,该解决方案已在 Windows 7 上开发和测试。
步骤 5. 编译解决方案
在此步骤中,您应该已成功构建提供的 exe 程序,例如 ConsoleApplications、Management、Publishers 和 PubSubRouter。所有快捷方式都集中在 ServiceBusTest
文件夹中,可以添加到桌面。
步骤 6. 更新配置文件
请将 Test 文件夹中所有配置文件中的以下值替换为您 Azure 服务总线和存储帐户的值
MY_ISSUER_NAME, MY_ISSUER_SECREET, MY_NAMESPACE, MY_ACCOUNT and MY_ACCOUNT_KEY
步骤 7. 创建订阅
启动 SubscriptionManager
程序以创建我们的测试订阅
此程序是声明性创建的,使用带有自定义活动 Subscribe 的工作流。请看下图中的设计
要验证服务总线上的上述订阅,请启动控制台程序 ManagingSubscriptionRules
如您所见,我们有三个订阅,其中第一个有一个简单的测试 Action 表达式。请注意,这个测试程序能够在订阅 image12
中添加一个规则,我们稍后需要演示它。
好的,现在我们的测试环境已准备就绪。但是,还有一个评论。测试包还包含一个简单的消息检查器,用于在控制台上记录消息,包括 BrokeredMessageProperty 包。
测试 1. 向队列发送消息。
这是一个非常简单的测试,发送方将消息发送到队列(Search),接收方将从队列中检索消息。首先(但不是必需的)启动接收方控制台程序
然后是发送方控制台程序
您应该看到消息已由接收方接收
太棒了,它在工作。现在,您可以再启动一个接收器,然后回到发送器并按任意键再次发送消息。您将看到消息在接收器之间进行负载平衡。
此测试已在配置文件中以声明方式实现。请访问这些配置文件(发送方和接收方)以获取更多详细信息。如您所见,将 NetMsmqBinding
(MSMQ) 迁移到 NetMessagingBinding
(服务总线) 可以非常直接和简单。
以下代码片段显示了发送方配置文件的一部分。其中包含用于在客户端终结点行为中设置 BMP 包的 messagePropertyAction
元素
测试 2. 发布消息到主题
在此测试场景中,我们将在订阅者之前触发(发布)事件兴趣,因此,请启动发布控制台程序。以下屏幕片段将显示。您可以在控制台屏幕上看到 wcf 消息(黄色)和 BMP 包(绿色)。请注意,消息的 TTL 设置为 30 分钟。
现在,启动订阅者控制台程序。您应该在订阅者上看到以下结果
太棒了,我们可以看到带有 BMP 包的日志消息以及服务中反序列化的消息(白色)。
让我们为这个发布/订阅场景实现更高级的功能,其中订阅者订阅了 image12
订阅,该订阅带有一个 $Default 规则,例如 image=111 or image=222
,这就是为什么我们只收到一条消息。
现在,让我们向此订阅添加另一个规则。我们可以通过使用 ManagingSubscriptionRules 控制台程序来做到这一点,该程序内置了此功能(为测试目的而硬编码)。请返回 ManagingSubscriptionRules 程序并按“a”键。您应该看到与以下屏幕片段所示相同的结果
请注意,此规则(name=abcd
)已声明了一个 RuleAction
,用于设置用户属性 Label
。
现在,我们可以回到发布者控制台程序,通过按任意键向服务总线主题通道发送消息。这是发布者的第二条消息,因此计数器值为 1。此测试中有趣的是,订阅者根据其 RuleDescription 的成功表达式从该订阅接收到两条消息。您还可以看到第二条消息的 BMP 包与第一条消息不同。这是 RuleAction 的结果,例如 SET Label = 'Hello Roman'
。
这很酷,您以后可以回来修改您的 RuleDescription 的测试代码,启动更多的订阅者(可能在不同的机器上),等等,以查看 Azure 服务总线上发布/订阅的功能。
此测试是程序化创建的,以下代码片段显示了发布者的实现
try { // my Service Bus string issuerOwner = ConfigurationManager.AppSettings.Get("sbIssuerName"); string issuerSecret = ConfigurationManager.AppSettings.Get("sbIssuerSecret"); string namespaceAddress = ConfigurationManager.AppSettings.Get("sbNamespaceAddress"); string text = string.Format("[{0}] Hello Service Bus - {1}", counter, DateTime.Now); EndpointAddress topicAddress = new EndpointAddress(namespaceAddress + "/" + topicName); var binding = new NetMessagingBinding(); var securityBehavior = new TransportClientEndpointBehavior() { TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(issuerOwner, issuerSecret), }; var se = new ServiceEndpoint(ContractDescription.GetContract(typeof(INotify)), binding, topicAddress); se.Name = "TopicPublisher"; se.Behaviors.Add(securityBehavior); se.Behaviors.Add(new RKiss.Logger.LoggerAttribute()); factory = new ChannelFactory<INotify>(se); var channel = factory.CreateChannel(); // programmatically setup bmp using (OperationContextScope scope = new OperationContextScope((IContextChannel)channel)) { var bmp = new BrokeredMessageProperty() { Label = "RKiss", TimeToLive = TimeSpan.FromMinutes(30) }; bmp.Properties.Add("abcd", (long)12345); bmp.Properties.Add("image", (long)111); bmp.Properties.Add("id", Guid.NewGuid()); bmp.Properties.Add("counter", (long)counter); OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, bmp); channel.Message(text); } factory.Close(); Console.WriteLine("Message #{0} has been sent to Topic='{1}'", counter++, topicName); } catch (CommunicationException ex) { // ... } catch (Exception ex) { // ... } Console.WriteLine("Press any key to send message"); Console.ReadLine();
测试 3. 事件驱动存储
此测试是一个场景,您可以看到多种技术的使用,例如 WCF、WF、Azure Blob 存储、主题和队列。下图显示了此场景
基本上,我们在本地拥有 Publisher、Subscriber 和 Cleaner 组件。在 Azure 端,我们有一个 Blob 存储和带有队列和主题通道的服务总线。操作非常简单,Publisher 将资源放入 Blob 存储,向主题通道触发事件,并向队列发送一个计划消息以清理存储中的资源。此场景适用于避免多播传递大型消息。服务总线不是为每个订阅复制大型消息,而是向订阅者发送一个非常小的通知消息,以便从存储中拉取资源。
请注意,此测试需要先决条件部分中描述的选项,例如 azure 存储帐户、创建 blob 容器和 Azure 存储资源管理器。
首先,请启动 AzureStorageExplorer,连接您的存储帐户并显示容器“box”。以下屏幕显示了我的帐户名称(rkstorage)
接下来,启动 StorageReceiver 控制台程序
现在,我们可以启动发布者,请启动 StoragePublisher 程序。请注意,发布者将自动执行操作并等待下一个操作,因此请刷新 AzureStorageExplorer 以查看容器“box”中存储的资源。
好的,下一步是启动 StoragePublisher
要验证我们的资源是否存储在 Azure Blob 存储中,请按下以下 Explorer 工具上的“刷新”按钮。您应该看到以下结果
使用此工具,我们还可以在浏览器中查看实际资源。请注意,看门狗时间已设置为 1 分钟,之后,Storage Cleaner 接收器将从队列接收消息,以执行从 Blob 存储中删除特定资源的操作。
下图是此动作的屏幕片段
我知道,资源 TTL 时间太短(只有 1 分钟),但是,您可以返回 StoragePublisher 并再次触发事件,并关注特定操作等。您可以多次触发发布者并观察 Storage 容器“box”的内容。
我喜欢这个场景,它的实现非常直接和易读。以下屏幕片段显示了 StoragePublisher 的工作流。这里有完整的 Put and Notify
活动序列,简单如 1-2-3。
测试 4. 路由器
此测试将展示 服务总线上的 WCF4 路由服务 的功能。为了更好地理解此测试场景,它需要启动更多程序来生成和消费消息,下图将为您提供此测试场景的简要概述
如您所见,此测试场景比上一个更复杂。因此,让我们进一步讨论。此测试的目标是展示 WCF RoutingService 的功能,它既是服务总线的订阅者也是发布者。RoutingService 可以是服务总线的网关。对于此情况,有一个 WinForm 程序 SimulatorPub/Sub 来表示 WCF 生产者和订阅者或 Web 服务。它是一个非常简单的程序,用于发送或接收 Soap 消息。
另一方面,我们有用于发布/订阅消息的 WCF/WF 程序和一个用于从队列接收消息的程序。以下配置片段显示了所有终结点的声明,例如路由器的入站和出站终结点。请注意,路由器已将其兴趣订阅到主题通道,订阅为 image1
路由器和服务总线之间的逻辑连接在路由部分中声明。下图显示了其过滤器。如您所见,此路由表支持三种连接类型(优先级 9、10 和 11)。第一个具有针对 BMP 包(image=111)的自定义消息过滤器,第二个基于 EndpointName
,其中来自 endpointRouter
的任何入站消息都转发到出站通道 TopicPublisher1
。最后一个具有最高优先级 11,基于消息 Action
。如果此匹配为真,则消息将转发到服务总线队列。
请查看完整的配置文件 PubSubRouter.exe.config
以了解其内容,特别是终结点行为部分。
我希望您能理解这个配置文件,尤其是路由表,这对于我们的下一步(例如运行时用例)是必需的
请关闭所有已打开的程序并启动以下程序
PubSubRouter
控制台程序(这是路由服务的主机进程)SimulatorPubSub
Winform 程序模拟发布/订阅消息订阅者
控制台程序用于从服务总线主题接收消息接收器
控制台程序用于从服务总线队列接收消息
此时,您应该在所有控制台程序上看到侦听器的提示消息。我们的 SimulatorPubSub 已经预定义了一个发布消息,所以我们的测试场景正在等待一个事件。我们将执行三个用例,即事件由模拟器发送,来自主题的事件由模拟器接收,以及模拟器将消息发送到队列的另一个用例。
1. 模拟器正在触发 event_1 消息
请按下 SimulatorPubSub 上的 OneWay 按钮。您应该在 PubSubRouter 控制台程序和接收器上看到日志消息,因此,来自模拟器的消息已通过路由器发送到服务总线队列并由接收器控制台程序接收,请看以下屏幕片段
2. 模拟器正在触发 event_2 消息
在这种情况下,模拟器将发送不同的事件消息。请修改此消息的 Action,例如,将 Broadcaster
替换为 BroadcasterXYZ
单词,然后按下 OneWay 按钮发送此消息。您应该在 Subscriber 控制台程序上看到以下结果
并在 Simulator Sub 中,来自订阅 image1 的消息
太棒了。这里发生了什么?嗯,来自 Pub Simulator 的入站消息已被路由到主题通道,所有订阅者(也是 Sub Simulator)以及订阅 image1
都收到了来自服务总线主题的副本消息。
好的,让我们再看一个例子。请按 Clear 按钮清除 Simulator 上的 Sub 消息。
3. WorkflowPublisher1 正在触发事件消息
在这种情况下,我们将向服务总线主题发布一条消息。为了进行此测试,我们需要启动 WorkflowPublisher1 控制台程序。请执行此操作并观察控制台程序上的结果。
首先,我们可以看到订阅者收到了两条消息(不同的 BMP 包但相同的消息负载)。这是正确的,该订阅者订阅了 image12
订阅,该订阅此时有两个 RuleDescription,因此我们收到了两条消息副本
我们可以在模拟器上看到另一个结果,其中 Sub 消息已接收。您可以检查消息有效负载的 id
元素。
本次测试到此结束。另外请注意,请查看 PubSubRouter 控制台程序。您将看到入站和出站终结点的日志消息,包括它们的 BMP 包。
如果一切顺利,请尝试修改此场景中的路由表、发布者和/或订阅者,以创建新的用例,从而查看 WCF 和 WF 技术如何与 Azure 服务总线主题/队列协同工作。
测试 4. 托管在 IIS/WAS 上的订阅者
订阅者文件夹包含在测试包中。此文件夹包含托管在 IIS/WAS 服务器中的三个订阅者的实现。其中一个订阅者使用工作流 xaml 以声明方式实现。以下代码片段订阅者是通过代码以命令方式创建的:
public class Subscriber1 : IGenericOneWayContract { public void Message(System.ServiceModel.Channels.Message msg) { Trace.WriteLine("*** Subscriber1 ***"); if (msg.Properties.ContainsKey(BrokeredMessageProperty.Name)) { Trace.WriteLine("BrokeredeMessageProperty:"); var bmp = msg.Properties[BrokeredMessageProperty.Name] as BrokeredMessageProperty; foreach (var prop in bmp.Properties) { Trace.WriteLine(string.Format(" {0} = {1}", prop.Key, prop.Value)); } } Trace.WriteLine(msg + "\r\n"); } }
以下屏幕片段显示了由 WF 创建的 Subscriber3。您可以看到 OperationContextScope
的使用,以获取消息 BMP 包
与服务总线的完整插件设置在配置文件中完成。请访问此文件以了解每个由 IIS/WAS 托管的订阅者。请注意,此虚拟应用程序必须启用自动启动功能。
我将把这个测试留给您,希望您能在跟踪输出(例如 DebugView)中看到订阅者的结果。您可以使用 Publisher 控制台程序向主题通道触发事件。
高级用法
Windows Azure 服务总线消息提供了高级功能,例如以 PeekLock
模式接收消息,即在客户端发送完成消息的请求后,消息才从队列/订阅中移除,还有客户端批处理、会话消息等。这些高级功能不在本文中描述。更多详细信息可以在 服务总线代理消息性能 - 最佳实践指南 中找到。
在本文中,我们使用 .Net API 客户端(由 WCF 范例封装)访问服务总线,这些客户端在 Azure SDK 中实现。REST API 可以用于 .Not 应用程序,但有一些限制。请阅读 Service Bus REST API 参考 以获取更多信息。
结论
本文介绍了通过 WCF 和 WF 技术使用 Windows Azure 服务总线消息。Azure 服务总线消息构建在多租户环境中,它代表了其生产者和消费者(位于本地和/或 Azure 环境中)之间的逻辑连接。服务总线不是一项技术,而是一种服务解决方案,用于以透明的松散耦合方式处理事件驱动架构和应用程序内部消息交换的队列和发布/订阅消息。毫无疑问,Azure 服务总线在业务模型中扮演着重要角色,它将集中式逻辑连接映射到分散式物理连接。希望您喜欢本文。
参考文献:
[1] 服务总线
[2] Windows Azure AppFabric 服务总线代理消息
[4] 服务总线主题和队列 – 高级
[5] Windows Azure AppFabric 服务总线:主题
[6] 探索 Azure AppFabric 服务总线 V2 5 月 CTP:主题
[7] 利用 Windows Azure 服务总线代理消息 API 的最佳实践
[8] 如何将 BizTalk Server 应用程序与服务总线队列和主题集成
[11] 通过构建服务总线资源管理器工具探索主题和队列——第 1 部分
[12] Windows Azure AppFabric 示例
[13] Azure AppFabric 服务总线代理消息 GA & Rude CTP 差异
[14] 探索 Azure AppFabric 服务总线 V2 5 月 CTP:队列
[15] SqlExpression
[16] CodePlex 上的服务总线示例
[17] 将服务总线主题和订阅与 WCF 结合使用