65.9K
CodeProject 正在变化。 阅读更多。
Home

Azure Service Bus 测试器

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.96/5 (15投票s)

2011年11月20日

CPOL

28分钟阅读

viewsIcon

92796

downloadIcon

2931

本文档描述了一个小型工具的设计和实现,该工具是用于 Windows Azure Service Bus Messaging 的测试工具。

 

注意 文章已更新 - 请参阅附录 D - 版本 1.4

 

目录

特性

  • Windows Azure Service Bus Messaging (2014年11月
  • WCF 技术
  • 虚拟订阅者/接收器 (SessionMode=Allowed, Required; ReceiveMode=ReceiveAndDelete)
  • 虚拟发布者/发送者
  • 每个订阅者或接收器都托管在其自己的自定义 AppDomain 中
  • 发送 BrokeredMessages (非类型化契约)
  • 显示接收到的 BrokeredMessages (非类型化契约)
  • 加载/保存和复制/粘贴消息
  • ContentTypes: application/soap+msbin1, application/soap+msbinsession1, application/soap+xml, application/xml,  application/json, text/plain
  • REST 发布者
  • Relay 发布者
  • ServiceBusTester REST 端点
  • 带 XSLT 消息转换的虚拟发布者

引言

本文档是我最近文章 使用 Windows Azure Service Bus Messaging 的续篇,在该文章中我详细描述了通过 WCF 和 WF 技术使用 Azure Service Bus Messaging。ServiceBusTester 是一个用于 Azure Service Bus Messaging 的小型工具。它模拟了 Azure Service Bus 命名空间中的队列和主题的发布者、发送者和消费者。基本上,通过这个工具,您可以向主题通道发布消息,创建用于接收 BrokeredMessages 的订阅者,等等。

下图显示了它在 Windows Azure 中的位置

ServiceBusTester 无法更改 Windows Azure Service Bus 中的任何实体。它的位置类似于著名的、流行的 WCFTestClient 程序,用于测试 WCF 服务。我们可以为特定主题创建一个发布者,使用非类型化的 BrokeredMessage 并将其发送到 Azure Service Bus。另一方面,我们可以为特定订阅创建一个虚拟订阅者(服务),并接收 BrokeredMessages 的副本。对于队列,虚拟发送者和接收者也可以以同样的方式进行操作。

使用这个小工具,我们可以排查 Service Bus 提供者和使用者之间的逻辑连接问题。它还有助于模拟发布者/发送者或评估订阅(过滤器和操作)/队列。我推荐使用 Service Bus Explorer 来创建命名空间、队列、主题、订阅和规则等 Service Bus 实体。

好的,让我们来描述它的概念和设计。我假设您对 Windows Azure 平台有一定的了解。

概念和设计

ServiceBusTester 的概念是基于向 Azure Service Bus 提供和从中消耗非类型化消息。发布者/发送者和订阅者/接收者基于 WCF 技术,并使用以下契约进行消息传递

[ServiceContract(Namespace = "urn:rkiss.sb/tester/2011/11")]
public interface IGenericOneWayContract
{
    [OperationContract(IsOneWay = true, Action = "*")]
    void ProcessMessage(System.ServiceModel.Channels.Message msg);
}

请注意,订阅者/接收者契约是针对 SessionMode.Required 以编程方式设置的。

另一个概念性特性是,每个到 Service Bus 的消费者(虚拟服务)都托管在自己的私有 AppDomain 中。此功能使我们能够在不相互影响的情况下动态地连接和断开托管的订阅者或接收者。从工具的角度来看,Azure Service Bus 代表了生产者(发布者/发送者)和消费者(订阅者/接收者)之间的逻辑连接模型。

下图展示了这个概念

正如您所见,ServiceBusTester 进程有一个默认(主)AppDomain,其中运行 WindowsForm、发布者、发送者以及一个用于处理来自所有订阅者/接收者的传入消息的服务。NetNamedPipeBinding 传输用于连接不同 AppDomain 中托管的虚拟服务与位于测试器服务中的默认 AppDomain,并在它们之间传递消息。

此概念中的另一个“设计技巧”是如何在默认 AppDomain 中显示 BrokeredMessageProperty 包。众所周知,BMP 包存储在消息属性中,仅供内部通道层使用,因此有必要将其复制并传输到默认 AppDomain 作为消息头。

ServiceHostActivator

ServiceHostActivator 是一个远程对象(派生自 MarshalByRefObject),在自定义 AppDomain 中初始化,其职责是在其域内托管和控制虚拟服务。默认 AppDomain 基于 UI 操作以编程方式管理这些自定义 AppDomain,例如创建、打开、关闭等。有关此技术的更多详细信息,请参阅我的文章 VirtualService for ESB,我从中借用并修改了一些类来进行此实现。

例如,以下代码片段展示了如何为自定义 AppDomain 创建 ServiceHostActivator

public static ServiceHostActivator Create(AppDomain appDomain, ConsumerConfigData config)
{
    string _assemblyName = Assembly.GetAssembly(typeof(ServiceHostActivator)).FullName;
    ServiceHostActivator activator = appDomain.CreateInstanceAndUnwrap(_assemblyName, 
      typeof(ServiceHostActivator).ToString()) as ServiceHostActivator;
    activator.SetHost(config);
    return activator;
}

private void SetHost(ConsumerConfigData config)
{
  try
  {
    if (_host == null)
    {
      var binding = new CustomBinding();
      
      if (config.ContentType.StartsWith("application/soap+xml"))
        binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.Soap12WSAddressing10, Encoding.UTF8));
      else if (config.ContentType.StartsWith("text/plain"))
        binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.None, Encoding.UTF8));
      else if (config.ContentType.StartsWith("application/xml") || config.ContentType.StartsWith("application/json"))
        binding.Elements.Add(new WebMessageEncodingBindingElement(System.Text.Encoding.UTF8));
      
      binding.Elements.Add(new NetMessagingTransportBindingElement());
      
      var securityBehavior = new TransportClientEndpointBehavior()
      {
          TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(config.IssuerName, config.IssuerSecret),
      };

      EndpointAddress ea = new EndpointAddress(config.TopicAddress);
      var se = new ServiceEndpoint(ContractDescription.GetContract(typeof(IGenericOneWayContract)), binding, ea);
      
      if(string.IsNullOrEmpty(config.SubscriptionAddress) == false)
          se.ListenUri = new Uri(config.SubscriptionAddress);
          
      if (config.RequiresSession)
          se.Contract.SessionMode = SessionMode.Required;
          
      se.Behaviors.Add(securityBehavior);

      _host = new ServiceHost(typeof(VirtualService));
      _host.AddServiceEndpoint(se);
      _host.Extensions.Add(config);
        
      _host.Faulted += new EventHandler(_host_Faulted);
      _host.Closed += new EventHandler(_host_Closed);
      _host.Opened += new EventHandler(_host_Opened);
      _name = config.Name;

      this.AddToStorage(this);
    }
    else
    {
        throw new InvalidOperationException("The ServiceHost has been already setup");
    }
  }
  finally
  {
    // ...
  }
}

