65.9K
CodeProject 正在变化。 阅读更多。
Home

您需要了解的关于 Azure Service Bus 代理消息传递的所有信息(第三部分)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.88/5 (5投票s)

2015 年 1 月 22 日

CPOL

19分钟阅读

viewsIcon

20883

您需要了解的关于 Azure Service Bus 代理消息传递的所有信息(第三部分)

Connecting Distributed Systems in the cloud

如果您一路跟随我们,您现在应该知道 Azure Service Bus 托管消息传递方面的信息量很大。尽管我们已经涵盖了大部分基础知识,但关于 Service Bus 仍有相当多的内容需要了解。在本系列的第三部分(也是最后一部分)中,我们将探讨托管消息传递的一些高级功能和模式,以及安全性和最佳实践。您可以通过 第一部分第二部分轻松快速地了解已涵盖的内容。

我们已经打下了基础,现在可以轻松利用 Service Bus 队列以及主题和订阅的功能来分发信息给断开连接的系统。此外,我们还了解了如何根据订阅规则进一步控制信息流以及如何适当地转换数据。但是效率呢?事务呢?有问题的消息处理呢?这些以及许多其他常见场景都可以通过服务提供的功能来处理,我们接下来将介绍其中的一些领域。

批处理

在 Service Bus 队列和主题中,我们只关注了对消息进行单独处理。然而,一次处理多条消息以获得更高程度的运营效率是很常见的。您可能会认为这会产生经济影响,但根据其 文档,批处理不会影响计费事务。

另外需要注意的是,批处理仅通过 Service Bus 消息传递协议 (SBMP) 可用。在您因为没有提到 HTTP 而感到沮丧之前,请了解在使用 Service Bus 客户端库时,默认情况下您使用的是 SBMP,除非您明确将协议设置为 HTTP。

在大多数情况下,批处理被认为是严格的生产者或使用者操作。但在 Azure Service Bus 中,批处理可以指代其他内部 Service Bus 操作。因此,让我们依次看看所有这些不同的场景。

客户端

批量发送消息

提醒:为专注于当前主题,已省略异常处理。请注意 .Net 如何处理与 Tasks 相关的异常。

正如您所料,发送一批消息的操作并不复杂。

List<BrokeredMessage> messages = new List<BrokeredMessage>();
BrokeredMessage taylorSwift = new BrokeredMessage("Taylor Swift Request");
BrokeredMessage justinBieber = new BrokeredMessage("Justin Bieber Request");
BrokeredMessage britneySpears = new BrokeredMessage("Britney Spears Request");
 
messages.Add(taylorSwift);
messages.Add(justinBieber);
messages.Add(britneySpears);
 
MessageSender messageSender = messagingFactory.CreateMessageSender(_topicName);
 
await messageSender.SendBatchAsync(messages); 

但是,一批消息也受限于消息的最大大小限制(关于消息大小,请参阅 第一部分)。因此,批处理会将超出该限制的消息截断,并将其作为下一批消息的第一条。此外,在内部,批处理是基于 `MessagingFactory` 的 `BatchFlushInterval` 属性处理的,该属性默认为 20 毫秒。当情况需要时,可以通过在创建 `MessagingFactory` 时更改此属性来提高效率(有关微调场景,请参阅 最佳实践)。

这将影响所有从此 MessagingFactory 创建的客户端。

MessagingFactorySettings messagingFactorySettings = new MessagingFactorySettings
{
     NetMessagingTransportSettings = {BatchFlushInterval = TimeSpan.FromMilliseconds(100)}
};
 
MessagingFactory messagingFactory = MessagingFactory.Create(uriAddress, messagingFactorySettings );

现在,此 `MessagingFactory` 创建的任何客户端(`MessageSender`、`MessageReceiver`、`QueueClient`、`TopicClient`、`SubscriptionClient`)都将受到此设置的影响。回顾一下 `MessagingFactory`,请参阅 第一部分。您可能会在查看这些客户端时想:“等等,我们不是在谈论发送吗!” 这是正确的,但批处理刷新间隔会影响发送和完成消息的操作,这是一个查看接收的好时机。

批量接收消息

