65.9K
CodeProject 正在变化。 阅读更多。
Home

SignalR + RX (流式数据演示应用程序 1/2)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.95/5 (81投票s)

2014年12月9日

CPOL

20分钟阅读

viewsIcon

301550

一个小型演示应用,展示如何使用 SignalR + RX 创建流发布者/弹性客户端

代码在哪里?

您可以从我的 GitHub 仓库获取代码,就在这里
https://github.com/sachabarber/SignalrRxDemo

引言

我在金融领域工作,目前在外汇 (FX) 领域,我们有许多涉及各种事物流式传输的需求,例如

  • 汇率
  • 交易
  • 其他金融工具

过去,为了满足这些需求,我们会使用套接字并将我们自己的数据(通常是 JSON 或 XML)通过网络推送,然后在此之上设置一些相当复杂的监听器,每个监听器都可能需要对原始流数据有稍微不同的视图。

但事实是,软件开发世界的时间从未停止,新的事物层出不穷,几乎每隔一天就会出现一个新框架,以某种方式改进了之前的产品。

我特别认为很多人误解的一点是响应式扩展 RX,公平地说,它已经存在一段时间了。我认为很多人将 RX 视为事件的 LINQ,它确实有这方面。然而,事实是 RX 是一个用于创建流应用程序的绝佳框架,你越深入 RX,你越倾向于将一切都视为流,从事件到选择文件或 ViewModel 中的 `ICommand.Execute` 等。

RX 拥有许多工具,例如

  • 2 个核心接口 `IObservable` / `IObserver`
  • 许多类似 LINQ 的 `IObservable` 扩展方法
  • 基于时间/窗口的操作(LINQ 并不真正具备这些)
  • 并发调度器
  • 流错误处理控制(Catch 等)
  • 能够从许多其他事物创建 `IObservable`,例如
    • IEnumerable
    • 任务
    • 事件
    • `IObservable` 工厂

我个人认为,如果人们花时间更多地学习 RX,他们可以从中获得巨大的收益。

微软还有一个很棒的库,它促进了基于推送的技术,即 SignalR。关于 SignalR 已经有很多文章,但你很少从桌面角度看到它。

大多数人可能会将 SignalR Hub 与网站联系起来,但你可以通过多种方式使用 SignalR,这也使其成为桌面(即非 Web)开发的好选择。这要归功于 SignalR OWIN 托管 API,因此你可以自托管 SignalR Hub。还有一个 .NET 客户端,我们可以用它与 Hub 进行通信。SignalR 在 .NET 客户端中也有很多事件,你猜我们可以用这些事件做什么?没错,我们可以从中创建 `IObservable`(s) 流,这为使用 RX 打开了大门。

本文将展示 RX 在流式 API 领域的一个优势,我们将使用 RX/SignalR

文章系列

  1. 使用 SignalR 和 RX 的流式 API(本文)
  2. 使用 NetMQ 和 RX 的流式 API

视频向您展示我们将要学习的内容

我认为了解这个演示的最佳方式是观看我上传到 YouTube 的视频:

https://www.youtube.com/watch?v=TDIpPvbw6ek

RX 小入门

您可以使用许多 RX 扩展方法

http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx

但也许开始学习 RX 的最佳之处在于理解您将反复使用的两个底层接口。它们是 `IObservable` 和 `IObserver`。我们将在下面讨论这两个接口。

IObservable

namespace System
{
    // Summary:
    //     Defines a provider for push-based notification.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     covariant. That is, you can use either the type you specified or any type
    //     that is more derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObservable<out T>
    {
        // Summary:
        //     Notifies the provider that an observer is to receive notifications.
        //
        // Parameters:
        //   observer:
        //     The object that is to receive notifications.
        //
        // Returns:
        //     A reference to an interface that allows observers to stop receiving notifications
        //     before the provider has finished sending them.
        IDisposable Subscribe(IObserver<T> observer);
    }
}

定义了基于推送通知的提供者。

此类型参数是协变的。也就是说,您可以使用您指定的类型或任何更派生的类型。有关协变和逆变的更多信息,请参阅泛型中的协变和逆变。

  • `Subscribe` 通知提供者观察者将接收通知。

延伸阅读:http://msdn.microsoft.com/en-us/library/dd990377(v=vs.110).aspx

IObserver

namespace System
{
    // Summary:
    //     Provides a mechanism for receiving push-based notifications.
    //
    // Type parameters:
    //   T:
    //     The object that provides notification information.This type parameter is
    //     contravariant. That is, you can use either the type you specified or any
    //     type that is less derived. For more information about covariance and contravariance,
    //     see Covariance and Contravariance in Generics.
    public interface IObserver<in T>
    {
        // Summary:
        //     Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();
        //
        // Summary:
        //     Notifies the observer that the provider has experienced an error condition.
        //
        // Parameters:
        //   error:
        //     An object that provides additional information about the error.
        void OnError(Exception error);
        //
        // Summary:
        //     Provides the observer with new data.
        //
        // Parameters:
        //   value:
        //     The current notification information.
        void OnNext(T value);
    }
}

`IObserver` 和 `IObservable` 接口提供了一种通用的基于推送通知的机制,也称为观察者设计模式。 `IObservable` 接口表示发送通知的类(提供者); `IObserver` 接口表示接收通知的类(观察者)。 `T` 表示提供通知信息的类。

`IObserver` 实现通过将自身的实例传递给提供者的 `IObservable.Subscribe` 方法来安排从提供者(`IObservable` 实现)接收通知。此方法返回一个 `IDisposable` 对象,可用于在提供者完成发送通知之前取消订阅观察者。
`IObserver` 接口定义了观察者必须实现的以下三个方法

  • `OnNext` 方法,通常由提供者调用,为观察者提供新数据或状态信息。
  • `OnError` 方法,通常由提供者调用,指示数据不可用、无法访问或损坏,或提供者遇到了其他错误情况。
  • `OnCompleted` 方法,通常由提供者调用,指示它已完成向观察者发送通知。