主机管理器(位于默认 AppDomain 中)将创建一个新的域和虚拟服务的配置数据。之后,它将调用上述静态方法以获取 ServiceHostActivator 的引用。一旦管理器拥有此引用,我们就可以控制在自定义域中托管虚拟服务。

以下代码片段展示了一个用于在自定义域中打开主机的远程方法

public void Open()
{
  if (_host != null)
  {
    try
    {
      if (_host.State == CommunicationState.Created)
      {
          _host.Open();
          Trace.WriteLine(string.Format("ServiceHostActivator '{0}' opened", this.Name));
      }
    }
    catch (Exception ex)
    {
      RemoveFromStorage(this);
      Trace.WriteLine(string.Format("Opening ServiceHostActivator '{0}' failed: '{1}'", 
                      this.Name, ex.Message));
      throw ex;
    }
  }
}

用于接收来自 Subscription/Queue 消息的虚拟服务非常简单直接。基本上,它是 TesterService 的一个代理,通过 NamedPipe 传输将从 Service Bus 接收到的消息转发到默认 AppDomain。以下代码片段展示了它的实现

public class VirtualService : IGenericOneWayContract
{
  public void ProcessMessage(System.ServiceModel.Channels.Message msg)
  {
    ChannelFactory<IGenericOneWayContract> factory = null;

    try
    {
      var config = OperationContext.Current.Host.Extensions.Find<ConsumerConfigData>();
      if (config == null)
          throw new Exception("Fatal error: Missing ServiceConfigData extension object");

      var binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
      var se = new ServiceEndpoint(ContractDescription.GetContract(
          typeof(IGenericOneWayContract)), binding, new EndpointAddress(config.TesterAddress));
        
      factory = new ChannelFactory<IGenericOneWayContract>(se);              
      var channel = factory.CreateChannel();

      using (var scope = new OperationContextScope((IContextChannel)channel))
      {
        if (msg.Properties.ContainsKey(BrokeredMessageProperty.Name))
        {
          OperationContext.Current.OutgoingMessageHeaders.Add(
            MessageHeader.CreateHeader(BrokeredMessageProperty.Name, 
            BrokeredMessagePropertyExtension.XName.NamespaceName, 
            (msg.Properties[BrokeredMessageProperty.Name] as BrokeredMessageProperty).CopyTo()));
        }
        
        OperationContext.Current.OutgoingMessageHeaders.Add(MessageHeader.CreateHeader(
            ConsumerConfigData.XName.LocalName, 
            ConsumerConfigData.XName.NamespaceName, config));

          channel.ProcessMessage(msg);
      }
      factory.Close();
    }
    catch (CommunicationException ex)
    {
       // ...
    }
    catch (Exception ex)
    {
       // ...
    }
  }
}

您可以看到,BrokeredMessageProperty 包和 ConsumerConfigData 通过消息头传递。

另一方面,在默认 AppDomain 中,我们有一个 TesterService 用于接收来自自定义域(如订阅者/接收者)的所有消息。接收到的消息被封装并清理了临时头(BMP 和 ConsumerConfigData),然后这三个对象被发送到 WinForm 以供其 UserControls 使用。

以下代码片段展示了此实现

public class TesterService : IGenericOneWayContract
{
  public void ProcessMessage(System.ServiceModel.Channels.Message message)
  {

    Form1 form = OperationContext.Current.Host.Extensions.Find<Form1>();
    string action = OperationContext.Current.IncomingMessageHeaders.Action;
    int indexBMP = message.Headers.FindHeader(BrokeredMessageProperty.Name, 
                           BrokeredMessagePropertyExtension.XName.NamespaceName);
    int indexConfig = message.Headers.FindHeader(ConsumerConfigData.XName.LocalName, 
                              ConsumerConfigData.XName.NamespaceName);
                
    try
    {
      var config = message.Headers.GetHeader<ConsumerConfigData>(indexConfig);
      message.Headers.RemoveAt(indexConfig);

      var bmp = message.Headers.GetHeader<BrokeredMessageProperty2>(indexBMP);
      message.Headers.RemoveAt(indexBMP);

      form.AddMessageToTreview(message, bmp, config);                 
    }
    catch (Exception ex)
    {
       // ...
    }               
  }
}

请注意,此过程作为后台任务在跨域执行,用户界面会实时更新。另一方面,作为用户操作(也使用多任务技术实现),用户可以通过选择一个 TreeNode 并单击其上下文菜单来调用特定操作。对于每个操作,都会实现一个对话框来执行任务。在下一节中,我将描述用法。

