65.9K
CodeProject 正在变化。 阅读更多。
Home

在 Azure Service Fabric 中使用 NServiceBus

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2018 年 6 月 6 日

CPOL

9分钟阅读

viewsIcon

17696

downloadIcon

118

关于 NServiceBus、WebAPI 和 Azure Service Fabric 系列文章的第三篇

引言

了解 Service Fabric (SF) 的一个很好的起点是这里

本文是系列文章中的第三篇,介绍了如何使用 NServiceBus(NSB) 构建松散耦合的应用程序,并将其托管在 Azure Service Fabric 中。前两篇文章是:

文章 1 描述了 NSB 如何(但也许不应该)用作客户端和服务器之间的通信链接。这个解决方案的问题在于它非常单一,即客户端和服务器之间存在强耦合。正如你可能知道的,在构建软件应用程序时应该努力实现松散耦合。

文章 2 描述了如何将 Web API 应用程序移植到 SF

Using the Code

下载解决方案后,您需要使用您的 Azure SQL Server 和 Azure 存储账户更新连接字符串。在 Azure 存储账户初始化之前,应用程序可能需要重新启动几次。关于 SQL Server,EntityFramework 将为您创建数据库并填充表。为了允许应用程序访问您的 Azure SQL Server,您必须使用您使用的 IP 地址更新 Azure SQL Server 防火墙。将会有一个异常,您可以从中读取您正在使用的 IP 地址。使用它来更新 Azure SQL Server 防火墙设置。NSB 许可证文件已包含在应用程序中,有效期至 2018-08-17。

下面是需要自定义的代码。它位于项目 Shared 文件 Helpers.cs 中。

public static string GetSqlConnection()
{
    return "Server=tcp:sireusdbserver.database.windows.net,1433;
    Initial Catalog=companyvehicles;Persist Security Info=False;User ID=xxx;Password=yyy;
    MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;";
}

public static string GetAspNetDbConnection()
{
    return "Server=tcp:sireusdbserver.database.windows.net,1433;
    Initial Catalog=aspnetdb;Persist Security Info=False;User ID=xxx;Password=yyy;
    MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;";
}

public static string GetStorageConnection()
{
    return "DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=yyy;
            EndpointSuffix=core.windows.net";
}

如果数据库 aspnetdbcompanyvehicles 不存在,EntityFramework 将自动创建它们。

您也可以将应用程序部署到 Azure。然后,在项目 Client 文件 appsettings.json 中,URL 需要相应更改。最初,appsettings.json 如下所示,WebAPI 位于 localhost:83

{
  "ConnectionStrings": {
    //"ApiAddress": "http://companyvehiclesfabric.northeurope.cloudapp.azure.com:83"
    "ApiAddress": "https://:83"
  },
  "Logging": {
    "IncludeScopes": false,
    "LogLevel": {
      "Default": "Warning"
    }
  }
}

背景

这个应用程序 CompanyVehiclesFabric 在第一篇文章和第二篇文章中不断发展,从一个带有 SQLite 数据库的 .NET Core Web API 应用程序发展到基于 Web API、NServiceBus、SQL Server、垂直切片架构、Azure 存储队列和 Azure 存储存在的 Azure Service Fabric 托管应用程序。

学习 SF 和 NSB 时需要克服许多障碍。有些来自误导或缺失的文档,有些来自示例或框架中的 bug,还有一些是由于 .NET Core、NSB 和 SF 之间的不兼容性造成的。

使用 Azure Service Fabric

SF 定价

SF 价格昂贵。三个虚拟机组成的最小配置每月费用超过 500 美元。为了尽量降低成本,我建议程序员从 Visual Studio 创建一个 SF 集群,用于测试和开发,完成后,从 Azure 中删除它。在下一个工作日,您可以在需要时重新创建集群。这可能是一种困难的工作方式,但它将使 SF 虚拟机的成本得到控制。

SF 派对集群

控制成本的另一个选择是使用 SF Party Cluster。使用派对集群的说明至少可以说还可以改进,但这里有一些理解说明的技巧。首先,您需要一个 github 帐户。然后转到

通过 github 登录后,您会到达一个页面,该页面告诉您下载证书。在那里,您会读到“使用 PFX 链接下载证书”。但是 PFX 链接在哪里呢?它在上一页!下载证书后,您必须将其安装到您的计算机上。说明如下:

在 Windows 计算机上,将 PFX 安装在 CurrentUser\My 证书存储中。

但是,一种更简单的方法是右键单击证书并选择“安装 PFX”。现在您可以使用 Party Cluster 一个小时,然后您必须再次执行相同的过程。