延伸阅读:http://msdn.microsoft.com/en-us/library/dd783449(v=vs.110).aspx

Observable.Create

任何使用过 RX 的人都会发现他们可能希望创建自己的操作符。在 LINQ 中,这可以通过创建一个新方法(可能是扩展方法)来完成,该方法简单地返回一个新的 `IEnumerable`。在 RX 中,事情稍微复杂一些,但并没有复杂太多。在 RX 中,您将使用 `Observable.Create` 来创建自己的操作符,或者作为创建新 `Observable` 的工厂。这里有一个简单的例子

private IObservable<string> CreateObservable()
{
  return Observable.Create<string>(
    (IObserver<string> observer) =>
    {
      observer.OnNext("a");
      observer.OnNext("b");
      observer.OnCompleted();
      return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
    });
}

`Observable.Create` 从订阅方法实现创建一个可观察序列。您通常会对现有 `IObservable` 使用扩展方法,然后将 `Observable.Create(..)` 作为扩展方法的主体。

`Observable.Create` 具有以下方法签名

public static IObservable<TSource> Create<TSource>(
	Func<IObserver<TSource>, IDisposable> subscribe
)

此方法签名中有趣的部分是,您可以看到它返回一个 `Func` 委托,该委托接受一个 `IObserver`,允许您使用其 `OnNext/OnError` 和 `OnCompleted` 方法将值推送到观察者。 `Func` 委托返回一个 `IDisposable`,它将在处置发生时被调用。现在 RX 提供了许多不同的 `IDisposable` 类型,如果您在 `Observable.Create` 实现中看到一些相当奇特的 `IDisposable` 类型,请不要感到惊讶。

关于使用 `Observable.Create` 的最佳描述之一来自 Lee Campbell 的网站

http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html

站在巨人的肩膀上

我曾有幸参加了为期两天的 Rx 课程,由 Lee Campbell 授课,他是全球唯一一本 Rx 书籍的作者,Lee 在这里有一个专门的博客:http://www.introtorx.com/

Lee 现在在一家名为“Adaptive”的咨询公司工作,他们就是我参加的培训课程的组织者。他们还发布了一个真正很棒的演示应用程序,展示了如何创建一个完全连接的容错流式 API + 客户端应用程序。他们提供了几种不同版本的演示

  • WPF
  • Windows Store (WinRT)
  • 使用 Monotouch 的 iOS
  • HTML5(使用 TypeScript)

你可以在这里获取演示应用 https://github.com/AdaptiveConsulting/ReactiveTrader

问题是这个解决方案相当庞大,如果你是 Rx 的新手,或者只是想弄清楚如何创建一个可靠的流式 API,这个解决方案有点过于繁重,至少在我看来是这样。

公平地说,我认为 Adaptive 团队制作的演示应用更多地是作为 Adaptive 团队集体智慧的营销工具,人们会看到它然后说,嗯,这些人似乎很懂行,让我们请他们来。但是如果你只是一个普通的开发人员,试图进入那个领域,你可能会寻找一些更小的东西来开始,你知道,有点像婴儿学步。

我希望这个演示文章能满足需求,因为我经历过尝试理解 https://github.com/AdaptiveConsulting/ReactiveTrader 演示应用的 WPF 版本的痛苦,并且将在本文中为您呈现一个非常精简的版本。

精简的演示应用

本节概述了一个精简的演示应用程序,正如我所说,它主要基于 Adaptive 公司的出色工作及其 https://github.com/AdaptiveConsulting/ReactiveTrader 演示应用程序。我想要实现的目标非常简单:

  1. 一个单一的 SignalR Hub(Adaptive 示例有几个)将推送 `TickerDto` 对象。
  2. 客户端将订阅某个主题以接收来自 SignalR 服务器端 Hub 的推送通知,并通过 RX 将其推送出去,以便任何感兴趣的人都可以使用标准的 RX 操作符。
  3. 能够模拟服务器宕机并恢复正常。
  4. 连接的客户端能够知道服务器何时启动/关闭。
  5. 客户端和服务器之间的连接具有弹性,即当服务器重新启动时,客户端应该能够从中恢复。

Adaptive 演示程序实现了所有这些(以及更多),但我不是 Lee Campbell,也没有他对 RX 那么丰富的经验,尽管我不得不说我现在觉得自己正在掌握它。所以我才写了这个小型演示程序。

总体思路

这张图说明了一般想法

发布者

发布者由几部分组成,如下所述,需要注意的一点是,客户端和服务器都使用 SignalR,因此阅读 SignalR 自托管可能是一个好主意,请参阅此处

http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-server

IOC (控制反转)

这里使用了一些 IOC,AutoFac 是使用的容器。由于 IOC 不是本文的重点,我不会再浪费时间讨论它。如果您想了解有关 IOC/AutoFac 的更多信息,请阅读代码,查看 `BootStrapper` 类

MainWindow

关于这个窗口没什么好说的,除了它有以下控件

  • 启动 SignalR:启动 SignalR Hub(用于模拟 Hub 的崩溃/重启)
  • 停止 SignalR:停止 SignalR Hub(用于模拟 Hub 的崩溃/重启)
  • 启动自动行情数据:将以固定间隔从 Hub 上下文推送随机的 `TickerDto` 对象
  • 停止自动行情数据:将暂停从 Hub 发送 `TickerDto` 对象
  • 发送一个行情:将从 Hub 上下文推送 1 个单独的随机 `TickerDto` 对象

MainWindowViewModel

