65.9K
CodeProject 正在变化。 阅读更多。
Home

Azure Service Bus 推送器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.75/5 (8投票s)

2012年4月1日

CPOL

24分钟阅读

viewsIcon

52595

downloadIcon

483

本文介绍了Windows Azure Service Bus Messaging的推送服务的设计和实现。

目录

特点

  • 发布/订阅-推送模型(PushAndPush模型)
  • WorkerRole中订阅者和接收者的虚拟主机
  • 用于运行时和配置存储库的表存储
  • 用于管理WorkerRole的发布/订阅控制器
  • 无外部终结点
  • 代理消息的目标终结点
  • BasicHttpBinding (http/https) 的目标终结点
  • 跨命名空间、主题等推送消息
  • 发送BrokeredMessageProperty标头的选项
  • 消息版本选项(默认为Soap12)
  • 直接或间接推送寻址
  • Windows Azure 服务总线消息传递(2011年11月)
  • WCF 技术

引言

Windows Azure 服务总线表示WCF和其他服务终结点(包括REST终结点)之间的逻辑连接。基本上,有两种消息传递功能,如“中继”和“代理”消息传递。本文重点介绍代理消息传递。

代理消息传递仅提供单向持久异步通信,消息传递组件(如QueueTopicSubscription)具有发布/订阅(PubSub)功能。其基础设施基于PushAndPull松耦合分布式架构。

代理消息被推送到特定的组件实体(队列或主题),并暂时存储,直到接收方准备好拉取它。这就是为什么它被称为PushAndPull模型。事件兴趣的消费者通过服务总线组件(如队列或主题/订阅)逻辑连接到事件源。在这种情况下,事件兴趣的消费者代表一个服务总线的服务,用于从其组件中“拉取”消息。

从架构的角度来看,服务总线的消费者是消息的接收者,必须托管并通过终结点提供,服务总线传递部分可以消费该终结点。有关此技术的更多详细信息,请参阅我的文章Azure 服务总线测试器,这个小工具可以帮助您理解Azure 服务总线代理消息传递的用法。

使用服务总线组件的另一种架构风格是它们的完全封装。换句话说,事件兴趣的消费者不需要知道有关连接的任何细节。它的接收器是一个用于消息调解的通用服务,它不知道消息在哪里以及如何创建。它可以从服务总线、从第三方适配器、直接从客户端等创建。在这种情况下,接收器是一个常规消息服务,而不是服务总线的订阅者或接收者。

对于这种架构,我们需要扩展服务总线基础设施,提供一个服务来提供PushAndPush连接模式,其中事件兴趣被推送到服务总线,然后再次推送到事件兴趣的服务(接收器)。

本文详细介绍了如何为Azure 服务总线设计和实现此Pusher服务。

下图显示了它在Windows Azure中的位置

基本上,上图显示了一个由WorkerRole托管的Pusher服务,它作为服务总线组件的消费者,从服务总线组件中拉取消息,并根据表存储中存储的元数据将消息推送到目标服务。这是服务总线代理消息传递的标准场景,唯一的区别是有一个集中的消费者和事件兴趣的映射器,这些事件兴趣由发布者/发送者在服务总线上发布。

表存储是Pusher服务的元数据存储库。有用于创建订阅者/接收者的启动数据和用于将消息推送到特定目标的运行时数据。除此之外,还有一个订阅者用于控制Pusher。请注意,Pusher作为具有多个实例的WorkerRole在后台进程中运行,并共享相同的表存储。没有用于与Pusher通信的公共终结点,所有控制都通过服务总线完成,因此发送到主题Pusher的任何控制消息都会多播到所有WorkerRole实例。另一方面,Pusher内置了一个信息/警告/错误通知消息的发布者。

因此,对于任何已注册的服务总线组件,如队列、主题/订阅,我们都可以在表存储中分配(订阅)一个Pusher。当发布的消息在这些组件中找到匹配项时,消息的副本将传递给Pusher,并根据表存储中的运行时元数据推送到特定的目标终结点。

