65.9K
CodeProject 正在变化。 阅读更多。
Home

您需要了解的关于 Azure Service Bus 代理消息传递的所有信息(第二部分)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (6投票s)

2014年12月21日

CPOL

13分钟阅读

viewsIcon

16374

downloadIcon

7

您需要了解的关于 Azure Service Bus 代理消息传递的所有信息。

引言

这是 Azure 服务总线中介消息的第二部分,您可以在 这里 找到第一部分。在第一部分中,我们严格关注了如何使用服务总线队列。在第二部分中,由于信息量巨大,我们将主要关注服务总线主题和订阅的功能和优势,您现在应该知道它们与队列是不同的。

主题

现在,由于主题和订阅的密切关系,很难不谈论它们,而且在几乎所有示例中,您都会看到它们。但是,尽管它们存在差异,主题与服务总线队列也有许多相似之处。因此,从这个角度来看,既然您已经熟悉了服务总线队列,我们将先讨论主题,然后介绍订阅。

与队列一样,主题提供了一个发送消息的目的地。但主题与订阅相结合,提供了一种发布/订阅模式来分发消息。这与队列的孤立视图不同,队列既是接收者也是其接收消息的保留者。

简而言之,主题提供了发布/订阅模式的发布者端。从 30,000 英尺的高度来看,它们与队列似乎并没有太大区别,因为它们都提供了一个发送消息的目的地。然而,主题与队列一样,仍然需要创建、管理和发送消息。

主题的许多管理操作的用法和执行方式与队列相同,我们可以使用我们在 第一部分 中看到的相同 NamespaceManager 来方便我们的 CRUD 操作。

创建主题

创建主题可以很简单,只需为其命名即可。

NamespaceManager namespaceManager = 
    NamespaceManager.CreateFromConnectionString(AccountInfo.ConnectionString);
TopicDescription newTopicDescription = await _namespaceManager.CreateTopicAsync(topicName);

或者,可以从 TopicDescription 创建主题,我们在其中定义主题的特征。

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

TopicDescription newTopicDescription = null;

//If not exists, we'll create it
if (!await namespaceManager.TopicExistsAsync(description.Path))
{
    newTopicDescription = await namespaceManager.CreateTopicAsync(description);
}

//Or we'll retrieve it if it already exists
TopicDescription topicDescription = newTopicDescription ?? 
    await namespaceManager.GetTopicAsync(description.Path);

通过一个更复杂的示例,我们可以演示一些可用的实用方法,用于检查主题是否存在以及检索有关现有主题的详细信息。

更新主题

与队列一样,我们也可以通过提供 TopicDescription 来更新现有主题的属性。但是,有些特定属性在创建后无法更改,例如 RequiresDuplicateDetection

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

await namespaceManager.UpdateTopicAsync(description);

删除主题

当我们删除一个不存在的主题时,它不会抛出异常。

await namespaceManager.DeleteTopicAsync(topicName);

发送消息到主题

现在,任何对主题和订阅工作原理有所了解的人都会想,在我介绍订阅之前,我为什么要谈论发送消息到主题呢?很简单,到目前为止,包括发送消息到主题,很多东西都没有改变,而且可以在不被它们的不同之处所困扰的情况下轻松了解服务总线主题。请继续关注我,我们很快就会讲到。

您可能还记得,我在 第一部分 中提到过 MessageSenderQueueClient 的抽象,并且使用这个抽象会有回报?嗯,它也是 TopicClient 的抽象。TopicClient 是发送消息到主题的直接客户端。因此,如果您可以使用 MessageSender 而不是相关的客户端,那么您的应用程序就不必关心它是向队列还是主题发送消息,这就是使用 MessageSender 的回报所在。

您可以参考我们创建 MessageSender 来发送消息的 Queue 示例,发送消息就像发送到 Queue 一样简单。

await messageSender.SendAsync(new BrokeredMessage("SimpleMessage"));

然而,如果您正在测试这一点,您将面临一个巨大的惊喜,所以我将为您节省一些头发(这是我擅长失去的东西!)。当主题没有注册的订阅时,消息会自动被丢弃。因此,现在似乎是介绍订阅的绝佳时机。

订阅

作为提醒,主题和订阅一起代表发布/订阅模式(参见之前的主题/订阅图)。主题代表发布者,订阅代表订阅者。当创建一个订阅时,会提供一个它将订阅的主题,以便接收订阅感兴趣的消息。最简单形式是,发送到主题的消息会被复制并转发到每个满足其订阅策略(我们称之为规则)的订阅。

