65.9K
CodeProject 正在变化。 阅读更多。
Home

了解NServiceBus Saga

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.88/5 (42投票s)

2014年2月20日

CPOL

19分钟阅读

viewsIcon

160831

downloadIcon

1379

一篇关于如何使用NServiceBus创建工作流的小文章。

fHo 这里有一个小型演示应用程序,可在此处获取

引言

本文主要介绍如何使用进程间服务总线来执行可能长时间运行的工作流。我选择使用 NServiceBus(以下简称 NSB)来演示这一点,因为它提供了以下属性,我个人认为这些属性对于任何消息层都非常重要:

  • 事务性
  • 持久性
  • 可靠性
  • 分布式
  • 可扩展性

在本文中,我们将不会涉及所有这些要点,但我们会介绍其中一些。我还将逐步介绍如何创建可能长时间运行的工作流(NSB 中的 saga),并讨论工作流状态如何在状态之间持久化和管理。

我还应该提到,NSB 并非唯一可以做到这一点的进程间总线,您可能还可以使用其他总线,例如:

  • Mass Transit
  • Rabbit MQ
  • Azure 服务总线(尽管我没有使用经验,所以可能说错了)

必备组件

NServiceBus 支持多种数据持久化机制,例如:

  • MSMQ
  • Raven DB
  • SQL Server,通过使用 NHibernate

我选择在本文中使用 SQL Server,因为我一直想尝试一下。但是,我还包含了 RavenDB 版本的需要存储的进程,如果您希望下载 RavenDB 服务器并尝试使用它。

由于我使用的是 SQL Server,因此您需要访问它(我想如果您正在阅读本文,您可能已经拥有)。然后您需要执行以下 2 件事:

  1. 创建一个名为“NServiceBusPersistence”的新数据库
  2. 更改“SQLSubscriber”项目的 App.config 中的连接字符串,使其指向您自己的数据库

 

NServiceBus 简介

NServiceBus 实际上是一个分布式消息层。它允许进程之间进行各种通信,您可以选择:

  • 点对点
  • 发布/订阅
  • 发送给自己
  • 广播到所有

然而,最终实现这一切的都是 NServiceBus 的核心主干。如今,NServiceBus 还提供了基于云的产品。不过,在本文中,我将更侧重于常规的 NServiceBus 安装(我认为它更常见),即在组织内使用 MSMQ 的 NServiceBus

 如前所述,NServiceBus 提供了以下属性:

事务性

这使用标准分布式事务协调器,允许进程间事务发生。NServiceBus 框架所做的是,只有在消息(或完整的 Saga)完成时,它才会提交事务。如果在消息处理程序中遇到异常,NServiceBus 可能会执行 n 次重试(您必须配置此项),之后它将回滚事务。

持久性

由于 NServiceBus 使用 MSMQ 作为传输(或 Azure,但如我所述,本文中我们不讨论此问题)。所有消息都是安全的,即使断电后也会保留。MSMQ 的常见优点都可用。

可靠性

这部分得益于 MSMQ 的使用,也得益于 NServiceBus 在其消息管道的各个阶段持久化数据。如前所述,这可以使用 MSMQ/Raven 或 SQL 存储。本文将讨论使用 SQL 存储进行持久化。

分布式

我们可以将处理程序/发布者分布到整个网络中,这使得我们能够完全分布式我们的消息层。

可扩展性

NServiceBus 提供了一个名为“The Distributor”的负载均衡器。我不会在本文中介绍它,但如果您认为需要对消息组件进行负载均衡,“The Distributor”可能适合您。

 

演示应用程序设置

如前所述,NServiceBus 提供了一个通用的消息层接口。对于演示应用程序,我们将研究如何使用 NServiceBus 接受命令,然后使用消息。此图可能有助于进一步说明这一点。

可以看出,NServiceBus 提供了通用的消息层骨干(姑且这样称呼),并且每个处理消息的进程(在 NSB 中称为端点)都有一个 MSMQ。

 

需要注意的是,这只是使用 NServiceBus 的一种可能配置。您确实可以根据自己的需求进行配置。上面的图只是我为演示应用程序选择的配置方式。

 

消息

可以说,NServiceBus 都是关于消息的。如果您想向 NServiceBus 端点发送命令或事件,它将以消息的形式出现。

那么消息到底是什么?简单来说,它是一个所有参与方都知道如何处理的共享契约。NServiceBus 将消息序列化为您选择的序列化格式(这里有很多选择:Xml、Json、Bson、Binary 等等),并通过处理消息的端点(通过配置和预期的处理程序)进行反序列化。