批量接收消息的过程并不复杂。我们可以像在之前的示例中那样接收消息并进行处理,只是这次是批量处理。

MessageReceiver messageReceiver =  await _messagingFactory.CreateMessageReceiverAsync(queueName);
try
{
     while (!cancellationToken.IsCancellationRequested)
     {
         IEnumerable<BrokeredMessage> messages = await messageReceiver.ReceiveBatchAsync(batchCount).ConfigureAwait(false);
         if (messages != null)
         {
              foreach (var brokeredMessage in messages)
              {
                  if (brokeredMessage.LockedUntilUtc == DateTime.UtcNow) await brokeredMessage.RenewLockAsync();
                  await ProcessAndReleaseMessageAsync(brokeredMessage);
              }    
         }
                     
          await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
     }
}
catch (Exception ex)
{
     Console.WriteLine(ex.Message);
     throw;
}
 
private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
     MessageProcessingAction action = MessageProcessingAction.Abandon;
 
    try
     {
         //Process Message
 
        action = MessageProcessingAction.Complete;
         await UpdateMessageState(message, action);
     }
     catch (Exception ex)
     {
         //log error
     }
     finally
     {
         //if something fails update with abandon    
         //C# 6.0 allows await calls in a finally blocks
         UpdateMessageState(message, action);
     }
}
 
private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
     switch (action)
     {
         case MessageProcessingAction.Complete:
               await message.CompleteAsync();
               break;
         case MessageProcessingAction.Abandon:
               await message.AbandonAsync();
               break;
         case MessageProcessingAction.Deadletter:
               await message.DeadLetterAsync();
               break;
         default:
               await message.AbandonAsync();
               break;
     }
 
}time.

控制 Azure 存储批处理

到目前为止,我们所有的批处理都发生在客户端。然而,当 Service Bus 服务接收消息时,它有能力在发送到内部存储时对这些消息进行批处理。这仅适用于发送和完成操作。Service Bus 能够批量持久化其消息到内部存储,只需要在队列、主题或订阅上设置 `EnableBatchedOperations` 标志。

TopicDescription topicDescription = new TopicDescription(topicPath)
{
    EnableBatchedOperations = true
};

这可以在创建主题时提供(请参阅 创建主题)或作为对现有主题的更新来提供。同样,这只是为主题提供的一个示例,也是队列和订阅可用的属性。

您可以在此 MSDN 源中阅读更多关于批处理的信息。

批处理通常是提高应用程序效率的讨论领域之一。然而,并非所有形式的批处理都高效,我将在 最佳实践中讨论这一点。Service Bus 的另一项功能可以直接提高效率并在此领域做出重大贡献,那就是预取,它与我们一直在讨论的批处理密切相关。我喜欢将其视为一种更主动的批处理形式。

预取

除了我们的批处理示例外,我们从队列或订阅获取消息的所有示例都是按调用进行的,每次请求都会产生一次到 Azure 的往返成本。想象一下,当您请求一条消息时,将消息预加载到客户端的本地缓存中,并且后续的所有消息请求都将从本地缓存中拉取,直到本地缓存中的消息用完。这大致就是您启用预取时设置的内容。

您不仅会因为从本地缓存接收消息而立即看到处理可用消息的效率提升,而且在接收消息方面无需进行任何接口更改。唯一的更改是在创建接收器客户端(`QueueClient`、`SubscriptionClient` 或 `MessageReceiver`)时设置 `PrefetchCount` 属性。这也可以在 `MessagingFactory` 级别设置,并级联到工厂创建的所有客户端。

MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync(queueName);
messageReceiver.PrefetchCount = 200;

`PrefetchCount` 不是一个任意数字,而应根据 Microsoft 推荐的公式(作为起点)进行计算。该公式是单个接收器每秒可处理的消息总数的 20 倍。所以,如果一个接收器每秒可以处理 5 条消息,则 `PrefetchCount` 为 20×5 = 100。如果在此 `MessagingFactory` 级别设置,则该数字应乘以工厂创建的客户端数量。因此,如果客户端数量未知,则在客户端级别设置可能更容易。