我喜欢将主题和订阅之间的这种关系比作机场到达场景。请想象一下,一个主题代表机场。您,作为消息,到达机场,经过行李认领,然后到达礼宾车司机站立的地方,他们手中拿着写有不同人名字的牌子。这些司机代表订阅策略(规则)。假设您找到了一个拿着写有您名字的牌子的司机,他/她会护送您到他们的礼宾车(订阅),您将在那里待在礼宾车里,然后被护送去度一个全包式的豪华假期,不再停留在机场。那么,如果您没有找到一个拿着写有您名字的牌子的司机呢?

您看过汤姆·汉克斯主演的电影《幸福终点站》吗?维克多·纳沃斯基(汤姆·汉克斯饰)因为被拒绝入境美国,同时又因为革命无法返回祖国,被困在肯尼迪国际机场?嗯,不像维克多有一个住宿的地方(终点站),当一条消息到达一个主题但又没有满足任何关联订阅的规则时,这条消息就会被丢弃。

所以,让我们更仔细地看看订阅的所有这些特性以及它们如何与主题关联。

创建订阅

当我们创建一个订阅时,最少也要将其关联到一个主题。

SubscriptionDescription switftSubscription = 
    await namespaceManager.SubscriptionExistsAsync(“singingtelegrams”, “TaylorSwift”);

此外,我们可以通过传递 SubscriptionDescription 来设置订阅的一些属性。

SubscriptionDescription subscription = new SubscriptionDescription((“singingtelegrams”, “TaylorSwift”)
{
    EnableDeadLetteringOnFilterEvaluationExceptions = true,
    EnableDeadLetteringOnMessageExpiration = true,
    EnableBatchedOperations = true,
};

SubscriptionDescription bieberSubscription = 
    await namespaceManager.CreateSubscriptionAsync(subscription);

在上面的示例中,我们提供了现有主题的路径以及我们要创建的订阅的名称。通过这种方式,这些订阅没有明确的规则来定义它们应该接收什么消息。因此,它们订阅了接收发送到“singingtelegrams”的所有消息。当任何消息发送到“singingtelegram”主题时,TaylorSwift 和 JustinBieber 订阅都会在其虚拟队列中收到一份副本。

目前,订阅数量的限制是 2000 个。

删除订阅

与删除队列和订阅之间唯一的细微差别是,我们还必须提供订阅关联的主题路径以及订阅的名称。

await namespaceManager.DeleteSubscriptionAsync(topicPath, subscriptionName);

更新订阅

同样,与队列和主题一样,有些属性必须在创建时设置,例如 RequireSession,但我们可以提供一个 SubscriptionDescription 实例来更新我们 Subscription 的属性。

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)
{
    DefaultMessageTimeToLive = TimeSpan.FromHours(8),
    LockDuration = TimeSpan.FromSeconds(90)
};

SubscriptionDescription updatedSubscription = 
    await namespaceManager.UpdateSubscriptionAsync(subscription);

回想一下第一部分,我曾说过,最终我们仍然是在处理队列?嗯,微软确实将 Subscription 定义为虚拟队列。发送到 subscription 的消息副本就驻留在那里,并且像服务总线队列一样进行检索。

在订阅中检索消息

就像使用 abstract MessageSender 一样,如果我们使用 abstract MessageReceiver,我们就不必担心是从服务总线队列还是订阅中拉取消息。下面是从第一部分 接收消息 的代码,与队列示例完全相同,除了传递给 CreateMessageReceiverAsync 方法的单个参数。

MessageReceiver messageReceiver =  await _messagingFactory.CreateMessageReceiverAsync
    ("singingtelegram/subscriptions/TaylorSwift");

try
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
        if (msg.LockedUntilUtc == DateTime.UtcNow) await msg.RenewLockAsync();
        await ProcessAndReleaseMessageAsync(msg);
        await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
    }
}
catch (Exception ex)
{
    //log error;
    throw;
}

private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
    MessageProcessingAction action = MessageProcessingAction.Abandon;

    try
    {
        //Process message
        action = MessageProcessingAction.Complete;
    }
    catch (Exception ex)
    {
        //log
    }
    finally
    {
        //if something fails update with abandon
        //C# 6.0 allows await calls in a finally blocks
        UpdateMessageState(message, action);
    }
}

private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
    switch (action)
    {
        case MessageProcessingAction.Complete:
            await message.CompleteAsync();
            break;
        case MessageProcessingAction.Abandon:
            await message.AbandonAsync();
            break;
        case MessageProcessingAction.Deadletter:
            await message.DeadLetterAsync();
            break;
        default:
            await message.AbandonAsync();
            break;
    }
}

