CQRS 事件源模式与 NServiceBus、ASP.NET MVC 和 SignalR






4.98/5 (21投票s)
本文档可能对尝试使用 NServiceBus 的用户有所帮助。它提供了创建发布者-订阅者对(通过 ServiceMatrix)的详细说明。解决方案架构师可能会发现这种技术在许多 CQRS 场景中很有用,是 WCF 的一个很好的替代方案。
目录
注释
本文档可能对试图使用 NServiceBus (NSB) 的初学者和经验丰富的开发人员都有帮助。它提供了使用 ServiceMatrix 实现分布式发布者-订阅者模式的详细说明。此外,本文档整合了来自不同来源的各种 SignalR 示例,并介绍了简单的服务器到客户端消息传输。解决方案架构师可能会发现这种技术在许多 CQRS 场景中很有用,是 WCF 的一个很好的替代方案。
背景
CQRS(命令查询职责分离)是一种基于命令、查询、域模型使用的模式,请参阅 https://codeproject.org.cn/Articles/555855/Introduction-to-CQRS。 通常,在具体实现时,会创建类集以通过某种命令总线发送消息来建立发布者-订阅者交互。本文提出了一种实现 CQRS 的架构原型。有许多文章描述了各种 CQRS 方法,但它们通常都以大量代码结束,这些代码通常不够灵活,有时甚至过度设计。此外,每篇文章都试图解决不同的问题,引入新的范例,因此缺乏通用方法。
在现实世界中,发布者和订阅者通常位于不同的机器上,需要进程间通信来进行消息传输。让我们简要讨论一些进程间通信的替代方案及其优缺点。
WCF - 它提供了清晰的进程间通信 API,在 Visual Studio IDE 中有良好的支持,并与其他 Microsoft 技术进行了顺畅的集成。非 HTTP 绑定可能会引起部署问题(防火墙问题)。也没有通用的客户端支持多个浏览器和平台。
NServiceBus,请参阅 http://particular.net, 是一种基于 MSMQ(异步消息处理)的服务器端技术,与 WCF 不同,它实现了流行的架构范例,如分布式发布者-订阅者和 CQRS。NServiceBus 作为一种服务器端技术,缺乏对客户端脚本的支持,因此不能直接从浏览器使用。
在实际场景中,订阅者可能使用不同的技术,如 JavaFX、HTML、JavaScript、WPF,它们也可能位于不同的平台。在大多数情况下,发布者和订阅者之间的通信会涉及到防火墙,这使得服务器向客户端推送通知变得非常困难。本文解决了这个问题,并提出了一些允许通过实现某些轮询技术来向位于防火墙后的订阅者推送通知的客户端技术。
REST HTTP vs WebSockets. REST(表述性状态转移)是一种可用于构建软件的架构风格,在这种软件中,客户端(用户代理)可以向服务(终结点)发出请求。用户代理与资源进行交互。REST 使用 HTTP,一种基于 TCP/IP 的应用层协议,它利用 GET、POST、PUT 和 DELETE HTTP 方法来构建应用程序逻辑。
WebSocket 是一种利用 TCP/IP 的客户端技术。它表示一个字节流,分为具有未定义内容的多个消息。如果没有对消息内容的假设,框架能做的非常少。此外,与大多数今天的 Web 应用程序直接编程到套接字一样,在 WebSocket 级别工作对于大多数应用程序来说可能过于底层。WebSocket 客户端 API 是 HTML 5 的一部分,它是一种处理低级握手的线协议,框架只是在 2012 年发布。存在许多具有更高级消息传递 API 的现有框架,它们在底层使用 WebSocket,并且可能还依赖于其他回退选项(例如,HTTP 流式传输、长轮询、keep-alive 等)。一些框架,如 Socket.io 和 Vert.x,提供了轻量级的应用程序功能,用于响应事件并将消息路由到特定的处理程序。另一些,如 CometD , 提供用于订阅和接收消息的通道以及内置的轻量级消息代理。 另一个名为 Laharsub 的开源框架提供了用于 AJAX/Silverlight/WCF 应用程序的 HTTP REST API。Laharsub 为任何应用程序提供了组织使用发布者/订阅者模式的互联网规模消息交换的简单方法。然而目前, ASP.NET SignalR 是开发最密集的功能库之一,允许将消息从服务器组件近乎实时地推送到 Web 客户端。
在本文中,我将尝试介绍一种简单但灵活且可扩展的实现,该实现使用以下技术栈将消息从任何基于云的服务器组件推送到 Web 客户端。
- NServiceBus。
- ServiceMatrix。
- SignalR
- ASP.NET MVC。
高层设计

