开箱即用的简单 SignalR 数据流基础设施。






4.94/5 (9投票s)
本文介绍了用于从服务器到客户端的数据流和使用 SignalR .NET Core 进行双向通信的基础设施组件。
引言
SignalR 已成为 .NET Core 领域快速增长的通信流行手段。因此,我认为为 SignalR 服务器和客户端添加数据流功能会很有用。这里的数据流指的是服务器持续生成数据并将其推送到其订阅者(客户端)的场景。对于不熟悉 SignalR 的读者,我建议从 ASP.NET Core SignalR 简介 开始。本文为 SignalR .NET Core 服务器和客户端以及 JavaScript 客户端提供了这样的基础设施。
与 Hub 相关基础设施
服务器
SignalRSvc
解决方案包含一个名为 Infrastructure 的文件夹,其中有三个项目(组件),分别是 AsyncAutoResetEventLib
、SignalRBaseHubServerLib
和 SignalRBaseHubClientLib
。AsyncAutoResetEventLib
组件取自这里,实现了 AsyncAutoResetEvent
类,该类是自动重置事件的异步版本。SignalRBaseHubServerLib
组件中的 StreamingHub<T>
类继承了基框架类 Microsoft.AspNetCore.SignalR.Hub
,并实现了 ISetEvent
接口及其方法 void SetEvent()
和属性 bool IsValid { get; }
。StreamingHub<T>
的代码如下。
public class StreamingHub<T> : Hub, ISetEvent
{
protected readonly IStreamingDataProvider<T> _streamingDataProvider;
private readonly AsyncAutoResetEvent _aev = new AsyncAutoResetEvent();
private int _isValid = 0;
protected StreamingHub(StreamingDataProvider<T> streamingDataProvider)
{
IsValid = true;
streamingDataProvider.Add(this);
_streamingDataProvider = streamingDataProvider;
}
public ChannelReader<T> StartStreaming()
{
return Observable.Create<T>(async observer =>
{
while (!Context.ConnectionAborted.IsCancellationRequested)
{
await _aev.WaitAsync();
observer.OnNext(_streamingDataProvider.Current);
}
}).AsChannelReader();
}
public bool IsValid
{
get => Interlocked.Exchange(ref _isValid, _isValid) == 1;
private set => Interlocked.Exchange(ref _isValid, value ? 1 : 0);
}
public void SetEvent()
{
_aev.Set();
}
protected override void Dispose(bool disposing)
{
IsValid = false;
base.Dispose(disposing);
}
}
此类主要目的是为 Hub 添加流式传输功能。其构造函数接收一个基类为 StreamingDataProvider<T>
的对象的引用作为参数,其中 T
是要传输的数据类型。StreamingDataProvider<T>
类也在 SignalRBaseHubServerLib
组件中实现。StreamingDataProvider<T>
支持 ISetEvent
接口列表,这些接口实际上是流式传输 Hub。其 setter Current
调用所有有效的流式传输 Hub 的 SetEvent()
方法,并“忘记”已处理的 Hub。调用 SetEvent()
方法会释放 StreamingHub<T>
类型的方法 StartStreaming()
中实现的流式传输机制,并将 T
类型的新实例推送到所有流式传输订阅者(为了简单起见,未实现订阅者过滤)。
上述 Hub 为派生的 Hub 类提供了开箱即用的流式传输能力。派生自基类 StreamingDataProvider<T>
的数据提供者类应定期调用 setter Current
,以确保将 T
实例流式传输到订阅者(客户端)。
.NET Core 客户端
SignalR Hub 广泛的客户端是 .NET Core 应用程序,甚至更常见的是使用 JavaScript 的 Web 应用程序。我们先从前者开始。SignalRBaseHubClientLib
组件的 HubClient
类提供了一个方便的流式传输订阅机制。它也可以作为 .NET Core Hub 客户端类型的基类。其构造函数以服务器 URL 作为参数,并创建 Microsoft.AspNetCore.SignalR.Client.HubConnection
类的实例。然后调用方法 async Task<bool> SubscribeAsync<T>(Action<T> callback)
以订阅来自服务器的 T
对象流。该方法的正文如下。
public async Task<bool> SubscribeAsync<T>(Action<T> callback)
{
if (Connection == null || _cts.Token.IsCancellationRequested || callback == null)
return false;
try
{
var channel = await Connection.StreamAsChannelAsync<T>("StartStreaming", _cts.Token);
while (await channel.WaitToReadAsync())
while (channel.TryRead(out var t))
{
try
{
callback(t);
}
catch (Exception e)
{
throw new Exception($"Hub \"{Url}\" Subscribe(): callback had failed. ", e);
}
}
return true;
}
catch (OperationCanceledException)
{
return false;
}
}
方法 SubscribeAsync<T>()
接收用户提供的回调作为参数,以处理新获得的 T
对象。
JavaScript 客户端
Web 应用程序 SignalR 客户端组件代码位于 SignalRSvc\wwwroot 文件夹的 comm.js 文件中。它基于这里发布的代码片段。要使用该代码,需要安装 JavaScript SignalR 客户端包。要使用 npm
包管理器进行安装,我们在控制台中运行以下命令:
npm init -y
npm install @aspnet/signalr
结果,我们将得到 node_modules 目录。然后为了方便起见,我们将文件 node_modules\@aspnet\signalr\dist\browser\signalr.js 复制到一个新目录 SignalRSvc\wwwroot\lib\signalr。
SignalRSvc\wwwroot\index.js 文件中的代码展示了如何使用 comm.js 中的通信函数。创建 hubConnection
对象,然后场景与 .NET Core 客户端类似。文件 comm.js 中的通信相关代码如下。
function createHubConnection(url) {
return new signalR.HubConnectionBuilder()
.withUrl(url)
.configureLogging(signalR.LogLevel.Information)
.build();
}
async function startHubConnection(hubConnection) {
try {
await hubConnection.start();
console.log('Comm: Successfully connected to hub ' + url + ' .');
return true;
}
catch (err) {
console.error('Comm: Error in hub startHubConnection().
Unable to establish connection with hub ' + url + ' . ' + err);
return false;
}
}
async function startStreaming(hubConnection, serverFuncName, callback) {
isOK = false;
try {
await hubConnection.stream(serverFuncName)
.subscribe({
next: item => {
try {
callback(item);
}
catch (err) {
console.error('Comm: Error in hub streaming callback. ' + err);
}
},
complete: () => console.log('Comm: Hub streaming completed.'),
error: err => console.error('Comm: Error in hub streaming subscription. ' + err)
});
console.log('Comm: Hub streaming started.');
isOK = true;
}
catch (err) {
console.error('Comm: Error in hub startStreaming(). ' + err);
}
return isOK;
}
请注意,此代码是用标准的 ES8(又名 ECMAScript 2017)JavaScript 编写的,并非所有浏览器目前都支持它。我用 Google Chrome 浏览器进行了测试。
代码示例
上述流式传输功能通过代码示例进行了说明。该解决方案由 Infrastructure 文件夹及其三个项目组成,包括上面描述的 AsyncAutoResetEventLib
、SignalRBaseHubServerLib
和 SignalRBaseHubClientLib
,提供数据传输对象 Dto
的 ModelLib
项目,提供类 DtoEventProvider : StreamingDataProvider<Dto>
的 DtoProviderLib
,包含 Hub 和 Controller 的 Web 服务 SignalRSvc
,以及其 .NET Core 客户端 SignalRClientTest
。
SignalRSvc
提供了一个 Hub 类 TheFirstHub : StreamingHub<Dto>
,其中包含一个客户端可以调用的方法 ProcessDto()
。该服务还具有 AboutController
,其方法可以像普通的 RESTful 服务一样调用。可以通过 HTTP 和 HTTPS 调用访问该服务。HTTPS 模式是通过使用 HTTPS 条件编译符号构建服务来实现的,但如果需要,可以通过对代码进行少量修改来使其可配置。SignalRSvc
拥有跨源资源共享 (CORS) 功能,这对 Web 应用程序客户端很重要。可以通过以下 URL 访问该服务:
HTTP | HTTPS | |
Hub | http://0.0.0.0:15000/hub/the1st | https://0.0.0.0:15001/hub/the1st |
Controller 方法 | http://0.0.0.0:15000/api/about | https://0.0.0.0:15001/api/about |
.NET Core 客户端应用程序 SignalRClientTest
调用方法 static async void MainAsync()
并进入一个 while
循环等待键盘输入。异步方法 MainAsync()
实例化 SignalRBaseHubClientLib
基础设施项目中的 HubClient
类,启动与服务的连接,提供 ReceiveMessage()
服务器 Hub 推送调用的处理程序,远程调用服务 Hub 的 ProcessDto()
方法,最后调用 HubClient
的 SubscribeAsync<Dto>()
方法以订阅来自服务的流式传输。接收到的 Dto
对象的处理程序作为参数传递给 SubscribeAsync<Dto>()
方法。与此同时,客户端在循环中等待键盘输入,如前所述。如果用户输入的字符不是 'q
' 或 'Q
',则会调用 Hub 方法 ProcessDto()
,服务会将 ReceiveMessage()
通知推送到所有客户端。
处理源代码时,您可以运行位于 solution 目录中的命令文件 Run_SignalRSvc.cmd 和 Run_SignalRTestClient.cmd。这些命令文件将构建默认配置(当前为 Debug),并分别启动 Web 服务及其 .NET Core 客户端。您将在客户端控制台中看到出现的日志消息。有两种类型的消息显示出来。大多数消息是作为流式传输的结果生成的。它们是具有 Guid ClientId
和随机整数 Data
的 Dto
对象。DtoEventProvider
类中的计时器处理程序会生成这些对象。第二种类型的消息出现在新客户端启动时,或者在客户端控制台中按下除 'q
' 之外的任何键时。此消息显示源客户端的名称和接收到的数据。根据我们的 Web 服务策略,所有消息都发送给所有客户端。要退出客户端,请按 'q
' 键。
现在我们来讨论 Web 应用程序客户端。在测试示例时,我仅在 Windows 上使用 IIS Express Web 服务器运行它。首先,我们需要在位于 %user%\Documents\IISExpress\config 文件夹的 IIS Express 配置文件 applicationhost.config 中设置一个合适的站点。名为 SignalRClientSite 的站点的定义在代码示例的 Read_Me.txt 文件中提供。此定义应放置在 applicationhost.config 文件的 <sites>
标签中(请注意,站点 id 的值应该是唯一的)。然后我们运行命令文件 Run_IISExpressTestSite.cmd,在 IIS Express 中启动客户端 Web 站点 SignalRClientSite。在 Web 站点启动后,您可以通过浏览器(我使用了 Google Chrome)使用 URL https://:15015 测试 Web 应用程序。激活的 Web 页面连接到我们位于 https://:15000/hub/the1st 的 Hub,然后将打印与 .NET Core 客户端相同的消息。
要测试 HTTPS 情况,您需要进行以下更改:
- 在
SignalRSvc
和SignalRClientTest
项目的 **属性** -> **生成** 选项卡中,插入条件编译符号 HTTPS。 - 在文件 SignalRSvc\wwwroot\index.js 的第一行,将
const HTTPS
的值更改为true
。
我们将 IIS Express 配置为能够使用 URL https://:44399 启动支持 HTTPS 的 Web 应用程序(端口 443?? 被选中,因为很可能它已经被绑定到一个 SSL 证书 - 这可以通过以下命令行调用进行检查。
netsh http show sslcert
由于这是同一个应用程序,它将以与上面描述相同的方式调用我们的 Web 服务。
要运行演示,您需要从演示根目录运行两个 cmd 文件。它们将启动包含 Hub 及其 .NET Core 客户端的 Web 服务。为了简单起见,演示中未展示 Web 客户端的使用。
该示例也在 Linux 环境(Ubuntu 18.04)中进行了测试。为了将示例移到 Linux,请在 SignalRSvc 和 SignalRClientTest 目录中分别运行文件 _publish.cmd。这两个位置都会生成 publish 文件夹。然后,应将 SignalRSvc 和 SignalRClientTest 的 publish 文件夹中的内容复制到已安装 .NET Core 的 Linux 机器上。对于 Linux 环境,我使用了 Oracle VM VirtualBox 和 MobaXterm 应用程序将文件从 Windows 复制到 Linux 并运行它们。
结论
这项工作为 SignalR 通信在 .NET Core 中提供了便捷的基础设施组件,特别是用于从服务器到客户端的数据流。可以无缝地将适当的组件集成到 .NET Core 服务器和客户端应用程序中。还提供了一个 JavaScript 客户端组件,供 Web 应用程序客户端使用。