在服务总线上订阅兴趣在队列和订阅组件之间有所不同。以下屏幕截图显示了这些差异

正如您所看到的,Pusher被分配给队列和/或订阅的特定规则。表(存储库)中的每一行必须是唯一的,并由服务总线组件的地址定义。对于订阅,规则名称被添加用于唯一定义(表存储中的RowKey)。基于此,您可以在上图中看到队列只有一个Pusher,但订阅可以根据需要有多个Pusher。

请注意,Pusher始终只为每个订阅创建一个订阅者。例如,如果订阅者有一个或多个规则,它将始终只托管一个订阅者,因为它们在服务总线上具有相同的侦听器Uri。

另外一点,Pusher有责任将订阅者收到的消息推送到目标终结点。这是一种一对一的映射模式。因此,只有服务总线(已注册的组件,如订阅和规则)才能处理消息多播。例如,如果我们的场景需要消息多播,我们可以注册一个具有相同SqlExpression过滤器(最大限制为64)的多个规则的订阅,或者为多个订阅(最大限制为2000)使用相同的规则。从Pusher的角度来看,我建议使用第一种情况,即一个订阅(一个订阅者)具有多个规则。

好的,现在根据上述描述,我们可以抽象出我们的Pusher,如下图所示

上图显示,发布者消息通过映射表推送到目标服务。通过管理表存储,我们可以在运行时管理此过程,例如添加新的推送、启用/禁用推送、编辑目标终结点等。

Pusher允许灵活的选项来选择目标终结点地址。基本上,有两种地址选项,一种是直接通过代理消息,它具有优先级,另一种是通过表存储间接。根据代理消息属性To的值,Pusher将知道如何处理此推送传递。这是一个非常强大的概念,允许从客户端(发布者)、使用规则操作的订阅以及最终通过存储库寻址目标。

以下屏幕截图显示了用于设置sys.To属性(使用规则名称)的$Default规则操作。基于此别名,目标终结点地址在运行时从存储库中选择。

请注意,订阅规则有一个带有sql表达式的过滤器。只有真正的匹配才允许执行SqlAction并将消息副本传递给订阅者。我们还可以评估sys.To属性的值作为SqlExpression的一部分。此功能使我们能够根据需要动态更改目标终结点地址。

太棒了。我们还可以将目标终结点地址设置为服务总线组件,例如队列或主题吗?是的,例如,像SET sys.To = 'sb://rk2011.servicebus.windows.net/image'这样的操作会将消息推送到rk2011命名空间中的队列。

下图显示了一个场景,其中发布者将消息发布到队列,然后Pusher将消息推送到订阅,多个规则可以将消息多播到服务。此示例基本上显示了队列消息的多播

演示Pusher实用性的另一个场景是不同服务总线命名空间之间的“桥接”。在这种情况下,Pusher实际上是下一个服务总线的发布者

好的,让我们继续概念和设计。我假设您对Windows Azure平台有实际操作知识。

概念与设计

Azure Service Bus Pusher的概念基于WCF订阅者拉取代理消息并将其转发(推送)到目标终结点。目标终结点是根据代理消息属性To动态选择的。

目标寻址

基本上,目标终结点可以是服务总线组件(例如QueueTopic)或BasicHttpBinding终结点。此地址可以直接在消息中声明,也可以在存储库中声明。下图显示了目标寻址

在前两种情况下,BrokeredMessage.To属性具有终结点地址的直接值。在sb方案中,代理消息被转发到服务总线组件。在最后两种情况下,BrokeredMessage.To属性具有规则的名称(例如,$Default),可用于从存储库中查询目标终结点地址。

以下代码片段显示了将推送消息传递到服务总线组件(队列或主题)的私有方法