例如,以下代码片段展示了一个重新连接订阅者的操作

private void toolStripMenuItemReconnectSubscriber_Click(object sender, EventArgs e)
{
  ThreadPool.QueueUserWorkItem(delegate(object state)
  {
    try
    {
      TreeNode node = null;
      this.InvokeEx(() => node = this.treeViewServiceBus.SelectedNode);
      var config = node.Tag as ConsumerConfigData;

      using (var progressnode = new ProgressNode(this, node, 6, 3))
      {
        HostServices.Current.Close(config.HostName);
        Thread.Sleep(1000);
        HostServices.Current.Add(config.HostName, config);
        HostServices.Current.Open(config.HostName);
      }
    }
    catch (Exception ex)
    {
       // ...
    }
  });
}

再举一个在 Topic 通道上发送消息的例子。以下代码片段展示了这个虚拟发布者

private void toolStripMenuItemFirePublisher_Click(object sender, EventArgs e)
{
  ThreadPool.QueueUserWorkItem(delegate(object state)
  {
    IOutputChannel channel = null;
    try
    {
      // winform properties
      TreeNode node = null;
      string xmlmessage = null;
      BrokeredMessageProperty bmp = null;
      this.InvokeEx(delegate()
      {
        node = this.treeViewServiceBus.SelectedNode;
        xmlmessage = this.richTextBoxMessage.Text;

        this.propertyGridBMP.Item.Cast<CustomProperty>().FirstOrDefault(
          p => p.Name == "MessageId" && p.Category == "sys").Value = Guid.NewGuid().ToString();
        bmp = this.CreateBrokeredMessageProperty(this.propertyGridBMP.Item);
        this.propertyGridBMP.Refresh();
      });

      using (var progressnode = new ProgressNode(this, node, 6, 3))
      {
        var config = node.Tag as ProviderConfigData;

        EndpointAddress topicAddress = new EndpointAddress(config.EntityAddress);
        
        var binding = new CustomBinding();
        if (bmp.ContentType.StartsWith("application/soap+xml"))
          binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.Soap12WSAddressing10, Encoding.UTF8));
        else if (bmp.ContentType.StartsWith("text/plain"))
          binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.None, Encoding.UTF8));
        else if (bmp.ContentType.StartsWith("application/json") || bmp.ContentType.StartsWith("application/xml"))
          binding.Elements.Add(new WebMessageEncodingBindingElement(System.Text.Encoding.UTF8));
        binding.Elements.Add(new NetMessagingTransportBindingElement());

        var securityBehavior = new TransportClientEndpointBehavior()
        {
          TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(config.IssuerName, config.IssuerSecret),
        };

        var se = new ServiceEndpoint(ContractDescription.GetContract(typeof(IOutputChannel)), binding, topicAddress);
        se.Name = "TopicPublisher";
        se.Behaviors.Add(securityBehavior);

        var factory = new ChannelFactory<IOutputChannel>(se);
        channel = factory.CreateChannel();

        using (var message = MessageExtension.CreateMessage(xmlmessage))
        using (OperationContextScope scope = new OperationContextScope((IContextChannel)channel))
        {
          // add BMP bag
          OperationContext.Current.OutgoingMessageProperties.Add(BrokeredMessageProperty.Name, bmp);
          
          // fire a message to the topic channel
          channel.Send(message);
        }

        ((IClientChannel)channel).Close();
        factory.Close();
       }
      }
      catch (CommunicationException ex)
      {
        // ...
      }
      catch (Exception ex)
      {
        // ...
    }
  });
}

如上面的实现所示,我们需要从 WinForms 控件中获取消息文本和 BMP 包,然后创建安全行为、服务终结点、通用通道工厂和操作范围,以便为传出消息添加 BMP 包。

在本章末尾,我想感谢第三方支持,例如 CodeProject 的 PropertyGridEx.dll对 RichTexBox 进行着色,以及 ControlExtensions 类,它们简化了我为这个小型工具实现的 UI。

好了,该展示了。让我们来描述一下这个小型工具能为您做什么。

用法

首先,以下是先决条件

  • Visual Studio 2013 Update 4(用于创建程序集的选项)
  • Windows Azure SDK v2.5(2014年11月)
  • Windows Azure Platform 账户
  • 到互联网的 HTTP/S 连接
  • 下载本文档的包(源代码和/或可执行文件)(用于创建程序集的选项)
  • 一些关于 Windows Azure 平台和 WCF 的知识和经验

负载均衡与多播

在将 ServiceBusTester 连接到 Azure Service Bus 之前,我想回顾一下 Topic 通道上负载均衡订阅者和多播订阅者之间的区别。

将一个订阅者订阅到同一个订阅(例如,Subscription 1),消息将在该订阅的所有订阅者之间进行负载均衡。根据上面的例子,它将在 S1Tester 之间进行。从业务角度来看,应用程序将“丢失”发送到虚拟订阅者 Tester 的那些消息。

为了进行测试(和监控)目的,我们需要以多播方式传递消息。如上图所示,我们需要共享一个 RuleDescription(过滤器)而不是订阅。Azure Service Bus 的当前版本(2011年9月)不支持共享过滤器或订阅到订阅克隆或其他任何机制。然而,在设计 Service Bus 实体(如命名空间、主题和订阅)以进行逻辑连接模型及其与物理模型的映射时,我们可以考虑一些实践和模式。

例如,可以应用以下实践

  • 根据环境范围创建命名空间,例如 dev、staging、QA、demo、production 等。
  • 克隆 Topic 用于测试和监控目的,其中原始 Topic 将消息转发到克隆 Topic(多播 Topic)
  • 克隆 Subscription,消息将多播到所有订阅(多播 Subscription)
  • 将 RuleDescription(过滤器、操作)克隆到一个 Subscription 中;在这种情况下,每个 Topic 将有一个特殊的 Subscription,其中包含其他 Subscription 的有趣 RuleDescription 集合(共享规则)