SF 无状态和有状态服务

公司车辆 Fabric 应用程序是一个无状态应用程序。在有状态应用程序中,数据存储在应用程序中分布在多个分区中的可靠集合中。我更喜欢使用传统的数据库(例如 SQL Server)来存储数据,而不是将数据存储在可靠集合中。我所见过的有状态应用程序示例在数据存储和检索方面对我来说似乎非常复杂。有状态应用程序也可以与外部数据库一起用作数据缓存。

垂直切片架构

垂直切片架构是一种实现命令查询职责分离 (CQRS) 的方法,从而使应用程序实体之间的耦合更加松散。公司车辆 Fabric 应用程序通过让 WebAPI 直接通过 GET 命令查询数据库,并通过 NSB 执行所有 PUT/POST/DELETE 命令来实现 CQRS。垂直切片架构在垂直切片架构页面中进行了描述。在文章 1 和 2 中,数据库中有两个表,CarsCompanies,其中一个 Company 可以拥有多辆 Cars。为了将此架构转换为垂直切片架构,所有可更新属性都将拥有读写表。这通过以下 Car 对象进行说明,其外观如下:

public class Car
    {
        public Guid CarId { get; set; }
        public Guid CompanyId { get; set; }
        public string CreationTime { get; set; }
        public string VIN { get; set; }
        public string RegNr { get; set; }
        public bool Online { get; set; }
        public bool Locked { get; set; }
        public long LockedTimeStamp { get; set; }
        public int Speed{ get; set; }
        public int QueueLength { get; set; }
   }

前 4 个属性是“只读”并在对象创建期间设置。属性“Online”和“Speed”可以由应用程序更新,并在 GUI 中可见。属性“Locked”是一个标志,当用户即将编辑或删除 Car 对象时设置。Locked 用于防止其他用户在第一个用户编辑对象时更改对象。为了实现垂直切片架构,可更新属性(OnlineSpeedLocked)拥有自己的读写表。Company 对象中的 AddressName 属性可以更新,因此在数据库中拥有读写表,如下图所示:

要真正理解其工作原理,请参阅代码。

公司车辆 Fabric 解决方案

该解决方案由公司车辆 Fabric 应用程序和客户端组成,请参考下图:

要将解决方案本地部署到您的 SF 开发集群,请右键单击 CompanyVehiclesFabric 并选择 Publish。部署后,右键单击项目 Client 并选择 Debug > Start new instance。客户端是一个 .NET Core MVC Web 应用程序,通过 http 与 Service Fabric 集群中的 WebAPI 进行通信。在集群内部,WebAPI 通过 NSB 与服务器通信,也直接与数据库通信。服务器通过连接字符串连接到 SQL Server 数据库,并通过 NSB 连接到 WebAPI,如下所示:

图形用户界面

 

这是主视图,显示了所有公司和所有汽车的列表。用户登录后,有三个按钮可用。这些按钮可用于:

  • 显示所有汽车的摘要以及 NSB 消息队列的长度
  • 随意更新所有汽车的速度和在线/离线状态
  • 重新初始化数据库

“自动化超速”按钮

打开一个新窗口并开始向汽车发送速度和在线/离线消息,每秒一条消息。使用此按钮可以查看随着每秒发送更多消息,每辆汽车队列中的消息数量是如何增加的。

“显示消息队列”按钮

此按钮打开一个新窗口,其中显示了每辆车队列中的消息数量。代码如下所示:

   static async Task<Dictionary<string, int>> GetQueueLenghtForEachCar()
   {
      Dictionary<string, int> queueLengthPerCar = new Dictionary<string, int>();
      CloudStorageAccount storageAccount = QueueStorage.Common.CreateStorageAccountFromConnectionString(Helpers.GetStorageConnection());
      CloudQueueClient cloudQueueClient = storageAccount.CreateCloudQueueClient();
      CloudQueue queue = cloudQueueClient.GetQueueReference(Helpers.ServerEndpoint);
      IEnumerable<CloudQueueMessage> peekedMessages = await queue.PeekMessagesAsync(32);
      if (peekedMessages != null)
      {
          foreach (var msg in peekedMessages.ToList())
          {
              var carId = GetCarIdFromMessage(msg);
              if (CarIdAlreadyInQueue(queueLengthPerCar, carId))
              {
                  queueLengthPerCar[carId]++;
              }
              else
              {
                  queueLengthPerCar.Add(carId, 1);
              }
          }
      }
      return queueLengthPerCar;
    }

要真正理解其工作原理,请参阅代码。

编辑汽车

