65.9K
CodeProject 正在变化。 阅读更多。
Home

使用强类型消息与 Azure Service Bus 的最佳实践

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.73/5 (18投票s)

2013年6月6日

CPOL

8分钟阅读

viewsIcon

88418

本文将向您展示一种使用 Azure Service Bus + Worker 角色以强类型方式发送和接收消息的模式,同时利用了 Brokered Messages 内置的功能。

引言

在构建 Azure Web 应用程序,或任何 Web 应用程序时,通常会使用后台服务来执行耗时或依赖外部资源的任务。这有助于提高 Web 应用程序的性能和健壮性。Azure 实现这一点的一种方式是使用一个或多个 Worker 角色和 Azure Service Bus,其中您的 Web 应用程序将消息发送到 Azure Service Bus,然后由您的 Worker 角色检索这些消息。Worker 角色可以根据接收到的消息类型执行适当的任务,而不会影响您的网站性能。

本文将探讨我认为在使用 Worker 角色实现消息传递时的一些最佳实践,包括:

  • 使用模式发送和接收强类型消息
  • 异常日志记录
  • 死信队列处理有害消息
  • 当没有收到消息时自动退避轮询
  • 与 Ninject 结合使用 IOC

背景 

网上有很多教程展示了使用 BrokeredMessage 类(在发送前设置消息属性,然后在检索消息时读取这些属性)的示例。例如,您可能有一段用于发送消息的代码(如本文所示),它按如下方式发送消息:

 // Create message, passing a string message for the body
BrokeredMessage message = new BrokeredMessage("Test message " + i);
// Set some addtional custom app-specific properties
message.Properties["TestProperty"] = "TestValue";
message.Properties["Message number"] = i;   
// Send message to the queue
Client.Send(message); 

在接收端

BrokeredMessage message = Client.Receive();
Console.WriteLine("Body: " + message.GetBody<string>());
Console.WriteLine("MessageID: " + message.MessageId);
Console.WriteLine("Test Property: " + message.Properties["TestProperty"]); 

请注意接收器的第二行,其中调用了 GetBody<string> 方法。这仅在消息正文实际上是字符串时才有效。同样,当您在消息正文中序列化不同类型的对象时也适用,例如,本文展示了比萨订单作为消息正文序列化,如下所示:

[DataContract]
public class PizzaOrder
{
    [DataMember]
    public string Name { get; set; }
    [DataMember]
    public string Pizza { get; set; }
    [DataMember]
    public int Quantity { get; set; }
}
....
// Create a new pizza order.
PizzaOrder orderIn = new PizzaOrder()
{
    Name = "Alan",
    Pizza = "Hawaiian",
    Quantity = 1
};
 
// Create a brokered message based on the order.
BrokeredMessage orderInMsg = new BrokeredMessage(orderIn);
 
// Send the message to the queue.
queueClient.Send(orderInMsg); 

在接收端

BrokeredMessage orderOutMsg = queueClient.Receive();
 