需要非常注意的是,服务器端每条消息的默认锁定持续时间为 60 秒,最长可延长至 5 分钟。如果一条消息在到期之前未被处理,它将可供另一客户端处理。然而,缓存该消息的客户端将不知道这一点,并且会收到该消息(从本地缓存中),如果尝试在消息的生存时间 (TTL) 过期后进行处理,则会收到异常。

不带预取的 3 条消息接收时间(毫秒)示例

1

2

3

获取消息花费的时间:1126 毫秒。

获取消息花费的时间:79 毫秒。

获取消息花费的时间:157 毫秒。

但设置 PrefetchCount 后

1

2

3

获取消息花费的时间:750 毫秒。

获取消息花费的时间:0 毫秒。

获取消息花费的时间:0 毫秒。

我无法解释为什么第一个批次中第一个消息的开销会更高,而没有发生预取。但是,当启用预取时,由于消息大小增加,第一个消息的开销会更高。但显然,我们可以看到接收到的后续消息数量的下降。想象一下,这发生在大量消息上。

事务

我们可以将 Service 操作包装在事务中,以确保在完成一系列操作时实现“全有或全无”的场景。例如,需要确保所有消息都发送或都不发送。例如,如果我们通过设置消息的 `CorrelationId` 属性并结合订阅的 `CorrelationFilter` 使用相关性模式,我们可能希望确保所有消息一起发送或都不发送。

就像您有一系列消息的情况一样

MessagingFactory factory = MessagingFactory.CreateFromConnectionString(AccountInfo.ConnectionString);
MessageSender messageSender = factory.CreateMessageSender(_topicName);
 
BrokeredMessage justinBieber = new BrokeredMessage() {CorrelationId = "Music Awards"};
justinBieber.Properties.Add("RequestedSinger", "Justin Bieber");
 
BrokeredMessage taylorSwift = new BrokeredMessage() {CorrelationId = "Music Awards"};
taylorSwift.Properties.Add("RequestedSinger", "Taylor Swifht");
 
BrokeredMessage britneySpears = new BrokeredMessage() {CorrelationId = "Music Awards"};
britneySpears.Properties.Add("RequestedSinger", "Britney Spears");
 
List<BrokeredMessage> messages = new List<BrokeredMessage>() {justinBieber, taylorSwift, britneySpears};

并且您希望它们全部发送或都不发送;

using (var transaction = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))  {
    foreach (BrokeredMessage message in messages)
    {
        messageSender.SendAsync(message);
    }
 
     transaction.Complete();
}

如果发生异常,则不会在主题上发布任何消息。请注意 `TransactionScope`,它接受 `TransactionScopeAsyncFlowOption` 枚举。这允许环境事务在线程延续之间流动,如此 MSDN 页面所述。

死信处理

当消息格式错误、消息的生存时间已过期、规则筛选器评估出错或由于任何其他原因导致托管消息未能在队列或订阅中处理时,我们可能希望捕获该消息以进行进一步分析。Service Bus 队列和订阅为此目的提供了死信故障转移。但是,默认情况下未启用。

实际上,关于死信消息的处理模式有很多可以讨论的。通常,这些模式有所不同,要么是 Service Bus 服务根据条件将消息设为死信,要么是使用者应用程序显式将消息移至死信队列。

我们可以将这个主题深入探讨,讨论所有细微之处,但让我们保持简单。您只需要了解几个基本要点即可利用 Service Bus 的死信功能。

隐式服务死信处理

为了由 Service Bus 进行隐式死信处理,必须在主题、队列或订阅上启用死信处理。这可以在创建时完成,也可以通过更新这些实体之一来完成。

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName) {
    DefaultMessageTimeToLive = TimeSpan.FromSeconds(30),
    LockDuration = TimeSpan.FromSeconds(90),
    EnableDeadLetteringOnMessageExpiration = true,
    EnableDeadLetteringOnFilterEvaluationExceptions = true
};

注意:显示的是 SubscriptionDescription,但也适用于 QueueDescription 和 TopicDescription