private void PushMessageToServiceBus(string address, Message message)
{
    ChannelFactory<IGenericOneWayContract> factory = null;

    try
    {
        var ns = address.Substring("sb://".Length).Split(new char[] { '.' }, 2)[0];

        var query = Repository.CreateQuery<AccountEntity>()
                        .AddQueryOption("$filter", string.Format("(PartitionKey eq '{0}' and RowKey eq '$Namespace')", ns))
                        .AddQueryOption("$top", "1");

        AccountEntity[] accounts = query.Execute().ToArray();

        if (accounts.Count() == 0)
            throw new EntryPointNotFoundException(string.Format("Account {0} doesn't exist in the Repository", ns));

        var securityBehavior = new Microsoft.ServiceBus.TransportClientEndpointBehavior()
        {
            TokenProvider = Microsoft.ServiceBus.TokenProvider.CreateSharedSecretTokenProvider(accounts[0].AccountName, accounts[0].AccountKey),
        };

        var binding = new NetMessagingBinding();
        var se = new ServiceEndpoint(ContractDescription.GetContract(typeof(IGenericOneWayContract)), binding, new EndpointAddress(address));
        se.Behaviors.Add(securityBehavior);

        factory = new ChannelFactory<IGenericOneWayContract>(se);
        var channel = factory.CreateChannel();
        channel.ProcessMessage(message);

        factory.Close();
    }
    catch (Exception ex)
    {
        if (factory != null)
        {
            if (factory.State == CommunicationState.Faulted)
                factory.Abort();
            else
                factory.Close();
            factory = null;
        }

        throw ex;
    }
}

HTTPS和基本身份验证

推送到目标的消息可以使用https传输和基本身份验证进行保护。客户端凭据(如UserNamePassword)存储在存储库表中,用于每个推送器条目,或通过$Credential行单独存储。

以下代码片段显示了http/https客户端代理的安全部分

if (binding.Security.Mode == BasicHttpSecurityMode.TransportWithMessageCredential)
{
    if (string.IsNullOrEmpty(entity.UserName) || string.IsNullOrEmpty(entity.Password))
    {
        #region Get Credential entity from the Repository
        var url = new Uri(entity.Url);
        string key = (url.Host + url.AbsolutePath).Replace("/", "%");

        var query = Repository.CreateQuery<CredentialEntity>()
                        .AddQueryOption("$filter", string.Format("(PartitionKey eq '{0}' and RowKey eq '$Credential')", key))
                        .AddQueryOption("$top", "1");

        CredentialEntity[] credential = query.Execute().ToArray();
        if (credential.Count() == 0)
            throw new EntryPointNotFoundException(string.Format("Credential Entity {0} doesn't exist in the Repository", key));

        entity.UserName = credential[0].UserName;
        entity.Password = credential[0].Password;
        #endregion
    }

    factory.Credentials.UserName.UserName = entity.UserName;
    factory.Credentials.UserName.Password = entity.Password;
    binding.Security.Transport.ClientCredentialType = HttpClientCredentialType.Basic;

    ServicePointManager.ServerCertificateValidationCallback = new RemoteCertificateValidationCallback(delegate { return true; });
}              

如您所见,当PusherEntity行中的凭据为空时,表将查询所有PartitionKey = key$Credential行。$Credential行的PartitionKey基于此格式:(url.Host + url.AbsolutePath).Replace("/", "%")

请注意,目标端(服务)必须启用https绑定并接受客户端凭据。在这种情况下,您需要在IIS上安装SSL证书并配置虚拟目录以要求SSL。

<basicHttpBinding>
  <binding name="secure">
    <security mode="TransportWithMessageCredential">
      <transport clientCredentialType="Basic" />
    </security>
  </binding>
</basicHttpBinding> 

订阅者/接收者

订阅者或接收者是托管服务,用于根据订阅或队列名称从服务总线(例如主题或队列)拉取事件兴趣。在Pusher的情况下,订阅者/接收者不是最终(业务)消息消费者;其设计可以通过一个通用服务进行虚拟化。感谢NetMessagingBinding简化了此拉取过程并由WCF模型抽象。