我们确实需要关注的是我们传递给 MessagingFactory.CreateMessageReceiverAsync(entityPath) 方法的 string。在我们的 Queue 示例中,我们只提供队列路径的 string。但是,在这种情况下,我们需要订阅路径,格式为 <主题路径>/subscriptions/<订阅名称>,如“singingtelegram/subscriptions/TaylorSwift”所示。

在接收到任何消息后,我们可以按照与我们在 Queue 示例中更新 Message 相同的流程,通过调用 AbondonCompleteDeadLetter。我将让您回顾第一部分中关于消息接收的详细信息。

到目前为止,使用主题和订阅而不是队列的唯一引人注目的原因是能够将单个消息的副本提供给多个感兴趣的方(订阅)。但这只是主题和订阅的一个引人注目的功能。另一个引人注目的功能是消息路由以及控制哪些消息路由到哪些订阅的能力。

我们可以通过规则来实现消息路由。现在,即使我们没有为已经创建的订阅定义规则,并不意味着规则不存在。实际上,所有订阅至少有一个规则。

规则、过滤器和操作,我的天哪!

正是通过规则,我们才达到了主题和订阅如此强大的顶峰。在订阅的世界中,规则定义了应该将哪些消息复制到特定的订阅。每个订阅都可以访问主题接收的所有消息,并单独过滤出属于它的消息。这种过滤是通过规则的过滤器完成的,我们将对此进行深入探讨。

除了过滤之外,当一条消息被订阅过滤掉,因为它满足规则的条件时,我们有机会对该消息执行操作。这些操作包括添加、删除和更新自定义属性以及更改系统属性值。所有这些都由规则的操作来定义。因此,实际上是过滤器和操作构成了规则。

从主观的方面来说,规则更像是策略而不是规则。规则不仅定义了条件(过滤器),还定义了满足条件时应该发生什么,这更像是策略。我之所以这么说,是因为有些人更容易将规则概念化为策略。

另一种陈述上述冗长解释的方式是,订阅会过滤掉满足规则条件的的消息,并在复制消息到订阅时,可选地对消息的属性执行操作。所以,让我们看看所有这些的示例。

规则过滤器

在 .NET 环境中,订阅规则以 RuleDescription 的形式存在,并定义了规则的 FilterAction。当我们创建订阅时,有一个可用的重载来传递 RuleDescription

RuleDescription rule = new RuleDescription
{
    Name = "ForwardToTaylorSwift",
    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber’ OR RequestedSinger = 'Taylor Swift’")
};

SubscriptionDescription subscription = 
    new SubscriptionDescription(topicPath, subscriptionName);
SubscriptionDescription description = 
    await namespaceManager.CreateOrRetrieveSubscriptionAsync(subscription, rule);

所以,这是我们第一次接触 Rule 及其过滤器。正如过滤器的类型所示,它允许在其构造函数中定义 SQL92 标准表达式,该表达式将决定哪些消息被过滤掉。我们可以将其解读为,在“singingtelegram”主题接收的所有消息中,如果具有自定义属性“RequestedSinger”且值为“Justin Bieber”或“Taylor Swift”,则会将它们过滤到“TaylorSwift”订阅。

所以,如果我们向“singingtelegram”主题发送一个与上述规则匹配的 BrokeredMessage

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);

但是,这并不排除其他订阅也接收此消息。请记住,我之前提到过,所有订阅都有一个规则。当我们不显式定义规则时(正如我们最初创建订阅的示例那样),会为我们提供一个默认规则,该规则有一个默认过滤器。由于规则条件是在规则过滤器中定义的,让我们看看可用的开箱即用的过滤器。

  • SQLFilter – 许多其他过滤器(如 TrueFilterFalseFilter)都从此过滤器派生。
  • TrueFilter – 这是通过默认规则提供的默认过滤器,当我们创建订阅时没有显式提供规则或过滤器时,会为我们生成此规则。最终,它生成 SQL92 表达式 1=1,并订阅接收关联主题的所有消息。
  • FalseFilter – 与 TrueFilter 相反,它生成 SQL92 表达式 1=0;具有此过滤器的订阅将永远不会订阅关联主题的任何消息。
  • CorrelationFilter – 此过滤器订阅订阅特定消息的 CorrelationId 属性的所有消息。

请注意,SQL 表达式中的比较值是区分大小写的,而属性名则不区分(例如,“RequestedSinger = ‘Taylor Swift’” 与 “RequestedSinger = ‘taylor swift’” 不同)。

添加规则

然而,订阅的数量不限于一条规则。我们可以向现有订阅添加其他规则。但是,这是一个完美的例子,说明 MessageReceiver 抽象类无法满足我们的需求,我们将需要转向 SubscriptionClient