当用户想要更新汽车的速度或在线状态时,汽车对象将在数据库中被锁定。锁定命令通过 NSB 发送,并以短暂的延迟到达数据库。当锁定存储在数据库中时,“保存”按钮将启用,并且可以保存更改。在锁定生效之前,“保存”按钮被禁用。保存生效后,数据库会立即更新,但这不会由于 NSB 中的延迟而直接显示在概述中。相反,在标题“待处理”下方会看到文本“更新”,如下图所示。如果编辑时间超过 40 秒,锁定将被移除,编辑将被禁用。届时,在标题“待处理”下方的概述中将显示文本“超时”。

 

NSB 传输和持久性

在 Company Vehicles Fabric 中,配置了两种传输队列。第一个传输队列用于发送命令消息,第二个是用于具有更高优先级的事件消息的“优先级”队列。在应用程序中,共有 15 条命令消息和 2 条事件消息。

用于实现 IEvent 的优先级消息传输队列

ClearDatabaseUpdateCarLockedStatus 消息是事件消息,即实现了 IEvent。发布/订阅模式用于优先级队列。因此,我们需要一个能够处理多播、与 Net Core 兼容并能在 Service Fabric 上运行的传输和持久性。这些要求由 AzureStoragePersistenceAzureStorageQueueTransport 满足。

事件消息使用发布/订阅模式。然而,在此应用程序中,只有一个订阅者侦听已发布的事件。

为 NSB 发布/订阅配置 AzureStoragePersistence 是一门独立的学问。在 Company Vehicles Fabric 应用程序中,优先级队列的持久性配置如下:

var endpointConfiguration.UsePersistence<AzureStoragePersistence, StorageType.Subscriptions>()
     .ConnectionString(Helpers.GetStorageConnection());

endpointConfiguration.UsePersistence<AzureStoragePersistence, StorageType.Timeouts>()
     .ConnectionString(Helpers.GetStorageConnection())
     .CreateSchema(true)
     .TimeoutManagerDataTableName("TimeoutManagerPriority")
     .TimeoutDataTableName("TimeoutDataPriority")
     .CatchUpInterval(3600)
     .PartitionKeyScope("2018052400");


var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>()
                        .ConnectionString(Helpers.GetStorageConnection());

var routing = transport.Routing();
routing.RegisterPublisher(typeof(UpdateCarLockedStatus), "api-publisher");
routing.RegisterPublisher(typeof(ClearDatabase), "api-publisher");


Endpoint.Start(endpointConfiguration).GetAwaiter().GetResult(); ;

用于发送实现 ICommand 的消息的传输队列

命令消息通过非优先级队列发送到指定端点。

var endpointConfiguration.UsePersistence<AzureStoragePersistence>()
                              .ConnectionString(Helpers.GetStorageConnection());

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>()
              .ConnectionString(Helpers.GetStorageConnection());

Endpoint.Start(endpointConfiguration).GetAwaiter().GetResult();

 

有两种方式可以说明每条消息的目的地

  • Startup.cs
transport.Routing().RouteToEndpoint(assembly: typeof(UpdateCarOnlineStatus).Assembly, 
@namespace: "Shared.Messages.Commands", destination: "server-endpoint");
  • 在控制器中,发送消息时
await endpointInstance.Send("server-endpoint”, updateCarOnlineStatus).ConfigureAwait(false);

要了解其工作原理,请参阅代码。

NSB 许可证文件和诊断日志文件夹

运行 NSB 需要许可证。许可证文件 License.xml 存储在 PackageRoot/Data 文件夹中,并在启动时由 NSB 读取。NSB 还需要一个用于写入诊断日志的文件夹,并选择了 PackageRoot 文件夹。有关详细信息,请参阅代码。

endpointConfiguration.SetDiagnosticsPath(Directory.GetParent(pathToData).ToString()); 
endpointConfiguration.LicensePath(pathToData + "\\License.xml");

NSB 7.0.0-rc0002 中的错误

如果您的 Visual Studio 默认区域性是“sv-SE”,那么在 Main() 中,您必须编写

new ArgumentException();
Thread.CurrentThread.CurrentUICulture = new CultureInfo("en-US");

SF 示例中的 Bug

文章 2 中的应用程序类似于此处找到的 SF Voting 示例:这里

但是,此示例在 Azure 中运行时存在一个错误。有状态的投票示例在本地集群中运行完美,但在 Azure 中,当集群由 Visual Studio 使用证书安全性设置时,投票示例将无法正常运行。这是由于文章 2 中已更正的错误。(投票使用反向代理 https://:19081,而它在 Azure 中应该使用 https://:19081。)

历史

  • 2018-06-06:第一版
© . All rights reserved.