以下代码片段显示了使用WCF托管技术创建订阅者主机的实现

public static ServiceHost CreateSubscriberHost(AccountEntity account, string addressTopic, string addressSubcription, bool bRequiresSession = false)
{
    if (account == null)
        throw new ArgumentNullException("account");

    var securityBehavior = new TransportClientEndpointBehavior()
    {
        TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(account.AccountName, account.AccountKey),
    };

    var binding = new NetMessagingBinding();
    var cd = ContractDescription.GetContract(typeof(IGenericOneWayContract));
    cd.SessionMode = bRequiresSession ? SessionMode.Required : SessionMode.Allowed;
    
    EndpointAddress topicEndpointAddress = new EndpointAddress(addressTopic);
    var se = new ServiceEndpoint(cd, binding, topicEndpointAddress);
    se.ListenUri = new Uri(addressSubcription);
    se.Behaviors.Add(securityBehavior);

    var host = new ServiceHost(typeof(PusherService));
    host.AddServiceEndpoint(se);

    return host;
}

如您所见,这是创建ServiceHost实例的直接实现,它将侦听addressTopic。不要混淆ListenerUri属性。此属性用于服务总线代理,需要它来侦听特定的订阅。此“技巧”由NetMessagingBinding逻辑涵盖。

订阅者/接收者的通用服务是PusherService,具有通用(无类型)单向合同。

[ServiceContract(Namespace = "urn:rkiss.sb/pusher/2012/03")]
public interface IGenericOneWayContract
{
    [OperationContract(IsOneWay = true, Action="*")]
    void ProcessMessage(System.ServiceModel.Channels.Message msg);
}

请注意,此服务也用于内部控制器,例如更新、重启等,稍后将详细介绍。

PusherService

PusherService是IGenericOneWayContract服务合同的实现。这是从服务总线拉取(接收)消息的地方。

以下代码片段显示了其样板逻辑,其中基本上有两个分支,例如控制器和推送消息。根据目标寻址,推送消息有其自己的实现和处理。用于从特定主机(使用OperationContext.Current)查询存储库以获取附加元数据的pusherKey。在直接目标寻址中,可以从bmp.To属性获取终结点地址。更多实现细节,例如http/httpssb代理,可以在包含的源代码中找到。

public class PusherService : IGenericOneWayContract
{
  public void ProcessMessage(System.ServiceModel.Channels.Message msg)
  {
    Guid? hostId = ((ServiceHost)OperationContext.Current.Host).Id();
    Uri listener = OperationContext.Current.EndpointDispatcher.ChannelDispatcher.Listener.Uri;
    string pusherKey = (listener.Host + listener.AbsolutePath).Replace('/', '%');
    bool IsSubscription = pusherKey.IndexOf("%subscriptions%") > 0;

    if (msg.Properties.ContainsKey(Microsoft.ServiceBus.Messaging.BrokeredMessageProperty.Name) == false)
    {
       Trace.TraceWarning("Only brokered message is supported. " + pusherKey);
       return;
    }

    if (msg.Headers.Action.StartsWith("urn:rkiss.sb/pusher/2012/03/IPusher/", StringComparison.CurrentCultureIgnoreCase))
    {
       // Controller IPusher 
    }
    else
    {
       var bmp = msg.Properties[Microsoft.ServiceBus.Messaging.BrokeredMessageProperty.Name] as Microsoft.ServiceBus.Messaging.BrokeredMessageProperty;

       try
       {
          #region Push this message           
          if (string.IsNullOrEmpty(bmp.To) == false && bmp.To.IndexOf("://") > 0)
          {
             // delivery message based on the message's url                   
          }
          else
          {
             // delivery message based on the Repository
          }
          #endregion
        }
        catch (Exception ex)
        {
            // Notification
        }
    }
  }
}

托管