如果您来自 WCF 世界,可以将消息视为客户端和服务器之间共享的 `[DataContract]` 对象。

“命令”消息和“消息”消息之间存在区别,但我们稍后会讨论。现在让我们看看两者的代码示例:

命令

namespace Messages.Commands
{
    public class CreatePurchaseOrderCommand
    {
        public int PurchaseOrderId { get; set; }
        public string Description { get; set; }
    }
}
Namespace Commands
    Public Class CreatePurchaseOrderCommand

        Public Property PurchaseOrderId() As Integer
        Public Property Description() As String

    End Class
End Namespace

Message

namespace Messages.Mess
{
    public class CheckPurchaseOrderStatusMessage
    {
        public int PurchaseOrderId { get; set; }
        public string ConfirmationDescription { get; set; }
    }
}
Namespace Mess
    Public Class CheckPurchaseOrderStatusMessage

        Public Property PurchaseOrderId() As Integer
        Public Property ConfirmationDescription() As String

    End Class
End Namespace

如您所见,Command 和 Message 类只是标准的 .NET 类,它们没有任何特别之处。我们稍后会讨论命令和消息之间的重要性,但现在请注意,这些消息只是常规的 .NET 类,允许您向其中添加任何您想要的数据。您可以放心,NServiceBus 会确保这些消息的值在需要时存储起来。

 

命令与消息

好的,那么命令到底有什么区别?为了进一步理解这一点,让我们考虑以下场景。

我们希望设计一个类似亚马逊的订购系统,允许用户下单。当客户下单后,应该发货并向客户发送一封电子邮件通知他们购买成功。

现在这是一个相当简单的工作流,但是“命令”和“消息”如何融入其中呢?

嗯,如果我们把它分解,我们可以想象出以下内容:

操作 命令还是消息?
下单 启动工作流的命令
发货 可能推进工作流的消息 
向客户发送订单确认邮件  可能推进工作流的消息
 

还不清楚?

 

好的,让我们尝试用一些词语。我发现以下思维方式很有帮助:

命令:将要发生的事情。通常这将是 1..1 类型消息。

消息:可能继续当前长时间运行的进程的事情。这通常会发送到当前端点,并且很可能是一个 1..1 类型的消息。

 

注意:您中比较敏锐的人可能会认为,与其拥有一个大型工作流,我们可能会有多个端点,一个用于 DispatchOrder,另一个用于 EmailConfirmationToClient。不同之处在于,如果您选择这条路线,您需要向一个新的端点发送消息,而不是在本地发送

NServiceBus 还支持发布事件的理念,这通常用于当您希望多个端点对某个事件采取行动时。事实上,NServiceBus 将这种类型的消息区分为“事件”,并通过 Bus.Publish(...) 广播给所有感兴趣的端点,这不足为奇。演示应用程序中没有这样做,但请放心,这完全可行,并且通过使用 NServiceBus 可以轻松实现。重要的是,这最终是一个架构决策,只有您自己才能做出。

有关通用消息类型区别的更多信息,请参阅此处此处。尽管它并未完全涵盖NServiceBus消息概念,但这些链接仍然值得一读

托管总线 

NServiceBus 提供了一些托管解决方案,您可以选择: 

  • 运行独立的 NServiceBus.Host.exe 可执行文件 
  • 在 Windows 服务中托管 NServiceBus,该服务仅运行独立的 NServiceBus.Host.exe 可执行文件
  • 或完全自主托管。

本文中的代码使用了最后一个选项,即我们自主托管总线。其方法如下:

演示文章的托管代码根据您所说的端点略有不同。

命令发送者