`MainWindowViewModel` 实际上用于促进向 SignalR Hub 发送停止/创建命令,或告诉 `TickerHubPublisher` 执行操作。这是 `MainWindowViewModel` 的完整代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using Microsoft.Owin.BuilderProperties;
using Microsoft.Owin.Hosting;
using SignalRSelfHost.Hubs.Ticker;

namespace SignalRSelfHost
{
    public class MainWindowViewModel : IMainWindowViewModel
    {
        private const string Address = "https://:5263";
        
        private readonly ITickerHubPublisher tickerHubPublisher;
        private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
        private IDisposable signalr;

        public MainWindowViewModel(ITickerHubPublisher tickerHubPublisher)
        {
            this.tickerHubPublisher = tickerHubPublisher;

            AutoTickerStartCommand = new DelegateCommand(tickerHubPublisher.Start);
            AutoTickerStopCommand = new DelegateCommand(tickerHubPublisher.Stop);
            SendOneTickerCommand = new DelegateCommand(async () =>
            {
                await tickerHubPublisher.SendOneManualFakeTicker();
            });
            StartCommand = new DelegateCommand(StartServer);
            StopCommand = new DelegateCommand(StopServer);
        }

        public ICommand AutoTickerStartCommand { get; set; }
        public ICommand AutoTickerStopCommand { get; set; }
        public ICommand SendOneTickerCommand { get; set; }
        public ICommand StartCommand { get; private set; }
        public ICommand StopCommand { get; private set; }

        public void Start()
        {
            StartServer();
        }
       
        private void StartServer()
        {
            try
            {
                signalr = WebApp.Start(Address);
            }
            catch (Exception exception)
            {
                Log.Error("An error occurred while starting SignalR", exception);
            }
        }


        private void StopServer()
        {
            if (signalr != null)
            {
                signalr.Dispose();
                signalr = null;
            }

        }

    }
}

如你所见,`MainWindowViewModel` 中并没有发生太多事情,所有的动作都发生在其他地方。

启动发布者

为了启动发布者,我们首先需要考虑的是如何托管 SignalR hub。这由应用程序范围的 `OwinStartupAttribute` 处理,您可以在演示应用程序的 `App` 类中找到它

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using Autofac;
using Microsoft.Owin;
using SignalRSelfHost.Hubs;
using SignalRSelfHost.Hubs.Ticker;
using SignalRSelfHost.IOC;
using log4net;


[assembly: OwinStartup(typeof(Startup))]
namespace SignalRSelfHost
{
    public partial class App : Application
    {
		....
		....
		....
		....
    }
}

`OwinStartupAttribute` 指向一个类(本例中为 `Startup`),该类配置 Owin SignalR 自托管。我们来看看吧。

using Microsoft.AspNet.SignalR;
using Microsoft.Owin.Cors;
using Owin;
using SignalRSelfHost.IOC;

namespace SignalRSelfHost.Hubs
{
    public class Startup
    {
        public void Configuration(IAppBuilder app)
        {
            // Branch the pipeline here for requests that start with "/signalr"
            app.Map("/signalr", map =>
            {
                // Setup the CORS middleware to run before SignalR.
                // By default this will allow all origins. You can 
                // configure the set of origins and/or http verbs by
                // providing a cors options with a different policy.
                map.UseCors(CorsOptions.AllowAll);
                var hubConfiguration = new HubConfiguration
                {
                    Resolver = new AutofacSignalRDependencyResolver(App.Container),

                };
                // Run the SignalR pipeline. We're not using MapSignalR
                // since this branch already runs under the "/signalr"
                // path.
                map.RunSignalR(hubConfiguration);
            });
        }
    }
}

所以 Hub 就是这样启动的。但 Hub 如何知道客户端,以及如何向它们推送?我们接下来会看到。

TickerHub

`TickerHub` 是应用程序中唯一的 Hub,它有两个主要方法。一个允许客户端订阅,一个用于移除客户端的订阅。这是 Hub 的代码,别担心,我们稍后会更详细地探讨这两个方法。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;

namespace SignalRSelfHost.Hubs.Ticker
{
    [HubName(ServiceConstants.Server.TickerHub)]
    public class TickerHub : Hub
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IContextHolder contextHolder;

        public const string TickerGroupName = "AllTickers";
        private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHub));

        public TickerHub(ITickerRepository tickerRepository, IContextHolder contextHolder)
        {
            this.tickerRepository = tickerRepository;
            this.contextHolder = contextHolder;
        }

        [HubMethodName(ServiceConstants.Server.SubscribeTickers)]
        public async Task SubscribeTrades()
        {

            contextHolder.TickerHubClients = Clients;

            var user = ContextUtil.GetUserName(Context);
            Log.InfoFormat("Received trade subscription from user {0}", user);

            // add client to the trade notification group
            await Groups.Add(Context.ConnectionId, TickerGroupName);
            Log.InfoFormat("Connection {0} of user {1} added to group '{2}'", Context.ConnectionId, user, TickerGroupName);

            var tickers = tickerRepository.GetAllTickers();
            await Clients.Caller.SendTickers(tickers);
            Log.InfoFormat("Snapshot published to {0}", Context.ConnectionId);
        }

        [HubMethodName(ServiceConstants.Server.UnsubscribeTickers)]
        public async Task UnsubscribeTrades()
        {
            Log.InfoFormat("Received unsubscription request for trades from connection {0}", Context.ConnectionId);

            // remove client from the blotter group
            await Groups.Remove(Context.ConnectionId, TickerGroupName);
            Log.InfoFormat("Connection {0} removed from group '{1}'", Context.ConnectionId, TickerGroupName);
        }
    }
}

订阅交易