好的,让我们继续使用 ServiceBusTester,按照以下步骤操作

步骤 1. 连接到 Azure Service Bus

右键单击 ServiceBus 根节点,屏幕上将弹出以下上下文菜单

选择 Connect,NamespaceDialog 将显示您与 Azure Service Bus 的连接,通过命名空间。只有通过此对话框插入的命名空间才能在整个工具中使用。

您可以输入所有感兴趣的命名空间,然后按 Refresh 按钮,或者逐个输入并进行刷新。按 OK,所有有效的命名空间将持久化到 ApplicationSettings 中,因此我们不必在每次启动 ServiceBusTester 时都重复此操作。完成此步骤后,ServiceBus 根节点将显示四个节点,按 Queue 和 Topic 实体分组。

步骤 2. 添加订阅者

此步骤允许您为特定的 Namespace/Topic/Subscription 添加虚拟订阅者。

在 Subscribers 节点上选择上下文菜单 Add,屏幕上将弹出以下 SubscriberDialog。

此对话框允许您选择特定的订阅并显示其规则。上面的例子显示了我的订阅 rk2011/worker/image2,它有两个规则:$Defaultabcd。请注意,这个工具不允许您更新订阅,这只是一个简单的实现任务,但它基于这个小型工具的概念和设计。当然,您可以修改包含的源代码来实现此功能,这取决于您。另一个选择是使用 Service Bus Explorer

一旦我们点击 OK,TreeView 将为该操作进行更新。下图显示了发生的情况。Subscribers 节点显示了我们的第一个新的虚拟订阅者,已准备好接收 BrokeredMessages,所以如果您在命名空间中已经有一个发布者,那么接收到一些消息不要感到惊讶。

步骤 3. 添加发布者

此步骤允许您为特定的 Namespace/Topic 通道添加虚拟发布者。

在 Publishers 节点上选择上下文菜单 Add,屏幕上将弹出以下 PublisherDialog。

在 ServiceBusTester 中添加的每个发布者都应该有一个唯一的名称。此名称可以被覆盖为您自己的名称,以便于区分发布者。

按下 OK,我们可以在 TreeView 中看到此操作,如下图所示

如您所见,Publishers 节点有一个用于 rk2011 命名空间中 worker 主题的第一个虚拟发布者。该工具还生成了一个模板消息和 BrokeredMessageProperty 包。

现在,根据我们的需求,消息和/或包可以进行更改。您可以拖放您的消息,加载其他消息等。BMP 包有两个类别:应用程序范围(用户)和系统范围。系统范围内的属性是预定义的,因此我们只能更改其值。其他类别,例如 application,完全由应用程序需求驱动。此属性的名称、类型和值可以添加和修改。模板只有三个预定义的应用程序属性,仅用于测试目的。

应用程序属性值的编辑可以直接通过 PropertyGrid 进行,但要删除、添加或更改属性类型,我们必须按如下屏幕截图所示的按钮

使用 Properties 对话框,我们可以创建、删除或编辑 BMP 包中的应用程序属性。

此时,我们的工具已注册一个订阅者和一个发布者,并带有模板消息和 BMP 包,因此我们可以进行发布/订阅场景,其中发布者将发送消息,订阅者将接收此兴趣。

好的,让我们执行此场景。右键单击发布者以获得此选项

步骤 4. 发送发布者

选择 Fire 以处理此操作,这将发送此消息及其 BMP 包到 Topic 通道。如果一切顺利,消息应该被订阅者接收,如下图所示,以我的命名空间为例

太棒了。我们的虚拟订阅者收到了消息(基于订阅)。在我的例子中,订阅 image12 有两个规则,我们可以看到消息的两个副本。每条收到的消息都会在其订阅者父节点下创建一个自己的子节点,标签为系统属性,例如 {Label}/{MessageId}。当然,有一个地方可以修改代码,并通过任何 BMP 属性扩展此节点命名功能。请注意,消息和 BMP 包是只读的。

步骤 5. 显示订阅

此步骤允许您显示订阅属性,包括其规则

在 Subscriber 节点上选择上下文菜单 Show Subscription,屏幕上将弹出以下 SubscriptionDialog。

以上就是关于发布/订阅测试的主要步骤。对于 Sender/Receiver 节点也存在类似的过程,例如添加接收器、发送器以及向队列发送消息。

发布者消息

正如我之前提到的,Connect 操作会将您的连接持久化到 ApplicationSettings 中。当前版本的此实现不允许持久化所有 TreeView 对象,包括发布者等,但是,存在加载/保存和复制/粘贴消息的功能,适用于虚拟发布者。此功能允许您从文件系统加载特定消息到工具中,进行修改,然后将其保存回文件。

下图显示了发布者消息节点上的此上下文菜单

订阅者收到的消息可以被 Copy 到剪贴板并 Paste 到发布者中。

Copy/Paste 功能允许我们通过虚拟发布者再次重发此消息。在这种情况下,我们还需要根据应用程序需求考虑 BMP 包。基本上,此功能节省了为虚拟发布者生成非类型化消息的时间。

 

结论

本文档提供了一个 Azure Service Bus Messaging 的小型测试工具。它可以帮助您评估 Service Bus 或开发 Service Bus 的消费者和/或生产者。它基于 WCF 技术,并支持 Windows Azure SDK 1.6(2011年11月)。

 

附录 A - 版本 1.1

测试器的状态可以随时保存到 XML 格式的文件中,以便以后打开。下图显示了此功能的上下文菜单

可以通过右键单击所选文件来从列表中删除该文件

 

 

附录 B - 版本 1.2