服务器端位于云中,并通过部署在某个 Web 应用程序中的事件处理程序与客户端进行交互。然后,收到的消息通过 SignalR 分发给不同类型的客户端应用程序:客户端发起通信;服务器使用打开的套接字将数据返回给客户端;客户端使用 keep-alive 来保持连接打开。这样,就实现了 CQRS 事件处理。
系统要求
给出的示例是在 Visual Studio 2012 Ultimate 中开发的,并安装了ASP.NET 和 Web 工具 2012.2 更新。同时,NServiceBus 版本(≥ 4.0)和ServiceMatrix 的最新版本是必需的。
使用代码
在程序包管理器控制台(工具 | 库程序包管理器)中按“还原”按钮,让 NuGet 获取缺少的程序包。之后,重新生成应用程序。然后,按照启动顺序进行操作。
关于 ServiceMatrix
ServiceMatrix 是一个新颖的高级开发工具,旨在简化与 Visual Studio 2012 集成的 NServiceBus 交互的建立。它可以在其创建者的官方网站上找到,请参阅http://particular.net,或者通过“工具”|“扩展和更新”选择 ServiceMatrix。必须安装 NServiceBus 版本(≥ 4.0)。有关安装说明,请访问上面提到的链接。安装 ServiceMatrix 后,NServiceBus 系统的新项目模板将出现在“新建项目”对话框中。见下图。

注意:我们可以注意到,我们默认选择了 .Net Framework 4.5 作为我们的项目,但创建的项目在其属性中是 .Net Framework 4.0。当然,它也可以与 .Net Framework 4.5 一起工作。请注意,在 ServiceMatrix UI 中的每次用户活动之后,都会生成代码片段并添加各种程序集。不幸的是,还不可能撤销操作,因此建议进行中间备份,以避免从头开始。尽管如此,与纯手动变体相比,ServiceMatrix 是一个了不起的工具,绝对值得学习。
步骤 A.
添加一个新的终结点(或者换句话说,添加新的 NServiceBus 主机)。目前它可以是三种类型之一:C# 控制台类库、MVC 项目或早期类型的 Web 应用程序。

打开 PubSub__NSB_SignalR.MyPublisher\EndpointConfig.cs
文件。将 `AsA_Client` 替换为 `AsA_Publisher`,如下所示:
using System; using NServiceBus; namespace PubSub__NSB_SignalR.MyPublisher { public partial class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher { } }
注意:为简化理解和验证所做的操作,建议注意下图所示的图表。
步骤 B.
让我们添加一个名为 `Notifications` 的新服务(注意解决方案资源管理器中的更新)。
步骤 C.
添加一个名为 `EventMessage` 的新事件。

步骤 D.
添加一个订阅者(`Notifications` 服务)。
步骤 E.
发布事件。早期创建的服务和事件名称会出现在相应的组合框中。
图表应如下所示:
步骤 G.
添加一个新的终结点:在此示例中,它是 MVC NServiceBus 主机。

步骤 H.
部署组件。