这个 `TickerHub` 方法通过 SignalR 客户端调用,将用于注册对某个主题的兴趣,SignalR 称之为“组”。本质上,当客户端启动时,它会使用自己的 SignalR 代理与服务器端自托管的 Hub 通信,并调用服务器端 Hub 的 `SubscribeTrades()` 方法。当这种情况发生时,客户端的连接将被添加到服务器端 Hub 的组中,并且客户端将收到一次性推送的所有当前持有的行情。这种所有当前行情的一次性推送是通过推送 `TickerRepository` 中当前存储的**所有** `TickerDto` 对象的全部内容来完成的,`TickerRepository` 是一个简单的内存队列仓库,用于存储最新生成的 `TickerDto`。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;

namespace SignalRSelfHost.Hubs.Ticker
{
   public class TickerRepository : ITickerRepository
    {
       private readonly Queue<TickerDto> tickers = new Queue<TickerDto>();
       private object syncLock = new object();
       private const int MaxTrades = 50;


       public TickerRepository()
       {
           tickers.Enqueue(new TickerDto() {Name="Yahoo", Price=1.2m});
           tickers.Enqueue(new TickerDto() {Name="Google", Price=1022m});
           tickers.Enqueue(new TickerDto() {Name="Apple", Price=523m});
           tickers.Enqueue(new TickerDto() {Name="Facebook", Price=49m});
           tickers.Enqueue(new TickerDto() {Name="Microsoft", Price=37m});
           tickers.Enqueue(new TickerDto() {Name="Twitter", Price=120m});
       }



       public TickerDto GetNextTicker()
       {
           return tickers.Dequeue();
       }


        public void StoreTicker(TickerDto tickerInfo)
        {
            lock (syncLock)
            {
                tickers.Enqueue(tickerInfo);

                if (tickers.Count > MaxTrades)
                {
                    tickers.Dequeue();
                }
            }
        }

        public IList<TickerDto> GetAllTickers()
        {
            IList<TickerDto> newTickers;

            lock (syncLock)
            {
                newTickers = tickers.ToList();
            }

            return newTickers;
        } 
    }
}

任何后续行情被创建时(我们很快会看到这是如何发生的),客户端将实时收到推送。这种批量推送是一次性的,在客户端首次与服务器建立通信时发生。此方法开始时发生的另一件事是捕获上下文,以便来自 `TickerHub` 的外部类将能够访问 `TickerHub` 上下文及其关联的值。

contextHolder.TickerHubClients = Clients;

取消订阅交易

这个 `TickerHub` `UnsubscribeTrades()` 方法通过 SignalR 客户端调用,将用于取消注册客户端对某个主题的兴趣(SignalR 称之为“组”)。本质上,当客户端关闭时,它会使用自己的 SignalR 代理与服务器端自托管的 Hub 通信,并调用服务器端 Hub 的 `UnsubscribeTrades()` 方法。当这种情况发生时,客户端连接将从服务器端 Hub 的组中移除。

谁向客户端进行流式传输?

好的,我们已经了解了 SignalR 客户端如何注册/取消注册对某个主题的兴趣(至少从服务器端的角度来看),但客户端如何从 `TickerHub` 接收推送通知呢?谁来做这件事?嗯,这项工作落在了另一个组件上。即 `TickerHubPublisher`。在本演示应用程序中,`TickerHubPublisher` 的工作非常简单,就是生成一个随机的 `TickerDto` 并将其推送到任何对特定主题感兴趣的已连接客户端。 `TickerHubPublisher` 的全部内容如下所示。我认为代码非常不言自明。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;

namespace SignalRSelfHost.Hubs.Ticker
{
    public class TickerHubPublisher : ITickerHubPublisher
    {
        private readonly IContextHolder contextHolder;
        private readonly ITickerRepository tickerRepository;
        Random rand = new Random();
        private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHubPublisher));
        private CancellationTokenSource autoRunningCancellationToken;
        private Task autoRunningTask;


        public TickerHubPublisher(IContextHolder contextHolder,
            ITickerRepository tickerRepository)
        {
            this.contextHolder = contextHolder;
            this.tickerRepository = tickerRepository;
        }


        public void Stop()
        {
            if (autoRunningCancellationToken != null)
            {
                autoRunningCancellationToken.Cancel();

                // Publisher is not thread safe, so while the auto ticker is running only the autoticker is 
                // allowed to access the publisher. Therefore before we can stop the publisher we have to 
                // wait for the autoticker task to complete
                autoRunningTask.Wait();
                autoRunningCancellationToken = null;
            }
        }

        public async void Start()
        {
            autoRunningCancellationToken = new CancellationTokenSource();
            autoRunningTask = Task.Run(async () =>
            {
                while (!autoRunningCancellationToken.IsCancellationRequested)
                {
                    await SendOneManualFakeTicker();
                    await Task.Delay(20);
                }
            });
        }


        public async Task SendOneManualFakeTicker()
        {
            var currentTicker = tickerRepository.GetNextTicker();
            var flipPoint = rand.Next(0, 100);
            if (flipPoint > 50)
            {
                currentTicker.Price += currentTicker.Price/ 30;
            }
            else
            {
                currentTicker.Price -= currentTicker.Price / 30;
            }
            tickerRepository.StoreTicker(currentTicker);
            await SendRandomTicker(currentTicker);
        }


        private Task SendRandomTicker(TickerDto tickerInfo)
        {
            if (contextHolder.TickerHubClients == null) return Task.FromResult(false);

            Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
            return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
        }
    }
}

这段代码中唯一可能需要解释的部分是 `SendRandomTicker(TickerDto tickerInfo)` 方法。

private Task SendRandomTicker(TickerDto tickerInfo)
{
    if (contextHolder.TickerHubClients == null) return Task.FromResult(false);

    Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
    return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}

此方法利用了在 `TickerHub` 本身内部捕获的 `TickerHub` 上下文。通过捕获上下文,它允许来自 `TickerHub` 的外部类使用其属性/值。这正是 `TickerHubPublisher` 所做的。它只是使用捕获的 `TickerHub` 上下文将新值推送到针对特定主题订阅的客户端。在这种情况下,是 `TickerHub.TickerGroupName`(“AllTickers”)字符串的值。因此,这就是 SignalR 服务器能够向客户端推送通知的方式。