SubscriptionDescription subscription = 
    await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName);

//MessagingFactory as demo'd in part 1
SubscriptionClient subscriptionClient = 
    messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name);

RuleDescription ruleDescription = 
    new RuleDescription("NeverGrowUpSongRule", new SqlFilter("RequestedSong = 'Never Grow Up'"));

await subscriptionClient.AddRuleAsync(ruleDescription);

第一个操作只是一个返回已验证的 SubscriptionDescription 的个人包装器。经过验证后,我们可以获取一个 MessagingFactory(参见第一部分 发送消息),这将允许我们创建一个 SubscriptionClient,进而允许我们向现有订阅添加规则。

请理解,每个满足的规则都会生成一个消息副本。这意味着,如果一个订阅有 3 条独立的规则,一条消息如果满足所有 3 条规则的条件,将收到该消息的 3 个单独副本。如果您想要基于各种条件的消息(一次),则需要将这些条件指定在单个规则过滤器中(例如,“RequestedSinger = ‘Justin Bieber’ OR RequestedSinger = ‘TaylorSwift’”)。

关于 CorrelationFilter 的快速说明

CoorelationFilter 是一个用于特定用途的特殊过滤器。它是实现相关性模式的一种可用方式,在该模式中,订阅可以检索绑定到同一消息的 CoorelationId 的所有消息。我们将在第三部分讨论消息模式时对此进行研究。

规则操作

每个规则都必须有一个过滤器;这是定义规则条件的。但是,规则不一定有一个操作。如前所述,规则的操作允许我们操作满足规则条件的的消息的属性(系统属性和自定义属性)。

如前所述,规则是过滤器和操作的容器,我们可以在 RuleDescriptionAction 属性中定义一个操作。

RuleDescription ruleDescription = new RuleDescription("ChangeRequestedSinger")
{
    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber'"),
    Action = new SqlRuleAction("SET RequestedSinger = 'Taylor Swift'")
};

在这里,我们创建了一个 RuleDescription,它将过滤掉任何 RequestedSinger 值为“Justin Bieber”的消息,并将该属性的值更改为“Taylor Swift”,如“SET RequestedSinger = ‘Taylor Swift’”操作所示。

如果我们添加此规则,或者如下所示,使用此规则创建订阅。

SubscriptionDescription newSubscriptionDescription = 
    await _namespaceManager.CreateSubscriptionAsync(topicPath, subscriptionName, ruleDescription );

然后发送一条满足订阅条件的消息(即,Filter = new SqlFilter(“RequestedSinger = ‘Justin Bieber'”)。

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);

当我们接收到消息时,我们会发现 RequestedSinger 自定义属性的值将不再是“Justin Bieber”,而是“TaylorSwift”。

BrokeredMessage msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
message.Properties["RequestedSinger"] // ="Taylor Swift"

操纵自定义属性只是 Action 的可用功能之一。还可以添加自定义属性或更改系统属性。想象一下,您想要一个捕获所有没有定义 RequestedSinger 的消息的通用订阅,并创建 RequestedSinger 属性并更新系统属性 ContentType

var ruleDescription = new RuleDescription("ChangeRequestedSinger")
{
    Filter = new SqlFilter("RequestedSinger IS NULL"),
    Action = new SqlRuleAction("SET sys.ContentType = 'audio/mpeg'; 
            SET RequestedSinger = 'Taylor Swift'")
};

使用具有上述 RuleDescriptionSubscription,任何被过滤的消息都将具有 System 属性 ContentType 更新为 audio/mpeg,并创建一个新的自定义属性 RequestedSinger

请注意 SQLRuleAction 中的前缀“sys”,它对于将属性范围设置为 System 属性是必需的。默认情况下,范围设置为用户属性(自定义属性),由 user.<property_name> 表示。

因此,正如您所见,规则操作具有很大的威力。您可以在 Microsoft 的 MSDN 页面 上了解更多关于操作的语法和可用选项。

结论

正如您所清楚看到的,主题是一个很大的“主题”,并且与订阅密不可分。我们了解到,主题和订阅最强大的两个功能是能够将消息分发给多个感兴趣的方(订阅),并且这些方能够过滤出它们特别感兴趣的消息。关于 Azure 服务总线的主题,仍有相当多的内容需要涵盖。因此,在最后一篇文章中,我们将深入探讨一些更高级的功能、模式、安全性和最佳实践。

关于 Azure 服务总线中介消息的所有知识(第二部分)首次出现在 LockMeDown.com

© . All rights reserved.