组件部署后,
。双击 `EventMessageProcessor` 组件,如下图所示:在此,订阅者处理发送的事件。这在图表中表示如下:
步骤 J.
现在需要进行一些手动代码更改。
双击 ServiceMatrixDetails 选项卡中的 `EventMessage` 事件,或通过解决方案资源管理器打开 PubSub__NSB_SignalR.Contract\Notifications\EventMessage.cs
。将以下代码添加到 `EventMessage` 类中:
using System;
using NServiceBus;
namespace PubSub__NSB_SignalR.Contract.Notifications
{
public class EventMessage
{
public Guid EventId { get; set; }
public DateTime? Time { get; set; }
public TimeSpan Duration { get; set; }
}
}
步骤 K.
在 `PubSub__NSB_SignalR.MyPublisher` 项目中添加一个名为 `ServerEndpoint.cs` 的新类,并添加以下代码。该类的目的是发布事件,并且拥有该事件类型的处理程序的每个订阅者都将能够接收通知。
using System;
using NServiceBus;
using PubSub__NSB_SignalR.Contract.Notifications;
namespace PubSub__NSB_SignalR.MyPublisher
{
class ServerEndpoint : IWantToRunWhenBusStartsAndStops
{
public IBus Bus { get; set; }
public void Start()
{
Console.WriteLine("This will publish IEvent, EventMessage, and AnotherEventMessage alternately.");
Console.WriteLine("Press 'Enter' to publish a timer message.To exit, Ctrl + C");
while (Console.ReadLine() != null)
{
EventMessage eventMessage = new EventMessage();
eventMessage.EventId = Guid.NewGuid();
eventMessage.Time = DateTime.Now;
eventMessage.Duration = TimeSpan.FromSeconds(99999D);
Bus.Publish(eventMessage);
Console.WriteLine("Published event with Id {0} {1}", eventMessage.EventId, eventMessage.Time.ToString());
Console.WriteLine("==========================================================================");
}
}
public void Stop()
{
}
}
}
注意:目前 NServiceBus 的最新版本是 4.3.0。要实现它,需要更改此解决方案中包含的所有 `packages.config` 文件;打开程序包管理器控制台并使用 NuGet 获取缺少的程序包。更新对 NServiceBus.dll、NServiceBus.Core.dll 和 NServiceBus.Host.exe 的引用。为了避免因 `PubSub__NSB_SignalR.MyPublisher\App.config` 中缺少 `AuditConfig` 部分而产生的警告消息,请按照“使用 NServiceBus 进行审计”页面上的说明进行操作,参见红色框选部分。运行应用程序。Service Matrix 可以很好地处理 4.3.0 版本。
ASP.NET SignalR 实现
如上所述,一旦 NServiceBus 发送的消息到达客户端,它就会遇到防火墙。SignalR 通过客户端必须初始化的持久连接将通知“推”到客户端。在本节中,验证 ASP.NET 和 Web 工具 2012.2 更新 已安装。
现有的 NServiceBus (ServiceMatrix) 项目中。通过“工具”|“库程序包管理器”打开程序包管理器控制台,并通过 NuGet 获取 SignalR 开发程序包:
PM> Install-Package Microsoft.AspNet.SignalR.Core -Version 1.0.0-rc2 –Pre –ProjectName PubSub__NSB_SignalR.SignalRSubscriber1
注意:目前,最新版本是 SignalR 2.0。但是,它面向 Visual Studio 2013。本示例使用之前的 SignalR 版本 1.0.0-rc2,因为它与目前在 Visual Studio 2012 中实现的最新 NServiceBus (ServiceMatrix) 项目类型兼容。使用 Framework 4.5 不是强制性的,但绝对是正常的。
Hub 类
SignalR Hubs API 允许处理从服务器到已连接客户端以及从客户端到服务器端的远程过程调用(RPC)。向现有的 PubSub__NSB_SignalR.SignalRSubscriber1
项目添加一个新类型的类,即 SignalR Hub 类。如果你的 VS2012 安装中没有此类模板,只需添加一个新类即可。将以下代码添加到 Chat.cs
:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using Microsoft.AspNet.SignalR;
namespace PubSub__NSB_SignalR.SignalRSubscriber1
{
public class Chat : Hub
{
public void Send(string message)
{
Clients.All.send(message);
}
}
}
在 PubSub__NSB_SignalR.SignalRSubscriber1\App_Start
文件夹中添加另一个新类 RegisterHubs.cs
,并添加以下代码:
using System.Web;
using System.Web.Routing;
using Microsoft.AspNet.SignalR;
[assembly: PreApplicationStartMethod(typeof(PubSub__NSB_SignalR.SignalRSubscriber1.RegisterHubs), "Start")]
namespace PubSub__NSB_SignalR.SignalRSubscriber1
{
public class RegisterHubs
{
public static void Start()
{
var config = new HubConfiguration
{
EnableCrossDomain = true
};
RouteTable.Routes.MapHubs(config);
}
}
}
更改 NuGet 添加的 `Microsoft.AspNet.Signal.Core.dll` ver. 1.0.0 的现有引用为 ver.1.0.1。这是必需的,因为在 NuGet 版本中,
接下来,将以下脚本直接添加到 PubSub__NSB_SignalR.SignalRSubscriber1\Views\Home\Index.cshtml
文件的末尾:
@section scripts {
<script src="Scripts/jquery-1.8.3.min.js"></script>
<script src="Scripts/jquery.signalR-1.0.0.min.js"></script>
<script src="https://:19897/signalR/hubs"></script>
<script>
$(function () {
var chat = $.connection.chat;
chat.client.send = function (message) {
$('#message').append('<li>' + message + '</li>');
};
// Turn logging on so we can see the calls in the browser console
$.connection.hub.logging = true;
// Change the hub url to point to the remote server
$.connection.hub.url = 'https://:19897/signalr';
$.connection.hub.start().done(function () {
$('#send').click(function () {
chat.server.send($('#msg').val());
});
});
});
</script>
}
<input type="text" id="msg" value=" " />
<input type="button" id="send" value="send" />
<ul id="message">
</ul>
验证项目是否引用了 Scripts/jquery-1.8.3.min.js
和 Scripts/jquery.signalR-1.0.0.min.js.
编译并单独启动 PubSub__NSB_SignalR.SignalRSubscriber1
项目(右键单击 Debug | Start new instance)。在本例中,它位于 localhost:19897
。请记住这个端口,因为它将被引用。
验证是否收到了以下屏幕,如果收到了,则表示操作成功。该屏幕表明 Chat.cs
类中的 Send
方法 。
现在,让我们从外部源 https://github.com/CasperWollesen/SignalR.Samples 向 PubSub__NSB_SignalR
解决方案添加两个新项目:BasicChat.Mvc
和 BasicChat.Wpf
。需要进行一些小的 更改:
更改 #1:
在 BasicChat.Mvc\Global.asax.cs
中,删除 RoubteTable.Routes.MapHubs();
,因为它已经在文件:PubSub__NSB_SignalR.SignalRSubscriber1\App_Start\RegisterHubs.cs
中被调用了。
using System.Web;
using System.Web.Http;
using System.Web.Mvc;
using System.Web.Optimization;
using System.Web.Routing;
using Microsoft.AspNet.SignalR;
namespace BasicChat.Mvc
{
// Note: For instructions on enabling IIS6 or IIS7 classic mode,
// visit http://go.microsoft.com/?LinkId=9394801
public class MvcApplication : System.Web.HttpApplication
{
protected void Application_Start()
{
AreaRegistration.RegisterAllAreas();
// The order of this is important
// RouteTable.Routes.MapHubs(); close it (!)
WebApiConfig.Register(GlobalConfiguration.Configuration);
FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
RouteConfig.RegisterRoutes(RouteTable.Routes);
BundleConfig.RegisterBundles(BundleTable.Bundles);
AuthConfig.RegisterAuth();
}
}
}
更改 #2:
在 BasicChat.Mvc\Views\Home\Index.cshtml
中,用以下内容替换脚本:
@section scripts {
<script src="Scripts/jquery-1.8.3.min.js"></script>
<script src="Scripts/jquery.signalR-1.0.0.min.js"></script>
<script src="https://:19897/signalR/hubs"></script>
<script>
$(function () {
var chat = $.connection.chat;
chat.client.send = function (message) {
$('#message').append('<li>' + message + '</li>');
};
// Turn logging on so we can see the calls in the browser console
$.connection.hub.logging = true;
// Change the hub url to point to the remote server
$.connection.hub.url = 'https://:19897/signalr';
$.connection.hub.start().done(function () {
$('#send').click(function () {
chat.server.send($('#msg').val());
});
});
});
</script>
}
在 BasicChat.Wpf
项目中,打开以下文件:
SignalrClientSeparatedConnectionProxyWindow.xaml.cs
,-
SignalrClientSharedConnectionProxyWindow.xaml.cs
.
不要忘记更改端口(在本例中为 19897
)。为了在它们之间切换,请在 BasicChat.Wpf\App.xaml
的 Application.StartupUri
属性中进行相应的更改。
SignalrClientSeparatedConnectionProxyWindow.cs
using System;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Input;
using System.Windows.Threading;
using Microsoft.AspNet.SignalR.Client.Hubs;
namespace BasicChat.Wpf
{
public partial class <code>SignalrClientSeparatedConnectionProxyWindow</code> : Window
{
public System.Threading.Thread Thread { get; set; }
public string Host = "https://:19897/";
public IHubProxy ProxyInvoke { get; set; }
public HubConnection ConnectionInvoke { get; set; }
public bool Active { get; set; }
public SignalrClientSeparatedConnectionProxyWindow()
{
InitializeComponent();
}
private async void ActionSendButtonClick(object sender, RoutedEventArgs e)
{
await SendMessage();
}
private async Task SendMessage()
{
await ProxyInvoke.Invoke("send", ClientNameTextBox.Text + ": " + MessageTextBox.Text);
}
private async void ActionWindowLoaded(object sender, RoutedEventArgs e)
{
Active = true;
Thread = new System.Threading.Thread(() =>
{
var connectionOn = new HubConnection(Host);
IHubProxy proxyOn = connectionOn.CreateHubProxy("Chat");
Proxy.On<string>("send", OnSendData);
connectionOn.Start();
while (Active)
{
System.Threading.Thread.Sleep(10);
}
}) { IsBackground = true };
Thread.Start();
ConnectionInvoke = new HubConnection(Host);
ProxyInvoke = ConnectionInvoke.CreateHubProxy("Chat");
await ConnectionInvoke.Start();
}
private void OnSendData(string message)
{
Dispatcher.Invoke(DispatcherPriority.Normal, (Action)(() => MessagesListBox.Items.Insert(0, message)));
}
private async void ActionMessageTextBoxOnKeyDown(object sender, KeyEventArgs e)
{
if (e.Key == Key.Enter || e.Key == Key.Return)
{
await SendMessage();
MessageTextBox.Text = "";
}
}
}
}
SignalrClientSharedConnectionProxyWindow.cs
using System;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Input;
using System.Windows.Threading;
using Microsoft.AspNet.SignalR.Client.Hubs;
namespace BasicChat.Wpf
{
public partial class <code>SignalrClientSharedConnectionProxyWindow</code> : Window
{
public System.Threading.Thread Thread { get; set; }
public string Host = "https://:19897/";
public IHubProxy Proxy { get; set; }
public HubConnection Connection { get; set; }
public bool Active { get; set; }
public SignalrClientSharedConnectionProxyWindow()
{
InitializeComponent();
}
private async void ActionSendButtonClick(object sender, RoutedEventArgs e)
{
await SendMessage();
}
private async Task SendMessage()
{
await Proxy.Invoke("send", ClientNameTextBox.Text + ": " + MessageTextBox.Text);
}
private void ActionWindowLoaded(object sender, RoutedEventArgs e)
{
Active = true;
Thread = new System.Threading.Thread(() =>
{
Connection = new HubConnection(Host);
Proxy = Connection.CreateHubProxy("Chat");
Proxy.On<string>("send", OnSendData);
Connection.Start();
while (Active)
{
System.Threading.Thread.Sleep(10);
}
}) { IsBackground = true };
Thread.Start();
}
private void OnSendData(string message)
{
Dispatcher.Invoke(DispatcherPriority.Normal, (Action)(() => MessagesListBox.Items.Insert(0, message)));
}
private async void ActionMessageTextBoxOnKeyDown(object sender, KeyEventArgs e)
{
if (e.Key == Key.Enter || e.Key == Key.Return)
{
await SendMessage();
MessageTextBox.Text = "";
}
}
}
}
为了定义 Hub 可以从服务器调用的客户端上的方法,通过调用连接对象上的 CreateHubProxy
来创建一个 Hub 的代理。
注意:为了创建一个控制台应用程序,主函数将与上面显示的 WPF 变体非常相似。
横向扩展
现在,有了前面测试过的两个部分:NSB 和 SignalR,是时候将它们组装起来了。在 NSB 部分的PubSub__NSB_SignalR\PubSub__NSB_SignalR.Code\Notifications\EventMessageProcessor.cs
中,添加如下所示的更改:using System;
using NServiceBus;
using PubSub__NSB_SignalR.Contract.Notifications;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
using PubSub__NSB_SignalR.SignalRSubscriber1;
namespace PubSub__NSB_SignalR.Notifications
{
public partial class EventMessageProcessor
{
partial void HandleImplementation(EventMessage message)
{
// Implement your handler logic here.
System.Diagnostics.Trace.TraceInformation("Notifications received: {0}", message.GetType().Name);
IHubContext hubContext = GlobalHost.ConnectionManager.GetHubContext<Chat>();
hubContext.Clients.All.send(message.Time.ToString());
}
}
}
using System;
using NServiceBus;
using PubSub__NSB_SignalR.Contract.Notifications;
namespace PubSub__NSB_SignalR.MyPublisher
{
class ServerEndpoint : IWantToRunWhenBusStartsAndStops
{
public IBus Bus { get; set; }
public void Start()
{
Console.WriteLine("This will publish IEvent, EventMessage, and AnotherEventMessage alternately.");
Console.WriteLine("Press 'Enter' to publish a timer message.To exit, Ctrl + C");
while (Console.ReadLine() != null)
{
var timer = new System.Timers.Timer(1000);
timer.Elapsed += timer_Elapsed;
timer.Interval = 3000;
timer.Enabled = true;
EventMessage eventMessage = new EventMessage();
eventMessage.EventId = Guid.NewGuid();
eventMessage.Time = DateTime.Now;
eventMessage.Duration = TimeSpan.FromSeconds(99999D);
Bus.Publish(eventMessage);
Console.WriteLine("Published event with Id {0} {1}", eventMessage.EventId, eventMessage.Time.ToString());
Console.WriteLine("==========================================================================");
}
}
public void Stop()
{
}
void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
EventMessage eventMessage = new EventMessage();
eventMessage.EventId = Guid.NewGuid();
eventMessage.Time = DateTime.Now;
eventMessage.Duration = TimeSpan.FromSeconds(99999D);
Bus.Publish(eventMessage);
Console.WriteLine("Published event with Id {0} {1}", eventMessage.EventId, eventMessage.Time.ToString());
Console.WriteLine("==========================================================================");
}
}
}
通过“工具”|“库程序包管理器”打开程序包管理器控制台,并通过 NuGet 获取 SignalR 开发程序包:
PM> Install-Package Microsoft.AspNet.SignalR.Client -Version 1.0.1 –ProjectName BasicChat.Wpf
生成 BasicChat.Wpf
项目。
这样,每当处理订阅的事件(在本例中为 EventMessage
)时,消息就会通过 SignalR 使用我们的 Chat-hub 类重新发送到我们的客户端应用程序:BasicChat.Mvc
和 BasicChat.Wpf
。通过在 PubSub__NSB_SignalR.SignalRSubscriber1\Controllers\HomeController.cs
中添加如下所示的更改,这将允许从 PubSub__NSB_SignalR.SignalRSubscriber1
向 BasicChat.Mvc
和 BasicChat.Wpf
发送消息。
public ActionResult About()
{
ViewBag.Message = "Your app description page.";
IHubContext hubContext = GlobalHost.ConnectionManager.GetHubContext<chat>();
hubContext.Clients.All.send("XXXX");
return View();
}
启动顺序:
注意:箭头表示可能的通信通道。