模拟发布者崩溃

这部分很简单,只需使用 MainWindow 中的“Stop SignalR”按钮即可。它将简单地运行以下代码,该代码会处置从 SignalR `WebApp.Start(..)` 方法返回的 SignalR `IDisposable`。

private void StopServer()
{
    if (signalr != null)
    {
        tickerHubPublisher.Stop();
        signalr.Dispose();
        signalr = null;
    }

}

应该发生的情况是,如果您有任何连接的客户端,它们(短时间后)应该会看到服务器不可用,并显示“DISCONNECTED”瓷砖。

重启发布者

这部分很简单,只需使用 MainWindow 中的“Start SignalR”按钮。这将简单地运行以下代码

private void StartServer()
{
    try
    {
        signalr = WebApp.Start(Address);
        tickerHubPublisher.Start();
    }
    catch (Exception exception)
    {
        Log.Error("An error occurred while starting SignalR", exception);
    }
}

应该发生的情况是,如果您有任何连接的客户端,它们(短时间后)应该会再次看到服务器可用,并且不再显示“DISCONNECTED”磁贴。

SignalR .NET 客户端

客户端是一个标准的 WPF 应用程序(我使用 WPF 只是因为我熟悉它,它并不是那么重要,你也可以很好地使用带有 MVP 模式的 Winforms)。客户端还使用 SignalR 代理类,因此一个好的起点是检查 SignalR .NET 客户端 API,你可以在这里阅读更多内容

http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-net-client

IOC (控制反转)

这里使用了一些 IOC,AutoFac 是使用的容器。由于 IOC 不是本文的重点,我不会再浪费时间讨论它。如果您想了解有关 IOC/AutoFac 的更多信息,请阅读代码,查看 `BootStrapper` 类

弹性连接

客户端的核心是一个名为 `ServiceClientBase` 的类。这个类是任何客户端 Hub 代理连接的基类。那么这个类到底提供了什么呢?

那么我们先来看看它的代码吧

using System;
using System.Reactive;
using System.Reactive.Linq;
using Client.Hub.Transport;
using Common.Extensions;


namespace Client.Hub
{
    internal class ServiceClientBase
    {
        private readonly IConnectionProvider _connectionProvider;

        protected ServiceClientBase(IConnectionProvider connectionProvider)
        {
            _connectionProvider = connectionProvider;
        }

        protected IObservable<T> GetResilientStream<T>(Func<IConnection, IObservable<T>> streamFactory, 
            TimeSpan connectionTimeout)
        {
            var activeConnections = (from connection in _connectionProvider.GetActiveConnection()
                                     from status in connection.StatusStream
                                     where status.ConnectionStatus == ConnectionStatus.Connected || 
                                        status.ConnectionStatus == ConnectionStatus.Reconnected
                                     select connection)
                .Publish()
                .RefCount();

            // get the first connection
            var firstConnection = activeConnections.Take(1).Timeout(connectionTimeout);

            // 1 - notifies when the first connection gets disconnected
            var firstDisconnection = from connection in firstConnection
                                     from status in connection.StatusStream
                                     where status.ConnectionStatus == ConnectionStatus.Reconnecting || 
                                        status.ConnectionStatus == ConnectionStatus.Closed
                                     select Unit.Default;

            // 2- connection provider created a new connection it means the active one has droped
            var subsequentConnection = activeConnections.Skip(1).Select(_ => Unit.Default).Take(1);

            // OnError when we get 1 or 2
            var disconnected = firstDisconnection.Merge(subsequentConnection)
                .Select(_ => Notification.CreateOnError<T>(new Exception("Connection was closed.")))
                .Dematerialize();

            // create a stream which will OnError as soon as the connection drops
            return (from connection in firstConnection
                    from t in streamFactory(connection)
                    select t)
                .Merge(disconnected)
                .Publish()
                .RefCount();
        }
    }

}

这个类中发生了非常多的事情。正如我所说,**这个类是核心类**。那么它到底做了什么呢?

嗯,它做了几件事

  1. 首先,它提供了一个基于工厂的 `IObservable`,该工厂将生成最终的 `IObservable`。这个工厂将是需要具有弹性的唯一源流 (`IObservable`)。
  2. 它使用连接状态流(内部利用 SignalR 事件(如已连接/已断开)来创建状态流)合并到整个流中。当看到“`Reconnection`”或“`Closed`”状态流结果(`OnNext` 值)时,我们认为这是一个断开连接,我们合成一个 `OnError` 通知(通过使用标准的 Rx `Dematerialize` 操作符),该通知与实际工厂流值合并,以给出整体流结果,该结果现在可能向其消费者产生 `OnNext` / `OnError`。
  3. 我们还使用了标准的 RX `Publish` 操作符,这样做是为了共享相同的底层 `IObservable` 序列
  4. 我们还使用了标准的 RX `RefCount` 操作符,这样当没有订阅者时,序列将被处置

这个类使用了几个额外的实用类,我们接下来将讨论这些类。

ConnectionProvider

`ConnectionProvider` 类只有一个任务,就是返回一个 `IObservable`,其中 `IConnection` 看起来像这样。

internal interface IConnection
{
    IObservable<ConnectionInfo> StatusStream { get; }
    IObservable<Unit> Initialize();
    string Address { get; }
    void SetAuthToken(string authToken);
    IHubProxy TickerHubProxy { get; }

}

其中更敏锐的人会注意到 `Unit` 的使用,这是一种表示我们不关心结果是什么,只关心它存在的方式。

这是 `ConnectionProvider` 类的相关代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;
using Common.Extensions;