每个订阅者/接收者都有自己的托管实例。基本上,有两种托管类型,例如用于控制器和用于应用程序推送消息的托管。控制器是为Pusher控制目的(更新、重启、通知状态等)而内置的强制订阅者,它通过配置文件中的设置进行配置。其他主机用于希望将其消息推送到特定目标的订阅者/接收者。对于托管概念,在WorkerRole实例进程中使用标准WCF技术。

下图显示了这个场景。

Pusher控制器

Pusher本身也利用服务总线的功能进行控制和通知。换句话说,Pusher正在使用Azure服务总线本身的PubSub功能。Pusher行为通知有内置的发布者,也有用于接收请求命令(例如,重启WorkerRole)的订阅者。

Pusher可以以并行方式在多个WorkerRole实例中运行,共享相同的存储库数据。对于这种设计模式,控制器可以使用服务总线的特性,例如多播主题/订阅。

WorkerRole中的每个控制器将(如果不存在)在Pusher主题上创建自己的订阅,并使用一个$Default规则与管理工具配对。下图显示了Service Bus Explorer的屏幕截图,显示Pusher主题有两个订阅(实例0和1)。

主题上具有配对键/值的任何发布者都可以向Pusher控制器发送消息。当使用Service Bus Tester时,您将在使用部分看到有关此场景的更多详细信息。

WorkerRole

订阅者、接收者和控制器的ServiceHosts必须不间断地24/7运行,以从服务总线拉取和推送消息。因此,将所有组件保持在可运行进程中的最佳选择是WorkerRole,就像本地的Windows NT服务一样。基本上,WorkerRole是一个单例实例,用于保存ServiceHosts引用并使其保持活动状态。

以下代码片段显示了WorkerRole.OnStart的重写方法。这是为每个订阅者、接收者和推送器控制器创建托管的地方。

public override bool OnStart()
{
    // Set the maximum number of concurrent connections 
    ServicePointManager.DefaultConnectionLimit = 12;

    CloudStorageAccount.SetConfigurationSettingPublisher((configName, configSetter) => configSetter(configName));

    // mark this start
    Guid? sessionId = Guid.NewGuid();

    // subscriber for Controller
    string subscriptionName = RoleEnvironment.CurrentRoleInstance.Id.Split('.').Reverse().FirstOrDefault();
    SubscriptionDescription  sd = ServiceBusExtension.CreateSubscriptionIfNotExist(subscriptionName);
    if (sd != null)
    {
        // create subscriber for this subscription
        hostWorkerRoleSubscriber = Hosting.CreateDefaultSubscriberHost(subscriptionName);
        hostWorkerRoleSubscriber.Extensions.Add(new HostExtension { SessionId = sessionId });
        hostWorkerRoleSubscriber.Open();
        hostWorkerRoleSubscriber.Faulted += delegate(object source, EventArgs e)
        {
            ServiceHost sh = source as ServiceHost;
            Trace.TraceError("Host_{0} for Controller {1} failed", sh.Id(), sh.PusherName());
            sh.Abort();
            hostWorkerRoleSubscriber = null;
        };

        Trace.TraceInformation("WorkerRole Host_{0} for Controller {1} has been opened.", sessionId, hostWorkerRoleSubscriber.PusherName());
    }
    else
    {
        Publisher.PublishWarning("Creating Subscriber for Pusher Control failed. The Pusher is not ready to received any control message");
    }

    // Load all Application Subscribers from Repository
    Hosting.Load(sessionId);

    RoleEnvironment.Changing += RoleEnvironmentChanging;
    return base.OnStart();
}

第一个订阅者是为控制器创建的,并根据设置配置数据同步打开。下图显示了其存储、服务总线、订阅和发布者的配置设置

其他订阅者/接收者是根据存储库元数据异步创建的。因此,每个WorkerRole启动都有自己的ID,以区分关闭ServiceHost的后处理。

存储库(表存储)

存储库代表Pusher的启动和运行时知识库。这是Pusher及其消费者之间的共享位置。存储库可以通过管理工具或客户端通过REST API以编程方式更新。

