65.9K
CodeProject 正在变化。 阅读更多。
Home

开箱即用的简单 SignalR 数据流基础设施。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (9投票s)

2019 年 1 月 22 日

CPOL

7分钟阅读

viewsIcon

21320

downloadIcon

839

本文介绍了用于从服务器到客户端的数据流和使用 SignalR .NET Core 进行双向通信的基础设施组件。

引言

SignalR 已成为 .NET Core 领域快速增长的通信流行手段。因此,我认为为 SignalR 服务器和客户端添加数据流功能会很有用。这里的数据流指的是服务器持续生成数据并将其推送到其订阅者(客户端)的场景。对于不熟悉 SignalR 的读者,我建议从 ASP.NET Core SignalR 简介 开始。本文为 SignalR .NET Core 服务器和客户端以及 JavaScript 客户端提供了这样的基础设施。

与 Hub 相关基础设施

服务器

SignalRSvc 解决方案包含一个名为 Infrastructure 的文件夹,其中有三个项目(组件),分别是 AsyncAutoResetEventLibSignalRBaseHubServerLibSignalRBaseHubClientLibAsyncAutoResetEventLib 组件取自这里,实现了 AsyncAutoResetEvent 类,该类是自动重置事件的异步版本。SignalRBaseHubServerLib 组件中的 StreamingHub<T> 类继承了基框架类 Microsoft.AspNetCore.SignalR.Hub,并实现了 ISetEvent 接口及其方法 void SetEvent() 和属性 bool IsValid { get; }StreamingHub<T> 的代码如下。

public class StreamingHub<T> : Hub, ISetEvent
{
    protected readonly IStreamingDataProvider<T> _streamingDataProvider;
    private readonly AsyncAutoResetEvent _aev = new AsyncAutoResetEvent();

    private int _isValid = 0;

    protected StreamingHub(StreamingDataProvider<T> streamingDataProvider)
    {
        IsValid = true;
        streamingDataProvider.Add(this);
        _streamingDataProvider = streamingDataProvider;          
    }

    public ChannelReader<T> StartStreaming()
    {
        return Observable.Create<T>(async observer =>
        {
            while (!Context.ConnectionAborted.IsCancellationRequested)
            {               
                await _aev.WaitAsync();
                observer.OnNext(_streamingDataProvider.Current);
            }
        }).AsChannelReader();
    }

    public bool IsValid
    {
        get => Interlocked.Exchange(ref _isValid, _isValid) == 1;
        private set => Interlocked.Exchange(ref _isValid, value ? 1 : 0);
    }

    public void SetEvent()
    {
        _aev.Set();
    }

    protected override void Dispose(bool disposing)
    {
        IsValid = false;
        base.Dispose(disposing);
    }
}

此类主要目的是为 Hub 添加流式传输功能。其构造函数接收一个基类为 StreamingDataProvider<T> 的对象的引用作为参数,其中 T 是要传输的数据类型。StreamingDataProvider<T> 类也在 SignalRBaseHubServerLib 组件中实现。StreamingDataProvider<T> 支持 ISetEvent 接口列表,这些接口实际上是流式传输 Hub。其 setter Current 调用所有有效的流式传输 Hub 的 SetEvent() 方法,并“忘记”已处理的 Hub。调用 SetEvent() 方法会释放 StreamingHub<T> 类型的方法 StartStreaming() 中实现的流式传输机制,并将 T 类型的新实例推送到所有流式传输订阅者(为了简单起见,未实现订阅者过滤)。

上述 Hub 为派生的 Hub 类提供了开箱即用的流式传输能力。派生自基类 StreamingDataProvider<T> 的数据提供者类应定期调用 setter Current,以确保将 T 实例流式传输到订阅者(客户端)。

.NET Core 客户端

SignalR Hub 广泛的客户端是 .NET Core 应用程序,甚至更常见的是使用 JavaScript 的 Web 应用程序。我们先从前者开始。SignalRBaseHubClientLib 组件的 HubClient 类提供了一个方便的流式传输订阅机制。它也可以作为 .NET Core Hub 客户端类型的基类。其构造函数以服务器 URL 作为参数,并创建 Microsoft.AspNetCore.SignalR.Client.HubConnection 类的实例。然后调用方法 async Task<bool> SubscribeAsync<T>(Action<T> callback) 以订阅来自服务器的 T 对象流。该方法的正文如下。