namespace Client.Hub.Transport
{
    /// <summary>
    /// Connection provider provides always the same connection until it fails then 
    /// create a new one a yield it Connection provider randomizes the list of server 
    /// specified in configuration and then round robin through the list
    /// </summary>
    internal class ConnectionProvider : IConnectionProvider, IDisposable
    {
        private readonly SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
        private readonly string username;
        private readonly IObservable<IConnection> connectionSequence;
        private readonly string server;
        private int _currentIndex;
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectionProvider));

        public ConnectionProvider(string username, string server)
        {
            this.username = username;
            this.server = server;
            connectionSequence = CreateConnectionSequence();
        }

        public IObservable<IConnection> GetActiveConnection()
        {
            return connectionSequence;
        }

        public void Dispose()
        {
            disposable.Dispose();
        }

        private IObservable<IConnection> CreateConnectionSequence()
        {
            return Observable.Create<IConnection>(o =>
            {
                log.Info("Creating new connection...");
                var connection = GetNextConnection();

                var statusSubscription = connection.StatusStream.Subscribe(
                    _ => { },
                    ex => o.OnCompleted(),
                    () =>
                    {
                        log.Info("Status subscription completed");
                        o.OnCompleted();
                    });

                var connectionSubscription =
                    connection.Initialize().Subscribe(
                        _ => o.OnNext(connection),
                        ex => o.OnCompleted(),
                        o.OnCompleted);

                return new CompositeDisposable { statusSubscription, connectionSubscription };
            })
            .Repeat()
            .Replay(1)
            .LazilyConnect(disposable);
        }

        private IConnection GetNextConnection()
        {
            return new Client.Hub.Transport.Connection(server, username);
        }
    }

}

Connection

连接本身公开了一个 SignalR 代理和一个 `IObservable`,后者通过检查所有可用的 SignalR .NET 客户端事件创建。此类的消费者现在将能够访问连接流,以便他们能够了解连接何时丢失或重新连接。

这是这个类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Common;
using Common.Extensions;
using log4net;
using Microsoft.AspNet.SignalR.Client;

namespace Client.Hub.Transport
{
    internal class Connection : IConnection
    {
        private readonly ISubject<ConnectionInfo> _statusStream;
        private readonly HubConnection hubConnection;

        private bool _initialized;
        private static readonly ILog log = LogManager.GetLogger(typeof(Connection));

        public Connection(string address, string username)
        {
            _statusStream = new BehaviorSubject<ConnectionInfo>(
                new ConnectionInfo(ConnectionStatus.Uninitialized, address));
            Address = address;
            hubConnection = new HubConnection(address);
            //hubConnection.Headers.Add(ServiceConstants.Server.UsernameHeader, username);
            CreateStatus().Subscribe(
                s => _statusStream.OnNext(new ConnectionInfo(s, address)),
                _statusStream.OnError,
                _statusStream.OnCompleted);
            hubConnection.Error += exception => log.Error("There was a connection error with " 
                + address, exception);

            TickerHubProxy = hubConnection.CreateHubProxy(ServiceConstants.Server.TickerHub);

        }

        public IObservable<Unit> Initialize()
        {
            if (_initialized)
            {
                throw new InvalidOperationException("Connection has already been initialized");
            }
            _initialized = true;

            return Observable.Create<Unit>(async observer =>
            {
                _statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, Address));

                try
                {
                    log.InfoFormat("Connecting to {0}", Address);
                    await hubConnection.Start();
                    _statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connected, Address));
                    observer.OnNext(Unit.Default);
                }
                catch (Exception e)
                {
                    log.Error("An error occurred when starting SignalR connection", e);
                    observer.OnError(e);
                }

                return Disposable.Create(() =>
                {
                    try
                    {
                        log.Info("Stoping connection...");
                        hubConnection.Stop();
                        log.Info("Connection stopped");
                    }
                    catch (Exception e)
                    {
                        // we must never throw in a disposable
                        log.Error("An error occurred while stoping connection", e);
                    }
                });
            })
            .Publish()
            .RefCount();
        }

        private IObservable<ConnectionStatus> CreateStatus()
        {
            var closed = Observable.FromEvent(h => hubConnection.Closed += h, 
                h => hubConnection.Closed -= h).Select(_ => ConnectionStatus.Closed);
            var connectionSlow = Observable.FromEvent(h => hubConnection.ConnectionSlow += h, 
                h => hubConnection.ConnectionSlow -= h).Select(_ => ConnectionStatus.ConnectionSlow);
            var reconnected = Observable.FromEvent(h => hubConnection.Reconnected += h, 
                h => hubConnection.Reconnected -= h).Select(_ => ConnectionStatus.Reconnected);
            var reconnecting = Observable.FromEvent(h => hubConnection.Reconnecting += h, 
                h => hubConnection.Reconnecting -= h).Select(_ => ConnectionStatus.Reconnecting);
            return Observable.Merge(closed, connectionSlow, reconnected, reconnecting)
                .TakeUntilInclusive(status => status == ConnectionStatus.Closed); 
            // complete when the connection is closed (it's terminal, SignalR will not attempt to reconnect anymore)
        }

        public IObservable<ConnectionInfo> StatusStream
        {
            get { return _statusStream; }
        }

        public string Address { get; private set; }

        public IHubProxy TickerHubProxy { get; private set; }

        public void SetAuthToken(string authToken)
        {
            //hubConnection.Headers[AuthTokenProvider.AuthTokenKey] = authToken;
        }

        public override string ToString()
        {
            return string.Format("Address: {0}", Address);
        }
    }

}

SignalR 订阅/行情流

