响应式扩展 (Rx)






2.19/5 (9投票s)
响应式扩展 (Rx) 简介。
引言
本文简要介绍了响应式扩展 (Rx)。Rx 是一个开源库,该项目由 Microsoft Open Technologies, Inc. 与开源社区合作发起。MSDN 将 Rx 定义为“一个使用可观察序列和类 LINQ 的查询运算符来组合异步和基于事件的程序的库。”,我认为这不够吸引人,也没有充分体现它强大的功能。我尝试给出一个更好的定义,但失败了,所以我将如何最好地定义 Rx 的挑战留给本文的读者。下面的部分将详细介绍 Rx,附带的示例程序展示了实现和使用 Rx 的最简单方法。
响应式扩展 (Rx)
Rx 很难用简单的语言解释,它涉及多个概念,这些概念结合在一起,使程序员的生活更轻松。<a href="http://www.reactivemanifesto.org/">响应式宣言</a>定义了响应式程序或应用程序的属性。我在下面的不同部分分解了每个概念,以便您可以根据自己的方式将其串联起来。
编程范式转变
在 PC 编程的早期,我们拥有快速的单核 CPU,作为程序员,我们需要确保其得到最佳利用。我们会在应用程序中创建较少的线程或进程,以确保 IO 和 CPU 密集型线程之间有良好的平衡。近年来,随着快速多核 CPU 的出现,PC 编程世界发生了一场范式转变。这催生了各种异步编程库,如 TPL、PLINQ,以及包括 Rx 在内的不同框架,以利用多核 CPU 的优势。
建模问题
传统上,进行异步编程很难,您必须创建自己的线程或进程,如果频繁需要执行异步操作,则必须创建线程或进程池并管理它们的生命周期。错误条件和异常的处理是一项繁琐的任务。
为了简化程序员的生活,不同的编程语言或框架提供了简化异步操作的方法。
.Net 1.0 提供了异步编程模型 (APM),使用可以异步调用方法的委托 BeginInvoke(),它提供了回调和等待句柄,解决了异步编程的各个方面,并且线程池由 .NET 管理。
.Net 2.0 提供了基于事件的异步编程模型 (EAP)。
.Net 4.0 提供了任务并行库 (TPL)。
它们有一个主要的缺点,那就是它们没有提供一个一致的模型来表示异步程序,这导致不同人编写的代码看起来不同,造成混乱,难以理解代码流程。<a href="https://msdn.microsoft.com/en-us/library/dd460717%28v=vs.110%29.aspx">TPL</a> 库在很大程度上解决了这个问题,并提供了比使用线程池处理线程更易读的代码,尤其是在应用程序中同步各种任务以及处理故障时的错误处理方面。Rx 则提出了一种通过 Subject 和 Observer 来建模问题的统一方法,这是所有开发人员都熟悉的经典 Observer 设计模式。
Observer 设计模式
Rx 将所有工作线程视为 Subject,将主线程视为 Observer,这为主线程和工作线程提供了一种同步工作的统一方式。
可观察序列
可观察序列是 Rx 在其他库中独一无二的特性,它允许 Observer 以有序的方式接收 Subject 的状态/事件变化通知,它还同步回调,因此 Observer 无需使用任何锁定机制来进行观察。Rx 提供了一个调度器来控制如何处理回调。它主要提供三种选项。
NewThread:此选项每次 Subject 通知 Observer 时都会创建一个新线程。
ThreadPool:此选项允许调度器使用线程池进行回调。
TaskPool:此选项允许调度器使用任务池线程进行回调。
无阻塞
可观察序列的接收端 Observer 无需使用任何线程同步机制来保护其正在处理的数据,如果 Subject 生成序列的速度非常快,以至于 Observer 无法处理,那么传统上大多数 Subject 和 Observer 都必须进行同步。Rx 为您处理了这个问题,这极大地简化了异步编程。非阻塞的特性还加快了执行时间。Rx 可以轻松利用多核处理器环境,因为程序员无需管理并行执行。
响应式扩展 (Rx) 架构
.NET 中的 Rx 架构围绕着 mscorlib.dll 中的两个主要接口,如下所述:
- IObservable<T> (由 Subject 实现)。
- IObserver<T> (由 Observers 实现)。
正如 Extension 这个词本身所暗示的,Rx 的大部分功能都作为扩展提供,因此您可以轻松地将现有代码适配到使用 Rx。下图显示了使用 Rx 需要导入到项目中的 Rx 程序集。
System.Reactive.Core: Rx 的核心,具有订阅 Subject 的扩展方法。它有一个静态类 Observer 用于创建 Observer。它还提供同步和调度服务。
System.Reactive.Interfaces: 它提供了 Subject 可以实现的事件模式接口,以便在数据可用时引发事件。它还具有调度器接口以及用于 Rx 调度器和 LINQ 支持的可查询接口。
System.Reactive.Linq: 它提供了著名的静态类 Observable 来创建内存中的可观察序列。它扩展了基本的 LINQ 功能以用于 Rx。
System.Reactive.PlatformServices: 它扩展了 System.Reactive.Core 中提供的基本调度服务,并且将来将取代所有调度服务。
使用代码
为了展示 Rx 的用法,我提供了两个示例程序,它们都是 .Net 控制台应用程序,并使用 Rx 2.2.5 版本。BasicReactiveExtension 程序展示了一个库如何实现 Subject,以便应用程序可以利用它。AdvancedReactiveExtension 程序展示了我们如何轻松地适配现有方法并将其转换为响应式程序。
下图展示了 BasicReactiveExtension 的工作原理。
NumberGenerator 类充当一个 Subject,可以被 Rx 通过实现 IObservable<int> 来观察,这要求它实现 IDisposable Subscribe(IObserver<T> observer),它将返回一个 IDisposabe 对象,Observer 可以使用该对象取消订阅。当调用 GenerateNumbers() 方法时,此类生成从零开始的顺序数字作为可观察序列。
public class NumberGenerator : IObservable<int>
{
private List<IObserver<int>> observers;
public IDisposable Subscribe(IObserver<int> observer)
{
if (!observers.Contains(observer))
{
observers.Add(observer);
}//End-if (!observers.Contains(observer))
return new Unsubscriber(observers, observer);
}
public void GenerateNumbers()
{
for (int i = 0; ; i++)
{
Thread.Sleep(250);
foreach (var observer in observers.ToArray())
{
if (i == 10)
{
observer.OnError(new NumberNotGeneratedException());
}
else
{
observer.OnNext(i);
}//End-if-else (i == 10)
}//End-foreach (var observer in observers.ToArray())
}//End-for (int i = 0; ; i++)
}
}
NumberObserver 类是一个 Observer,实现了 IObserver<int> 接口,并能够将数字分类为偶数和奇数。
public class NumberObserver : IObserver<int>
{
private IDisposable unsubscriber;
private string instName;
private bool isEvenObserver;
public NumberObserver(string name, bool isEvenObserver)
{
this.instName = name;
this.isEvenObserver = isEvenObserver;
}
public string Name
{ get { return this.instName; } }
public virtual void Subscribe(IObservable<int> provider)
{
if (provider != null)
{
unsubscriber = provider.Subscribe(this);
}//End-if (provider != null)
}
public virtual void OnCompleted()
{
Console.WriteLine("The Number Generator has completed generation {0}.", this.Name);
this.Unsubscribe();
}
public virtual void OnError(Exception e)
{
Console.WriteLine("{0}: Error occured while generating number.", this.Name);
}
public virtual void OnNext(int value)
{
bool isEven = value % 2 == 0;
if (this.isEvenObserver && isEven)
{
Console.WriteLine("{1}: The current number is Even. Value => {0}", value, this.Name);
}
else if (!this.isEvenObserver && !isEven)
{
Console.WriteLine("{1}: The current number is Odd. Value => {0}", value, this.Name);
}//End-if (this.isEvenObserver && isEven)
}
public virtual void Unsubscribe()
{
Console.WriteLine("{0} unsubscribed.", this.Name);
unsubscriber.Dispose();
}
}
主程序创建相应的对象,并请求 NumberGenerate 对象使用单独的线程生成数字,直到用户按任意键退出。
static void Main(string[] args)
{
//Define number provider
NumberGenerator NumberProvider = new NumberGenerator();
//Have two observers.
NumberObserver reporter1 = new NumberObserver("EvenObserver", true);
reporter1.Subscribe(NumberProvider);
NumberObserver reporter2 = new NumberObserver("OddObserver", false);
reporter2.Subscribe(NumberProvider);
Console.WriteLine("Press any key to stop observering.");
Task.Factory.StartNew(() => NumberProvider.GenerateNumbers());
Console.ReadKey();
NumberProvider.StopGeneration();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
为了展示错误处理能力,当生成数字 10 时,Subject 会抛出错误,Observer 可以通过 OnError() 方法捕获。这个基本程序没有利用 Rx 生成可观察序列的能力,并且不会提供线程安全的*/回调,因为用户线程驱动着 Observer 的通知逻辑。
下图展示了 AdvancedReactiveExtension 的工作原理。
高级程序借助静态类 System.Reactive.Linq.Observable 从一个非可观察的 Subject(在本例中为 NumberGenerator 类)创建可观察序列。主程序使用 Observable 静态类创建一个可观察对象并进行观察。
static void Main(string[] args)
{
NumberGenerator ng = new NumberGenerator();
var observable = Observable.ToObservable(ng.GenerateNumbers(), Scheduler.Default);
Console.WriteLine("Press any key to exit.");
observable.Subscribe(x => Console.WriteLine("Number generated is => {0}", x));
Console.ReadKey();
}
使用 Rx 创建的可观察序列,我们可以利用下面提到的所有特性,并提供非阻塞的线程安全回调。这使我们无需处理编写以管理应用程序异步模型的复杂同步逻辑。NumberGenerator 类有一个 GenerateNumbers() 方法来生成数字序列。
public class NumberGenerator
{
int sequencer;
public NumberGenerator()
{
sequencer = 0;
}
public IEnumerable<int> GenerateNumbers()
{
for (int i = 0; ; i++)
{
if (i % 2 == 0)
{
Thread.Sleep(250);
}//End-if (i % 2 == 0)
yield return sequencer++;
}//End-for (int i = 0; ; i++)
}
}
Rx 特性
Rx 允许您以不同的方式控制可观察序列,从而允许应用程序根据目标域需求进行利用。
Skip
此功能允许 Observer 跳过可观察序列中的一些通知。
Concat
此功能允许将两个可观察序列连接成一个序列,允许 Observer 将其作为单个序列进行消费。
Zip
此功能允许以交替的顺序观察两个可观察序列。
时间戳
这允许对可观察序列进行采样,指定时间间隔并通知 Observer。
节流
这允许将可观察序列缓冲到特定数量,然后再通知 Observer。
Rx 语言和框架支持
Rx 由以下框架支持:
-
.Net
-
Java
-
JavaScript
-
Ruby
历史
- 在文章中添加了代码示例,解释了示例程序。