在WorkerRole启动期间,存储库中描述的所有已启用的推送器都必须订阅服务总线。此步骤将允许我们从主题和/或队列中拉取消息。当PusherService收到消息时,根据运行时存储库,消息可以传递到目标终结点。请注意,此运行时存储库数据可以随时更改,而无需更新WorkerRole。在添加新的订阅者/接收者的情况下,WorkerRole将更新其状态(关闭或打开特定订阅者/接收者的ServiceHost)

下图显示了存储库支持的三个实体。第一个用于保存访问服务总线所需的Service Bus帐户。第二个用于订阅者/接收者推送消息,最后一个用于HTTPS Pusher凭据。

这些实体的上述属性用于运行时决策,决定如何以及在何处转发/推送收到的消息。启动数据存储在存储库中的TableServiceEntity中,使用PartitionKeyRowKey。这两个属性在存储库中表示为唯一的逻辑Pusher条目。

对于Pusher存储库,表存储用于其管理,我正在使用流行的免费Azure Storage Explorer。对于生产版本,我建议构建一个Pusher管理工具来管理此存储库并隐藏其设置属性。

下图显示了我的Azure Storage Explorer制作的Push Storage的屏幕截图

如前所述,存储库中的Pusher条目可以用于两个实体,例如AccountEntityPusherEntity。此外,PusherEntity必须针对服务总线组件队列和主题完成。为了优化表存储上的查询,RowKey用于识别行的类型(Pusher条目)。

RowKey中有以下预定义值

  • $Default,这是服务总线声明的订阅第一个规则的默认名称
  • $Namespace,这是帐户条目的键
  • $Credential,这是客户端凭据(用户名/密码)的键
  • $Queue,这是队列条目的键
  • 订阅规则的任何有效名称。

以下屏幕截图显示了指定命名空间中帐户条目的示例,位于PartitionKey

除了$Namespace之外的其他条目的PartitionKey将表示订阅地址或队列地址。由于url地址的格式不能插入到PartitionKey中,因此需要进行一些更改,例如:删除带分隔符的方案并将字符'/'替换为'%'。

以下屏幕截图显示了队列条目的这些更改

根据上述条目,我们可以为地址为sb://rk2011.servicebus.windows.net/search的队列注册一个Pusher。收到的消息被推送到由Url地址、消息版本定义的目标,并可以选择将BrokeredMessageProperty作为自定义Soap Header传递。如果消息无法传递到目标,通知消息将发布到Service Bus Topic。Pusher条目可以通过属性Enabled禁用。

以下两张图片显示了存储库中订阅条目的示例。第一个用于$Default规则,第二个用于bridgeByPass规则

代理消息属性标头(BMP标头)

在将消息推送到Soap Target的情况下,我们可以启用IncludeBMP选项,将BrokeredMessageProperty插入到Soap Headers部分。此功能允许过滤这些属性,而无需反序列化消息负载,例如,使用WCF RoutingService。

以下屏幕截图显示了Soap11消息中BMP标头的示例

好的,是时候展示了。让我们描述一下这个服务总线推送器能为您做什么。

用法与测试

首先,以下是先决条件:

Pusher需要构建和部署Azure应用程序、表存储以及使用Azure服务总线消息传递的知识和经验。因此,我假设您对Windows Azure平台有实际操作知识。

Pusher解决方案

Pusher解决方案只包含一个WorkerRole,其中实现了所有基于表存储中存储的元数据的功能。Test文件夹中包含用于Push表和虚拟Publisher Tester的测试资源。

以下屏幕截图显示了此解决方案

请注意,以下步骤基于配置设置,例如表名、主题名等。我建议暂时保留这些名称。当然,它们可以根据需要进行更改。在以下步骤中,我将使用第三方工具来模拟和探索Azure服务总线推送器的用法。

第一步(A)将为我们的Pusher准备管理,例如发送控制消息、接收通知等。