正如我们在发布者中看到的,我们知道特定服务器端 SignalR Hub 的客户端将调用两个方法,即 `SubscribeTickers` 和 `UnsubscribeTickers`。我们也知道有一个 `TickerDto` 的流式 API(至少对于这个只包含一个 Hub 的演示应用程序是这样)。其中以下条件成立:

  • 客户端在连接时将调用服务器端 Hub 的 `SubscribeTickers`
  • 客户端在处置时将调用服务器端 Hub 的 `UnsubscribeTickers`(这仅在处置时调用此方法来完成,请参见以下代码,我们结合 `Observable.Create(..)` 和 `CompositeDisposable` 的强大功能来处置一些东西,其中之一是使用 `Disposable.Create(..)` 创建的一个 `Action`,该 `Action` 将在处置时调用 `UnsubscribeTickers`)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Common;
using log4net;
using Microsoft.AspNet.SignalR.Client;

namespace Client.Hub
{
    internal class TickerHubClient : ServiceClientBase, ITickerHubClient
    {

        private static readonly ILog log = LogManager.GetLogger(typeof(TickerHubClient));

        public TickerHubClient(IConnectionProvider connectionProvider)
            : base(connectionProvider)
        {
        }

        public IObservable<IEnumerable<TickerDto>> GetTickerStream()
        {
            return GetResilientStream(connection => GetTradesForConnection(connection.TickerHubProxy), 
                TimeSpan.FromSeconds(5));
        }

        private IObservable<IEnumerable<TickerDto>> GetTradesForConnection(IHubProxy tickerHubProxy)
        {
            return Observable.Create<IEnumerable<TickerDto>>(observer =>
            {
                // subscribe to trade feed first, otherwise there is a race condition 
                var spotTradeSubscription = tickerHubProxy.On<IEnumerable<TickerDto>>(
                    ServiceConstants.Client.SendTickers, observer.OnNext);

                var spotTradeSubscriptionRaceDisposable = 
                    tickerHubProxy.On<IEnumerable<TickerDto>>(
                    ServiceConstants.Client.SendTickers, 
                    (x) => 
                    {
                            Console.WriteLine("Got a new trade" + x.First().Name);
                    });



                log.Info("Sending ticker subscription...");
                var sendSubscriptionDisposable = SendSubscription(tickerHubProxy)
                    .Subscribe(
                        _ => log.InfoFormat("Subscribed to ticker."),
                        observer.OnError);

                var unsubscriptionDisposable = Disposable.Create(() =>
                {
                    // send unsubscription when the observable gets disposed
                    log.Info("Sending ticker unsubscription...");
                    SendUnsubscription(tickerHubProxy)
                        .Subscribe(
                            _ => log.InfoFormat("Unsubscribed from ticker."),
                            ex => log.WarnFormat(
                            	"An error occurred while unsubscribing from ticker: {0}", 
                            	ex.Message));
                });
                return new CompositeDisposable { 
                    spotTradeSubscription, unsubscriptionDisposable, 
                    sendSubscriptionDisposable,spotTradeSubscriptionRaceDisposable 
                };
            })
            .Publish()
            .RefCount();
        }

        private static IObservable<Unit> SendSubscription(IHubProxy tickerHubProxy)
        {
            return Observable.FromAsync(() => tickerHubProxy.Invoke(
                ServiceConstants.Server.SubscribeTickers));
        }

        private static IObservable<Unit> SendUnsubscription(IHubProxy tickerrHubProxy)
        {
            return Observable.FromAsync(() => tickerrHubProxy.Invoke(
                ServiceConstants.Server.UnsubscribeTickers));

        }
    }
}

这段代码需要注意的另一点是,它继承自 `ServiceClientBase`,因此具有我们上面讨论过的弹性连接功能。我们向 `ServiceClientBase` 类传入 `GetTradesForConnection(IHubProxy tickerHubProxy)` 方法,作为在最终(现在 благодаря `ServiceClientBase` 基类中的逻辑而具有弹性)`GetTickerStream` 流中使用的 `IObservable` 工厂。

从此,`GetTickerStream` 流可以自由供任何人使用。那么让我们看看谁在使用它,下面将讨论这一点。

TickerRepository

TickerRepository 是 Observable 链上的下一个环节。那么它长什么样呢?它实际上出奇的简单,但不要被它迷惑,这里有很多事情正在发生。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Hub;

namespace Client.Repositories
{
    class TickerRepository : ITickerRepository
    {
        private readonly ITickerHubClient tickerHubClient;
        private readonly ITickerFactory tickerFactory;

        public TickerRepository(ITickerHubClient tickerHubClient, ITickerFactory tickerFactory)
        {
            this.tickerHubClient = tickerHubClient;
            this.tickerFactory = tickerFactory;
        }

        public IObservable<IEnumerable<Ticker>> GetTickerStream()
        {
            return Observable.Defer(() => tickerHubClient.GetTickerStream())
                .Select(trades => trades.Select(tickerFactory.Create))
                .Catch(Observable.Return(new Ticker[0]))
                .Repeat()
                .Publish()
                .RefCount();
        }
    }
}

那么这里到底发生了什么?

  • 我们使用 `Observable.Defer`,这样我们实际上不会使用底层流,直到有人订阅了通过 `Observable.Defer` 创建的 `IObservable`。这是一种将热流变冷的方法。
  • 我们使用 select 将流数据从 `TickerDto` 转换为 `Ticker`
  • 我们使用 `Catch` 来捕获流中的任何 `Exception`(`OnError`)。在这种情况下,我们使用一个默认值。
  • 我们使用 `Repeat`。这一个**非常非常**重要。它允许我们重复整个流,包括重新连接到服务器端 Hub。正是这种机制允许客户端从服务器端 SignalR Hub 的丢失中恢复。这与弹性流逻辑是应用程序**最**重要的部分(至少在我看来是这样)。
  • 我们使用 `Publish` 来共享底层流
  • 我们使用 `RefCount` 以便在没有订阅者时自动进行处置

既然我们已经看到了这个仓库,我们只剩下一次跳转来继续 `TickerStream` 的 `IObservable` 之旅。我们接下来看看。

