NetMQ+ RX (流式数据演示应用程序 2/2)






4.98/5 (21投票s)
一个小型演示应用,展示如何使用NetMQ + RX创建流式发布者/弹性客户端
代码在哪里?
您可以从我的github仓库获取代码
https://github.com/sachabarber/NetMQRxDemo
引言
这是本系列文章的第二部分。如果您错过了第一部分,以下是上一篇文章引言部分的主要内容。
我在金融行业工作,目前在外汇(FX)领域,我们有很多关于各种事物流式传输的需求,例如
- 汇率
- 交易
- 其他金融工具
过去,为了满足这些需求,我们通常会使用套接字并将我们自己的数据(通常是JSON或XML)推送到网络上,然后在其上设置一些相当复杂的监听器,每个监听器都可能需要对原始流数据进行稍微不同的视图。
问题是,软件开发领域的时间从不停止,新事物层出不穷,几乎每隔一天就有一个新的框架出现,以某种方式改进了之前的技术。
有一个特别之处,我认为很多人实际上误解了它,那就是响应式扩展RX,公平地说,它已经存在一段时间了。我认为很多人将RX视为LINQ to events,它确实有这个功能。然而,事实是RX是一个用于创建流应用程序的卓越框架,你越深入RX,就越倾向于将所有事物都视为一个流,从事件到选择文件,甚至到ViewModel中的ICommand.Execute
。
RX在其功能库中拥有众多工具,例如
- 2个核心接口
IObservable
/IObserver
- 许多类似于LINQ的
IObservable
扩展方法 - 基于时间/窗口的操作(LINQ没有这些)
- 并发调度器
- 流错误处理控制(Catch等等)
- 能够从许多其他事物创建
IObservable
,例如IEnumerable
任务
- 事件
IObservable
工厂
我个人觉得,如果很多人花时间多学习一点RX,他们可以从中获得巨大的收益。
上次我们讨论了SignalR与RX的配合。问题是,有时人们会出于这样或那样的原因想要使用套接字。我喜欢套接字,但我不喜欢使用标准.NET套接字类型时需要编写的大量样板代码。幸运的是,你不必这么做,因为有一个非常棒的项目叫做ZeroMQ,它有一个通过NetMQ实现的.NET端口,我非常高兴地说我对此做出了非常小的(微不足道的)贡献。我编写了NetMQ的Actor部分。当我第一次开始分析SignalR演示(如果你读过第一篇文章,你会知道它是基于Lee Campbell(Intro To RX作者和全能顶尖人物,以及他在Adaptive的同事)的伟大工作),我联系了NetMQ的作者,并询问他是否愿意帮助我将该代码移植到NetMQ,他欣然同意了,你可以在这里看到他是我的合著者,谢谢Doron,你真棒。
因此,在本文中,Doron和我概述了您需要做些什么来使用NetMQ和RX创建一个类似的应用程序。
文章系列
- 使用SignalR和RX的流API
- 使用NetMQ和RX的流API(本文)
视频展示我们将学到什么
我认为了解这个演示的最佳方式是观看视频,我已经上传到YouTube,链接在此:
https://www.youtube.com/watch?v=uTqCA1cN16k
RX入门
我在第一篇文章中已经介绍过这一点,但万一有些人错过了,我认为再次强调这些重要内容是很重要的,所以我们开始吧。
您可以利用许多RX扩展方法
http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable_methods(v=vs.103).aspx
但也许开始使用RX的最佳方式是理解您将反复使用的两个底层接口。它们是IObservable
和IObserver
。我们接下来讨论这两个。
IObservable
namespace System
{
// Summary:
// Defines a provider for push-based notification.
//
// Type parameters:
// T:
// The object that provides notification information.This type parameter is
// covariant. That is, you can use either the type you specified or any type
// that is more derived. For more information about covariance and contravariance,
// see Covariance and Contravariance in Generics.
public interface IObservable<out T>
{
// Summary:
// Notifies the provider that an observer is to receive notifications.
//
// Parameters:
// observer:
// The object that is to receive notifications.
//
// Returns:
// A reference to an interface that allows observers to stop receiving notifications
// before the provider has finished sending them.
IDisposable Subscribe(IObserver<T> observer);
}
}
定义一个基于推送通知的提供者。
此类型参数是协变的。也就是说,您可以使用您指定的类型或任何更派生的类型。有关协变和逆变的更多信息,请参阅泛型中的协变和逆变。
Subscribe
通知提供者观察者将接收通知。
进一步阅读: http://msdn.microsoft.com/en-us/library/dd990377%28v=vs.110%29.aspx
IObserver
namespace System
{
// Summary:
// Provides a mechanism for receiving push-based notifications.
//
// Type parameters:
// T:
// The object that provides notification information.This type parameter is
// contravariant. That is, you can use either the type you specified or any
// type that is less derived. For more information about covariance and contravariance,
// see Covariance and Contravariance in Generics.
public interface IObserver<in T>
{
// Summary:
// Notifies the observer that the provider has finished sending push-based notifications.
void OnCompleted();
//
// Summary:
// Notifies the observer that the provider has experienced an error condition.
//
// Parameters:
// error:
// An object that provides additional information about the error.
void OnError(Exception error);
//
// Summary:
// Provides the observer with new data.
//
// Parameters:
// value:
// The current notification information.
void OnNext(T value);
}
}
IObserver<T>
和IObservable<T>
接口提供了一种通用的基于推送通知的机制,也称为观察者设计模式。IObservable<T>
接口表示发送通知的类(提供者);IObserver<T>
接口表示接收通知的类(观察者)。T
表示提供通知信息的类。
IObserver<T>
实现通过将自身的一个实例传递给提供者的IObservable<T>.Subscribe
方法来安排从提供者(IObservable<T>
实现)接收通知。此方法返回一个IDisposable
对象,可在提供者完成发送通知之前取消订阅观察者。IObserver<T>
接口定义了观察者必须实现的以下三个方法
OnNext
方法,通常由提供者调用,向观察者提供新数据或状态信息。OnError
方法,通常由提供者调用,表示数据不可用、无法访问或损坏,或提供者遇到了其他错误情况。OnCompleted
方法,通常由提供者调用,表示已完成向观察者发送通知。
进一步阅读: http://msdn.microsoft.com/en-us/library/dd783449%28v=vs.110%29.aspx
Observable.Create
任何使用过RX的人都会发现他们可能想创建自己的操作符。在LINQ中,这可以通过创建一个新方法(可能是一个扩展方法)来完成,该方法简单地返回一个新的IEnumerable<T>
。在RX中,事情稍微复杂一些,但并没有复杂太多。在RX中,您可以使用Observable.Create
来创建自己的操作符,或者您也可以使用Observable.Create
作为创建新的Observable<T>
的工厂。这里有一个简单的例子:
private IObservable<string> CreateObservable()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
Observable.Create
从订阅方法实现创建可观察序列。您通常会使用对现有IObservable<T>
的扩展方法,然后将Observable.Create(..)
作为扩展方法的主体。
Observable.Create
的方法签名如下:
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe
)
这个方法签名中有趣的部分是,你可以看到它返回一个Func
委托,该委托接受一个IObserver<TSource>
,允许你使用其OnNext/OnError
和OnCompleted
方法将值推送到观察者。Func
委托返回一个IDisposable
,它将在销毁发生时被调用。现在RX提供了许多不同的IDisposable
类型,如果你在Observable.Create
的实现中看到一些相当奇特的IDisposable
类型,请不要感到惊讶。
关于使用Observable.Create
的最佳描述之一来自Lee Campbell的网站
http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html
总体思路
此图说明了大致思路
发布者
发布者由几部分组成,下面将进行描述。需要注意的是,客户端和服务器都使用了NetMQ,因此在开始之前阅读一些关于NetMQ的资料可能会有所帮助。Somdoron(NetMQ作者)在他的博客上有一些关于NetMQ的帖子,还有一些零散的帖子,我自己也写了一些,如下所示。在撰写本文的过程中,Somdoron问我是否想更深入地参与NetMQ,我愿意,所以圣诞节之后,我同意更多地帮助NetMQ,第一步将是妥善地编写文档,这正是它(公平地说)需要更多推动的一个领域。
- http://somdoron.com/category/netmq/
- https://github.com/zeromq/netmq
- #1:你好,世界
- #2:套接字类型
- #3:套接字选项/身份和SendMore
- #4:多套接字轮询
- #5:从多个套接字发送
- #6:分而治之
- #7:简单Actor模型
IOC (控制反转)
使用了一些IOC,Autofac是使用的容器。由于IOC不是本文的重点,我不会在这上面浪费更多时间。如果您想了解更多关于IOC/Autofac的信息,请阅读代码,查看BootStrapper
类。
MainWindow
关于这个窗口没什么好说的,除了它有以下控件
- 启动NetMQ:这会启动NetMQ
NetMQPublisher Actor PublisherSocket
(用于模拟NetMQ服务器崩溃/重启) - 停止NetMQ:这会处置NetMQ
NetMQPublisher Actor
(用于模拟NetMQ服务器崩溃/重启) - 启动自动行情数据:将以固定间隔从NetMQ
NetMQPublisher Actor
推出随机的TickerDto
对象 - 停止自动行情数据:将暂停从NetMQ
NetMQPublisher Actor
发送TickerDto
对象 - 发送一个行情:将从NetMQ
NetMQPublisher Actor
推出一个**单个**随机的TickerDto
对象
如果“Actor”这个术语对你来说还没有意义,请不要太担心,我们很快就会讲到它。
MainWindowViewModel
MainWindowViewModel
主要用于促进向NetMQ NetMQPublisher Actor
发送停止/创建命令,或者指示NetMQ NetMQPublisher Actor
执行操作。这是MainWindowViewModel
的完整代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using NetMQServer.Ticker;
namespace NetMQServer
{
public class MainWindowViewModel
{
private readonly ITickerPublisher tickerPublisher;
private readonly ITickerRepository tickerRepository;
private Random rand;
private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
private CancellationTokenSource autoRunningCancellationToken;
private Task autoRunningTask;
private bool serverStarted;
private bool autoTickerStarted;
public MainWindowViewModel(ITickerPublisher tickerPublisher, ITickerRepository tickerRepository)
{
this.tickerPublisher = tickerPublisher;
this.tickerRepository = tickerRepository;
this.rand = new Random();
serverStarted = false;
autoTickerStarted = false;
AutoTickerStartCommand = new DelegateCommand(
AutoRunning,
() => serverStarted && !autoTickerStarted);
AutoTickerStopCommand = new DelegateCommand(
() =>
{
if (autoRunningCancellationToken != null)
{
autoRunningCancellationToken.Cancel();
autoRunningTask.Wait();
autoTickerStarted = false;
}
},
() => serverStarted && autoTickerStarted);
SendOneTickerCommand = new DelegateCommand(
SendOneManualFakeTicker,
() => serverStarted && !autoTickerStarted);
StartCommand = new DelegateCommand(
StartServer,
() => !serverStarted);
StopCommand = new DelegateCommand(
StopServer,
() => serverStarted);
}
public DelegateCommand AutoTickerStartCommand { get; set; }
public DelegateCommand AutoTickerStopCommand { get; set; }
public DelegateCommand SendOneTickerCommand { get; set; }
public DelegateCommand StartCommand { get; set; }
public DelegateCommand StopCommand { get; set; }
public void Start()
{
StartServer();
}
private void AutoRunning()
{
autoTickerStarted = true;
autoRunningCancellationToken = new CancellationTokenSource();
autoRunningTask = Task.Run(async () =>
{
//Publisher is not thread safe, so while the auto ticker is
//running only the autoticker is allowed to access the publisher
while (!autoRunningCancellationToken.IsCancellationRequested)
{
SendOneManualFakeTicker();
await Task.Delay(20);
}
});
}
private void SendOneManualFakeTicker()
{
var currentTicker = tickerRepository.GetNextTicker();
var flipPoint = rand.Next(0, 100);
if (flipPoint > 50)
{
currentTicker.Price += currentTicker.Price / 30;
}
else
{
currentTicker.Price -= currentTicker.Price / 30;
}
tickerRepository.StoreTicker(currentTicker);
tickerPublisher.PublishTrade(currentTicker);
}
private void StartServer()
{
serverStarted = true;
tickerPublisher.Start();
AutoRunning();
}
private void StopServer()
{
if (autoTickerStarted)
{
autoRunningCancellationToken.Cancel();
// Publisher is not thread safe, so while the auto ticker is
// running only the autoticker is allowed to access the publisher.
//Therefore before we can stop the publisher we have to
// wait for the autoticker task to complete
autoRunningTask.Wait();
autoTickerStarted = false;
autoRunningCancellationToken = null;
autoRunningTask = null;
}
tickerPublisher.Stop();
serverStarted = false;
}
}
}
正如你所看到的,MainWindowViewModel
中并没有太多实际操作,所有的动作都在其他地方发生。
NetMQ Actors
在我们深入了解NetMQPublisher
如何工作之前,值得注意一件事。那就是服务器和客户端都使用了NetMQ的Actor
模型,我很高兴看到它被使用,因为那部分是我写的,也是那部分把我介绍给了Somdoron。
您基本上可以将Actor
视为一个可以向其发送消息(命令)的套接字。在内部,Actor
使用一个特殊的PairSocket
,当您向Actor
发送消息(命令)时,您正在写入该对套接字的一半,而在内部,Actor
正在从PairSocket
的另一端接收消息,它将根据从PairSocket
接收到的命令执行一些工作。
你真正要做的是制定一个简单的协议,用于命令源和Actor
之间,并坚持该协议。该协议可能包括告诉Actor
执行操作的命令,或者是一个信号Actor
停止执行任何操作的命令(NetMQ为此类常见需求提供了一个特殊命令ActorKnownMessages.END_PIPE
)。
现在你可能会想,我们为什么要使用这个Actor
模型呢?很简单,使用Actor
模型可以确保永远不会出现锁的争用,因为根本没有任何共享数据,数据是通过套接字发送的,因此Actor
保证拥有自己的数据副本,因此不需要锁定,也就没有等待锁,这有助于保持一切快速和线程安全。
你可以在这里阅读我对此的深入博客文章:#7 : 简单Actor模型
启动发布者
正如我们上面所看到的,MainWindowViewModel
包含启动/停止NetMQPublisher
的命令。但它们做了什么,如何启动/停止NetMQPublisher
?
对于“启动”操作,实际发生的事情非常简单,在内部,NetMQPublisher
将创建一个新的Actor
,然后该Actor
就可以接收命令了。
public void Start()
{
actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}
谁向客户端流式传输数据?
这一切都由我们刚才讨论过的NetMQPublisher
完成。这是NetMQPublisher
的代码:
NetMQPublisher
实际上做了几件事
- 响应客户端发起的请求(
RequestSocket
),发送一次性快照(ResponseSocket
)。 - 向所有已订阅(
SubscriberSocket
)“交易”的连接客户端发布(PublisherSocket
)“交易”。 - 向所有已订阅(
SubscriberSocket
)“心跳”的连接客户端发布(PublisherSocket
)心跳“HB”。
using System;
using System.Collections.Generic;using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Navigation;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace NetMQServer.Ticker
{
public class NetMQPublisher : ITickerPublisher
{
private const string PublishTicker = "P";
public class ShimHandler : IShimHandler<object>
{
private readonly NetMQContext context;
private PublisherSocket publisherSocket;
private ResponseSocket snapshotSocket;
private ITickerRepository tickerRepository;
private Poller poller;
private NetMQTimer heartbeatTimer;
public ShimHandler(NetMQContext context, ITickerRepository tickerRepository)
{
this.context = context;
this.tickerRepository = tickerRepository;
}
public void Initialise(object state)
{
}
public void RunPipeline(PairSocket shim)
{
publisherSocket = context.CreatePublisherSocket();
publisherSocket.Bind("tcp://*:" + StreamingProtocol.Port);
snapshotSocket = context.CreateResponseSocket();
snapshotSocket.Bind("tcp://*:" + SnapshotProtocol.Port);
snapshotSocket.ReceiveReady += OnSnapshotReady;
shim.ReceiveReady += OnShimReady;
heartbeatTimer = new NetMQTimer(StreamingProtocol.HeartbeatInterval);
heartbeatTimer.Elapsed += OnHeartbeatTimerElapsed;
shim.SignalOK();
poller = new Poller();
poller.AddSocket(shim);
poller.AddSocket(snapshotSocket);
poller.AddTimer(heartbeatTimer);
poller.Start();
publisherSocket.Dispose();
snapshotSocket.Dispose();
}
private void OnHeartbeatTimerElapsed(object sender, NetMQTimerEventArgs e)
{
publisherSocket.Send(StreamingProtocol.HeartbeatTopic);
}
private void OnSnapshotReady(object sender, NetMQSocketEventArgs e)
{
string command = snapshotSocket.ReceiveString();
// Currently we only have one type of events
if (command == SnapshotProtocol.GetTradessCommand)
{
var tickers = tickerRepository.GetAllTickers();
// we will send all the tickers in one message
foreach (var ticker in tickers)
{
snapshotSocket.SendMore(JsonConvert.SerializeObject(ticker));
}
snapshotSocket.Send(SnapshotProtocol.EndOfTickers);
}
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
switch (command)
{
case ActorKnownMessages.END_PIPE:
poller.Stop(false);
break;
case PublishTicker:
string topic = e.Socket.ReceiveString();
string json = e.Socket.ReceiveString();
publisherSocket.
SendMore(topic).
Send(json);
break;
}
}
}
private Actor<object> actor;
private readonly NetMQContext context;
private readonly ITickerRepository tickerRepository;
public NetMQPublisher(NetMQContext context, ITickerRepository tickerRepository)
{
this.context = context;
this.tickerRepository = tickerRepository;
}
public void Start()
{
if (actor != null)
return;
actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}
public void Stop()
{
if (actor != null)
{
actor.Dispose();
actor = null;
}
}
public void PublishTrade(TickerDto ticker)
{
if (actor == null)
return;
actor.
SendMore(PublishTicker).
SendMore(StreamingProtocol.TradesTopic).
Send(JsonConvert.SerializeObject(ticker));
}
}
}
}
在NetMQPublisher
的Actor
管道内部,会创建几个额外的套接字。
这些**不是**Actor
内部使用的套接字,那些是您不会真正看到的专用PairSocket
对,它们是NetMQ代码库的一部分。我们这里讨论的套接字是用于应用程序逻辑的套接字,在此演示应用程序示例中,它们如下:
- 一个
PublisherSocket
:此套接字用于向客户端发布。 NetMQ的工作方式是使用消息帧,其中第一个帧可以作为主题,下一个消息帧可以是实际的负载。这样,客户端(SubscriberSocket
s)就能够判断消息是否是它们感兴趣的,然后再处理消息负载。这一个PublisherSocket
应该足以服务许多不同的主题,您只需向发布者提供以下内容:- 消息主题
- 消息有效负载
NetMQPublisher
发送特定消息的示例如下:public void PublishTrade(TickerDto ticker) { actor. SendMore(PublishTicker). SendMore(StreamingProtocol.TradesTopic). Send(JsonConvert.SerializeObject(ticker)); }
演示应用程序中实际使用了两个主题,如下所示:-
TradesTopic ("Trades")
:这使用NetMQPublisher
持有的单个PublisherSocket
向连接的客户端流式传输TickerDto
对象。在客户端,它们使用一个SubscriberSocket
,其主题设置为“Trades
”,这样它们将只接收来自NetMQPublisher PublisherSocket
的与“Trades
”主题匹配的消息。 HeartbeatTopic ("HB")
:这也使用单个NetMQPublisher
持有的PublisherSocket
向所有连接的客户端流式传输包含主题名称“HB
”的单个消息帧(消息内容不重要,仅主题名称,以便客户端可以看到新的“HB
”主题消息)。在客户端,使用一个SubscriberSocket
,其主题设置为“HB
”,这样它们将只接收来自NetMQPublisher PublisherSocket
的与“HB
”主题匹配的消息。因此,发生的情况是服务器将发起消息发布,客户端包含心跳主题的订阅者,在客户端的订阅者代码中,这种安排的工作方式如下:- 如果服务器(
NetMQPublisher
)及时响应,则客户端和服务器之间的通信被认为是正常的。 - 如果服务器(
NetMQPublisher
)**没有**及时响应,则客户端和服务器之间的通信被认为是中断/断开/不良/错误/损坏/虚假。
- 如果服务器(
- 一个
ResponseSocket
:此套接字用于在客户端和服务器之间发送存储在内存发布者进程持有的TickerRespository
(实际上只是过去X个交易的队列)中的所有交易快照。客户端将包含一个RequestSocket
,而服务器(NetMQPublisher
)包含ResponseSocket
。因此,发生的情况是客户端将发起消息发送(消息内容本身不重要),并期望收到响应,该响应将是从服务器端TickerRespository
序列化为JSON的当前X个交易。这在客户端启动时作为一次性操作完成。
模拟发布者崩溃
这部分很简单,只需在主窗口中使用“停止NetMQ”按钮。这将简单地运行以下代码,它将处置NetMQPublisher
类中的Actor
。
public void Stop()
{
actor.Dispose();
}
如果存在任何连接的客户端,它们应该(短暂延迟后)将服务器视为不可用并显示“断开连接”瓦片。
重启发布者
这部分很简单,只需在MainWindow
中使用“启动NetMQ”按钮。这将简单地运行以下代码:
private void StartServer()
{
serverStarted = true;
tickerPublisher.Start();
AutoRunning();
}
应该发生的是,如果有任何连接的客户端,它们应该(经过短暂的延迟后)再次看到服务器可用,并且不再显示“已断开连接”的磁贴。
NetMQ客户端
客户端是一个标准的WPF应用(我使用WPF只是因为我熟悉它,它并不是那么重要,你也可以使用MVP模式的Winforms)。客户端也使用了NetMQ,因此一个好的起点是检查NetMQ提供了什么。
这是两个客户端同时运行的截图,这是我在第一篇文章中没有展示的。
这就是我们在服务器上点击“停止NetMQ”按钮后的样子。
IOC (控制反转)
使用了一些IOC,Autofac是使用的容器。由于IOC不是本文的重点,我不会在这上面浪费更多时间。如果您想了解更多关于IOC/Autofac的信息,请阅读代码,查看BootStrapper
类。
客户端
我们决定将事物结构化的方式是拥有一个Rx’y类型的客户端,它暴露一个IObservable
流,应用程序的其余部分可以使用它。在该IObservable
内部,我们将利用Observable.Create(..)
的强大功能来创建一个工厂,该工厂将通过调用/创建实现流需求所需的NetMQ类来为该场景创建IObservable
流。
所以这个演示的通用模式会是这样
XXXXClient
将是拥有应用程序其余部分使用的IObservable
流的东西,它将在内部使用XXXXNetMQClient
来完成与NetMQ服务器的繁琐通信。
TickerClient / NetMQTickerClient
我们决定,对于每一个非“心跳(HB)”主题,客户端和服务器之间都将有一个专门的客户端。这与我们在第一部分中每个Hub类型都有一个专门的客户端代理的情况有点类似。
NetMQTickerClient
NetMQTickerClient
是所有客户端NetMQ操作发生的地方。在NetMQTickerClient
中,客户端将使用NetMQ SubscriberSocket
订阅“Trades
”主题。和以前一样,我们将使用NetMQ中的Actor
框架。NetMQTickerClient
还使用RequestSocket
与服务器进行初始快照,其中NetMQ服务器将有一个ResponseSocket
。服务器将发送TickerDto
的初始快照,然后这些快照在行情流中提供给应用程序使用。
NetMQTickerClient
也进行了一些错误处理OnError
,但检测故障问题的主要方法是使用HeartBeatClient
。
以下是NetMQTickerClient
代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace Client.Comms
{
public class NetMQTickerClient : IDisposable
{
private Actor<object> actor;
private Subject<TickerDto> subject;
private CompositeDisposable disposables = new CompositeDisposable();
class ShimHandler : IShimHandler<object>
{
private NetMQContext context;
private SubscriberSocket subscriberSocket;
private Subject<TickerDto> subject;
private string address;
private Poller poller;
private NetMQTimer timeoutTimer;
public ShimHandler(NetMQContext context, Subject<TickerDto> subject, string address)
{
this.context = context;
this.address = address;
this.subject = subject;
}
public void Initialise(object state)
{
}
public void RunPipeline(PairSocket shim)
{
// we should signal before running the poller but this will block the application
shim.SignalOK();
this.poller = new Poller();
shim.ReceiveReady += OnShimReady;
poller.AddSocket(shim);
timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
timeoutTimer.Elapsed += TimeoutElapsed;
poller.AddTimer(timeoutTimer);
Connect();
poller.Start();
if (subscriberSocket != null)
{
subscriberSocket.Dispose();
}
}
private void Connect()
{
// getting the snapshot
using (RequestSocket requestSocket = context.CreateRequestSocket())
{
requestSocket.Connect(string.Format("tcp://{0}:{1}", address, SnapshotProtocol.Port));
requestSocket.Send(SnapshotProtocol.GetTradessCommand);
string json;
requestSocket.Options.ReceiveTimeout = SnapshotProtocol.RequestTimeout;
try
{
json = requestSocket.ReceiveString();
}
catch (AgainException ex)
{
// Fail to receive trades, we call on error and don't try to do anything with subscriber
// calling on error from poller thread block the application
Task.Run(() => subject.OnError(new Exception("No response from server")));
return;
}
while (json != SnapshotProtocol.EndOfTickers)
{
PublishTicker(json);
json = requestSocket.ReceiveString();
}
}
subscriberSocket = context.CreateSubscriberSocket();
subscriberSocket.Subscribe(StreamingProtocol.TradesTopic);
subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
subscriberSocket.ReceiveReady += OnSubscriberReady;
poller.AddSocket(subscriberSocket);
// reset timeout timer
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
{
// no need to reconnect, the client would be recreated because of RX
// because of RX internal stuff invoking on the poller thread block the entire application, so calling on Thread Pool
Task.Run(() => subject.OnError(new Exception("Disconnected from server")));
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
if (command == ActorKnownMessages.END_PIPE)
{
poller.Stop(false);
}
}
private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
{
string topic = subscriberSocket.ReceiveString();
if (topic == StreamingProtocol.TradesTopic)
{
string json = subscriberSocket.ReceiveString();
PublishTicker(json);
// reset timeout timer also when a quote is received
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
else if (topic == StreamingProtocol.HeartbeatTopic)
{
// reset timeout timer
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
}
private void PublishTicker(string json)
{
TickerDto tickerDto = JsonConvert.DeserializeObject<TickerDto>(json);
subject.OnNext(tickerDto);
}
}
public NetMQTickerClient(NetMQContext context, string address)
{
subject = new Subject<TickerDto>();
this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), null);
this.disposables.Add(this.actor);
this.disposables.Add(NetMQHeartBeatClient.Instance.GetConnectionStatusStream()
.Where(x => x.ConnectionStatus == ConnectionStatus.Closed)
.Subscribe(x =>
this.subject.OnError(new InvalidOperationException("Connection to server has been lost"))));
}
public IObservable<TickerDto> GetTickerStream()
{
return subject.AsObservable();
}
public void Dispose()
{
this.disposables.Dispose();
}
}
}
TickerClient
TickerClient
可以在整个应用程序中用于流式传输TickerDto
对象,它只是简单地封装了来自NetMQTickerClient
的另一个流。重要的是,当发生错误并且TickerRepository
内部的Repeat
启动时,TickerClient IObservable
订阅将重新创建NetMQHeartBeatClient
。这将确保NetMQHeartBeatClient
将再次尝试与服务器通信。和以前一样,一切都归结于良好的内务管理和生命周期管理。
这是TickerClient
的代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Common;
using NetMQ;
namespace Client.Comms
{
public class TickerClient : ITickerClient
{
private readonly NetMQContext context;
private readonly string address;
public TickerClient(NetMQContext context, string address)
{
this.context = context;
this.address = address;
}
public IObservable<TickerDto> GetTickerStream()
{
return Observable.Create<TickerDto>(observer =>
{
NetMQTickerClient client = new NetMQTickerClient(context, address);
var disposable = client.GetTickerStream().Subscribe(observer);
return new CompositeDisposable { client, disposable };
})
.Publish()
.RefCount();
}
public IObservable<ConnectionInfo> ConnectionStatusStream()
{
return Observable.Create<ConnectionInfo>(observer =>
{
NetMQHeartBeatClient.Instance.InitialiseComms();
var disposable = NetMQHeartBeatClient.Instance.
GetConnectionStatusStream().Subscribe(observer);
return new CompositeDisposable { disposable };
})
.Publish()
.RefCount();
}
}
}
HeartBeatClient / NetMQHeartBeatClient
我们决定客户端与服务器之间的心跳在客户端的上下文中是一个全局关注点。
因此,预计只会有一个HeartBeatClient
(通过IOC注册实现),并且只有一个NetMQHeartBeatClient
的实例(硬编码设计)单例。
NetMQHeartBeatClient
NetMQHeartBeatClient
是所有客户端NetMQ“花招”发生的地方。在NetMQHeartBeatClient
中,客户端将使用NetMQ SubscriberSocket
订阅“HeartBeat (HB)
”主题。和以前一样,我们将利用NetMQ中的Actor
框架。这也是我们期望在x时间内从服务器端PublisherSocket
获得响应的地方。如果未获得响应,则认为通信中断,我们将使用内部RX Subject<T>
来OnNext
相关的ConnectionInfo/ConnectionStatus
。
以下是NetMQHeartBeatClient
代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;
namespace Client.Comms
{
public class NetMQHeartBeatClient
{
private readonly NetMQContext context;
private readonly string address;
private Actor<object> actor;
private Subject<ConnectionInfo> subject;
private static NetMQHeartBeatClient instance = null;
private static object syncLock = new object();
protected int requiresInitialisation = 1;
class ShimHandler : IShimHandler<object>
{
private NetMQContext context;
private SubscriberSocket subscriberSocket;
private Subject<ConnectionInfo> subject;
private string address;
private Poller poller;
private NetMQTimer timeoutTimer;
private NetMQHeartBeatClient parent;
public ShimHandler(NetMQContext context, Subject<ConnectionInfo> subject, string address)
{
this.context = context;
this.address = address;
this.subject = subject;
}
public void Initialise(object state)
{
parent = (NetMQHeartBeatClient) state;
}
public void RunPipeline(PairSocket shim)
{
// we should signal before running the poller but this will block the application
shim.SignalOK();
this.poller = new Poller();
shim.ReceiveReady += OnShimReady;
poller.AddSocket(shim);
timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
timeoutTimer.Elapsed += TimeoutElapsed;
poller.AddTimer(timeoutTimer);
Connect();
poller.Start();
if (subscriberSocket != null)
{
subscriberSocket.Dispose();
}
}
private void Connect()
{
subscriberSocket = context.CreateSubscriberSocket();
subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
subject.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, this.address));
subscriberSocket.ReceiveReady += OnSubscriberReady;
poller.AddSocket(subscriberSocket);
// reset timeout timer
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
{
// no need to reconnect, the client would be recreated because of RX
// because of RX internal stuff invoking on the poller thread block
// the entire application, so calling on Thread Pool
Task.Run(() =>
{
parent.requiresInitialisation = 1;
subject.OnNext(new ConnectionInfo(ConnectionStatus.Closed, this.address));
});
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveString();
if (command == ActorKnownMessages.END_PIPE)
{
poller.Stop(false);
}
}
private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
{
string topic = subscriberSocket.ReceiveString();
if (topic == StreamingProtocol.HeartbeatTopic)
{
subject.OnNext(new ConnectionInfo(ConnectionStatus.Connected, this.address));
// reset timeout timer
timeoutTimer.Enable = false;
timeoutTimer.Enable = true;
}
}
}
private NetMQHeartBeatClient(NetMQContext context, string address)
{
this.context = context;
this.address = address;
InitialiseComms();
}
public static NetMQHeartBeatClient CreateInstance(NetMQContext context, string address)
{
if (instance == null)
{
lock (syncLock)
{
if (instance == null)
{
instance = new NetMQHeartBeatClient(context,address);
}
}
}
return instance;
}
public void InitialiseComms()
{
if (Interlocked.CompareExchange(ref requiresInitialisation, 0, 1) == 1)
{
if (actor != null)
{
this.actor.Dispose();
}
subject = new Subject<ConnectionInfo>();
this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), this);
}
}
public IObservable<ConnectionInfo> GetConnectionStatusStream()
{
return subject.AsObservable();
}
public static NetMQHeartBeatClient Instance
{
get { return instance; }
}
}
}
HeartBeatClient
正是这个HeartBeatClient
在整个应用程序中暴露出来,它简单地封装了来自NetMQHeartBeatClient
的另一个流,可用于推断客户端(整体)与服务器之间通信的连接状态。重要的是,当发生错误并进行Repeat
时,HeartBeatClient
会被重新创建。这将确保NetMQHeartBeatClient
将再次尝试与服务器通信。和以前一样,一切都归结于良好的内务管理和生命周期管理。
这是HeartBeatClient
的代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms;
using Client.Comms.Transport;
using NetMQ;
namespace Client.Comms
{
public class HeartBeatClient : IHeartBeatClient
{
public IObservable<ConnectionInfo> ConnectionStatusStream()
{
return Observable.Create<ConnectionInfo>(observer =>
{
NetMQHeartBeatClient.Instance.InitialiseComms();
var disposable = NetMQHeartBeatClient.Instance
.GetConnectionStatusStream().Subscribe(observer);
return new CompositeDisposable { disposable };
})
.Publish()
.RefCount();
}
}
}
TickerRepository
TickerRepository
是Observable链的下一级。那么它看起来是什么样子呢?实际上,它出奇地简单,但不要被它蒙蔽,这里面发生了很多事情。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Comms;
namespace Client.Repositories
{
class TickerRepository : ITickerRepository
{
private readonly ITickerClient tickerClient;
private readonly ITickerFactory tickerFactory;
public TickerRepository(ITickerClient tickerClient, ITickerFactory tickerFactory)
{
this.tickerClient = tickerClient;
this.tickerFactory = tickerFactory;
}
public IObservable<Ticker> GetTickerStream()
{
return Observable.Defer(() => tickerClient.GetTickerStream())
.Select(tickerFactory.Create)
.Catch<Ticker>(Observable.Empty<Ticker>())
.Repeat()
.Publish()
.RefCount();
}
}
}
那么这里到底发生了什么?
- 我们使用
Observable.Defer
,这样我们直到有人订阅了使用Observable.Defer
创建的IObservable
才真正使用底层流。这是一种将热流变为冷流的方法。 - 我们使用select将流数据从
TickerDto
转换为Ticker
- 我们使用
Catch
来捕获流中的任何Exception
(OnError
)。在这种情况下,我们使用默认值。 - 我们使用
Repeat
。这一点**非常非常**重要。它允许我们重复整个流,包括再次连接到服务器端中心。正是这种机制使客户端能够从服务器端SignalR中心丢失中恢复。这与弹性流逻辑是应用程序中**最**重要的部分(至少在我看来) - 我们使用
Publish
来共享底层流 - 我们使用
RefCount
在没有订阅者时自动销毁
现在我们已经看到了这个存储库,我们只需要再跳一步,就可以继续TickerStream
的IObservable
之旅了。接下来让我们看看。
TickersViewModel
TickersViewModel
代表您在屏幕上看到的所有Ticker
。正是这个视图模型利用了TickerRepository
提供的惰性/可重复/弹性IObservable
。让我们看看代码,我认为它相当一目了然。
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;
namespace Client.ViewModels
{
public class TickersViewModel : INPCBase
{
private readonly ITickerRepository tickerRepository;
private readonly IConcurrencyService concurrencyService;
private bool stale = false;
private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));
public TickersViewModel(IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
TickerViewModelFactory tickerViewModelFactory)
{
Tickers = new ObservableCollection<TickerViewModel>();
Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
Tickers.Add(tickerViewModelFactory.Create("Google"));
Tickers.Add(tickerViewModelFactory.Create("Apple"));
Tickers.Add(tickerViewModelFactory.Create("Facebook"));
Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
Tickers.Add(tickerViewModelFactory.Create("Twitter"));
this.tickerRepository = reactiveTrader.TickerRepository;
this.concurrencyService = concurrencyService;
LoadTrades();
}
public ObservableCollection<TickerViewModel> Tickers { get; private set; }
private void LoadTrades()
{
tickerRepository.GetTickerStream()
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
AddTicker,
ex => log.Error("An error occurred within the trade stream", ex));
}
private void AddTicker(Ticker ticker)
{
Tickers.Single(x => x.Name == ticker.Name)
.AcceptNewPrice(ticker.Price);
}
}
}
每个Ticker
都由一个单独的TickerViewModel
表示,如下所示。可以看到,这个类也使用了我们前面讨论过的ConnectionStatusStream IObservable
。它用于改变TickerViewModel
的视图,以显示一个带有“DISCONNECTED”字样的红色方框。我们稍后会详细讨论。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class TickerViewModel : INPCBase
{
private decimal price;
private bool isUp;
private bool stale;
private bool disconnected;
private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));
public TickerViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService,
string name)
{
this.Name = name;
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
public string Name { get; private set; }
public void AcceptNewPrice(decimal newPrice)
{
IsUp = newPrice > price;
Price = newPrice;
}
public decimal Price
{
get { return this.price; }
private set
{
this.price = value;
base.OnPropertyChanged("Price");
}
}
public bool IsUp
{
get { return this.isUp; }
private set
{
this.isUp = value;
base.OnPropertyChanged("IsUp");
}
}
public bool Stale
{
get { return this.stale; }
set
{
this.stale = value;
base.OnPropertyChanged("Stale");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Connecting:
Disconnected = true;
break;
case ConnectionStatus.Connected:
Disconnected = false;
break;
case ConnectionStatus.Closed:
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}
可以看出,这个ViewModel使用IReactiveTrader.ConnectionStatusStream
来监控与NetMQPublisher
的连接状态。正是这段代码负责将磁贴的外观从显示跳动价格的磁贴更改为大的红色“DISCONNECTED”磁贴。这是通过使用Disconnected
属性完成的。
ConnectivityStatusViewModel
我想向您展示的最后一件事是ConnectionStatusStream
是如何使用的。当客户端的HeartBeatClient
推出新值时,此流OnNext
。因此我们看到诸如Connecting, Connected, Closed
之类的事物。所有这些都是使用我们之前在NetMQHeartBeatClient
中查看的逻辑创建的,并使用标准的RX Subject<T>
转换为IObservable
流。
无论如何,这是我们用于在应用程序底部状态栏中显示信息的整体ConnectivityStatusViewModel
。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;
namespace Client.ViewModels
{
public class ConnectivityStatusViewModel : INPCBase
{
private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
private string server;
private string status;
private bool disconnected;
public ConnectivityStatusViewModel(
IReactiveTrader reactiveTrader,
IConcurrencyService concurrencyService)
{
reactiveTrader.ConnectionStatusStream
.ObserveOn(concurrencyService.Dispatcher)
.SubscribeOn(concurrencyService.TaskPool)
.Subscribe(
OnStatusChange,
ex => log.Error("An error occurred within the connection status stream.", ex));
}
private void OnStatusChange(ConnectionInfo connectionInfo)
{
Server = connectionInfo.Server;
switch (connectionInfo.ConnectionStatus)
{
case ConnectionStatus.Connecting:
Status = "Connecting...";
Disconnected = true;
break;
case ConnectionStatus.Connected:
Status = "Connected";
Disconnected = false;
break;
case ConnectionStatus.Closed:
Status = "Disconnected";
Disconnected = true;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
public string Server
{
get { return this.server; }
set
{
this.server = value;
base.OnPropertyChanged("Server");
}
}
public string Status
{
get { return this.status; }
set
{
this.status = value;
base.OnPropertyChanged("Status");
}
}
public bool Disconnected
{
get { return this.disconnected; }
set
{
this.disconnected = value;
base.OnPropertyChanged("Disconnected");
}
}
}
}
暂时就这些
总之,这就是我目前想说的全部,希望您喜欢这个非常迷你的RX/SignalR/NetMQ系列,并且它可能让您想去尝试一下RX/SignalR/NetMQ。如果您喜欢,请随时留下您的投票或评论,Doron和我为这些演示付出了很多努力,试图解决任何错误并使其尽可能真实,因此我们总是很高兴收到评论/投票。