步骤A. 更新您的存储帐户和服务总线命名空间的设置

步骤A1. 在服务总线上创建pusher主题和Notifications订阅

使用服务总线浏览器在Azure服务总线上创建主题和订阅。

以下屏幕截图显示了服务总线上的pusher主题和Notifications订阅

请注意,pusher主题将具有由WorkerRole的每个实例创建的附加订阅。

步骤A2. 创建虚拟管理工具

使用Azure 服务总线测试器创建一个虚拟订阅者,用于接收Pusher的通知。

使用Azure 服务总线测试器创建虚拟发布者,用于向Pusher发送控制消息。消息模板可以从解决方案文件夹Test中加载。

步骤A3. 编译本文中的AzurePushService解决方案

在此步骤中,我们将使用Windows Azure计算模拟器运行程序包,并观察服务总线测试器上的所有跟踪通知。

以下屏幕截图显示了模拟器上的结果

在测试器上,我们应该看到来自每个实例的两个通知。消息通知Pusher已启动。

步骤A4. 从测试器触发NotifyStatus

触发虚拟发布者pusher.NotifyStatus,控制消息将发送到pusher主题并由两个WorkerRole实例接收。如果消息被PusherService识别,Pusher将发布托管订阅者列表。由于此时Push表为空;以下状态也为空

同样,我们可以触发其他发布者以查看其通知。

太棒了。我们完全控制了Pusher。现在,是时候向存储库(Push表存储)添加一些推送条目了。

让我们举一些例子。

步骤B1. 队列多播示例

这是一个将事件兴趣推送到队列并在主题上跨命名空间多播的示例。下图显示了此场景

为了演示发送方和订阅者,我将使用Azure 服务总线测试器,用于编辑表存储,我将使用Azure Storage Explorer,用于在服务总线上创建组件,我将使用服务总线浏览器。此示例需要在服务总线上创建两个命名空间。

请在您的第一个命名空间中创建队列search(在我的示例中是rk2011)。在第二个命名空间中,创建主题topic/worker,并在该主题下创建两个订阅,例如image1imageN

下图是每个命名空间的服务总线浏览器屏幕截图

下一步是将Push条目添加到存储库。这是从队列到主题的BrokeredMessage的Pusher,因此我们只需要为此实体条目设置以下属性

PartitionKeyRowKey是条目行的唯一标识符。基本上,有一个映射(路由)信息,用于根据PartitionKey和Url将消息从队列传递到主题。请注意,Pusher需要用于此连接的两个帐户,例如跨命名空间的拉取和推送过程。这些帐户也必须存储在Push表中。

Azure Storage Explorer允许从文件系统上传表。Test文件夹包含此文件,下图显示了其内容

Pusher设置和元数据管道到此为止。现在我们可以使用带有虚拟发送方和订阅者的测试器了。请为搜索队列添加发送方,并为Image1和imageN订阅添加订阅者。

一旦我们在测试器上有了这些组件,我们就可以进行测试了。右键单击发送方并选择发送以将消息发送到队列。消息将由Pusher从队列中拉出并推送到topic/worker,其中有两个订阅者。

您应该在测试器上看到多播接收到两条消息的结果,如下图所示

太棒了。但是等等,让我们做一些额外的实验。为队列搜索添加虚拟接收器并再次重复测试。您应该会看到Pusher和虚拟接收器之间的消息平衡。

再举一个例子,但这次我想演示将消息推送到旧版http Web服务,请参阅下一步。

步骤B2. HTTP Pusher示例。

此测试的用例非常简单。我们需要在之前的示例中添加一个多播目标。此目标是一个旧版BasicHttpBinding Web服务,其地址例如为https://:8982/Service

第一步是添加一个新的订阅,例如imageNplusOne,并将Action设置为sys.To='$Default'。请注意,此RuleAction对于存储库中的查询非常重要,请参阅前面的描述。

下图显示了带有新订阅imageNplusOne的服务总线浏览器