private static IBus CreateBusFactory()
{
    Configure.Transactions.Enable();
    Configure.Serialization.Xml();

    //Disable Sagas to stop NSB from trying to use RavenDB in this app, which we don't need since
    //we are using MSMQ subscription storage in this app
    //See : http://support.nservicebus.com/customer/portal/articles/860436-publish-subscribe-configuration
    Configure.Features.Disable<Sagas>();

    var bus = Configure.With()
        .DefaultBuilder()
        .Log4Net()
        .MsmqSubscriptionStorage()
        .DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
        .UseTransport<Msmq>()
        .UnicastBus()
        //See : http://stackoverflow.com/questions/18344962/nservicebus-4-0-3-how-to-disable-ravendb
        .DisableTimeoutManager()
        .SendOnly();

    return bus;
}
    Private Shared Function CreateBusFactory() As IBus
        Configure.Transactions.Enable()
        Configure.Serialization.Xml()

        'Disable Sagas to stop NSB from trying to use RavenDB in this app, which we don't need since
        'we are using MSMQ subscription storage in this app
        'See : http://support.nservicebus.com/customer/portal/articles/860436-publish-subscribe-configuration
        Configure.Features.Disable(Of Sagas)()

        'See : http://stackoverflow.com/questions/18344962/nservicebus-4-0-3-how-to-disable-ravendb
        Dim bus = Configure.[With]() _
                  .DefaultBuilder() _
                  .Log4Net() _
                  .MsmqSubscriptionStorage() _
                  .DefiningCommandsAs(Function(t) t.[Namespace] IsNot Nothing _
                                          AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
                  .UseTransport(Of NServiceBus.Msmq)() _
                  .UnicastBus() _
                  .DisableTimeoutManager() _
                  .SendOnly()

        Return bus
    End Function

我们来分解一下,好吗(注意:本节只展示 C# 代码,抱歉)

流畅 API 部分 描述
Configure.Transactions.Enable(); 启用事务
Configure.Serialization.Xml(); 使用 Xml 序列化
Configure.Features.Disable<Sagas>(); 禁用 Saga 功能(发送方不需要运行长时间运行的工作流)
Configure.With() 启动标准配置器流式 API
.DefaultBuilder() 使用默认的 IOC 容器(撰写本文时为 AutoFac)
.Log4Net() 使用 Log4Net 日志记录
.MsmqSubscriptionStorage() 使用 MSMQ 订阅存储
.DefiningCommandsAs(t => t.Namespace != null &&
&    t.Namespace.StartsWith("Messages.Commands"))
将命令定义为位于所示命名空间中的命令(您也可以使用特殊的 NSB 属性,但我认为这种流畅的 API 更美观)
.UseTransport<Msmq>() 使用 MSMQ 传输
.UnicastBus() 创建单播总线
.DisableTimeoutManager() 禁用超时管理器。NSB 默认使用 RavenDB 作为超时管理器,我们不想为命令发送者设置它。因此,通过禁用超时管理器,我们实际上也停止了 Raven DB
.SendOnly(); 将此端点设置为“只发送”端点。这很好,因为此端点只“发送”一个命令

订阅者

namespace SQLSubscriber.IOC
{
    public class NServiceBusInstaller : IWindsorInstaller
    {
        public void Install(IWindsorContainer container, IConfigurationStore store)
        {

            Configure.Transactions.Enable();
            Configure.Serialization.Xml();
            Configure.Features.Enable<Sagas>();
          

            Configure.With()
                     .Log4Net()
                     .CastleWindsorBuilder(container)
                     .DefiningCommandsAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Commands"))
                     .DefiningMessagesAs(t => t.Namespace != null && t.Namespace.StartsWith("Messages.Mess"))
                     .UseTransport<Msmq>()
                        .PurgeOnStartup(false)
                     .UnicastBus()
                     .LoadMessageHandlers()
                     .UseNHibernateSubscriptionPersister() 
                     .UseNHibernateTimeoutPersister() 
                     .UseNHibernateSagaPersister() 
                     .UseNHibernateGatewayPersister()
                     .CreateBus()
                     .Start(() => Configure.Instance.ForInstallationOn<Windows>().Install());
        }
    }
}
Namespace IOC
    Public Class NServiceBusInstaller
        Implements IWindsorInstaller

        Public Sub IWindsorInstaller_Install(ByVal container As IWindsorContainer, ByVal store As IConfigurationStore) Implements IWindsorInstaller.Install
            Configure.Transactions.Enable()
            Configure.Serialization.Xml()
            Configure.Features.Enable(Of Sagas)()


            Configure.[With]() _
                .Log4Net() _
                .CastleWindsorBuilder(container) _
                .DefiningCommandsAs(Function(t) t.[Namespace] _
                                        IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Commands")) _
                .DefiningMessagesAs(Function(t) t.[Namespace] _
                                        IsNot Nothing AndAlso t.[Namespace].StartsWith("Messages.Mess")) _
                .UseTransport(Of NServiceBus.Msmq)() _
                .PurgeOnStartup(False) _
                .UnicastBus() _
                .LoadMessageHandlers() _
                .UseNHibernateSubscriptionPersister() _
                .UseNHibernateTimeoutPersister() _
                .UseNHibernateSagaPersister() _
                .UseNHibernateGatewayPersister() _
                .CreateBus() _
                .Start(Function() InstallForWindows())
        End Sub

        Private Function InstallForWindows()
            Configure.Instance.ForInstallationOn(Of Windows)().Install()
            ' return dummy value to get around VB.NET (expression does not return a value) in lambda
            ' see : http://stackoverflow.com/questions/2786753/vb-net-action-delegate-problem
            Return True
        End Function


    End Class
End Namespace

我们来分解一下,好吗(注意:本节只展示 C# 代码,抱歉)

流畅 API 部分 描述
CConfigure.Transactions.Enable(); 启用事务
Configure.Serialization.Xml(); 使用 Xml 序列化
Configure.Features.Enable<Sagas>();/td>启用 saga 功能,因为此端点希望能够运行长时间运行的工作流
Configure.With() 启动标准配置器流式 API
.Log4Net() 使用 Log4Net 日志记录
 .CastleWindsorBuilder(container) 使用特定的 Castle 容器
.DefiningCommandsAs(t => t.Namespace != null && 
     t.Namespace.StartsWith("Messages.Commands"))
将命令定义为所示命名空间中的命令
(您也可以使用特殊的 NSB 属性,但我认为这种流畅的 API
更美观)
.DefiningMessagesAs(t => t.Namespace != null &&
     t.Namespace.StartsWith("Messages.Mess"))
将命令定义为所示命名空间中的命令
(您也可以使用特殊的 NSB 属性,但我认为这种流畅的 API
更美观)
.UseTransport<Msmq>()
.PurgeOnStartup(false)
使用 MSMQ 传输
.UnicastBus() 创建单播总线
 .LoadMessageHandlers() 加载所有消息处理程序
.UseNHibernateSubscriptionPersister() 使用 SQL 订阅存储
.UseNHibernateTimeoutPersister() 使用 SQL 超时存储
.UseNHibernateSagaPersister() 使用 SQL saga 存储
.UseNHibernateGatewayPersister() 使用 SQL 网关持久化器
 .CreateBus() 创建总线
.Start(() => Configure.Instance.ForInstallationOn<Windows>().Install()); 在 Windows 上安装它

如您所见,NServiceBus 的设置是使用流畅的 API 完成的,这两种情况下的设置都非常相似。

 

 

 

配置 NServiceBus

本节将讨论如何根据上述演示应用程序要求配置 NServiceBus。如果您的要求与演示应用程序不同(100% 会不同),您肯定需要自己进行一些工作(因此请准备好自学)

演示应用程序命令发送者

这是命令发送者的配置 (App.Config):

<?xml version="1.0" encoding="utf-8"?>
<configuration>

  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" 
	type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" 
	type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="Logging" 
	type="NServiceBus.Config.Logging, NServiceBus.Core" />
    <section name="log4net" 
	type="log4net.Config.Log4NetConfigurationSectionHandler, log4net" />
  </configSections>


  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />

  <UnicastBusConfig ForwardReceivedMessagesTo="audit">
    <MessageEndpointMappings>
      <!-- If you want a remote machine endpoint for messages, 
	use syntax like Endpoint="C1951@Messages.Commands.CreatePurchaseOrderCommand.SQLSubscriber" -->
      <!--<add Assembly="Messages" 
	Type="Messages.Commands.CreatePurchaseOrderCommand" 
	Endpoint="RavenSubscriber" />-->
      <add Assembly="Messages" 
	Type="Messages.Commands.CreatePurchaseOrderCommand" 
	Endpoint="SQLSubscriber" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

  <Logging Threshold="ERROR" />


</configuration>

根据我们迄今为止所经历的一切,一件显而易见的事情是,命令发送器是一种 1..1 类型的消息。因此,当我们配置此 NSB 端点时,我们需要告知它消息将最终发送到何处。这通过上面看到的 `` 元素完成,其中我们指定了“`SQLSubscriber`”的 Endpoint。

这告诉 NSB,当前端点(`Commander`)将能够向配置的端点发送命令,在本例中,只有 `SQLSubscriber`。

还有一些其他的配置部分,描述如下:

MessageForwardingInCaseOfFaultConfig

本节只有一个值,即错误队列的名称。在我的例子中,这是一个非常通用的“`error`”值(你喜欢这个,我也喜欢,很酷吧)。你可以随心所欲地命名它。

日志记录

本节设置 NSB 的日志阈值。`Threshold` 是唯一的值(据我所知)。

 

 

演示应用程序订阅者

这是命令发送者的配置 (App.Config):

<?xml version="1.0"?>
<configuration>

  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" 
	type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core"/>
    <section name="UnicastBusConfig" 
	type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
    <section name="Logging" 
	type="NServiceBus.Config.Logging, NServiceBus.Core"/>
    <section name="log4net" 
	type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
  </configSections>

  <connectionStrings>
    <add name="NServiceBus/Persistence" connectionString="......." providerName="System.Data.SqlClient"/>
  </connectionStrings>

  <!-- specify the other needed NHibernate settings like below in appSettings:-->
  <appSettings>
    <!-- dialect is defaulted to MsSql2008Dialect, if needed change accordingly -->
    <add key="NServiceBus/Persistence/NHibernate/dialect" 
	value="NHibernate.Dialect.MsSql2008Dialect" />
    <!-- other optional settings examples -->
    <add key="NServiceBus/Persistence/NHibernate/connection.provider" 
	value="NHibernate.Connection.DriverConnectionProvider" />
    <add key="NServiceBus/Persistence/NHibernate/connection.driver_class" 
	value="NHibernate.Driver.Sql2008ClientDriver" />
  </appSettings>
  

  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error"/>

  <UnicastBusConfig ForwardReceivedMessagesTo="audit">
    <MessageEndpointMappings>
    </MessageEndpointMappings>
  </UnicastBusConfig>


  <Logging Threshold="ERROR"/>

 
</configuration>

我们可以看到 `SQLSubscriber` 的 App.Config 与 `Commander` 的有很大不同。这是为什么呢?

嗯,首先,本文主要围绕如何在 NSB 中创建长时间运行的工作流 (Sagas),我选择在同一个端点中执行工作流(如前所述,这是一个设计决策)。因此,我不需要将 NSB 消息/命令/事件发送到当前端点之外,这就解释了为什么您看不到 `` 的任何值

这里另一个有趣的地方是,由于此端点将处理长时间运行的工作流 (NSB saga),我们需要配置几个**强制性**(在使用 NHibernate SQL 存储时)应用程序设置,以告知 NSB 它可以用于存储长时间运行的工作流 (NSB saga) 的 SQL Server 实例。

然而,一旦您克服了这两个差异,`Command` 和 `SQLSubscriber` 项目之间就没有其他差异了。

 

运行演示应用程序

为了运行所附的演示应用程序,您需要执行以下操作:

  1. 确保您已完成先决条件设置
  2. 在调试模式下启动 SQLSubscriber 项目(或 VB.SQLSubscriber)
  3. 在调试模式下启动 Commander 项目(或 VB.Commander)
  4. 在 Commander 文本框中输入一些文本
  5. 点击按钮,这应该会启动 Saga。您可以通过在 Saga (SQLSubscriber) Handle(..) 方法中设置断点来确认。

 

NServiceBus 基础设施

如果你已经到了这里,并且开始思考我们到目前为止所讨论的一切,你可能会是那些会说:

“嗯,很棒,但是每个环境(我工作的地方有 6 个环境)需要设置的所有队列怎么办?你知道有人必须创建所有这些队列,对吧?我不想做!”

是的,你说得对,肯定有人需要做。幸运的是,NSB 会为您代劳,当您运行演示应用程序时,您应该会看到这样的内容:

我当然没有设置这个。实际上发生的是,NSB 读取您的配置并为您完成繁重的工作,并很贴心地为您创建队列。太棒了。感谢 NSB。

 

Saga?这是什么

好的,现在我们终于到了本文的重点。我想向您介绍如何使用 NServiceBus 创建可能长时间运行的工作流。那么什么是“Saga”呢?在 NServiceBus 中,“Saga”是一个长时间运行的工作流,它具有以下基本特征:

  • 它可能是(但并非总是)长时间运行的
  • 由许多不同的步骤组成(这些步骤在 NServiceBus 中编码为消息处理程序)
  • 可能会持久化到某个后端存储(RavenDB / MSMQ / SQL Server)。这确保了 Sagas 的持久性
  • 由于底层 MSMQ 传输的使用,Sagas 是可靠的
  • 停止后可以重新启动
  • 可以完成

所以,这就是 Saga 的本质

 

简单 Saga 逐步讲解

对于演示应用程序,我虚构了一个简单的(潜在的)长时间运行的工作流。我之所以说“潜在的”,是因为演示应用程序显然做得不多,因为我希望它保持简单,但它确实向您展示了思想/概念,这始终是最重要的学习内容。

这是演示应用程序创建的 Saga 的图表:

这是 Saga 的完整代码:

using System.Threading;
using Messages.Commands;
using Messages.Mess;
using NServiceBus;
using NServiceBus.Saga;
using Services;

namespace SQLSubscriber.Handlers
{
    public class MySaga : Saga<CreatePurchaseOrderSagaData>,
       IAmStartedByMessages<CreatePurchaseOrderCommand>,
       IHandleMessages<CheckPurchaseOrderStatusMessage>
    {
        public IPurchaseService PurchaseService { get; set; }

        public override void ConfigureHowToFindSaga()
        {
            ConfigureMapping<CreatePurchaseOrderCommand>
                (message => message.PurchaseOrderId)
                    .ToSaga(saga => saga.PurchaseOrderId);
            ConfigureMapping<CheckPurchaseOrderStatusMessage>
                (message => message.PurchaseOrderId)
                    .ToSaga(saga => saga.PurchaseOrderId);
        }

        public void Handle(CreatePurchaseOrderCommand message)
        {
            this.Data.TransactionId = PurchaseService.Initialise();
            this.Data.PurchaseOrderId = message.PurchaseOrderId;
            PurchaseService.Purchase(this.Data.TransactionId, 
                string.Format("1 * {0}", message.PurchaseOrderId));
            Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
                          {
                              PurchaseOrderId = this.Data.PurchaseOrderId
                          });
        }


        public void Handle(CheckPurchaseOrderStatusMessage message)
        {
            if (this.Data.Retries < 3)
            {
                bool isCompleted = PurchaseService.IsCompleted(this.Data.TransactionId);
                if (isCompleted)
                {
                    base.MarkAsComplete();
                }
                else
                {
                    Thread.Sleep(2000);
                    this.Data.Retries++;
                    Bus.SendLocal(new CheckPurchaseOrderStatusMessage()
                                  {
                                      PurchaseOrderId = this.Data.PurchaseOrderId
                                  });
                }
            }
            else
            {
                base.MarkAsComplete();
            }
        }
    }
}
Imports System.Threading
Imports VB.Services.VB.Services
Imports VB.Messages.Mess
Imports VB.Messages.Commands
Imports NServiceBus
Imports NServiceBus.Saga

Namespace Handlers

    Public Class MySaga
        Inherits Saga(Of CreatePurchaseOrderSagaData)
        Implements IAmStartedByMessages(Of CreatePurchaseOrderCommand)
        Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage)

        Public Overrides Sub ConfigureHowToFindSaga()

            ConfigureMapping(Of CreatePurchaseOrderCommand) _
                (Function(message) message.PurchaseOrderId) _
                .ToSaga(Function(saga) saga.PurchaseOrderId)
            ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
                (Function(message) message.PurchaseOrderId) _
                .ToSaga(Function(saga) saga.PurchaseOrderId)

        End Sub


        Public Property PurchaseService() As IPurchaseService

        Public Sub Handle(ByVal message As CreatePurchaseOrderCommand) _
            Implements IHandleMessages(Of CreatePurchaseOrderCommand).Handle

            Me.Data.TransactionId = PurchaseService.Initialise()
            Me.Data.PurchaseOrderId = message.PurchaseOrderId
            PurchaseService.Purchase(Me.Data.TransactionId, String.Format("1 * {0}", message.PurchaseOrderId))
            Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
            checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
            Bus.SendLocal(checkPurchaseOrderStatusMessage)

        End Sub

        Public Sub Handle(ByVal message As CheckPurchaseOrderStatusMessage) _
            Implements IHandleMessages(Of CheckPurchaseOrderStatusMessage).Handle

            If Me.Data.Retries < 3 Then
                Dim isCompleted As Boolean = PurchaseService.IsCompleted(Me.Data.TransactionId)
                If isCompleted Then
                    MyBase.MarkAsComplete()
                Else
                    Thread.Sleep(2000)
                    Me.Data.Retries += 1
                    Dim checkPurchaseOrderStatusMessage = New CheckPurchaseOrderStatusMessage()
                    checkPurchaseOrderStatusMessage.PurchaseOrderId = Me.Data.PurchaseOrderId
                    Bus.SendLocal(checkPurchaseOrderStatusMessage)
                End If
            Else
                MyBase.MarkAsComplete()
            End If

        End Sub


    End Class
End Namespace

令人惊讶的是,这里只有几个概念需要学习。

 

概念 1:我们如何创建 Saga

这非常简单,我们只需继承 NServiceBus 中的 `Saga` 类(或 VB.NET 用户的 `Saga(Of CreatePurchaseOrderSagaData)`)。其中泛型是与 Saga 关联的状态数据类型(稍后详述)。

 

概念 2:Saga 如何启动

要启动 Saga,我们只需实现 NServiceBus 接口 `IAmStartedByMessages`(或 VB.NET 用户的 `IAmStartedByMessages(Of CreatePurchaseOrderCommand)`)。这意味着每当 NServiceBus 在总线上看到类型为 `T` 的消息时,它将启动与类型 `T` 相关联的 Saga。

当您实现 `IAmStartedByMessages` 接口(或 `IAmStartedByMessages(Of CreatePurchaseOrderCommand)`)时,您还将获得一个必须实现的 `Handle(..)` 方法。这是启动 Saga 的消息类型的消息处理程序代码。

 

概念 3:Saga 如何继续

要启动 Saga,我们只需实现 NServiceBus 接口 `IHandleMessages`(或 VB.NET 用户的 `IHandleMessages(Of CreatePurchaseOrderCommand)`)。这意味着每当 NServiceBus 在总线上看到类型为 `T` 的消息时,它将启动与类型 `T` 相关联的 Saga。

当您实现 `IHandleMessages` 接口(或 `IAmStarteIHandleMessagesdByMessages(Of CreatePurchaseOrderCommand)`)时,您还将获得一个必须实现的 `Handle(..)` 方法。这是继续 Saga 的消息类型的消息处理程序代码。

 

概念 4:我们如何为 Saga 存储状态

我们稍后会更详细地介绍这一点。但正如我已经说过的那样,Sagas 和 NServiceBus 提供持久性消息传递。因此,我们希望我们的 Saga 状态也能存储起来,这并非不合理。这已经考虑到了,并且像写入与当前 Saga 关联的状态对象一样简单。

 this.Data.PurchaseOrderId = message.PurchaseOrderId;
Me.Data.PurchaseOrderId = message.PurchaseOrderId

 

概念 5:Saga 如何完成

最终你会想要完成 Saga,那么我们该如何做呢?

幸运的是,这也很容易操作,我们只需这样做:

base.MarkAsComplete();
MyBase.MarkAsComplete()

 

Saga 状态存储

您可能会问自己的一件事是,NServiceBus 如何存储与 Saga 相关的状态。嗯,其中的秘密在于使用一个简单的属性包类型类。对于演示应用程序,它大致如下:

using System;
using NServiceBus.Saga;

namespace SQLSubscriber.Handlers
{
    public class CreatePurchaseOrderSagaData : IContainSagaData
    {
        // the following properties are mandatory
        public virtual Guid Id { get; set; }
        public virtual string Originator { get; set; }
        public virtual string OriginalMessageId { get; set; }

        // all other properties you want persisted - remember to make them virtual

        public virtual int PurchaseOrderId { get; set; }
        public virtual string Description { get; set; }
        public virtual string ConfirmationDescription { get; set; }
        public virtual int Retries { get; set; }
        public virtual Guid TransactionId { get; set; }
    }
}

Imports NServiceBus.Saga

Namespace Handlers
    Public Class CreatePurchaseOrderSagaData
        Implements IContainSagaData

        ' the following properties are mandatory
        Public Overridable Property Id() As Guid Implements IContainSagaData.Id
        Public Overridable Property Originator() As String Implements IContainSagaData.Originator
        Public Overridable Property OriginalMessageId() As String Implements IContainSagaData.OriginalMessageId

        ' all other properties you want persisted - remember to make them virtual
        Public Overridable Property PurchaseOrderId() As Int32
        Public Overridable Property Description() As String
        Public Overridable Property ConfirmationDescription() As String
        Public Overridable Property Retries() As Int32
        Public Overridable Property TransactionId() As Guid

    End Class
End Namespace

可以看出,您必须提供一些强制属性。

  • public virtual Guid Id { get; set; }
  • public virtual string Originator { get; set; }
  • public virtual string OriginalMessageId { get; set; }

这些对于 NServiceBus 处理 Saga 数据的方式是不可或缺的。其他属性是您需要满足 Saga 要求的任何内容。对我来说,这些属性是:

  • public virtual int PurchaseOrderId { get; set; }
  • public virtual string Description { get; set; }
  • public virtual string ConfirmationDescription { get; set; }
  • public virtual int Retries { get; set; }
  • public virtual Guid TransactionId { get; set; }

所以,一旦你有了这个状态对象,你就可以在你的 saga 代码中像这样写入它:

 this.Data.TransactionId = PurchaseService.Initialise();
Me.Data.TransactionId = PurchaseService.Initialise()

所以这很酷,又好又简单。但是幕后到底发生了什么呢?NServiceBus 是如何使用这个状态的呢?

很简单,当 saga 需要持久化数据时,NServiceBus 会使用这些状态信息,并自动将其持久化到您选择的后端持久化存储(无论是 RavenDB、SQL Server 等)。我重复一遍,这是由 NServiceBus 自动为您完成的,您无需自己管理。

 

SQL Server Saga 状态存储

话虽如此,如果您选择走 SQL Server 路线,在深入了解 Saga 代码的细节之前,您确实需要完成两件事。

步骤 1:为 NServiceBus 持久化创建数据库

创建一个新的 SQL Server 数据库。对于演示应用程序,它被命名为“NServiceBusPersistence”。您需要创建自己的数据库,但如果您决定将其命名为其他名称,请记住更改下面所示的步骤 2 中的 App.Config 设置。

 

步骤 2:App.Config

确保您已经在 App.Config 中指定了 SQL Server 相关设置。操作如下:

<?xml version="1.0"?>
<configuration>

  <connectionStrings>
    <add name="NServiceBus/Persistence" 
	connectionString="Data Source=YOUR_DATABASE_NAME;
		Initial Catalog=NServiceBusPersistence;Integrated Security=True;
		Timeout=180;MultipleActiveResultSets=true;" 
	providerName="System.Data.SqlClient"/>
  </connectionStrings>

  <!-- specify the other needed NHibernate settings like below in appSettings:-->
  <appSettings>
    <!-- dialect is defaulted to MsSql2008Dialect, if needed change accordingly -->
    <add key="NServiceBus/Persistence/NHibernate/dialect" 
	value="NHibernate.Dialect.MsSql2008Dialect" />
    <!-- other optional settings examples -->
    <add key="NServiceBus/Persistence/NHibernate/connection.provider" 
	value="NHibernate.Connection.DriverConnectionProvider" />
    <add key="NServiceBus/Persistence/NHibernate/connection.driver_class" 
	value="NHibernate.Driver.Sql2008ClientDriver" />
  </appSettings>
  ......
  ......
  ......
  ......
  ......
</configuration>

一旦您完成了这两件事,NServiceBus 将自动创建管理 Saga 状态存储所需的数据库表,它还会根据需要使用相关数据填充表。

例如,这是演示应用程序运行时的情况。请看有一个“CreatePurchaseOrderSagaData”表。NServiceBus 为我们创建了它。

我们还可以看到,在 Saga 运行时,此表会填充数据。同样,这全部由 NServiceBus 处理。

 

为存储的数据查找正确的 Saga

好的,我们已经成功保存了一些 Saga 数据。但是 NServiceBus 究竟如何知道这些数据属于所有正在运行的 Saga 中的哪一个呢?嗯,NServiceBus 自动创建的表让它知道 Saga 的类型,这很好。但是我们可能同时运行许多许多相同 Saga 的实例,那么我们如何知道存储的数据是针对哪一个的呢?嗯,这听起来像个问题,但如果您还记得,我们必须在 Saga 状态对象中存储一些强制性数据和一些自定义数据。我们可以使用其中任何一个来告诉 NServiceBus 如何找到正确的 Saga。这在代码中如下所示:

public override void ConfigureHowToFindSaga()
{
    ConfigureMapping<CreatePurchaseOrderCommand>
	(message => message.PurchaseOrderId)
            .ToSaga(saga => saga.PurchaseOrderId);
    ConfigureMapping<CheckPurchaseOrderStatusMessage>
	(message => message.PurchaseOrderId)
            .ToSaga(saga => saga.PurchaseOrderId);
}
Public Overrides Sub ConfigureHowToFindSaga()

    ConfigureMapping(Of CreatePurchaseOrderCommand) _
        (Function(message) message.PurchaseOrderId) _
            .ToSaga(Function(saga) saga.PurchaseOrderId)
    ConfigureMapping(Of CheckPurchaseOrderStatusMessage) _
        (Function(message) message.PurchaseOrderId) _
            .ToSaga(Function(saga) saga.PurchaseOrderId)

End Sub

可以看出,我使用了我存储的一个自定义状态值,即“PurchaseOrderId”。这对于我的演示应用程序来说是一个唯一值。因此,这将很容易找到正确的 Saga。

工作完成得很好!!!!

 

更复杂的存储怎么办?

如果您想存储更复杂的 Saga 数据,其中可能有很多相互关联的表,而您真的懒得去详细阐述,您可能希望切换到 RavenDB Saga 存储,因为它允许存储任意对象。话虽如此,即使您使用关系数据库(如 SQL Server),您也有选择。XML 将是一个不错的选择。事实上,有人已经这样做了,并写了一篇关于这个主题的非常好的文章,您可以在这里找到:

http://www.make-awesome.com/2010/09/implementing-an-nservicebus-saga-persister/

我很喜欢那篇链接的文章,觉得它写得很好,如果您需要使用关系数据库,它非常有用。

 

就这些

无论如何,这就是我目前想说的全部。我确实计划写一篇关于使用 Durandal 的小文章(只是为了比较它与 Knockout.js 和 Angular.js 的体验),除此之外,我将主要尝试学习 F# 并从初学者的角度撰写博客。所以如果您喜欢这篇文章,并且觉得想投一票/评论,那将非常受欢迎。感谢您的阅读。再见。

© . All rights reserved.