public async Task<bool> SubscribeAsync<T>(Action<T> callback)
{
    if (Connection == null || _cts.Token.IsCancellationRequested || callback == null)
        return false;

    try
    {
        var channel = await Connection.StreamAsChannelAsync<T>("StartStreaming", _cts.Token);
        while (await channel.WaitToReadAsync())
            while (channel.TryRead(out var t))
            {
                try
                {
                    callback(t);
                }
                catch (Exception e)
                {
                    throw new Exception($"Hub \"{Url}\" Subscribe(): callback had failed. ", e);
                }
        }

        return true;
    }
    catch (OperationCanceledException)
    {
        return false;
    }
}

方法 SubscribeAsync<T>() 接收用户提供的回调作为参数,以处理新获得的 T 对象。

JavaScript 客户端

Web 应用程序 SignalR 客户端组件代码位于 SignalRSvc\wwwroot 文件夹的 comm.js 文件中。它基于这里发布的代码片段。要使用该代码,需要安装 JavaScript SignalR 客户端包。要使用 npm 包管理器进行安装,我们在控制台中运行以下命令:

npm init -y
npm install @aspnet/signalr

结果,我们将得到 node_modules 目录。然后为了方便起见,我们将文件 node_modules\@aspnet\signalr\dist\browser\signalr.js 复制到一个新目录 SignalRSvc\wwwroot\lib\signalr

SignalRSvc\wwwroot\index.js 文件中的代码展示了如何使用 comm.js 中的通信函数。创建 hubConnection 对象,然后场景与 .NET Core 客户端类似。文件 comm.js 中的通信相关代码如下。

function createHubConnection(url) {
    return new signalR.HubConnectionBuilder()
        .withUrl(url)
        .configureLogging(signalR.LogLevel.Information)
        .build();
}