下一步是在存储库中添加以下推送条目。请注意,RowKey是规则名称,MessageVersion是Soap11,Url是我们的目标服务的终结点地址

现在,回到测试器。从虚拟发送方发送消息,我们可以像前面的示例一样在image1imageN订阅者中看到消息,但在Pusher订阅者中看到错误消息。此错误消息表明Pusher无法将消息传递到此目标终结点。

如果您的机器上运行此服务并在控制台程序中自托管,则屏幕上应显示以下消息

我展示这张图片的原因是Soap11消息中的BMP Header。当然,这是存储库中的一个可选属性。

步骤B3. HTTPS Pusher示例

在此场景中,推送消息通过安全通道发送到目标,并且客户端必须使用其基本凭据(例如用户名和密码)由服务授权。此示例向您展示了此场景必须完成的工作

  1. 使用目标服务的UserNamePassword编辑PusherEntity
  2. 将传输更改为https,例如:https://myMachine/Service
  3. 在IIS上安装SSL证书并配置虚拟目录以要求SSL。
  4. IIS公开可用
  5. 您的服务需要配置您的自定义UserNamePasswordValidator

下图显示了上述a)和b)点中描述的Pusher端的所有更改

将Pusher移动到安全传输非常简单,在服务端设置HTTPS和基本身份验证将占用大部分时间。在向同一目标(终结点)发送多个推送消息的情况下,我们可以为此终结点使用$Credential实体行。下图显示了这些条目。请注意,PusherEntity行必须为空(null)。

注意:我建议在本地机器上测试此场景,包括模拟器,然后将Pusher和您的服务都移到云端。此链接如何配置HTTPS终结点上的SSL证书也可以帮助您。

示例到此为止,您可以根据需要创建您的推送条目,但是如果我们需要将消息推送到具有不同绑定协议的目标等怎么办?这是一个很好的问题。解决方案称为虚拟桥,并在本文的附录A中描述,我很快就会在codeproject上发布它。

步骤C1. 在Azure上部署。

前面的步骤是在模拟器上完成的,所有跟踪消息都可以在实例屏幕上看到。现在,是时候将其移至Azure了。退出模拟器,创建包并将其部署到云。您也可以使用手动过程,例如创建托管服务并上传包和配置文件。

一旦云上的服务准备就绪,可以重复上述测试步骤,包括最后一个带有终结点地址错误(https://:8982/Service)的步骤。

结论

本文介绍了服务总线组件的推送服务。Pusher支持使用推送模型从服务总线组件(如队列和主题)传递事件兴趣。表存储(存储库)允许我们在推送模型驱动架构中创建订阅者或接收者到目标终结点的运行时映射。希望您喜欢它。

附录 A

Pusher内置了一个简单的BasicHttpBinding终结点代理,我们可以在其中推送代理消息。在其他情况下,例如消息调解和定位,我们需要一个bridge。基本上,Bridge是一个特殊的服务,用于以预定义的模式验证-丰富-转换-丰富-路由(VETER)处理消息。下图显示了此场景

上图显示了发布者将事件兴趣推送到服务总线(队列或主题),Pusher将代理消息推送到Bridge,在那里消息可以被调解(转换)并分派到特定目标。

此过程如何实现并为Windows Azure基础设施进行抽象(虚拟化)并托管在WebRole中,您可以在我即将发布的文章中看到。它基于我之前的文章Azure上的路由服务。此外,我建议查看Windows Azure 服务总线EAI & EDI实验室发布

参考文献:

[0] 服务总线

[1] 服务总线浏览器

[2] Azure 服务总线测试器

[3] Azure 存储浏览器

[4] AppFabric开发人员指南

[5] 使用Windows Azure 服务总线消息传递

[6] Windows Azure 服务总线EAI & EDI

[7] 操作方法:使用自定义用户名和密码验证器

[8] 如何配置HTTPS终结点上的SSL证书

© . All rights reserved.