此更新增加了更多用于发送和/或接收消息的内容类型,例如 application/soap+xmlapplication/xmltext/plain。之前的版本已实现为基本 WCF 绑定,由 NetMessagingTransportBinding 表示,例如带有内容类型 application/soap+msbin1application/soap+msbinsession1 的二进制消息编码器。

处理消息内容类型的概念基于正确的绑定和编码器选择。以下代码片段展示了发布者/发送者实现的示例

 var binding = new CustomBinding();
 if(bmp.ContentType.StartsWith("application/soap+xml"))
   binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.Soap12WSAddressing10, Encoding.UTF8));
 else if (bmp.ContentType.StartsWith("application/xml") || bmp.ContentType.StartsWith("text/plain"))
   //binding.Elements.Add(new WebMessageEncodingBindingElement(Encoding.UTF8));
   binding.Elements.Add(new TextMessageEncodingBindingElement(MessageVersion.None, Encoding.UTF8));
 binding.Elements.Add(new NetMessagingTransportBindingElement());

基本上,我们可以使用 NetMessagingTransportBinding 将 xml/soap 消息作为二进制数据传输,而其他绑定(在此版本中)例如 TextMessageEncodingBinding 将数据作为文本传输。在这种情况下,文本可以是 xml 格式的 POX/Soap 或纯文本。换句话说,消息编码器在 Soap12WSAddressing10None 版本之间切换。

您可以看到,绑定是根据 brokered message(bmp.ContentType 属性)中请求的内容类型选择的。可以通过组合框选择特定值,如下图所示

 

 

下图显示了 POX 内容类型和纯文本的发布者。

 

 

发送具有特定内容类型的 brokered message 非常直接。发布者或发送者发布的每条消息都可以有自己的内容类型。订阅者/接收者的情况则不同,因为虚拟测试器服务必须在打开之前进行配置才能接收 brokered message。这是 Service Bus Tester 的设计使然。

为了在订阅者或接收者处选择内容类型,在对话框中添加了一个额外的组合框。下图显示了一个用于创建 Subscription rk2014.servicebus.windows.net/test/Subscriptions/SantaAna 的虚拟订阅者的 SubscriberDialog 示例。

 

请注意,树中指向的订阅者的工具提示将显示其内容类型,例如

[application/xml] sb://rk2014.servicebus.windows.net/test/Subscriptions/SantaAna

 

高级示例

如果您熟悉 WABS(Windows Azure BizTalk Services),您可以使用以下示例来测试此新版本的 Service Bus Tester。

这是一个非常简单的示例,用于将 brokered message 从源 Topic 传递到基于自定义属性路由器(名称为 a)的特定目标。我们只关注将消息传递给 Topic1、Queue1、Queue2 和 Queue3,因为其他目标(Workflow 和 AzureBlob1)来自我即将发布的文章。

如您所见,每个目标都配置不同。

基本上,Subscription Source TopicTest 通过 Pass-Through Bridge(Fire)将接收到的 brokered message 传递到基于过滤器 a 的目标。

 

 

基于上述目标,我们可以在 Service Bus Tester 中创建虚拟接收器和订阅者。

让我们以 Topic1 作为第一个例子。此目标由 a=topic1 过滤,Topic 代理发送器配置为二进制传输,因此测试器订阅者的 content-type=application/soap+msbin1。虚拟发布者将消息发布到 sb://rk2014.servicebus.windows.net/test 主题,如下图所示

虚拟订阅者将接收 content-type=application/xml 的 brokered message:

 

另一个测试用例是发送 content-type=text/plain 的 brokered message,其中一个 xml 格式的 POX 文本被发送到 queue3

下图显示了此示例,消息被发布到 sb://rk2014.servicebus.windows.net/test 主题,由 WABS 订阅者捕获,然后通过 Pass-Through Bridge 将消息传递到目标 Queue3,该 Queue3 配置为 sb://rk2014.servicebus.windows.net/Queue/test3

 

 

下图显示了 sb://rk2014.servicebus.windows.net/Queue/test3 的虚拟接收器

 

就是这样。

使用新版本的 Service Bus Tester,您将能够使用此测试器来处理更多基于 brokered message 内容类型的虚拟发布者/发送者和订阅者/接收者。对于快速开发 Windows Azure BizTalk Services(WABS)来说,这是一个非常出色的小型工具,特别是当源和/或目标是 Windows Azure Service Bus(WASB)时,尤其是在真实发布者/发送者和/或订阅者/接收者不存在的情况下。

 

再说一句。请修改由版本 1.0 或 1.1 保存的旧文件,通过在每个 ConsumerConfigData 对象中插入新属性 ContentType。请参阅下图中的示例

 

 

附录 C - 版本 1.3

此次更新的原因是为了增强虚拟发布者,使其能够通过 REST API 向 Service Bus 实体(如队列、主题和中继)发送消息。此功能允许基于地址和 SAS/ACS 密钥发布消息,而无需提前注册 Service Bus 连接。有关 SAS/ACS 的更多详细信息,请参阅 Service Bus 身份验证和授权此处

发布者和订阅者是完全透明的,因此发布者可以通过 REST 发送 brokered message,而订阅者可以使用 WCF 堆栈(NetMessagingTransportBinding)进行创建。

让我们介绍 Service Bus Tester 在 1.3 版本中实现的新功能。

1. REST 发布者

PublisherDialog 已略作更改,通过地址工具箱来实现 REST 功能。通过复选框在 REST 和 WCF 绑定之间切换。默认选择是 REST 选项。

下图显示了此版本的 PublisherDialog

 

正如我之前提到的,Address 文本框允许使用 SAS/ACS 密钥包含在查询路径中的 Service Bus 实体(如队列、主题或中继)的地址。当您将 Service Bus 实体的 SAS 或 ACS 的连接字符串拖放到 Address 文本框控件中时,PublisherDialog 逻辑会将其转换为带查询字符串的 uri 地址。此时实体名称未知,因此您需要将 XXX 文本替换为实体的实际名称。如上图所示,此 uri 地址的一部分已高亮显示。