async function startHubConnection(hubConnection) {
    try {
        await hubConnection.start();
        console.log('Comm: Successfully connected to hub ' + url + ' .');
        return true;
    }
    catch (err) {
        console.error('Comm: Error in hub startHubConnection(). 
                       Unable to establish connection with hub ' + url + ' .  ' + err);
        return false;
    }
}

async function startStreaming(hubConnection, serverFuncName, callback) {
    isOK = false;
    try {
        await hubConnection.stream(serverFuncName)
            .subscribe({
                next: item => {
                    try {
                        callback(item);
                    }
                    catch (err) {
                        console.error('Comm: Error in hub streaming callback. ' + err);
                    }
                },
                complete: () => console.log('Comm: Hub streaming completed.'),
                error: err => console.error('Comm: Error in hub streaming subscription. ' + err)
            });

        console.log('Comm: Hub streaming started.');
        isOK = true;
    }
    catch (err) {
        console.error('Comm: Error in hub startStreaming(). ' + err);
    }

    return isOK;
}

请注意,此代码是用标准的 ES8(又名 ECMAScript 2017)JavaScript 编写的,并非所有浏览器目前都支持它。我用 Google Chrome 浏览器进行了测试。

代码示例

上述流式传输功能通过代码示例进行了说明。该解决方案由 Infrastructure 文件夹及其三个项目组成,包括上面描述的 AsyncAutoResetEventLibSignalRBaseHubServerLibSignalRBaseHubClientLib,提供数据传输对象 DtoModelLib 项目,提供类 DtoEventProvider : StreamingDataProvider<Dto>DtoProviderLib,包含 Hub 和 Controller 的 Web 服务 SignalRSvc,以及其 .NET Core 客户端 SignalRClientTest

SignalRSvc 提供了一个 Hub 类 TheFirstHub : StreamingHub<Dto>,其中包含一个客户端可以调用的方法 ProcessDto()。该服务还具有 AboutController,其方法可以像普通的 RESTful 服务一样调用。可以通过 HTTP 和 HTTPS 调用访问该服务。HTTPS 模式是通过使用 HTTPS 条件编译符号构建服务来实现的,但如果需要,可以通过对代码进行少量修改来使其可配置。SignalRSvc 拥有跨源资源共享 (CORS) 功能,这对 Web 应用程序客户端很重要。可以通过以下 URL 访问该服务:

  HTTP HTTPS
Hub http://0.0.0.0:15000/hub/the1st https://0.0.0.0:15001/hub/the1st
Controller 方法 http://0.0.0.0:15000/api/about https://0.0.0.0:15001/api/about

.NET Core 客户端应用程序 SignalRClientTest 调用方法 static async void MainAsync() 并进入一个 while 循环等待键盘输入。异步方法 MainAsync() 实例化 SignalRBaseHubClientLib 基础设施项目中的 HubClient 类,启动与服务的连接,提供 ReceiveMessage() 服务器 Hub 推送调用的处理程序,远程调用服务 Hub 的 ProcessDto() 方法,最后调用 HubClientSubscribeAsync<Dto>() 方法以订阅来自服务的流式传输。接收到的 Dto 对象的处理程序作为参数传递给 SubscribeAsync<Dto>() 方法。与此同时,客户端在循环中等待键盘输入,如前所述。如果用户输入的字符不是 'q' 或 'Q',则会调用 Hub 方法 ProcessDto(),服务会将 ReceiveMessage() 通知推送到所有客户端。

处理源代码时,您可以运行位于 solution 目录中的命令文件 Run_SignalRSvc.cmdRun_SignalRTestClient.cmd。这些命令文件将构建默认配置(当前为 Debug),并分别启动 Web 服务及其 .NET Core 客户端。您将在客户端控制台中看到出现的日志消息。有两种类型的消息显示出来。大多数消息是作为流式传输的结果生成的。它们是具有 Guid ClientId 和随机整数 DataDto 对象。DtoEventProvider 类中的计时器处理程序会生成这些对象。第二种类型的消息出现在新客户端启动时,或者在客户端控制台中按下除 'q' 之外的任何键时。此消息显示源客户端的名称和接收到的数据。根据我们的 Web 服务策略,所有消息都发送给所有客户端。要退出客户端,请按 'q' 键。

现在我们来讨论 Web 应用程序客户端。在测试示例时,我仅在 Windows 上使用 IIS Express Web 服务器运行它。首先,我们需要在位于 %user%\Documents\IISExpress\config 文件夹的 IIS Express 配置文件 applicationhost.config 中设置一个合适的站点。名为 SignalRClientSite 的站点的定义在代码示例的 Read_Me.txt 文件中提供。此定义应放置在 applicationhost.config 文件的 <sites> 标签中(请注意,站点 id 的值应该是唯一的)。然后我们运行命令文件 Run_IISExpressTestSite.cmd,在 IIS Express 中启动客户端 Web 站点 SignalRClientSite。在 Web 站点启动后,您可以通过浏览器(我使用了 Google Chrome)使用 URL https://:15015 测试 Web 应用程序。激活的 Web 页面连接到我们位于 https://:15000/hub/the1st 的 Hub,然后将打印与 .NET Core 客户端相同的消息。

要测试 HTTPS 情况,您需要进行以下更改:

  • SignalRSvcSignalRClientTest 项目的 **属性** -> **生成** 选项卡中,插入条件编译符号 HTTPS
  • 在文件 SignalRSvc\wwwroot\index.js 的第一行,将 const HTTPS 的值更改为 true

我们将 IIS Express 配置为能够使用 URL https://:44399 启动支持 HTTPS 的 Web 应用程序(端口 443?? 被选中,因为很可能它已经被绑定到一个 SSL 证书 - 这可以通过以下命令行调用进行检查。

netsh http show sslcert

由于这是同一个应用程序,它将以与上面描述相同的方式调用我们的 Web 服务。

要运行演示,您需要从演示根目录运行两个 cmd 文件。它们将启动包含 Hub 及其 .NET Core 客户端的 Web 服务。为了简单起见,演示中未展示 Web 客户端的使用。

该示例也在 Linux 环境(Ubuntu 18.04)中进行了测试。为了将示例移到 Linux,请在 SignalRSvcSignalRClientTest 目录中分别运行文件 _publish.cmd。这两个位置都会生成 publish 文件夹。然后,应将 SignalRSvcSignalRClientTestpublish 文件夹中的内容复制到已安装 .NET Core 的 Linux 机器上。对于 Linux 环境,我使用了 Oracle VM VirtualBoxMobaXterm 应用程序将文件从 Windows 复制到 Linux 并运行它们。

结论

这项工作为 SignalR 通信在 .NET Core 中提供了便捷的基础设施组件,特别是用于从服务器到客户端的数据流。可以无缝地将适当的组件集成到 .NET Core 服务器和客户端应用程序中。还提供了一个 JavaScript 客户端组件,供 Web 应用程序客户端使用。

© . All rights reserved.