TickersViewModel

`TickersViewModel` 代表您在屏幕上看到的所有行情。正是这个视图模型利用了 `TickerRepository` 提供的惰性/可重复/弹性 `IObservable`。让我们看看代码,我认为它非常不言自明。

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;


namespace Client.ViewModels
{
    public class TickersViewModel : INPCBase
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IConcurrencyService concurrencyService;
        private bool stale = false;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));

        public TickersViewModel(IReactiveTrader reactiveTrader,
                                IConcurrencyService concurrencyService,
            TickerViewModelFactory tickerViewModelFactory)
        {
            Tickers = new ObservableCollection<TickerViewModel>();
            Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
            Tickers.Add(tickerViewModelFactory.Create("Google"));
            Tickers.Add(tickerViewModelFactory.Create("Apple"));
            Tickers.Add(tickerViewModelFactory.Create("Facebook"));
            Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
            Tickers.Add(tickerViewModelFactory.Create("Twitter"));
            this.tickerRepository = reactiveTrader.TickerRepository;
            this.concurrencyService = concurrencyService;
            LoadTrades();
        }


        public ObservableCollection<TickerViewModel> Tickers { get; private set; }

        private void LoadTrades()
        {
            tickerRepository.GetTickerStream()
                            .ObserveOn(concurrencyService.Dispatcher)
                            .SubscribeOn(concurrencyService.TaskPool)
                            .Subscribe(
                                AddTickers,
                                ex => log.Error("An error occurred within the trade stream", ex));
        }

        private void AddTickers(IEnumerable<Ticker> incomingTickers)
        {
            var allTickers = incomingTickers as IList<Ticker> ?? incomingTickers.ToList();
            if (!allTickers.Any())
            {
                // empty list of trades means we are disconnected
                stale = true;
            }
            else
            {
                if (stale)
                {
                    stale = false;
                }
            }

            foreach (var ticker in allTickers)
            {
                Tickers.Single(x => x.Name == ticker.Name)
                    .AcceptNewPrice(ticker.Price);
            }
        }
    }
}}

其中每个 `Ticker` 都由一个 `TickerViewModel` 表示,如下所示。可以看出,该类也使用了我们之前讨论的 `ConnectionStatusStream IObservable`。这用于更改 `TickerViewModel` 的视图以显示一个带有“DISCONNECTED”字样的红色框。我们稍后将讨论这一点。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class TickerViewModel : INPCBase
    {
        private decimal price;
        private bool isUp;
        private bool stale;
        private bool disconnected;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));


        public TickerViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService,
            string name)
        {

            this.Name = name;

            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                    OnStatusChange,
                    ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        public string Name { get; private set; }


        public void AcceptNewPrice(decimal newPrice)
        {
            IsUp = newPrice > price;
            Price = newPrice;
        }


        public decimal Price
        {
            get { return this.price; }
            private set
            {
                this.price = value;
                base.OnPropertyChanged("Price");
            }
        }

        public bool IsUp
        {
            get { return this.isUp; }
            private set
            {
                this.isUp = value;
                base.OnPropertyChanged("IsUp");
            }
        }

        public bool Stale
        {
            get { return this.stale; }
            set
            {
                this.stale = value;
                base.OnPropertyChanged("Stale");
            }
        } 
        
        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Uninitialized:
                case ConnectionStatus.Connecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Reconnected:
                case ConnectionStatus.Connected:
                    Disconnected = false;
                    break;
                case ConnectionStatus.ConnectionSlow:
                    Disconnected = false;
                    break;
                case ConnectionStatus.Reconnecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Closed:
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}

连接状态流

我最后想向您展示的是 `ConnectionStatusStream` 是如何使用的。当客户端 SignalR Hub 代理事件被触发时,这个流会 `OnNext`。所以我们看到了像 `Connecting, Connected, ConnectionSlow, Reconnecting, Reconnected, Closed, Uninitialized` 这样的事件。所有这些最初都是 SignalR Hub 代理上的事件,并通过许多 RX 工厂之一(本例中是 `IObservable.FromEvent`)转换为 `IObservable` 流。

无论如何,这是我们用于在应用程序底部状态栏中显示信息的总体 `ConnectivityStatusViewModel`。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class ConnectivityStatusViewModel : INPCBase
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
        private string server;
        private string status;
        private bool disconnected;

        public ConnectivityStatusViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService)
        {
            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                OnStatusChange,
                ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {
            Server = connectionInfo.Server;

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Uninitialized:
                case ConnectionStatus.Connecting:
                    Status = "Connecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Reconnected:
                case ConnectionStatus.Connected:
                    Status = "Connected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.ConnectionSlow:
                    Status = "Slow connection detected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.Reconnecting:
                    Status = "Reconnecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Closed:
                    Status = "Disconnected";
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

        public string Server
        {
            get { return this.server; }
            set
            {
                this.server = value;
                base.OnPropertyChanged("Server");
            }
        }

        public string Status
        {
            get { return this.status; }
            set
            {
                this.status = value;
                base.OnPropertyChanged("Status");
            }
        }

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }
    }

}

暂时就这些

好了,我希望我已经向您展示了 RX 实际上不仅仅是事件的 LINQ,它包含了许多用于处理并发、LINQ、事件、流、工厂、重试、错误处理、资源释放的类。

有些人可能会很高兴地知道这是一篇分两部分的文章,在下一部分中,我将与备受推崇的套接字库 ZeroMQ 的 .NET 移植作者合作。这个 .NET 移植名为 NetMQ,我们将与它的作者(好的,我也为那个库贡献了一点)一起向您展示如何使用 NetMQ 构建这类应用程序。所以在那之前,如果您喜欢这篇文章,请随意留下评论/投票,它们非常受欢迎。

© . All rights reserved.