Azure Service Bus 没有像 Azure Storage 那样的 SAS/ACS 密钥的快捷名称;因此我创建了一些快捷名称。以下是它们的映射

对于 SAS:SharedAccessKeyName=skn  SharedAccessKey=skk,对于 ACS:SharedSecretIssuer=ssi  SharedSecretValue=ssv

带 SAS 的 uri 地址示例  https://mynamespace.servicebus.windows.net/appevents?skn=ae_abc2014&skk=mXXXXXXDDDddjkflsfllXWfHLr4w=

带 ACS 的 uri 地址示例  https://mynamespace.servicebus.windows.net/appevents?ssi=owner&ssv=XcCyH+12345dwp3DW444nEUptTYYYYhhK/u5ve/nWA=

如果 Azure 中能有一个通用的设计模式(格式)来使用 SAS/ACS 密钥寻址云实体就太好了。

无论如何,我创建了一个静态帮助类来处理 SAS/ACS Token、通过 REST 发布消息等。请详细查看 ServiceBusHelper 类。它位于 Extensions.cs 文件中。

以下代码片段展示了该类的一个静态方法

public static HttpResponseMessage SendMessageToSB(string addressWithKey, HttpContent content, dynamic brokerProperties, 
  dynamic userProperties = null)
{
  addressWithKey = HttpUtility.HtmlDecode(HttpUtility.UrlDecode(addressWithKey));
  string qAddress = GetResourceAddress(addressWithKey);
  string token = GetToken(addressWithKey);

  var httpClient = new HttpClient() { BaseAddress = new Uri(qAddress) };
  httpClient.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", token);

  if (brokerProperties != null && brokerProperties is string)
    httpClient.DefaultRequestHeaders.TryAddWithoutValidation("BrokerProperties", brokerProperties);
  else if (brokerProperties != null && brokerProperties is object)
    httpClient.DefaultRequestHeaders.
      TryAddWithoutValidation("BrokerProperties", JsonConvert.SerializeObject(brokerProperties));          

  if (userProperties != null)
  {
    if (userProperties as NameValueCollection != null)
    {
      foreach (string key in (userProperties as NameValueCollection).AllKeys)
          httpClient.DefaultRequestHeaders.TryAddWithoutValidation(key, (userProperties as NameValueCollection)[key]);
    }
    else if (userProperties as Dictionary<string, string> != null)
    {
      foreach (string key in (userProperties as Dictionary<string, string>).Keys)
        httpClient.DefaultRequestHeaders.TryAddWithoutValidation(key, (userProperties as Dictionary)[key]);
    }
    else
    {
      // anonymous type
      foreach (var property in userProperties.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance))
      {
        if (property.CanRead)
          httpClient.DefaultRequestHeaders.TryAddWithoutValidation(property.Name, property.GetValue(userProperties, null));
      }
    }
  }
  return httpClient.PostAsync("messages", content).Result;
}

 

上述方法中的关键部分是基于带 SAS/ACS 密钥的 uri 地址获取 token。一旦我们有了 token,就可以向 HttpClient 实例添加头和内容。请注意,BrokerProperties 必须以 JSON 格式的文本传递。其他属性(如用户)以名称/值对的形式作为单个头添加。

让我们举一个简单的用法示例

 

示例

此示例展示了如何创建一个 REST 发布者。

右键单击 Publisher 节点并选择 Add。下图显示了结果

 

 

粘贴 Service Bus 实体的 SAS 连接字符串。将 XXX 文本替换为实体的实际名称(注意,此版本仅支持队列、主题或中继实体)。获得地址后,请按 Refresh 按钮以验证此地址是否通过 Service Bus 进行身份验证。如果地址有效,OK 按钮将被启用。

 

 

通过按 OK 按钮,PublisherDialog 将关闭,并且 Publisher 将在 TreeView 中显示一个新的发布者。请注意,此 REST 发布者具有不同的图标,表示这是一个 REST 发布者。

 

 

现在,我们可以通过 REST 发布者发送消息。请注意,完整的 REST 地址可以复制到剪贴板。

 

 

太好了,现在您可以为每个 Service Bus 实体(如队列、主题和中继)创建三个 REST 发布者,并使用不同的内容类型向它们发送消息。

 

2. Relay 发布者

Relay 发布者是发布者的一项新功能,它使用 WCF BasicHttpRelayBinding。要创建 Relay 发布者,我们需要取消选中复选框并在组合框中选择 Relay 项。

下图显示了这些选择

 

 

请注意,必须注册 Relay 服务才能在组合框中看到终结点名称。一旦我们选择了 Relay 服务终结点,OK 按钮将被启用以进行确认和退出。

以下代码片段显示了一个 WCF 客户端,它使用 BasicHttpRelayBinding。其实现非常直接,是典型的 WCF 编程

 

#region using Relay
EndpointAddress topicAddress = new EndpointAddress(config.EntityAddress);

var binding = new BasicHttpRelayBinding();
binding.Security.Mode = EndToEndBasicHttpSecurityMode.Transport;
binding.Security.RelayClientAuthenticationType = RelayClientAuthenticationType.None;

var securityBehavior = new TransportClientEndpointBehavior()
{
    TokenProvider = TokenProvider.CreateSharedSecretTokenProvider(config.IssuerName, config.IssuerSecret),
};

ServiceEndpoint se = 
  new ServiceEndpoint(ContractDescription.GetContract(typeof(IOutputChannel)),binding,topicAddress){Name="RelayPublisher"};
 
se.Behaviors.Add(securityBehavior);

var factory = new ChannelFactory(se);
channel = factory.CreateChannel();

using (var message = MessageExtension.CreateMessage(xmlmessage))
{
    channel.Send(message);
}