您会发现,实际上除了一个之外,还有更多与死信相关的属性可以设置。`EnableDeadLetteringOnMessageExpiration` 允许 Service Bus 服务移动根据其 `TimeToLive` 属性过期的消息。而 `EnableDeadLetteringOnFilterEvaluationExceptions` 允许捕获订阅规则条件在评估消息时引发异常的消息。服务在消息传递次数等于队列、主题或订阅的 `MaxDeliveryCount` 属性时,还会隐式地将消息移动到死信队列。

显式使用者死信处理

除了服务将消息移至死信队列外,使用者应用程序还可以确定是否应将消息移至死信队列。这可以通过调用消息的 Abandon/AbandonAysnc 方法轻松完成。在我们接收和处理消息的多个不同示例中,您已经看到过这一点。

await message.DeadLetterAsync();

或提供原因和描述

await message.DeadLetterAsync("Popularity", 
 "Number of request for singer has not reached minimum");

检索死信消息

最后,当我们想检索已移至死信队列的消息时,我们很幸运,因为这与我们过去创建接收器的方式没有区别。唯一的区别是我们需要获取死信队列的路径,并将其作为创建方法的 *路径* 提供。

string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicPath, subscriptionName);
MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync(deadLetterPath);

请注意,我们正在调用 `SubscriptionClient` 上的静态 `FormatDeadLetterPath` 方法。

获取 `MessageReceiver` 应该看起来很熟悉,从这里我们可以获取死信队列中的任何消息。

自动转发

吞吐量和可伸缩性,这两个非常重要的领域恰好是自动转发功能旨在解决的问题。这是最强大的功能之一,可以用最少的努力对高流量系统产生最大的影响。

看,实现自动转发非常简单。只需设置队列、主题和订阅的 `ForwardTo` 属性即可。它正如其名称所示,会自动将接收到的消息转发到目标。然而,在您的架构中利用自动转发才能真正发挥此功能的力量。让我们更仔细地研究一下,以便更好地理解。

TopicDescription topic = new TopicDescription(topicPath);
await _namespaceManager.CreateTopicAsync(topic);
 
SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)
{
    ForwardTo = topic.Path,
};
 
await _namespaceManager.CreateSubscriptionAsync(subscription, ruleDescription);

需要从上述示例中记住的一点是,目标必须存在,然后您才能使用定义的 `ForwardTo` 字段更新订阅,或创建具有该字段定义的新订阅。

首先,您需要了解主题的吞吐量受其在任何给定时间可以处理的消息数量的限制。自然,如果您可以分散消息的处理,就可以提高吞吐量,而这正是自动转发隐式允许的。让我们看一个例子。

想象您是一家货运公司,并且您管理着全国卡车的地理位置。您将国家/地区划分为 5 个区域,并根据卡车当前的地理坐标将路线信息路由到区域分发中心订阅。反过来,该区域订阅会将消息自动转发到一个主题,该主题将有 *n* 个本地分发中心订阅。

在我们的示例中,我们可以有成百上千个本地分发中心。目前,主题的订阅限制为 2,000。因此,与其要求单个主题处理高达 2,000 个订阅的消息,从而严重降低吞吐量,不如将工作负载分散到 5 个订阅,然后这些订阅会将消息转发到一个主题,该主题将负责处理更小范围的本地分发中心订阅。我们不仅将因为减少了单个主题需要处理的订阅数量而获得指数级的吞吐量提升,而且还将消除 2,000 的订阅限制。

这还可以说明自动转发提供的可伸缩性能力。如果有新的本地或区域分发中心上线,将该订阅添加到相应主题并更新/添加规则以适应新分发中心将非常简单。

安全

第一部分中,我们已经初步了解了 Service Bus 开箱即用的一些安全协议,即 `TokenProvider`。`TokenProvider` 是允许我们实现所需安全控制形式的管控件。无论是通过我们已经详细讨论过的用于其他服务的共享访问签名 (SAS),还是通过 访问控制服务(例如身份提供者(Windows Live ID、Google、Yahoo 等)或联合身份验证服务(如 AD FS 2.0))。

