SignalR + RX (流式数据演示应用程序 1/2)






4.95/5 (81投票s)
一个小型演示应用,展示如何使用 SignalR + RX 创建流发布者/弹性客户端
代码在哪里?
您可以从我的 GitHub 仓库获取代码,就在这里
https://github.com/sachabarber/SignalrRxDemo
引言
我在金融领域工作,目前在外汇 (FX) 领域,我们有许多涉及各种事物流式传输的需求,例如
- 汇率
- 交易
- 其他金融工具
过去,为了满足这些需求,我们会使用套接字并将我们自己的数据(通常是 JSON 或 XML)通过网络推送,然后在此之上设置一些相当复杂的监听器,每个监听器都可能需要对原始流数据有稍微不同的视图。
但事实是,软件开发世界的时间从未停止,新的事物层出不穷,几乎每隔一天就会出现一个新框架,以某种方式改进了之前的产品。
我特别认为很多人误解的一点是响应式扩展 RX,公平地说,它已经存在一段时间了。我认为很多人将 RX 视为事件的 LINQ,它确实有这方面。然而,事实是 RX 是一个用于创建流应用程序的绝佳框架,你越深入 RX,你越倾向于将一切都视为流,从事件到选择文件或 ViewModel 中的 `ICommand.Execute` 等。
RX 拥有许多工具,例如
- 2 个核心接口 `IObservable` / `IObserver`
- 许多类似 LINQ 的 `IObservable` 扩展方法
- 基于时间/窗口的操作(LINQ 并不真正具备这些)
- 并发调度器
- 流错误处理控制(Catch 等)
- 能够从许多其他事物创建 `IObservable`,例如
IEnumerable
任务
- 事件
- `IObservable` 工厂
我个人认为,如果人们花时间更多地学习 RX,他们可以从中获得巨大的收益。
微软还有一个很棒的库,它促进了基于推送的技术,即 SignalR。关于 SignalR 已经有很多文章,但你很少从桌面角度看到它。
大多数人可能会将 SignalR Hub 与网站联系起来,但你可以通过多种方式使用 SignalR,这也使其成为桌面(即非 Web)开发的好选择。这要归功于 SignalR OWIN 托管 API,因此你可以自托管 SignalR Hub。还有一个 .NET 客户端,我们可以用它与 Hub 进行通信。SignalR 在 .NET 客户端中也有很多事件,你猜我们可以用这些事件做什么?没错,我们可以从中创建 `IObservable`(s) 流,这为使用 RX 打开了大门。
本文将展示 RX 在流式 API 领域的一个优势,我们将使用 RX/SignalR
文章系列
- 使用 SignalR 和 RX 的流式 API(本文)
- 使用 NetMQ 和 RX 的流式 API
视频向您展示我们将要学习的内容
我认为了解这个演示的最佳方式是观看我上传到 YouTube 的视频:
https://www.youtube.com/watch?v=TDIpPvbw6ek
RX 小入门
您可以使用许多 RX 扩展方法
http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx
但也许开始学习 RX 的最佳之处在于理解您将反复使用的两个底层接口。它们是 `IObservable` 和 `IObserver`。我们将在下面讨论这两个接口。
IObservable
namespace System
{
// Summary:
// Defines a provider for push-based notification.
//
// Type parameters:
// T:
// The object that provides notification information.This type parameter is
// covariant. That is, you can use either the type you specified or any type
// that is more derived. For more information about covariance and contravariance,
// see Covariance and Contravariance in Generics.
public interface IObservable<out T>
{
// Summary:
// Notifies the provider that an observer is to receive notifications.
//
// Parameters:
// observer:
// The object that is to receive notifications.
//
// Returns:
// A reference to an interface that allows observers to stop receiving notifications
// before the provider has finished sending them.
IDisposable Subscribe(IObserver<T> observer);
}
}
定义了基于推送通知的提供者。
此类型参数是协变的。也就是说,您可以使用您指定的类型或任何更派生的类型。有关协变和逆变的更多信息,请参阅泛型中的协变和逆变。
- `Subscribe` 通知提供者观察者将接收通知。
延伸阅读:http://msdn.microsoft.com/en-us/library/dd990377(v=vs.110).aspx
IObserver
namespace System
{
// Summary:
// Provides a mechanism for receiving push-based notifications.
//
// Type parameters:
// T:
// The object that provides notification information.This type parameter is
// contravariant. That is, you can use either the type you specified or any
// type that is less derived. For more information about covariance and contravariance,
// see Covariance and Contravariance in Generics.
public interface IObserver<in T>
{
// Summary:
// Notifies the observer that the provider has finished sending push-based notifications.
void OnCompleted();
//
// Summary:
// Notifies the observer that the provider has experienced an error condition.
//
// Parameters:
// error:
// An object that provides additional information about the error.
void OnError(Exception error);
//
// Summary:
// Provides the observer with new data.
//
// Parameters:
// value:
// The current notification information.
void OnNext(T value);
}
}
`IObserver
`IObserver
`IObserver
- `OnNext` 方法,通常由提供者调用,为观察者提供新数据或状态信息。
- `OnError` 方法,通常由提供者调用,指示数据不可用、无法访问或损坏,或提供者遇到了其他错误情况。
- `OnCompleted` 方法,通常由提供者调用,指示它已完成向观察者发送通知。
延伸阅读:http://msdn.microsoft.com/en-us/library/dd783449(v=vs.110).aspx
Observable.Create
任何使用过 RX 的人都会发现他们可能希望创建自己的操作符。在 LINQ 中,这可以通过创建一个新方法(可能是扩展方法)来完成,该方法简单地返回一个新的 `IEnumerable
private IObservable<string> CreateObservable()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
`Observable.Create` 从订阅方法实现创建一个可观察序列。您通常会对现有 `IObservable
`Observable.Create` 具有以下方法签名
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe
)
此方法签名中有趣的部分是,您可以看到它返回一个 `Func` 委托,该委托接受一个 `IObserver
关于使用 `Observable.Create` 的最佳描述之一来自 Lee Campbell 的网站
http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html
站在巨人的肩膀上
我曾有幸参加了为期两天的 Rx 课程,由 Lee Campbell 授课,他是全球唯一一本 Rx 书籍的作者,Lee 在这里有一个专门的博客:http://www.introtorx.com/
Lee 现在在一家名为“Adaptive”的咨询公司工作,他们就是我参加的培训课程的组织者。他们还发布了一个真正很棒的演示应用程序,展示了如何创建一个完全连接的容错流式 API + 客户端应用程序。他们提供了几种不同版本的演示
- WPF
- Windows Store (WinRT)
- 使用 Monotouch 的 iOS
- HTML5(使用 TypeScript)
你可以在这里获取演示应用 https://github.com/AdaptiveConsulting/ReactiveTrader
问题是这个解决方案相当庞大,如果你是 Rx 的新手,或者只是想弄清楚如何创建一个可靠的流式 API,这个解决方案有点过于繁重,至少在我看来是这样。
公平地说,我认为 Adaptive 团队制作的演示应用更多地是作为 Adaptive 团队集体智慧的营销工具,人们会看到它然后说,嗯,这些人似乎很懂行,让我们请他们来。但是如果你只是一个普通的开发人员,试图进入那个领域,你可能会寻找一些更小的东西来开始,你知道,有点像婴儿学步。
我希望这个演示文章能满足需求,因为我经历过尝试理解 https://github.com/AdaptiveConsulting/ReactiveTrader 演示应用的 WPF 版本的痛苦,并且将在本文中为您呈现一个非常精简的版本。
精简的演示应用
本节概述了一个精简的演示应用程序,正如我所说,它主要基于 Adaptive 公司的出色工作及其 https://github.com/AdaptiveConsulting/ReactiveTrader 演示应用程序。我想要实现的目标非常简单:
- 一个单一的 SignalR Hub(Adaptive 示例有几个)将推送 `TickerDto` 对象。
- 客户端将订阅某个主题以接收来自 SignalR 服务器端 Hub 的推送通知,并通过 RX 将其推送出去,以便任何感兴趣的人都可以使用标准的 RX 操作符。
- 能够模拟服务器宕机并恢复正常。
- 连接的客户端能够知道服务器何时启动/关闭。
- 客户端和服务器之间的连接具有弹性,即当服务器重新启动时,客户端应该能够从中恢复。
Adaptive 演示程序实现了所有这些(以及更多),但我不是 Lee Campbell,也没有他对 RX 那么丰富的经验,尽管我不得不说我现在觉得自己正在掌握它。所以我才写了这个小型演示程序。
总体思路
这张图说明了一般想法
发布者
发布者由几部分组成,如下所述,需要注意的一点是,客户端和服务器都使用 SignalR,因此阅读 SignalR 自托管可能是一个好主意,请参阅此处
http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-server
IOC (控制反转)
这里使用了一些 IOC,AutoFac 是使用的容器。由于 IOC 不是本文的重点,我不会再浪费时间讨论它。如果您想了解有关 IOC/AutoFac 的更多信息,请阅读代码,查看 `BootStrapper` 类
MainWindow
关于这个窗口没什么好说的,除了它有以下控件
- 启动 SignalR:启动 SignalR Hub(用于模拟 Hub 的崩溃/重启)
- 停止 SignalR:停止 SignalR Hub(用于模拟 Hub 的崩溃/重启)
- 启动自动行情数据:将以固定间隔从 Hub 上下文推送随机的 `TickerDto` 对象
- 停止自动行情数据:将暂停从 Hub 发送 `TickerDto` 对象
- 发送一个行情:将从 Hub 上下文推送 1 个单独的随机 `TickerDto` 对象
MainWindowViewModel
`MainWindowViewModel` 实际上用于促进向 SignalR Hub 发送停止/创建命令,或告诉 `TickerHubPublisher` 执行操作。这是 `MainWindowViewModel` 的完整代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using Microsoft.Owin.BuilderProperties;
using Microsoft.Owin.Hosting;
using SignalRSelfHost.Hubs.Ticker;
namespace SignalRSelfHost
{
public class MainWindowViewModel : IMainWindowViewModel
{
private const string Address = "https://:5263";
private readonly ITickerHubPublisher tickerHubPublisher;
private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
private IDisposable signalr;
public MainWindowViewModel(ITickerHubPublisher tickerHubPublisher)
{
this.tickerHubPublisher = tickerHubPublisher;
AutoTickerStartCommand = new DelegateCommand(tickerHubPublisher.Start);
AutoTickerStopCommand = new DelegateCommand(tickerHubPublisher.Stop);
SendOneTickerCommand = new DelegateCommand(async () =>
{
await tickerHubPublisher.SendOneManualFakeTicker();
});
StartCommand = new DelegateCommand(StartServer);
StopCommand = new DelegateCommand(StopServer);
}
public ICommand AutoTickerStartCommand { get; set; }
public ICommand AutoTickerStopCommand { get; set; }
public ICommand SendOneTickerCommand { get; set; }
public ICommand StartCommand { get; private set; }
public ICommand StopCommand { get; private set; }
public void Start()
{
StartServer();
}
private void StartServer()
{
try
{
signalr = WebApp.Start(Address);
}
catch (Exception exception)
{
Log.Error("An error occurred while starting SignalR", exception);
}
}
private void StopServer()
{
if (signalr != null)
{
signalr.Dispose();
signalr = null;
}
}
}
}
如你所见,`MainWindowViewModel` 中并没有发生太多事情,所有的动作都发生在其他地方。
启动发布者
为了启动发布者,我们首先需要考虑的是如何托管 SignalR hub。这由应用程序范围的 `OwinStartupAttribute` 处理,您可以在演示应用程序的 `App` 类中找到它
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using Autofac;
using Microsoft.Owin;
using SignalRSelfHost.Hubs;
using SignalRSelfHost.Hubs.Ticker;
using SignalRSelfHost.IOC;
using log4net;
[assembly: OwinStartup(typeof(Startup))]
namespace SignalRSelfHost
{
public partial class App : Application
{
....
....
....
....
}
}
`OwinStartupAttribute` 指向一个类(本例中为 `Startup`),该类配置 Owin SignalR 自托管。我们来看看吧。
using Microsoft.AspNet.SignalR;
using Microsoft.Owin.Cors;
using Owin;
using SignalRSelfHost.IOC;
namespace SignalRSelfHost.Hubs
{
public class Startup
{
public void Configuration(IAppBuilder app)
{
// Branch the pipeline here for requests that start with "/signalr"
app.Map("/signalr", map =>
{
// Setup the CORS middleware to run before SignalR.
// By default this will allow all origins. You can
// configure the set of origins and/or http verbs by
// providing a cors options with a different policy.
map.UseCors(CorsOptions.AllowAll);
var hubConfiguration = new HubConfiguration
{
Resolver = new AutofacSignalRDependencyResolver(App.Container),
};
// Run the SignalR pipeline. We're not using MapSignalR
// since this branch already runs under the "/signalr"
// path.
map.RunSignalR(hubConfiguration);
});
}
}
}
所以 Hub 就是这样启动的。但 Hub 如何知道客户端,以及如何向它们推送?我们接下来会看到。
TickerHub
`TickerHub` 是应用程序中唯一的 Hub,它有两个主要方法。一个允许客户端订阅,一个用于移除客户端的订阅。这是 Hub 的代码,别担心,我们稍后会更详细地探讨这两个方法。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Hubs;
namespace SignalRSelfHost.Hubs.Ticker
{
[HubName(ServiceConstants.Server.TickerHub)]
public class TickerHub : Hub
{
private readonly ITickerRepository tickerRepository;
private readonly IContextHolder contextHolder;
public const string TickerGroupName = "AllTickers";
private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHub));
public TickerHub(ITickerRepository tickerRepository, IContextHolder contextHolder)
{
this.tickerRepository = tickerRepository;
this.contextHolder = contextHolder;
}
[HubMethodName(ServiceConstants.Server.SubscribeTickers)]
public async Task SubscribeTrades()
{
contextHolder.TickerHubClients = Clients;
var user = ContextUtil.GetUserName(Context);
Log.InfoFormat("Received trade subscription from user {0}", user);
// add client to the trade notification group
await Groups.Add(Context.ConnectionId, TickerGroupName);
Log.InfoFormat("Connection {0} of user {1} added to group '{2}'", Context.ConnectionId, user, TickerGroupName);
var tickers = tickerRepository.GetAllTickers();
await Clients.Caller.SendTickers(tickers);
Log.InfoFormat("Snapshot published to {0}", Context.ConnectionId);
}
[HubMethodName(ServiceConstants.Server.UnsubscribeTickers)]
public async Task UnsubscribeTrades()
{
Log.InfoFormat("Received unsubscription request for trades from connection {0}", Context.ConnectionId);
// remove client from the blotter group
await Groups.Remove(Context.ConnectionId, TickerGroupName);
Log.InfoFormat("Connection {0} removed from group '{1}'", Context.ConnectionId, TickerGroupName);
}
}
}
订阅交易
这个 `TickerHub` 方法通过 SignalR 客户端调用,将用于注册对某个主题的兴趣,SignalR 称之为“组”。本质上,当客户端启动时,它会使用自己的 SignalR 代理与服务器端自托管的 Hub 通信,并调用服务器端 Hub 的 `SubscribeTrades()` 方法。当这种情况发生时,客户端的连接将被添加到服务器端 Hub 的组中,并且客户端将收到一次性推送的所有当前持有的行情。这种所有当前行情的一次性推送是通过推送 `TickerRepository` 中当前存储的**所有** `TickerDto` 对象的全部内容来完成的,`TickerRepository` 是一个简单的内存队列仓库,用于存储最新生成的 `TickerDto`。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Common;
namespace SignalRSelfHost.Hubs.Ticker
{
public class TickerRepository : ITickerRepository
{
private readonly Queue<TickerDto> tickers = new Queue<TickerDto>();
private object syncLock = new object();
private const int MaxTrades = 50;
public TickerRepository()
{
tickers.Enqueue(new TickerDto() {Name="Yahoo", Price=1.2m});
tickers.Enqueue(new TickerDto() {Name="Google", Price=1022m});
tickers.Enqueue(new TickerDto() {Name="Apple", Price=523m});
tickers.Enqueue(new TickerDto() {Name="Facebook", Price=49m});
tickers.Enqueue(new TickerDto() {Name="Microsoft", Price=37m});
tickers.Enqueue(new TickerDto() {Name="Twitter", Price=120m});
}
public TickerDto GetNextTicker()
{
return tickers.Dequeue();
}
public void StoreTicker(TickerDto tickerInfo)
{
lock (syncLock)
{
tickers.Enqueue(tickerInfo);
if (tickers.Count > MaxTrades)
{
tickers.Dequeue();
}
}
}
public IList<TickerDto> GetAllTickers()
{
IList<TickerDto> newTickers;
lock (syncLock)
{
newTickers = tickers.ToList();
}
return newTickers;
}
}
}
任何后续行情被创建时(我们很快会看到这是如何发生的),客户端将实时收到推送。这种批量推送是一次性的,在客户端首次与服务器建立通信时发生。此方法开始时发生的另一件事是捕获上下文,以便来自 `TickerHub` 的外部类将能够访问 `TickerHub` 上下文及其关联的值。
contextHolder.TickerHubClients = Clients;
取消订阅交易
这个 `TickerHub` `UnsubscribeTrades()` 方法通过 SignalR 客户端调用,将用于取消注册客户端对某个主题的兴趣(SignalR 称之为“组”)。本质上,当客户端关闭时,它会使用自己的 SignalR 代理与服务器端自托管的 Hub 通信,并调用服务器端 Hub 的 `UnsubscribeTrades()` 方法。当这种情况发生时,客户端连接将从服务器端 Hub 的组中移除。
谁向客户端进行流式传输?
好的,我们已经了解了 SignalR 客户端如何注册/取消注册对某个主题的兴趣(至少从服务器端的角度来看),但客户端如何从 `TickerHub` 接收推送通知呢?谁来做这件事?嗯,这项工作落在了另一个组件上。即 `TickerHubPublisher`。在本演示应用程序中,`TickerHubPublisher` 的工作非常简单,就是生成一个随机的 `TickerDto` 并将其推送到任何对特定主题感兴趣的已连接客户端。 `TickerHubPublisher` 的全部内容如下所示。我认为代码非常不言自明。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Common;
using log4net;
using Microsoft.AspNet.SignalR;
namespace SignalRSelfHost.Hubs.Ticker
{
public class TickerHubPublisher : ITickerHubPublisher
{
private readonly IContextHolder contextHolder;
private readonly ITickerRepository tickerRepository;
Random rand = new Random();
private static readonly ILog Log = LogManager.GetLogger(typeof(TickerHubPublisher));
private CancellationTokenSource autoRunningCancellationToken;
private Task autoRunningTask;
public TickerHubPublisher(IContextHolder contextHolder,
ITickerRepository tickerRepository)
{
this.contextHolder = contextHolder;
this.tickerRepository = tickerRepository;
}
public void Stop()
{
if (autoRunningCancellationToken != null)
{
autoRunningCancellationToken.Cancel();
// Publisher is not thread safe, so while the auto ticker is running only the autoticker is
// allowed to access the publisher. Therefore before we can stop the publisher we have to
// wait for the autoticker task to complete
autoRunningTask.Wait();
autoRunningCancellationToken = null;
}
}
public async void Start()
{
autoRunningCancellationToken = new CancellationTokenSource();
autoRunningTask = Task.Run(async () =>
{
while (!autoRunningCancellationToken.IsCancellationRequested)
{
await SendOneManualFakeTicker();
await Task.Delay(20);
}
});
}
public async Task SendOneManualFakeTicker()
{
var currentTicker = tickerRepository.GetNextTicker();
var flipPoint = rand.Next(0, 100);
if (flipPoint > 50)
{
currentTicker.Price += currentTicker.Price/ 30;
}
else
{
currentTicker.Price -= currentTicker.Price / 30;
}
tickerRepository.StoreTicker(currentTicker);
await SendRandomTicker(currentTicker);
}
private Task SendRandomTicker(TickerDto tickerInfo)
{
if (contextHolder.TickerHubClients == null) return Task.FromResult(false);
Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}
}
}
这段代码中唯一可能需要解释的部分是 `SendRandomTicker(TickerDto tickerInfo)` 方法。
private Task SendRandomTicker(TickerDto tickerInfo)
{
if (contextHolder.TickerHubClients == null) return Task.FromResult(false);
Log.InfoFormat("Broadcast new trade to blotters: {0}", tickerInfo);
return contextHolder.TickerHubClients.Group(TickerHub.TickerGroupName).SendTickers(new[] { tickerInfo });
}
此方法利用了在 `TickerHub` 本身内部捕获的 `TickerHub` 上下文。通过捕获上下文,它允许来自 `TickerHub` 的外部类使用其属性/值。这正是 `TickerHubPublisher` 所做的。它只是使用捕获的 `TickerHub` 上下文将新值推送到针对特定主题订阅的客户端。在这种情况下,是 `TickerHub.TickerGroupName`(“AllTickers”)字符串的值。因此,这就是 SignalR 服务器能够向客户端推送通知的方式。
模拟发布者崩溃
这部分很简单,只需使用 MainWindow 中的“Stop SignalR”按钮即可。它将简单地运行以下代码,该代码会处置从 SignalR `WebApp.Start(..)` 方法返回的 SignalR `IDisposable`。
private void StopServer()
{
if (signalr != null)
{
tickerHubPublisher.Stop();
signalr.Dispose();
signalr = null;
}
}
应该发生的情况是,如果您有任何连接的客户端,它们(短时间后)应该会看到服务器不可用,并显示“DISCONNECTED”瓷砖。
重启发布者
这部分很简单,只需使用 MainWindow 中的“Start SignalR”按钮。这将简单地运行以下代码
private void StartServer()
{
try
{
signalr = WebApp.Start(Address);
tickerHubPublisher.Start();
}
catch (Exception exception)
{
Log.Error("An error occurred while starting SignalR", exception);
}
}
应该发生的情况是,如果您有任何连接的客户端,它们(短时间后)应该会再次看到服务器可用,并且不再显示“DISCONNECTED”磁贴。
SignalR .NET 客户端
客户端是一个标准的 WPF 应用程序(我使用 WPF 只是因为我熟悉它,它并不是那么重要,你也可以很好地使用带有 MVP 模式的 Winforms)。客户端还使用 SignalR 代理类,因此一个好的起点是检查 SignalR .NET 客户端 API,你可以在这里阅读更多内容
http://www.asp.net/signalr/overview/guide-to-the-api/hubs-api-guide-net-client
IOC (控制反转)
这里使用了一些 IOC,AutoFac 是使用的容器。由于 IOC 不是本文的重点,我不会再浪费时间讨论它。如果您想了解有关 IOC/AutoFac 的更多信息,请阅读代码,查看 `BootStrapper` 类
弹性连接
客户端的核心是一个名为 `ServiceClientBase` 的类。这个类是任何客户端 Hub 代理连接的基类。那么这个类到底提供了什么呢?
那么我们先来看看它的代码吧
using System;
using System.Reactive;
using System.Reactive.Linq;
using Client.Hub.Transport;
using Common.Extensions;
namespace Client.Hub
{
internal class ServiceClientBase
{
private readonly IConnectionProvider _connectionProvider;
protected ServiceClientBase(IConnectionProvider connectionProvider)
{
_connectionProvider = connectionProvider;
}
protected IObservable<T> GetResilientStream<T>(Func<IConnection, IObservable<T>> streamFactory,
TimeSpan connectionTimeout)
{
var activeConnections = (from connection in _connectionProvider.GetActiveConnection()
from status in connection.StatusStream
where status.ConnectionStatus == ConnectionStatus.Connected ||
status.ConnectionStatus == ConnectionStatus.Reconnected
select connection)
.Publish()
.RefCount();
// get the first connection
var firstConnection = activeConnections.Take(1).Timeout(connectionTimeout);
// 1 - notifies when the first connection gets disconnected
var firstDisconnection = from connection in firstConnection
from status in connection.StatusStream
where status.ConnectionStatus == ConnectionStatus.Reconnecting ||
status.ConnectionStatus == ConnectionStatus.Closed
select Unit.Default;
// 2- connection provider created a new connection it means the active one has droped
var subsequentConnection = activeConnections.Skip(1).Select(_ => Unit.Default).Take(1);
// OnError when we get 1 or 2
var disconnected = firstDisconnection.Merge(subsequentConnection)
.Select(_ => Notification.CreateOnError<T>(new Exception("Connection was closed.")))
.Dematerialize();
// create a stream which will OnError as soon as the connection drops
return (from connection in firstConnection
from t in streamFactory(connection)
select t)
.Merge(disconnected)
.Publish()
.RefCount();
}
}
}
这个类中发生了非常多的事情。正如我所说,**这个类是核心类**。那么它到底做了什么呢?
嗯,它做了几件事
- 首先,它提供了一个基于工厂的 `IObservable
`,该工厂将生成最终的 `IObservable `。这个工厂将是需要具有弹性的唯一源流 (`IObservable `)。 - 它使用连接状态流(内部利用 SignalR 事件(如已连接/已断开)来创建状态流)合并到整个流中。当看到“`Reconnection`”或“`Closed`”状态流结果(`OnNext` 值)时,我们认为这是一个断开连接,我们合成一个 `OnError` 通知(通过使用标准的 Rx `Dematerialize` 操作符),该通知与实际工厂流值合并,以给出整体流结果,该结果现在可能向其消费者产生 `OnNext` / `OnError`。
- 我们还使用了标准的 RX `Publish` 操作符,这样做是为了共享相同的底层 `IObservable
` 序列 - 我们还使用了标准的 RX `RefCount` 操作符,这样当没有订阅者时,序列将被处置
这个类使用了几个额外的实用类,我们接下来将讨论这些类。
ConnectionProvider
`ConnectionProvider` 类只有一个任务,就是返回一个 `IObservable
internal interface IConnection
{
IObservable<ConnectionInfo> StatusStream { get; }
IObservable<Unit> Initialize();
string Address { get; }
void SetAuthToken(string authToken);
IHubProxy TickerHubProxy { get; }
}
其中更敏锐的人会注意到 `Unit` 的使用,这是一种表示我们不关心结果是什么,只关心它存在的方式。
这是 `ConnectionProvider` 类的相关代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;
using Common.Extensions;
namespace Client.Hub.Transport
{
/// <summary>
/// Connection provider provides always the same connection until it fails then
/// create a new one a yield it Connection provider randomizes the list of server
/// specified in configuration and then round robin through the list
/// </summary>
internal class ConnectionProvider : IConnectionProvider, IDisposable
{
private readonly SingleAssignmentDisposable disposable = new SingleAssignmentDisposable();
private readonly string username;
private readonly IObservable<IConnection> connectionSequence;
private readonly string server;
private int _currentIndex;
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectionProvider));
public ConnectionProvider(string username, string server)
{
this.username = username;
this.server = server;
connectionSequence = CreateConnectionSequence();
}
public IObservable<IConnection> GetActiveConnection()
{
return connectionSequence;
}
public void Dispose()
{
disposable.Dispose();
}
private IObservable<IConnection> CreateConnectionSequence()
{
return Observable.Create<IConnection>(o =>
{
log.Info("Creating new connection...");
var connection = GetNextConnection();
var statusSubscription = connection.StatusStream.Subscribe(
_ => { },
ex => o.OnCompleted(),
() =>
{
log.Info("Status subscription completed");
o.OnCompleted();
});
var connectionSubscription =
connection.Initialize().Subscribe(
_ => o.OnNext(connection),
ex => o.OnCompleted(),
o.OnCompleted);
return new CompositeDisposable { statusSubscription, connectionSubscription };
})
.Repeat()
.Replay(1)
.LazilyConnect(disposable);
}
private IConnection GetNextConnection()
{
return new Client.Hub.Transport.Connection(server, username);
}
}
}
Connection
连接本身公开了一个 SignalR 代理和一个 `IObservable
这是这个类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Common;
using Common.Extensions;
using log4net;
using Microsoft.AspNet.SignalR.Client;
namespace Client.Hub.Transport
{
internal class Connection : IConnection
{
private readonly ISubject<ConnectionInfo> _statusStream;
private readonly HubConnection hubConnection;
private bool _initialized;
private static readonly ILog log = LogManager.GetLogger(typeof(Connection));
public Connection(string address, string username)
{
_statusStream = new BehaviorSubject<ConnectionInfo>(
new ConnectionInfo(ConnectionStatus.Uninitialized, address));
Address = address;
hubConnection = new HubConnection(address);
//hubConnection.Headers.Add(ServiceConstants.Server.UsernameHeader, username);
CreateStatus().Subscribe(
s => _statusStream.OnNext(new ConnectionInfo(s, address)),
_statusStream.OnError,
_statusStream.OnCompleted);
hubConnection.Error += exception => log.Error("There was a connection error with "
+ address, exception);
TickerHubProxy = hubConnection.CreateHubProxy(ServiceConstants.Server.TickerHub);
}
public IObservable<Unit> Initialize()
{
if (_initialized)
{
throw new InvalidOperationException("Connection has already been initialized");
}
_initialized = true;
return Observable.Create<Unit>(async observer =>
{
_statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, Address));
try
{
log.InfoFormat("Connecting to {0}", Address);
await hubConnection.Start();
_statusStream.OnNext(new ConnectionInfo(ConnectionStatus.Connected, Address));
observer.OnNext(Unit.Default);
}
catch (Exception e)
{
log.Error("An error occurred when starting SignalR connection", e);
observer.OnError(e);
}
return Disposable.Create(() =>
{
try
{
log.Info("Stoping connection...");
hubConnection.Stop();
log.Info("Connection stopped");
}
catch (Exception e)
{
// we must never throw in a disposable
log.Error("An error occurred while stoping connection", e);
}
});
})
.Publish()
.RefCount();
}
private IObservable<ConnectionStatus> CreateStatus()
{
var closed = Observable.FromEvent(h => hubConnection.Closed += h,
h => hubConnection.Closed -= h).Select(_ => ConnectionStatus.Closed);
var connectionSlow = Observable.FromEvent(h => hubConnection.ConnectionSlow += h,
h => hubConnection.ConnectionSlow -= h).Select(_ => ConnectionStatus.ConnectionSlow);
var reconnected = Observable.FromEvent(h => hubConnection.Reconnected += h,
h => hubConnection.Reconnected -= h).Select(_ => ConnectionStatus.Reconnected);
var reconnecting = Observable.FromEvent(h => hubConnection.Reconnecting += h,
h => hubConnection.Reconnecting -= h).Select(_ => ConnectionStatus.Reconnecting);
return Observable.Merge(closed, connectionSlow, reconnected, reconnecting)
.TakeUntilInclusive(status => status == ConnectionStatus.Closed);
// complete when the connection is closed (it's terminal, SignalR will not attempt to reconnect anymore)
}
public IObservable<ConnectionInfo> StatusStream
{
get { return _statusStream; }
}
public string Address { get; private set; }
public IHubProxy TickerHubProxy { get; private set; }
public void SetAuthToken(string authToken)
{
//hubConnection.Headers[AuthTokenProvider.AuthTokenKey] = authToken;
}
public override string ToString()
{
return string.Format("Address: {0}", Address);
}
}
}
SignalR 订阅/行情流
正如我们在发布者中看到的,我们知道特定服务器端 SignalR Hub 的客户端将调用两个方法,即 `SubscribeTickers` 和 `UnsubscribeTickers`。我们也知道有一个 `TickerDto` 的流式 API(至少对于这个只包含一个 Hub 的演示应用程序是这样)。其中以下条件成立:
- 客户端在连接时将调用服务器端 Hub 的 `SubscribeTickers`
- 客户端在处置时将调用服务器端 Hub 的 `UnsubscribeTickers`(这仅在处置时调用此方法来完成,请参见以下代码,我们结合 `Observable.Create(..)` 和 `CompositeDisposable` 的强大功能来处置一些东西,其中之一是使用 `Disposable.Create(..)` 创建的一个 `Action`,该 `Action` 将在处置时调用 `UnsubscribeTickers`)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Common;
using log4net;
using Microsoft.AspNet.SignalR.Client;
namespace Client.Hub
{
internal class TickerHubClient : ServiceClientBase, ITickerHubClient
{
private static readonly ILog log = LogManager.GetLogger(typeof(TickerHubClient));
public TickerHubClient(IConnectionProvider connectionProvider)
: base(connectionProvider)
{
}
public IObservable<IEnumerable<TickerDto>> GetTickerStream()
{
return GetResilientStream(connection => GetTradesForConnection(connection.TickerHubProxy),
TimeSpan.FromSeconds(5));
}
private IObservable<IEnumerable<TickerDto>> GetTradesForConnection(IHubProxy tickerHubProxy)
{
return Observable.Create<IEnumerable<TickerDto>>(observer =>
{
// subscribe to trade feed first, otherwise there is a race condition
var spotTradeSubscription = tickerHubProxy.On<IEnumerable<TickerDto>>(
ServiceConstants.Client.SendTickers, observer.OnNext);
var spotTradeSubscriptionRaceDisposable =
tickerHubProxy.On<IEnumerable<TickerDto>>(
ServiceConstants.Client.SendTickers,
(x) =>
{
Console.WriteLine("Got a new trade" + x.First().Name);
});
log.Info("Sending ticker subscription...");
var sendSubscriptionDisposable = SendSubscription(tickerHubProxy)
.Subscribe(
_ => log.InfoFormat("Subscribed to ticker."),
observer.OnError);
var unsubscriptionDisposable = Disposable.Create(() =>
{
// send unsubscription when the observable gets disposed
log.Info("Sending ticker unsubscription...");
SendUnsubscription(tickerHubProxy)
.Subscribe(
_ => log.InfoFormat("Unsubscribed from ticker."),
ex => log.WarnFormat(
"An error occurred while unsubscribing from ticker: {0}",
ex.Message));
});
return new CompositeDisposable {
spotTradeSubscription, unsubscriptionDisposable,
sendSubscriptionDisposable,spotTradeSubscriptionRaceDisposable
};
})
.Publish()
.RefCount();
}
private static IObservable<Unit> SendSubscription(IHubProxy tickerHubProxy)
{
return Observable.FromAsync(() => tickerHubProxy.Invoke(
ServiceConstants.Server.SubscribeTickers));
}
private static IObservable<Unit> SendUnsubscription(IHubProxy tickerrHubProxy)
{
return Observable.FromAsync(() => tickerrHubProxy.Invoke(
ServiceConstants.Server.UnsubscribeTickers));
}
}
}
这段代码需要注意的另一点是,它继承自 `ServiceClientBase`,因此具有我们上面讨论过的弹性连接功能。我们向 `ServiceClientBase` 类传入 `GetTradesForConnection(IHubProxy tickerHubProxy)` 方法,作为在最终(现在 благодаря `ServiceClientBase` 基类中的逻辑而具有弹性)`GetTickerStream` 流中使用的 `IObservable
从此,`GetTickerStream` 流可以自由供任何人使用。那么让我们看看谁在使用它,下面将讨论这一点。
TickerRepository
TickerRepository 是 Observable 链上的下一个环节。那么它长什么样呢?它实际上出奇的简单,但不要被它迷惑,这里有很多事情正在发生。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Hub;
namespace Client.Repositories
{
class TickerRepository : ITickerRepository
{
private readonly ITickerHubClient tickerHubClient;
private readonly ITickerFactory tickerFactory;
public TickerRepository(ITickerHubClient tickerHubClient, ITickerFactory tickerFactory)
{
this.tickerHubClient = tickerHubClient;
this.tickerFactory = tickerFactory;
}
public IObservable<IEnumerable<Ticker>> GetTickerStream()
{
return Observable.Defer(() => tickerHubClient.GetTickerStream())
.Select(trades => trades.Select(tickerFactory.Create))
.Catch(Observable.Return(new Ticker[0]))
.Repeat()
.Publish()
.RefCount();
}
}
}
那么这里到底发生了什么?
- 我们使用 `Observable.Defer`,这样我们实际上不会使用底层流,直到有人订阅了通过 `Observable.Defer` 创建的 `IObservable`。这是一种将热流变冷的方法。
- 我们使用 select 将流数据从 `TickerDto` 转换为 `Ticker`
- 我们使用 `Catch` 来捕获流中的任何 `Exception`(`OnError`)。在这种情况下,我们使用一个默认值。
- 我们使用 `Repeat`。这一个**非常非常**重要。它允许我们重复整个流,包括重新连接到服务器端 Hub。正是这种机制允许客户端从服务器端 SignalR Hub 的丢失中恢复。这与弹性流逻辑是应用程序**最**重要的部分(至少在我看来是这样)。
- 我们使用 `Publish` 来共享底层流
- 我们使用 `RefCount` 以便在没有订阅者时自动进行处置
既然我们已经看到了这个仓库,我们只剩下一次跳转来继续 `TickerStream` 的 `IObservable` 之旅。我们接下来看看。
TickersViewModel
`TickersViewModel` 代表您在屏幕上看到的所有行情。正是这个视图模型利用了 `TickerRepository` 提供的惰性/可重复/弹性 `IObservable`。让我们看看代码,我认为它非常不言自明。
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;
namespace Client.ViewModels
{
public class TickersViewModel : INPCBase
{
private readonly ITickerRepository tickerRepository;
private readonly IConcurrencyService concurrencyService;
private bool stale = false;
private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));
public TickersViewModel(IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
TickerViewModelFactory tickerViewModelFactory)
{
Tickers = new ObservableCollection<TickerViewModel>();
Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
Tickers.Add(tickerViewModelFactory.Create("Google"));
Tickers.Add(tickerViewModelFactory.Create("Apple"));
Tickers.Add(tickerViewModelFactory.Create("Facebook"));
Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
Tickers.Add(tickerViewModelFactory.Create("Twitter"));
this.tickerRepository = reactiveTrader.TickerRepository;
this.concurrencyService = concurrencyService;
LoadTrades();
}
public ObservableCollection<TickerViewModel> Tickers { get; private set; }
private void LoadTrades()
{
tickerRepository.GetTickerStream()
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
AddTickers,
ex => log.Error("An error occurred within the trade stream", ex));
}
private void AddTickers(IEnumerable<Ticker> incomingTickers)
{
var allTickers = incomingTickers as IList<Ticker> ?? incomingTickers.ToList();
if (!allTickers.Any())
{
// empty list of trades means we are disconnected
stale = true;
}
else
{
if (stale)
{
stale = false;
}
}
foreach (var ticker in allTickers)
{
Tickers.Single(x => x.Name == ticker.Name)
.AcceptNewPrice(ticker.Price);
}
}
}
}}
其中每个 `Ticker` 都由一个 `TickerViewModel` 表示,如下所示。可以看出,该类也使用了我们之前讨论的 `ConnectionStatusStream IObservable`。这用于更改 `TickerViewModel` 的视图以显示一个带有“DISCONNECTED”字样的红色框。我们稍后将讨论这一点。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class TickerViewModel : INPCBase
{
private decimal price;
private bool isUp;
private bool stale;
private bool disconnected;
private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));
public TickerViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
string name)
{
this.Name = name;
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
public string Name { get; private set; }
public void AcceptNewPrice(decimal newPrice)
{
IsUp = newPrice > price;
Price = newPrice;
}
public decimal Price
{
get { return this.price; }
private set
{
this.price = value;
base.OnPropertyChanged("Price");
}
}
public bool IsUp
{
get { return this.isUp; }
private set
{
this.isUp = value;
base.OnPropertyChanged("IsUp");
}
}
public bool Stale
{
get { return this.stale; }
set
{
this.stale = value;
base.OnPropertyChanged("Stale");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Uninitialized:
case ConnectionStatus.Connecting:
Disconnected = true;
break;
case ConnectionStatus.Reconnected:
case ConnectionStatus.Connected:
Disconnected = false;
break;
case ConnectionStatus.ConnectionSlow:
Disconnected = false;
break;
case ConnectionStatus.Reconnecting:
Disconnected = true;
break;
case ConnectionStatus.Closed:
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
连接状态流
我最后想向您展示的是 `ConnectionStatusStream` 是如何使用的。当客户端 SignalR Hub 代理事件被触发时,这个流会 `OnNext`。所以我们看到了像 `Connecting, Connected, ConnectionSlow, Reconnecting, Reconnected, Closed, Uninitialized` 这样的事件。所有这些最初都是 SignalR Hub 代理上的事件,并通过许多 RX 工厂之一(本例中是 `IObservable.FromEvent`)转换为 `IObservable` 流。
无论如何,这是我们用于在应用程序底部状态栏中显示信息的总体 `ConnectivityStatusViewModel`。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Hub.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class ConnectivityStatusViewModel : INPCBase
{
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
private string server;
private string status;
private bool disconnected;
public ConnectivityStatusViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService)
{
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
Server = connectionInfo.Server;
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Uninitialized:
case ConnectionStatus.Connecting:
Status = "Connecting...";
Disconnected = true;
break;
case ConnectionStatus.Reconnected:
case ConnectionStatus.Connected:
Status = "Connected";
Disconnected = false;
break;
case ConnectionStatus.ConnectionSlow:
Status = "Slow connection detected";
Disconnected = false;
break;
case ConnectionStatus.Reconnecting:
Status = "Reconnecting...";
Disconnected = true;
break;
case ConnectionStatus.Closed:
Status = "Disconnected";
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
public string Server
{
get { return this.server; }
set
{
this.server = value;
base.OnPropertyChanged("Server");
}
}
public string Status
{
get { return this.status; }
set
{
this.status = value;
base.OnPropertyChanged("Status");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
}
}
暂时就这些
好了,我希望我已经向您展示了 RX 实际上不仅仅是事件的 LINQ,它包含了许多用于处理并发、LINQ、事件、流、工厂、重试、错误处理、资源释放的类。
有些人可能会很高兴地知道这是一篇分两部分的文章,在下一部分中,我将与备受推崇的套接字库 ZeroMQ 的 .NET 移植作者合作。这个 .NET 移植名为 NetMQ,我们将与它的作者(好的,我也为那个库贡献了一点)一起向您展示如何使用 NetMQ 构建这类应用程序。所以在那之前,如果您喜欢这篇文章,请随意留下评论/投票,它们非常受欢迎。