((IClientChannel)channel).Close();
factory.Close();
#endregion


请注意,此版本仅支持具有 BasicHttpRelayBinding 的 Relay 服务。

 

示例

此简单示例展示了如何基于 Relay 服务终结点(在此示例中,终结点名为 Bulk)创建 Relay 发布者

 

一旦 Relay 终结点被验证,OK 按钮将被启用,可以确认创建过程。

下图显示了 Relay 发布者的右键单击上下文菜单。请注意,Relay 发布者有自己的图标和工具提示。

 

 

如上图所示,没有 BrokeredMessageProperties。这是正确的,因为 Relay 服务终结点具有 BasicHttpRelayBinding

请注意,消息必须是 Soap 1.1 版本。

 

3. 高级 GUI 功能

在 TreeView 中添加了两个 GUI 功能,如重命名和拖放节点。

下图显示了 Publishers 节点中两个重命名的发布者

 

TreeView 中的拖放功能允许在同一组内重新排列组和节点。换句话说,我们不能在发布者、订阅者等节点之间混合。

下图显示了 Publishers 节点作为 ServiceBus TreeView 中的第一个节点

 

拖放功能使用方便。如上图所示,Publishers 节点已被拖放,然后放在 Receivers 节点中,它将插入到该节点之前。同样,发布者 BasicHttpRelayBinding 节点也以同样的方式进行,例如插入到 REST-Relay 节点之前。

 

4. 高级示例

这个特殊的进阶示例演示了如何使用 Service Bus Tester 来模拟触发计划看门狗的事件。为了更好地理解,让我们描述一下事件驱动分布式架构的发布/订阅拓扑。Azure Service Bus 允许创建许多发布/订阅模式。

下图展示了一个基于 Windows Azure Service Bus 实体及其配额的发布/订阅设计拓扑模式示例(Azure Service Bus 配额)。基于这些实体配额和环境范围(如开发、暂存、QA、演示、生产等),我们可以创建适合持续部署的敏捷模型。模型敏捷性在命名空间级别完成,其中每个环境范围由服务命名空间表示。模型根位于账户级别,其中基本上只有两个支柱,如暂存和生产(这与我们对云服务的处理方式相同)。

命名空间内的发布/订阅(Topic/Subscription)拓扑在设计和部署时对每个环境范围都相同,因此我们可以使用工具进行持续部署(将部署从开发推送到生产范围)。

在 Azure Service Bus 中,Topic 实体可用于隔离客户端,其中每个客户端只能看到自己的主题以订阅服务。这个概念允许将应用程序连接的内部逻辑模型与多租户外部逻辑模型连接解耦。

 

 

如上图所示,所有事件都发布到根 Topic(命名为 appevents)。从这个“总线”(主题)中,事件通过订阅分发到其服务或根据应用程序模型连接转发到其他 Topic。正如您可能知道的,Microsoft Azure Service Bus 添加了一个特殊的订阅功能,允许创建发布/订阅模型连接的拓扑,例如 ForwardTo 属性。目前,我们只能将事件转发到同一命名空间内的 Topic 和 Queue 实体,但我希望很快我们就能将消息转发到 Relay 终结点以及跨命名空间或账户。

请注意,ForwardTo 属性代表了构建发布/订阅拓扑的概念性功能。如上例所示, appevents 主题有几个订阅。第一个订阅非常直接,允许捕获 appevents 主题上的任何事件(消息),并将其传递给 AppLog 服务进行故障排除、后处理、报告等。第二个订阅代表了一组客户端订阅,其中与每个客户端相关的特定事件被转发到它们的主题,例如 Client1 等。

最后一个订阅是 ScheduledWatchdog 订阅。通过此订阅,我将演示 Service Bus Tester 的用法,其中测试器将事件发送到 appevents 主题,此订阅会将计划消息(延迟 45 秒)转发到 Watchdogs Topic。这是事件驱动架构中的典型模式,其中工作流过程由“看门狗服务”在特定时间内监视其完成情况。

下图显示了 ScheduledWatchdog 订阅

 

您可以看到,设置了 ForwardTo 属性以转发到 watchdogs 主题。此设置非常简单直接。在 $Default 规则的 Action 的 SqlExpression 文本框中设置 ScheduledEnqueueTimeUtc 时,实现了更多的技巧。我们需要做的是向当前时间添加 45 秒,以延迟转发到 watchdogs 主题的消息。SqlRuleAction 支持 SQL 92 表达式的子集,因此特定的延迟已作为用户属性在 brokered message 中创建(请参阅下图中的应用程序属性 watchdogScheduledTimeUtc)。

我们可以使用 Service Bus Explorer 工具创建上述发布/订阅拓扑。Service Bus Tester 工具可用于测试看门狗事件。下图显示了将事件发送到 appevents 主题,这将触发一个 ScheduledWatchdog 转发消息到 Watchdogs 主题。brokered message 将由 Watchdog_AppA 订阅者接收。

 

 

请注意,应用程序属性 watchdogScheduledTimeUtc 的值是 UTC 日期格式的文本,例如:8/19/2014 2:12:11 PM

 

 

附录 D - 版本 1.4

此版本增强了 ServiceBusTester 以实现以下功能

1. 使用连接字符串

自 Azure SDK 2.5(2014年11月)以来,Azure 门户在创建命名空间时不再支持默认的 ACS 身份验证。Service Bus 命名空间默认使用 SAS 身份验证。为了适应 Azure 门户命名空间的更改,在 ServiceBusTester 中进行了以下更改:

您可以看到,上面的 NamespaceDialog 也允许通过连接字符串进行命名空间身份验证。在这种情况下,InssuerName 属性必须为空值。

 

2. ServiceBusTester REST 端点