但是,话虽如此,您可以阅读 Microsoft 关于 Service Bus 安全性的观点,以及他们最近(2014 年 9 月)如何对其进行更改,以大力支持共享访问签名 (SAS),可以在 此处找到。因此,我们将安全讨论的范围也限定在共享访问签名。在本节结束时,我将提供一些其他信息,供对其他安全控件感兴趣的人员参考。

Creating Shared Access Policies

如果您一直关注我之前关于 Azure 服务的主题,例如 BlobTable Storage 等,您应该对共享访问签名有一定的了解。如果您不了解,请阅读 MSDN 版本。共享访问签名 (SAS) 提供对特定资源的细粒度访问控制。它们允许您指定使用者在给定时间内对特定资源拥有哪些访问权限。与其他 Azure 服务一样,共享访问签名是一个字符串,它由访问详细信息以及 Azure 服务用于验证传递给服务的共享访问签名所使用的哈希组成。

为了避免重复定义签名授予共享访问签名使用者的具体权限,我们可以使用存储在 Azure 上的预定义策略。这些策略充当为特定 Service Bus 资源提供权限的模板。还可以通过删除单个存储策略来轻松撤销访问。我们可以提供访问的资源包括 Service Bus 的两个级别:命名空间和实体。

命名空间资源是我们已经在 第一部分中定义的,而实体包括主题和队列等资源。然而,由于在 Service Bus 命名空间中以编程方式创建存储策略需要使用 Azure 门户上传证书,因此我主要关注为实体创建共享访问策略。但是,如果您有兴趣了解如何上传证书并创建命名空间共享访问策略,可以查看这篇 MSDN 文章及其相关链接。

但是,您可以通过转到您创建的 Service Bus 命名空间下的“配置”部分,轻松地通过门户创建命名空间级别的共享访问策略,如下所示。

 

在此门户中,您可以创建命名空间级别的策略,这些策略可以提供“管理”、“发送”或“侦听”特权的任何组合。外部客户端。

 

创建存储策略

遵循最小权限原则,让我们看看如何在实体级别创建和保存存储策略。然后,我们可以查看如何使用这些策略创建共享访问签名,该签名可以用于或提供给使用者。在创建策略方面,并没有太多内容。