if (orderOutMsg != null)
{
    // Deserialize the message body to a pizza order.
    PizzaOrder orderOut = orderOutMsg.GetBody<PizzaOrder>();
 
    Console.WriteLine("Received order, {0} {1} for {2}",
        orderOut.Quantity, orderOut.Pizza, orderOut.Name);
        
... 

同样,使用 .GetBody<T> 方法允许我们以强类型方式检索消息正文,但此方法仅在消息正文类型为 PizzaOrder 时才有效。

您可能已经注意到,如果使用上述模式,Worker 角色将只能处理单一消息类型。如果我们想序列化多种不同类型的对象发送到后台 Worker 角色,并让它根据它们的类型自动反序列化并执行操作,而无需提前知道消息类型,该怎么办?

本文将展示如何实现这一点,使 Worker 角色不局限于只处理单一消息类型。

撰写本文的动机

如上所述,网络上的所有示例似乎都只处理发送/接收单一类型的消息。当您需要发送/接收多种类型时,您必须考虑其他选项,例如拥有多个 Worker 角色,或使用 NServiceBus 或 MassTransit 等消息框架。

本文的灵感来源于 NServiceBus,它(我几年前最后一次使用时)似乎正在使用类似的模式来序列化/反序列化消息。我还发现 Azure 教程 http://www.windowsazure.com/en-us/develop/net/tutorials/multi-tier-web-site/1-overview/ 对此非常有帮助。我想我会实现类似的代码,并将其打包在一个基本解决方案中供任何人使用。

基本模式涉及向 Azure 服务发送一个包含序列化消息体的 BrokeredMessage,以及一个指定消息体完整类型的附加属性。在接收端,Worker 角色使用 Brokered Message 中的类型信息动态确定 Brokered Message 中包含的序列化类型,并以强类型方式检索它。然后,它调用为该消息类型指定的预定义处理程序。

使用代码

发送消息

发送消息的代码相当简单。您只需要确保所有消息都通过一个集中点发送,该点包含以下代码:

const string QueueName = "ProcessingQueue";
protected void SendQueueMessage<T>(T message)
{
    // Create the queue if it does not exist already
    string connectionString = 
      CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
    var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
    if (!namespaceManager.QueueExists(QueueName))
    {
        namespaceManager.CreateQueue(QueueName);
    }
    // Initialize the connection to Service Bus Queue
    var client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
    // Create message, with the message body being automatically serialized
    BrokeredMessage brokeredMessage = new BrokeredMessage(message);
    brokeredMessage.Properties["messageType"] = message.GetType().AssemblyQualifiedName;
    client.Send(brokeredMessage);
} 

注意最后三行

  • 第一行创建了一个代理消息,并使用内置功能传入一个将作为消息体序列化的对象。
  • 第二行在代理消息上设置了一个属性 (messageType),指定消息体中包含的对象的完整类型。
  • 第三行将消息发送到队列。

注意:我使用 Visual Studio 2012 开发本文代码,并在我的 Web 项目中添加了对 Azure Service Bus 的引用,以便使用 Azure 消息队列功能。

Install-Package WindowsAzure.ServiceBus  

接收消息

在接收端,我们需要读取 messageType 属性,然后使用反射根据该类型提取消息体。然后我们需要为该消息类型调用适当的消息处理程序。

为了实现这一点,我们首先定义一个名为 IMessageHandler 的接口,如下所示:

public interface IMessageHandler<T> where T : class
{
    void Handle(T message);
} 

然后我们定义一个表示可能发送的消息的 DTO 类。例如:

public class PersonMessage
{
    public string Name { get; set; }
    public int Age { get; set; }
} 

现在我们需要为这种类型的消息定义一个处理程序,例如:

public class PersonMessageHandler : IMessageHandler<PersonMessage>
{
    public void Handle(PersonMessage message)
    {
        //Do something useful with the message here
    }
}

最后,在我们的 WorkerRole.cs 类中,我们执行以下操作:

  • 在我们的 OnStart 方法中,我们配置了我们的 IOC 容器(在本例中是 Ninject,但您也可以使用其他容器),以自动将 IMessageHandler 接口的所有实现连接到正确的实现。
  • 我们还为我们的 Worker 角色配置了诊断,因此任何异常都会自动记录到我们的存储帐户中进行分析。
  • 当我们从队列中接收到消息时,我们读取 messageType,使用反射提取消息体并确定正确的处理程序类型。然后我们使用 Ninject 找到适当的处理程序,并调用该处理程序。(请注意,我在这里不必使用 Ninject - 我可以使用另一个 IOC 容器,或者只是使用反射来创建消息处理程序的实例。但是我已经在使用 Ninject,所以我认为我会利用它)。
  • 处理程序完成后,我们将消息标记为已完成,以便将其从队列中移除。

我们还负责记录异常、处理死信消息以及在当前轮询间隔内没有收到消息时自动退避。完整的源代码如下所示。

public class WorkerRole : RoleEntryPoint
{
    private IKernel NinjectKernel;
    // The name of your queue
    const string QueueName = "ProcessingQueue";
    int _currentPollInterval = 5000;
    int _minPollInterval = 5000;
    int _maxPollInterval = 300000;
    // QueueClient is thread-safe. Recommended that you cache 
    // rather than recreating it on every request
    QueueClient Client;
    bool IsStopped;
    private volatile bool _returnedFromRunMethod = false;
    public override void Run()
    {
        while (!IsStopped)
        {
            BrokeredMessage msg = null;
            try
            {
                // Receive as many messages as possible (to reduce the number of storage transactions)
                var receivedMessages = Client.ReceiveBatch(32);
                if (receivedMessages.Count() == 0)
                {
                    Thread.Sleep(_currentPollInterval);
                    //No messages, so increase poll interval
                    if (_currentPollInterval < _maxPollInterval)
                    {
                        _currentPollInterval = _currentPollInterval * 2;
                    }
                    continue;
                }
                //At least one message, so reset our poll interval
                _currentPollInterval = _minPollInterval;
                foreach (var receivedMessage in receivedMessages)
                {
                    msg = receivedMessage;
                    // Process the message
                    Trace.WriteLine("Processing", receivedMessage.SequenceNumber.ToString());
                    //If it's a poison message, move it off to the deadletter queue
                    if (receivedMessage.DeliveryCount > 3)
                    {
                        Trace.TraceError("Deadlettering poison message: message {0}", 
                                     receivedMessage.ToString());
                        receivedMessage.DeadLetter();
                        continue;
                    }
                    //Get actual message type
                    var messageBodyType = 
                      Type.GetType(receivedMessage.Properties["messageType"].ToString());
                    if (messageBodyType == null)
                    {
                        //Should never get here as a messagebodytype should
                        //always be set BEFORE putting the message on the queue
                        Trace.TraceError("Message does not have a messagebodytype" + 
                          " specified, message {0}", receivedMessage.MessageId);
                        receivedMessage.DeadLetter();
                    }
                    //Use reflection to figure out the type
                    //of object contained in the message body, and extract it
                    MethodInfo method = typeof(BrokeredMessage).GetMethod("GetBody", new Type[] { });
                    MethodInfo generic = method.MakeGenericMethod(messageBodyType);
                    var messageBody = generic.Invoke(receivedMessage, null);
                    //Process the message contents
                    ProcessMessage(messageBody);
                    //Everything ok, so take it off the queue
                    receivedMessage.Complete();
                }
            }
            catch (MessagingException e)
            {
                if (!e.IsTransient)
                {
                    Trace.WriteLine(e.ToString());
                }
                Thread.Sleep(10000);
            }
            catch (Exception ex)
            {
                string err = ex.ToString();
                if (ex.InnerException != null)
                {
                    err += "\r\n Inner Exception: " + ex.InnerException.ToString();
                }
                if (msg != null)
                {
                    err += "\r\n Last queue message retrieved: " + msg.ToString();
                }
                Trace.TraceError(err);
                // Don't fill up Trace storage if we have a bug in either process loop.
                System.Threading.Thread.Sleep(1000 * 60);
            }
        }
        // If OnStop has been called, return to do a graceful shutdown.
        _returnedFromRunMethod = true;
        Trace.WriteLine("Exiting run method");
    }
    public override bool OnStart()
    {
        // Set the maximum number of concurrent connections 
        ServicePointManager.DefaultConnectionLimit = 12;
        //Diagnostics
        ConfigureDiagnostics();
        // Create the queue if it does not exist already
        string connectionString = 
          CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
        var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        if (!namespaceManager.QueueExists(QueueName))
        {
            namespaceManager.CreateQueue(QueueName);
        }
        // Initialize the connection to Service Bus Queue
        Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
        IsStopped = false;
        ConfigureNinject();
        return base.OnStart();
    }
    private void ConfigureDiagnostics()
    {
        DiagnosticMonitorConfiguration config = DiagnosticMonitor.GetDefaultInitialConfiguration();
        config.ConfigurationChangePollInterval = TimeSpan.FromMinutes(1d);
        config.Logs.BufferQuotaInMB = 500;
        config.Logs.ScheduledTransferLogLevelFilter = LogLevel.Verbose;
        config.Logs.ScheduledTransferPeriod = TimeSpan.FromMinutes(1d);
        DiagnosticMonitor.Start(
               "Microsoft.WindowsAzure.Plugins.Diagnostics.ConnectionString",
               config);
    }
    private void ConfigureNinject()
    {
        var kernel = new StandardKernel();
        kernel.Bind(x => x.FromThisAssembly()
              .SelectAllClasses().InheritedFrom(typeof(IMessageHandler<>))
              .BindAllInterfaces());
        NinjectKernel = kernel;
    }
    public override void OnStop()
    {
        // Close the connection to Service Bus Queue
        IsStopped = true;
        while (_returnedFromRunMethod == false)
        {
            System.Threading.Thread.Sleep(1000);
        }
        Client.Close();
        base.OnStop();
    }
    /// <summary>
    /// Locates the correct handler type, and executes it using the current message
    /// </summary>
    /// <typeparam name="T">The type of message</typeparam>
    /// <param name="message">The actual message body</param>
    public void ProcessMessage<T>(T message) where T : class
    {
        //Voodoo to construct the right message handler type
        Type handlerType = typeof(IMessageHandler<>);
        Type[] typeArgs = { message.GetType() };
        Type constructed = handlerType.MakeGenericType(typeArgs);
		//NOTE: Could just use reflection here to locate and create an instance
		// of the desired message handler type here if you didn't want to use an IOC container...
        //Get an instance of the message handler type
        var handler = NinjectKernel.Get(constructed);
        //Handle the message
        var methodInfo = constructed.GetMethod("Handle");
        methodInfo.Invoke(handler, new[] { message });
    }
} 

如您所见,这段代码做了很多工作,但所有这些都解决了前面提到的各种最佳实践。如果您有兴趣在您选择的解决方案中应用此模式,我建议从 GitHub 下载完整的源代码。

注意:我使用 Visual Studio 2012 开发本文代码,并在我的 worker 项目中添加了对 Azure Service Bus 的引用,以便使用 Azure 消息队列功能。由于我使用了 Ninject,我还必须添加相关的 NuGet 包,如下所示:

Install-Package WindowsAzure.ServiceBus
Install-Package ninject.extensions.conventions -Version 3.0.0.11 

源代码在哪里?

本文的所有源代码都可以在我的 YouConf 网站的 GitHub 仓库中找到:https://github.com/phillee007/youconf。需要查看的相关部分是:

  • YouConfWorker 项目,其中包含用于接收和处理消息的 Azure Worker 角色代码。
  • YouConf.Common 项目,其中包含将序列化在代理消息正文中的消息类型。
  • YouConf Web 项目中的 BaseController 类,负责以之前描述的方式发送消息。

其他关注点

反射和其他 IOC 容器

如前面评论中所述——尽管本示例使用 Ninject 自动为给定消息类型找到正确的消息处理程序,但您也可以同样容易地使用普通的反射来完成相同的任务,或者您自己选择的 IOC 容器。我个人建议使用 IOC 容器,因为这正是它们可以提供帮助的情况,并且可以省去您本来需要编写的大量管道代码。

多线程

如果您的 Worker 角色执行的任务是 I/O 密集型的(例如,依赖于长时间运行的外部 Web 服务调用或类似操作),或者您正在多核实例(中/大型角色)上运行,您可能需要考虑使您的 Worker 角色成为多线程的。这可以使用 任务并行库 相当容易地实现,并将对 ProcessMessage 的调用包装在对 Task.Run (.Net 4.5) 或 TaskFactory.StartNew (.Net 4.0) 的调用中。只需确保除了可能生成的其他异常之外,还要捕获和处理任何 AggregateExceptions

请注意,您是否需要或想使用多线程将主要取决于您的具体情况和线程知识,因此,在您确信它会有帮助之前,最好不要尝试。

注意 - 服务级别协议 (SLA)

如果您的 Worker 角色必须符合特定消息类型的特定 SLA(例如,每秒处理 x 条消息),那么最好不要使用这种方法,而是为每种不同的消息类型使用单独的 Worker 角色。这将允许您更准确地测量特定消息类型的吞吐量,这在例如您的系统每个用户/客户端都有不同的消息类型,并且您已向他们保证可以达到给定级别的吞吐量时可能很重要。

您的评论

如果您对如何改进本文有任何评论或反馈,请告诉我,因为随着我对 Azure 知识的提高,本文仍在不断完善。

最后,如果您觉得本文有用,请务必查看我的主要 CodeProject 文章 - https://codeproject.org.cn/Articles/584534/YouConf-Your-Live-Online-Conferencing-Tool,其中包含许多类似本文的技巧,可帮助您构建一个坚如磐石的 Azure 应用程序。这只是我在 Azure 开发者挑战中众多发现之一。

感谢您的阅读,希望您学到了一些有用的东西!

参考文献 

历史

  • 2013年6月6日 - 初版文章。
  • 2013年6月14日 - 轻微语法修正。
© . All rights reserved.