ServiceBusTester 存储四个实体,如接收器、发送器、订阅者和发布者消息。消息动态地存储在接收器和订阅者容器中。通过向 ServiceBusTester 添加 REST 端点,我们可以根据业务需求查询这些容器中的任何资源。下图的上下文菜单显示了如何从每个容器检索资源

 

通过单击选定的菜单,我们可以在浏览器中看到所选容器的内容。ServiceBusTester REST 端点 URL 是 https://:10100/sb。请注意,端口号是从配置文件动态创建的,用于 ServiceBusTester 的每个实例。

通过在浏览器中键入 URL 地址,例如: https://:10100/sb/Subscribers,您将看到所有创建的订阅者,包括接收到的消息。

以下是一些内置查询的示例

https://:10100/sb/Subscribers/AppLog 显示 AppLog 订阅者的所有消息
https://:10100/sb/Subscribers/AppLog?$xpath=ServiceBusTester-2 显示 AppLog 订阅者中的 ServiceBusTester-2 消息
https://:10100/sb/Subscribers/AppLog?$xpath=ServiceBusTester-2/BrokerMessage/Body/s:Envelope&$ns={http://www.w3.org/2003/05/soap-envelope}s 在查询中使用 xpath

 

我们可以使用简单的查询,例如 https://:10100/sb/*,以查看存储在 ServiceBusTester 中的所有消息。

下图显示了该结果的一个示例(通过网页浏览器)

太棒了。通过在我们的测试工具中拥有此功能,它将使我们能够创建更复杂的工具、监控、故障排除过程等,并以声明式方式进行。通过 REST 端点,我们可以获取虚拟接收器或订阅者接收到的特定资源,并触发虚拟发布者。请注意,树节点的可重命名值必须是有效的 xname 值,否则查询将显示该节点的错误。

以下新功能展示了 ServiceBusTester REST 端点的用法   

 

3. 通过 XSLT 转换生成发布者/发送者消息

这是一项非常强大的功能,它使我们能够基于接收到的消息动态生成 brokered message。现在,我们不再使用显式格式化的消息,而是可以使用 XSLT 转换来创建 brokered message。XslCompiledTransform 类允许添加自定义函数来处理 brokered message 属性并从 REST 端点获取任何资源。现在您应该能理解为什么之前的特性需要添加到 ServiceBusTester 中了。

以下代码片段显示了向 XSLT 编译器添加自定义对象的片段

 XsltArgumentList xsltArgList = new XsltArgumentList();
 xsltArgList.AddExtensionObject("urn:rkiss.sb/tester/2014/09", new XslBMPExtensions(){ 
   EndpointAddress = endpointAddress, 
   UserProperties = userProperties, 
   SysProperties = sysProperties, 
   BMP = bmp 
   });

 

自定义函数,如 XslBMPExtensions,有两个方法用于 xslt 资源。第一个是 ThisEndpoint,用于获取当前 ServiceBusTester REST 端点,另一个方法是 Set,用于设置 brokered message 属性。

下图显示了这个自定义对象。请注意,为了在 xslt 资源中引用它,该对象的名称已被硬编码为 urn:rkiss.sb/tester/2014/09

最后,下图显示了自定义 XslBMPExtensions 对象在 xslt 资源中的用法

<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 
  xmlns:msxsl="urn:schemas-microsoft-com:xslt" 
  xmlns:bmp="urn:rkiss.sb/tester/2014/09"
  exclude-result-prefixes="msxsl bmp">
<xsl:output method="xml" encoding="UTF-8" indent="no" omit-xml-declaration="yes" />
<xsl:strip-space elements="*" />

<xsl:variable name="apos">'</xsl:variable>
<xsl:variable name="uri" select="concat(bmp:ThisEndpoint(), '/Subscribers/AppLog?$xpath=/*[starts-with(local-name(),', $apos, 'HelloWorld', $apos, ')]/*[1]')"/>

<xsl:variable name="input">
<xsl:copy-of select="document(msxsl:node-set($uri))"/>
</xsl:variable>

<xsl:variable name="bmp"> 
  <bmp>
    <application> 
      <AppName>Test</AppName>
    </application>
    <sys>
      <Label>HelloWorld</Label>
        <CorrelationId>
          <xsl:value-of select="12345"/>
        </CorrelationId>
    </sys>
  </bmp> 
</xsl:variable>

<xsl:template match="/">
  <xsl:value-of select="bmp:Set($bmp)"/>
  <Job xmlns="urn:rkiss/bulk2014" id="1234" ctrl="Run" msg="{bmp:ThisEndpoint()}">
   <xsl:copy-of select="$input"/>
  </Job>
 </xsl:template>
</xsl:stylesheet>

 

正如您所见,首先我们将此自定义对象声明为 xmlns:bmp,然后连接 REST 端点的变量 URI。在此示例中,我们希望获取 Subscriber AppLog 接收到的第一条消息的 HelloWorld 元素。一旦我们有了 REST 端点的查询地址,我们就可以使用 Microsoft 扩展函数 document 来获取资源值并将其分配给变量。在此示例中,它是一个 input 变量。当然,根据需要,变量元素可以使用 xpath 查询特定属性/元素。

自定义函数 bmp:Set 带有一个参数 XPathNavigator(允许我们传递变量),用于设置 BrokeredMessageProperties。请注意,变量的结构必须如上例所示,请参阅变量 bmp

您可以将上面的 xslt 示例复制到剪贴板,并在测试期间粘贴到发布者中。第一次发送消息时,您将看到简短的接收到的有效负载(没有 HelloWorld 消息),但再次发送,您将看到不同的结果。

以上是本次更新的内容。希望很快会有下一次更新,因为在使用 ServiceBusTester 时,我将提出另一个业务需求。

 

 

参考文献

[0] Service Bus

[1] Service Bus Explorer

[2] The Developers Guide to AppFabric

© . All rights reserved.