您需要了解的关于 Azure Service Bus 代理消息传递的所有信息(第一部分)





5.00/5 (9投票s)
您需要了解的关于 Azure Service Bus 代理消息传递的所有信息(第一部分)。
引言
在解耦系统之间共享数据的能力并非易事。分布式系统并非新概念,但随着共享数据需求的不断增长,它已成为一个持续受欢迎的热门话题。随着信息不断成为我们应用程序的核心,共享信息的需求以及信息的分发方式将继续带来挑战。
Microsoft Azure 的服务总线是一项云服务,可帮助促进解耦系统之间的数据共享。在本文中,我们将学习如何利用 Azure 服务总线的中介消息在系统之间分发数据。但是,如果您熟悉 Azure 支持分布式系统的服务,您就会知道服务总线并非同类服务中的唯一。Azure 的队列存储服务也提供类似的功能,并促进了分布式系统之间的数据共享。那么,哪种队列服务适合您呢?这些问题以及以下主题都是我们将涵盖的领域。
什么是服务总线?
简单来说,服务总线是 Azure 构建的第二个消息队列平台,提供中继和中介消息功能。它是一项功能丰富且成熟的服务,可以为解耦的系统提供独立交换信息的方式。
Azure 的服务总线是众多平台即服务 (PaaS) 服务之一,可以很简单地只包含一个队列,也可以是非常复杂的消息工作流,包含近乎无限数量的相互关联的队列、主题和订阅。
正如我之前指出的,服务总线并非 Azure 提供的唯一消息队列服务,也不是第一个。但是,服务总线与 Azure 的第一个消息队列服务——队列存储之间存在显著的区别。我们将深入探讨服务总线中介消息提供的功能,但了解何时使用哪种服务至关重要。
Microsoft 提供了一份比较文档来帮助您做出决定。然而,鉴于使用队列存储服务的理由有限,我们可以快速总结何时队列存储是最佳选择。如果您需要最简单的方法,并且您的应用程序满足以下需求,那么队列存储将是最佳选择:
- 需要保留超过 80 GB 的数据在队列中
- 消息生存时间小于 7 天
- 能够跟踪队列中的消息处理*
正如您所见,消息存储大小和生存时间是 Azure 存储队列服务的两个关键区别点。很自然,存储队列服务的此数据保留方面是由于底层存储服务。因此,除非其中一点是关键要求,否则服务总线服务将是功能更丰富、更通用的选项。
然而,在服务总线服务中,不止一种消息传递功能。本文侧重于服务总线的中介消息。但是,中介消息并非服务总线提供的唯一消息传递功能。中继是您在研究 Azure 服务总线时会了解到的另一种选项,因此让我们花一点时间来区分这两者。
中介消息与中继消息
到目前为止,我们只提到了 Azure 服务总线中的“中介”消息。但这并非服务总线提供的唯一消息传递功能。与我们一直暗示的消息排队模式不同,中继消息允许消息“弹跳”到一个服务,然后发送给已连接的接收者。它要求期望接收消息的接收者在线并且可用。中继消息的一个优点是能够暴露服务的终结点,而无需通常的网络防火墙和基础设施配置的繁琐步骤即可使其可供外部客户端使用。
然而,与中介消息相比,中继在持久性方面并非强项。中介消息支持真正的暂时解耦的系统场景,在这种场景下,消息生产者或消费者的可用性无法得到保证。因此,未能立即传递的消息必须存放在某处,这就是“中介”发挥作用的地方。通过中介消息,队列是中介,它保留生产者创建的消息,消费者可以在准备好时检索该消息。
因此,虽然暴露服务终结点是中继消息的优点之一,而队列提供了中介消息的持久性,但队列有多种形式。因此,在深入了解服务总线中介消息的实现之前,我们需要查看不同的队列选项。
队列与主题和订阅
对于刚接触服务总线中介消息的人来说,这可能会令人困惑,所以我希望尽量说清楚。首先,不要忘记我们始终在谈论队列。服务总线队列提供最简单的消息传递选项。队列中的消息按先进先出 (FIFO) 的顺序排列,每个消息预计由单个使用者处理。我喜欢将队列可视化为一根管子,消息被送入管子,并在另一端被单个消费者消费。
然而,主题和订阅构成了发布/订阅模式,允许同一条消息被 N 个消费者处理。订阅规则和自动转发等功能允许以树状结构可视化此过程的工作原理。
我们将详细介绍以上内容,但从高层次来看,可以将一条消息添加到主题,并且对于满足的每个订阅规则,都会将该消息的一个副本添加到该订阅。在这种情况下,每个订阅都成为队列,消费者可以单独处理订阅上的消息。
入门:构建块
我们最终的目标是生成供消费者消费的消息。但要实现这一点,我们必须从构建块开始。您将用于与服务总线进行大量直接和间接交互的主要构建块之一是 `NamespaceManager` 对象。
NamespaceManager 提供了管理和创建队列、主题和订阅等核心实体并通过工厂方法(仅举几例)进行的能力。但是,NamespaceManager 有几个我们需要的依赖项,所以我们先讨论它们。
我们需要实现的依赖项如下:
- 服务总线命名空间(由服务总线服务 URI 使用)
- 令牌提供程序
- 服务总线服务 URI
服务总线命名空间正是其听起来的样子,它最终定义了我们在服务总线服务中的个人命名空间。第一个依赖项需要从 Azure 门户的“服务总线”下创建。
您需要指定命名空间(请注意这一点),您的服务总线服务终结点将在其中为人所知,以及例如托管它的区域等信息。最后,还有一个重要的注意事项:基本与标准消息传递层。要了解它们,您可以查看此 Azure 文档,这会影响主题和订阅的托管能力。
创建命名空间后,我们可以专注于最后两个依赖项;令牌提供程序和服务总线服务 URI。令牌提供程序提供 `NamespaceManager` 将使用的身份验证机制。有几个现成的提供程序可以使用,当我们讨论安全性时,我们将更详细地研究其中一些。目前,我们将使用共享访问签名令牌提供程序,并指定默认的共享访问签名 (SAS) 策略名称和密钥。这两个信息可以在 Azure 门户中新创建的服务总线命名空间的“配置”部分找到。有了策略名称和密钥,我们就可以组合一个令牌提供程序。
TokenProvider tokenProvider =
TokenProvider.CreateSharedAccessSignatureTokenProvider(AccountInfo.PolicyName,
AccountInfo.Key);
注意:`AccountInfo` 仅提供正确的共享访问签名策略名称和密钥。
服务总线 URI 为 `NamespaceManager` 提供了操作将要操作的服务终结点。这就是我们将利用之前创建的命名空间的地方。可以使用 `ServiceBusEnvironment` 类中的一个 `static` 方法,通过指定我们的命名空间和协议来创建诸如 `mynamespace.servicebus.windows.net` 这样的服务终结点。
//”sb” defines the scheme of the service Uri
Uri serviceBusUri = ServiceBusEnvironment.CreateServiceUri("sb", "shopit", string.Empty);
借助 Visual Studio Azure 工具,通过使用 Azure Service Bus Configuration NuGet 包,许多 Azure Service Bus 项目模板都已配置好。但是,理解这些底层依赖关系将有助于您在需要时进行自己的自定义实现。
创建命名空间管理器
有了依赖项,我们现在可以生成我们的 `NamespaceManager`。
NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, tokenProvider);
另外,还有一个重载允许我们指定一个 `NamespaceManagerSettings` 对象,该对象允许我们进一步指定其他设置,例如操作超时时间和重试策略(例如指数重试策略或自定义重试策略)。
NamespaceManagerSettings settings = new NamespaceManagerSettings
{OperationTimeout = new TimeSpan(0, 1, 0), TokenProvider = tokenProvider};
NamespaceManager namespaceManger = new NamespaceManager(serviceBusUri, settings);
我在本文中讨论了实现自定义重试策略以跟踪重试发生的时间,该文章利用了 Enterprise Transient Fault Application Block。我们稍后将介绍重试策略。
异步方法
高度推荐的最佳实践之一是利用异步开发方法来处理服务总线操作。这可以通过利用您在大多数操作中提供的异步方法轻松完成。您会发现该库提供了 2 种不同的异步模式供您选择,包括异步编程模型或基于任务的异步模式。我们将在最佳实践部分更多地讨论这一点。为了尽量坚持最佳实践,并且由于大多数演示都是同步进行的,因此此处的大多数示例都将异步提供(故意双关)!
为了拦截和处理异步操作抛出的异常,您至少需要将调用包装在 `Try`/`Catch` 中。但是,除非演示错误处理,否则我们将排除该代码,以便专注于演示的操作。
服务总线队列
通过创建 `NamespaceManager`,我们为我们将要查看的关于服务总线中介消息的大部分操作奠定了基础。我已经解释了服务总线提供的两种消息传递选项,我们将从最简单的方法开始,即使用服务总线队列。
虽然队列不支持使主题和订阅如此独特的消息过滤,但它们确实支持会话、重复检测、消息延迟、死信队列和消息过期等功能。尽管主题和订阅也支持其中许多功能;我决定分别回顾队列和主题/订阅。
创建队列
创建队列是允许我们存储和检索消息的第一步。队列名称始终以小写字母创建,无论您为队列名称提供什么大小写。
QueueDescription queueDescription = await namespaceManager.CreateQueueAsync(queueName);
为了演示其他一些实用方法,我们可以先尝试验证队列是否已存在,如果存在则返回现有队列信息。
QueueDescription newQueueDescription = null;
if (!await _namespaceManager.QueueExistsAsync(queueName))
{
newQueueDescription = await _namespaceManager.CreateQueueAsync(queueName);
}
QueueDescription queueDescription =
newQueueDescription ?? await _namespaceManager.GetQueueAsync(queueName);
`CreateQueueAsync` 重载采用一个 `QueueDescription` 对象,该对象允许您指定队列默认构造之外的值。我一直不愿意在这个阶段讨论许多不同的队列属性,但由于其中一些属性在队列创建后无法更新,因此我们最好现在讨论它们。
队列属性
队列包含许多直接影响队列的属性,而其他属性可能与它们将托管的中介消息有关。并非所有属性在队列创建后都可以更新或更改,因此如果需要更新,请注意这些限制。下面的列表并非详尽无遗,而是您会发现您感兴趣的一些更常见的属性。
- `DefaultMessageTimeToLive` – 这是一个有趣的属性。它决定了消息在两种情况下的生存时间:
- 如果消息没有设置其直接的 `TimeToLive` 属性值,或者
- 当消息的 `TimeToLive` 大于队列的 `DefaultMessageTimeToLive` 属性时。但是,如果消息的 `TimeToLive` 值较低,则消息的 `TimeToLive` 将是消息过期的时间。
- `RequiresDuplicationDetection` – 允许您启用消息重复检测。这与 `DuplicationDetectionHistoryTimeWindow` 协同工作。这是创建时必须设置且之后无法更新的值之一。
- `DuplicateDetectionHistoryTimeWindow` – 您可以指定一个时间段,队列将在此期间保留消息 ID 以执行消息重复检测。时间不能超过消息在队列中可以生存的最长时间,即 7 天。
请注意,此功能会产生一些开销。必须由队列保留的消息 ID 会产生 64 字节的开销。因此,一个简单的例子是,一个处理 1000 条消息的队列将需要 64,000 字节的开销。这可能听起来不多,但如果您每秒发送 30 条消息并尝试将 `DuplicateDetectionHistoryTimeWindow` 保留最长 7 天,那么开销将接近 1GB。
- `EnableDeadLetteringOnMessageExpiration` – 当队列的 `DefaultMessageTimeToLive` 或消息的 `TimeToLive` 先达到且消息过期时,允许将消息移动到死信队列。
- `LockDuration` – 当我们讨论消息处理时,我们将更多地讨论 `PeekLock`,但这是设置消息在被释放回队列之前锁定以供处理的时间。
- `MaxSizeInMegabytes` – 队列的总大小。默认值为 1GB。这受到 `DuplicationDetectionHistoryTimeWindow` 产生的开销的不利影响。
- `AutoDeleteOnIdle` – 这实际上是一个时间跨度,表示队列在闲置后可以保持活动多长时间然后被自动删除。最短时间为 5 分钟。
更新队列
某些队列属性可以在其创建后更新,而另一些则不能。下面我们可以看到如何利用 `QueueDescription` 对象来更新现有队列的某些属性。
QueueDescription newQueueDescription = new QueueDescription("TestOperationQueue")
{
DefaultMessageTimeToLive = TimeSpan.FromDays(3),
AutoDeleteOnIdle = TimeSpan.FromHours(1),
MaxSizeInMegabytes = 2048
};
QueueDescription queueDescription =
await _namespaceManager.UpdateQueueAsync(newQueueDescription );
在这里,我们已将队列属性更新为不允许消息生存期超过 3 天,将队列自身的空闲生存期设置为 1 小时,并将最大队列大小设置为 2GB。
请注意,即使队列中有消息,具有 `AutoDeleteOnIdle` 的队列删除也确实会执行删除。阅读:队列的空闲时间从最后一次入队或出队消息开始。
删除队列
如果队列不存在,删除不会抛出异常。
_namespaceManager.DeleteQueueAsync(queuePath);
我们可以将 `AutoDeleteOnIdle` 队列属性设置为在队列空闲并经过设定的时间段后自动删除。
发送消息
现在我们的队列已经就绪,我们可以练习向队列发送消息了。为了做到这一点,我们需要生成一个用于发送消息的工具。当严格讨论服务总线队列时,我们只有 2 个选项(`QueueClient` 和 `MessageSender`)。
理解 `MessageSender` 是 `QueueClient` 的一个抽象,并且除非绝对有必要获取特定于 `QueueClient` 的功能,否则应该使用它。当我们讨论主题时,我们将看到使用 `MessageSender` 而不是 `QueueClient` 的好处。
我们可以从 `MessagingFactory` 的工厂方法之一创建一个 `MessageSender`。但是,队列的创建者并不总是发送消息到队列的实体。因此,我们的 `MessagingFactory` 也需要一个正确的服务总线终结点和令牌提供程序,就像我们的 `NamespaceManager` 所需的一样。
之前,当我们生成 `NamespaceManager` 时,我们可以通过连接字符串来生成它,而不是手动生成服务总线 URI 和令牌提供程序这两个依赖项。因此,为了演示目的,我们将研究如何生成一个连接字符串,该连接字符串可以从我们的 Azure 门户获取。您可以通过选择“服务总线”服务图标后,在 Azure 门户底部单击“连接信息”来获取您的服务总线命名空间的连接字符串。
连接字符串被解析,以便在我们使用 `FromConnectionString` 方法时为 `MessageFactory` 提供服务总线 URI(终结点)和令牌提供程序(共享访问签名)。
MessagingFactory factory = MessagingFactory.CreateFromConnectionString(AccountInfo.ConnectionString);
MessageSender messageSender = factory.CreateMessageSender(QueueName);
有了 `MessageSender`,我们就可以准备发送消息了。
//Some object
SingingTelegram singingTelegram = new SingingTelegram
{
RecipientFirstName = "Elvis",
RecipientLastName = "Presley",
SongName = "Won't you come back again?"
};
BrokeredMessage message = new BrokeredMessage(singingTelegram);
await messageSender.SendAsync(message);
但等等!
您发送的是哪种消息?
在我们能够执行“分布式”系统中的“分发”并开始向队列发送消息之前,我们需要了解什么是消息。在我们的例子中,很简单。我们发送的是 `BrokeredMessage`。
但是,`BrokeredMessage` 中有很多需要了解的东西,可以分解为特性和功能。很多特性将在特定领域讨论,但现在我想停下来讨论一些您需要了解的关于 `BrokeredMessages` 的特性。
首先,消息的核心是由正文和属性组成。消息的总最大大小为 256kb。所有属性的最大大小为 64kb。正文的最大大小是消息最大大小与属性当前大小的剩余部分。有办法增加最大大小(例如会话),但我们目前不讨论。
由于某些协议限制,例如 HTTP 标头大小,建议将自定义属性保持在 2-4 kb 之间。
正文
正文只是任何可序列化的对象,构成消息的有效负载。是否需要使用消息正文取决于您。您会看到有许多不同的 `BrokeredMessage` 构造函数。这些通常都设置了消息的正文,例如:
BrokeredMessage msg1 = new BrokeredMessage("My Message Body");
BrokeredMessage msg2 = new BrokeredMessage(new SingingTelegram());
属性
属性由两部分组成:系统属性和自定义属性。自定义属性只是一个键/值对集合,可以是任何字符串/对象。它们允许我们在消息的标头级别传输特定的自定义属性。然而,当我们将讨论到主题和订阅时,自定义属性才真正发挥作用。
思考消息属性的另一种方式是当讨论消息在不同协议(HTTP/S、AMQP、SMB)之间流动时。来自 HTTP/S 请求的消息将最终将请求标头映射到消息属性。
当我们想将数据提升到消息的标头级别时,我们可以轻松地利用自定义 `Properties` 属性,例如:
BrokeredMessage message = new BrokeredMessage(singingTelegram);
foreach (object info in importantInformation)
{
message.Properties.Add(info.Key, info.Value);
}
`BrokeredMessage` 上有相当数量的重要系统属性可以调整。其中许多将在特定功能中介绍。但是,有几个值得一提的属性:
- `ScheduledEnqueueTimeUtc` – 如果您想延迟消息在队列上的可见性,可以设置一个未来的 UTC 时间,在此时间之前消息将不可用。
- `ExpiresAtUtc` – 如果设置,将是一个特定的 UTC 时间,消息将在此时过期。
- `TimeToLive` – 类似于 `ExpiresAtUtc`,这是消息入队后必须经过的时间量,消息在此时间后将过期一次。此属性和 `ExpiresAtUtc` 都直接影响消息何时过期,一个在设定的时间,另一个在一段时间后。
同样,这不是一个详尽的列表,了解其他一些有影响力的属性会对您有所帮助,例如 `MessageId` 如何用于消息重复,或者 `Label` 和 `ContentType` 等可选属性的用途。
接收消息
命名空间已建立。检查。队列已创建。检查。消息已发送。检查。这个简单场景中明显最后一步是分布式系统执行我们入队的消息。与 `MessageSender` 一样,我们也希望使用 `MessageReceiver` 抽象来获取消息。
要生成 `MessageReceiver`,我们需要一个 `MessagingFactory`,您可以参考发送消息部分以获取更多信息。
MessageReceiver messageReceiver =
await _messagingFactory.CreateMessageReceiverAsync(queueName);
从这里开始,我们可以开始接收消息。我们可以利用 `ReceiveAsync` 的 `Timespan` 参数来指定轮询多长时间然后放弃。此外,下面的示例将执行一个连续循环,直到取消标记被取消。
while (!cancellationToken.IsCancellationRequested)
{
//Wait up to 1 minute for a message
var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1)).ConfigureAwait(false);
await ProcessAndReleaseMessageAsync(msg);
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
}
private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
MessageProcessingAction action = MessageProcessingAction.Abandon;
try
{
//Process message
action = MessageProcessingAction.Complete;
await UpdateMessageState(message, action);
}
catch (Exception ex)
{
//log
}
finally
{
//C# 6.0 allows await calls in a finally blocks
UpdateMessageState(message, action);
}
}
private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
switch (action)
{
case MessageProcessingAction.Complete:
await message.CompleteAsync();
break;
case MessageProcessingAction.Abandon:
await message.AbandonAsync();
break;
case MessageProcessingAction.Deadletter:
await message.DeadLetterAsync();
break;
default:
await message.AbandonAsync();
break;
}
}
简单来说,我们最多等待 1 分钟来从队列中拉取一条消息。成功拉取消息后,消息将被处理,然后我们通过调用 `CompleteAsync` 或 `AbandonAsync` 来明确通知 Azure 服务总线消息的状态。这是因为队列的默认模式是消息以 `PeekLock` 模式接收。
`PeekLock` 指定消息仅暂时被签出一段时间以供处理。除非消息被签入为完成状态,否则它将不会从队列中删除。还有其他后果会直接影响消息从队列中删除,例如队列或消息的生存时间设置。但是,就我们的接收者处理消息而言,我们需要完成、放弃或死信该消息。我们稍后将更多地讨论死信队列以及将消息移至那里。
就上面的示例而言,有几点;显然,`switch` 语句可以更优雅地重构,但也会模糊我想演示的关于更新消息状态的内容,以便队列能够正确处理它。无论这意味着在我们的处理尝试失败的情况下将其保留在队列上,还是将其设置为完成以供队列删除。
`PeekLock` 并非消息接收的唯一模式。让我们看看消息接收模式以及我一直保留到现在的其他队列行为和功能。
再次处理队列!
我特意尝试不让队列过于复杂,以免涉及所有不同的功能。但在结束服务总线的第一部分之前,我想回顾一下几个值得注意的重要观点。
正如我已经提到的,有两种消息检索模式:`PeekLock` 和 `ReceiveAndDelete`(技术上还有第三种,会话+Peek,但不在本次讨论范围之内)。我们已经讨论了 `PeekLock`,因此 `ReceiveAndDelete` 是一种不太持久的选项,因为它会在检索消息时将其从队列中删除。因此,正如您可能预料到的,您未处理的任何消息处理异常都可能导致该消息丢失。创建 `MessageReceiver` 时,我们可以更改这一点。
MessageReceiver messageReceiver =
await _messagingFactory.CreateMessageReceiverAsync(queueName, ReceiveMode.ReceiveAndDelete);
正如在队列属性下提到的,当在 `PeekLock` 模式下接收消息时,我们可以设置消息的 `LockDuration`,它决定了接收者可以保留消息多长时间,然后才能将其释放回队列进行处理。默认值为 1 分钟,但最多可设置为 5 分钟。如果消息的 `LockUntilUtc` 属性显示其即将过期,我们可以请求续订消息锁定。
await msg.RenewLockAsync();
队列有两种传递模式:拉取和转发。到目前为止,我们看到了拉取,即我们的 `MessageReceiver` 根据请求拉取队列以接收消息。但是,我们可以通过设置队列的 `ForwardTo` 属性来转发消息,这样任何传递的消息都将自动转发到另一个队列。
QueueDescription queueDescription =
await namespaceManager.CreateQueueAsync(new QueueDescription(QueueName)
{
ForwardTo = destinationQueueName
});
第一部分总结
总而言之,我们了解了构成 Azure 服务总线的许多差异,如何管理队列的各个方面,以及如何通过队列发送和接收消息。现在,我们已经涵盖了很多关于服务总线的内容,但还有更多内容需要涵盖。别担心,我会让您休息一下,并在第二部分中介绍主题和订阅、死信队列、自动转发、事务、批量处理、安全和服务总线最佳实践等主题。
关于 Azure 服务总线中介消息(第一部分)你需要知道的一切 最早出现在 LockMeDown.com。