TopicDescription topicDescription = await _operations.CreateOrRetrieveTopicAsync(topicPath);
topicDescription.Authorization.Add(new SharedAccessAuthorizationRule(“SendSingerRequests”,
SharedAccessAuthorizationRule.GenerateRandomKey(), new[] { AccessRights.Send ));
 
await _namespaceManager.UpdateTopicAsync(topicDescription );

同样,`CreateOrRetrieveTopicAsync` 只是一个辅助方法,用于在 `TopicDescription` 不存在时创建它,或在存在时返回它。然后,我们只需将一个新的 `AuthorizationRule` 作为 `SharedAccessAuthorizationRule` 添加到 `TopicDescription` 的 `Authorization` 属性。此 `SharedAccessAuthorizationRule` 是通过提供策略名称以及 `AccessRights` 枚举数组来创建的。

在这种情况下,我们正在创建一个名为“SendSingerRequests”的存储策略,该策略具有在主题上*发送*的权限,如果我们导航到 Azure 门户中 Service Bus 命名空间下的该主题,可以看到这一点。

Creating Stored Policies

现在我们已经为主题创建了存储策略,我们可以轻松生成共享访问签名,并将其提供给使用者进行身份验证。

从策略生成共享访问签名

生成共享访问签名只需要服务知道终结点 URI、策略名称以及主密钥或辅助密钥,以及一个指定共享访问签名有效期限的时间跨度。

Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb", "shopit", string.Empty);
string generatedSaS = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(
        policyName,
        policyKey,
        serviceUri.ToString().Trim('/'),
        TimeSpan.FromHours(8));

在第一部分中,我们展示了创建 `NamespaceManager` 的各种方法,其中许多内容应该已经让您感到熟悉。从这里,我们可以将共享访问签名字符串提供给使用者,以便他们在创建 `TokenProvider` 时使用。

使用共享访问签名

获得生成的共享访问签名后,我们可以将其提供给可能想要向我们的主题发送消息的使用者,当他们生成 `TokenProvider` 时,他们就可以开始使用它。

TokenProvider tokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(generatedSaS);
 
MessagingFactory factory = MessagingFactory.Create("sb://<yourNamespace>.servicebus.windows.net", tokenProvider);
MessageSender messageSender = factory.CreateMessageSender(_topicName);

从这里开始,他们可以向主题发送消息,只要共享访问签名在上面生成时设置了生存时间(例如 8 小时)。

额外资源

如前所述,还有其他可用的访问控制服务,例如身份提供者、联合身份验证服务等。以下是一些关于这些扩展访问控制领域附加资源的链接:

  1. http://blogs.msdn.com/b/jimoneil/archive/2012/04/24/fun-with-the-service-bus-part-1.aspx
  2. http://azure.microsoft.com/en-us/documentation/articles/active-directory-dotnet-how-to-use-access-control/
  3. http://blogs.msdn.com/b/servicebus/archive/2014/09/03/change-to-azure-service-bus-portal-default-authentication-mechanism-for-service-bus-namespaces-now-sas.aspx
  4. http://www.developerfusion.com/article/121561/integrating-active-directory-into-azure/

最佳实践

协议

Service Bus 可以运行在多种协议下。默认情况下,SDK 使用 Service Bus 消息传递协议,该协议提供最佳性能和功能。除非您的或您的消费者的操作仅限于 HTTP/S,否则请使用 SBMP。有关可用协议的更多信息,请参阅 本文

工厂、客户端和抽象

我之前没有提到过,但在创建 `MessagingFactory` 和 `QueueClient`、`TopicClient`、`SubscriptionClient` 等客户端以及它们的 `MessageSender` 和 `MessageReceiver` 抽象时,存在大量的开销。因此,在创建这些资源时,请尽量充分利用它们,避免不必要的重复创建。例如,为每条需要发送的消息都重新创建 `MessageSender`。

此外,请注意,当您终止 `MessagingFactory` 等工厂时,它也会关闭其负责创建的所有实体。

抽象

当您绝对不需要使用 `QueueClient`、`TopicClient` 和 `SubscriptionClient` 等直接客户端时,请使用它们的抽象 `MessageSender` 和 `MessageReceiver`。这样做,您无需担心是发送到队列还是主题,是从队列还是订阅接收。

操作

在本系列开头我提过,但我觉得有必要重申一下,尽可能利用 SDK 提供的异步操作的重要性。使用提供的异步方法可以显著提高性能。

消息大小

最后,消息大小的默认值为 256kb,包括正文和系统/自定义属性,最大属性大小为 64kb。虽然您可能可以增加此大小,但请谨慎行事。如果出于任何原因或可能性,未来的需求或当前客户端需要使用 HTTP 作为其协议,这可能会产生负面影响。Service Bus 消息旨在提高效率,并且 Service Bus 消息设计为与 HTTP 标头大小规范兼容。

批处理

关于批处理和提高效率,它并非非黑即白,并非绝对。批处理仅适用于发送和完成操作。但这里有一些性能注意事项。

低吞吐量 - 将批处理刷新间隔设置为 0 以禁用批处理。

高吞吐量 - 将 `BatchFlushInterval` 设置为 50ms。

多个发送者/接收者 - 使用多个发送者/接收者时,将 `BatchFlushInterval` 设置为 100ms。

要调整批处理的刷新间隔,请在 `MessagingFactorySettings` 对象上设置 `BatchFlushInternval` 属性,然后将其传递给 `MessagingFactory` 的创建过程(请参阅批处理):

结论

这已经是一个关于 Azure Service Bus 的非常庞大的系列了,我希望能涵盖该服务的每一个角落,但事实是,有太多内容需要讨论,以至于一个三部分的系列不足以做到。然而,请放心,您已经了解了 Azure Service Bus 的绝大多数功能。现在,还有一些密切相关的、构建在 Azure Service Bus 之上的服务,例如事件中心和通知中心,以及未涵盖的服务总线模式,例如请求-响应模式等等。但是,希望您从三部分系列中获得的这些信息将教会您超越 Azure Service Bus 的基础知识。

“关于 Azure Service Bus,您需要了解的一切(第三部分)”首次出现在 LockMeDown.com

© . All rights reserved.