65.9K
CodeProject 正在变化。 阅读更多。
Home

响应式扩展 (Rx)

starIconstarIcon
emptyStarIcon
starIcon
emptyStarIconemptyStarIcon

2.19/5 (9投票s)

2015年10月22日

CPOL

7分钟阅读

viewsIcon

23646

downloadIcon

395

响应式扩展 (Rx) 简介。

引言

本文简要介绍了响应式扩展 (Rx)。Rx 是一个开源库,该项目由 Microsoft Open Technologies, Inc. 与开源社区合作发起。MSDN 将 Rx 定义为“一个使用可观察序列和类 LINQ 的查询运算符来组合异步和基于事件的程序的库。”,我认为这不够吸引人,也没有充分体现它强大的功能。我尝试给出一个更好的定义,但失败了,所以我将如何最好地定义 Rx 的挑战留给本文的读者。下面的部分将详细介绍 Rx,附带的示例程序展示了实现和使用 Rx 的最简单方法。

响应式扩展 (Rx)

Rx 很难用简单的语言解释,它涉及多个概念,这些概念结合在一起,使程序员的生活更轻松。<a href="http://www.reactivemanifesto.org/">响应式宣言</a>定义了响应式程序或应用程序的属性。我在下面的不同部分分解了每个概念,以便您可以根据自己的方式将其串联起来。

编程范式转变

在 PC 编程的早期,我们拥有快速的单核 CPU,作为程序员,我们需要确保其得到最佳利用。我们会在应用程序中创建较少的线程或进程,以确保 IO 和 CPU 密集型线程之间有良好的平衡。近年来,随着快速多核 CPU 的出现,PC 编程世界发生了一场范式转变。这催生了各种异步编程库,如 TPL、PLINQ,以及包括 Rx 在内的不同框架,以利用多核 CPU 的优势。

建模问题

传统上,进行异步编程很难,您必须创建自己的线程或进程,如果频繁需要执行异步操作,则必须创建线程或进程池并管理它们的生命周期。错误条件和异常的处理是一项繁琐的任务。

为了简化程序员的生活,不同的编程语言或框架提供了简化异步操作的方法。

.Net 1.0 提供了异步编程模型 (APM),使用可以异步调用方法的委托 BeginInvoke(),它提供了回调和等待句柄,解决了异步编程的各个方面,并且线程池由 .NET 管理。

.Net 2.0 提供了基于事件的异步编程模型 (EAP)。

.Net 4.0 提供了任务并行库 (TPL)。

它们有一个主要的缺点,那就是它们没有提供一个一致的模型来表示异步程序,这导致不同人编写的代码看起来不同,造成混乱,难以理解代码流程。<a href="https://msdn.microsoft.com/en-us/library/dd460717%28v=vs.110%29.aspx">TPL</a> 库在很大程度上解决了这个问题,并提供了比使用线程池处理线程更易读的代码,尤其是在应用程序中同步各种任务以及处理故障时的错误处理方面。Rx 则提出了一种通过 Subject 和 Observer 来建模问题的统一方法,这是所有开发人员都熟悉的经典 Observer 设计模式。

Observer 设计模式

Rx 将所有工作线程视为 Subject,将主线程视为 Observer,这为主线程和工作线程提供了一种同步工作的统一方式。

可观察序列

可观察序列是 Rx 在其他库中独一无二的特性,它允许 Observer 以有序的方式接收 Subject 的状态/事件变化通知,它还同步回调,因此 Observer 无需使用任何锁定机制来进行观察。Rx 提供了一个调度器来控制如何处理回调。它主要提供三种选项。

NewThread:此选项每次 Subject 通知 Observer 时都会创建一个新线程。

ThreadPool:此选项允许调度器使用线程池进行回调。

TaskPool:此选项允许调度器使用任务池线程进行回调。

无阻塞

可观察序列的接收端 Observer 无需使用任何线程同步机制来保护其正在处理的数据,如果 Subject 生成序列的速度非常快,以至于 Observer 无法处理,那么传统上大多数 Subject 和 Observer 都必须进行同步。Rx 为您处理了这个问题,这极大地简化了异步编程。非阻塞的特性还加快了执行时间。Rx 可以轻松利用多核处理器环境,因为程序员无需管理并行执行。

响应式扩展 (Rx) 架构

.NET 中的 Rx 架构围绕着 mscorlib.dll 中的两个主要接口,如下所述:

  1. IObservable<T> (由 Subject 实现)。
  2. IObserver<T> (由 Observers 实现)。

正如 Extension 这个词本身所暗示的,Rx 的大部分功能都作为扩展提供,因此您可以轻松地将现有代码适配到使用 Rx。下图显示了使用 Rx 需要导入到项目中的 Rx 程序集。

System.Reactive.Core: Rx 的核心,具有订阅 Subject 的扩展方法。它有一个静态类 Observer 用于创建 Observer。它还提供同步和调度服务。

System.Reactive.Interfaces: 它提供了 Subject 可以实现的事件模式接口,以便在数据可用时引发事件。它还具有调度器接口以及用于 Rx 调度器和 LINQ 支持的可查询接口。

System.Reactive.Linq: 它提供了著名的静态类 Observable 来创建内存中的可观察序列。它扩展了基本的 LINQ 功能以用于 Rx。

System.Reactive.PlatformServices: 它扩展了 System.Reactive.Core 中提供的基本调度服务,并且将来将取代所有调度服务。

使用代码

为了展示 Rx 的用法,我提供了两个示例程序,它们都是 .Net 控制台应用程序,并使用 Rx 2.2.5 版本。BasicReactiveExtension 程序展示了一个库如何实现 Subject,以便应用程序可以利用它。AdvancedReactiveExtension 程序展示了我们如何轻松地适配现有方法并将其转换为响应式程序。

下图展示了 BasicReactiveExtension 的工作原理。

NumberGenerator 类充当一个 Subject,可以被 Rx 通过实现 IObservable<int> 来观察,这要求它实现 IDisposable Subscribe(IObserver<T> observer),它将返回一个 IDisposabe 对象,Observer 可以使用该对象取消订阅。当调用 GenerateNumbers()  方法时,此类生成从零开始的顺序数字作为可观察序列。

public class NumberGenerator : IObservable<int>
{        
    private List<IObserver<int>> observers;

    public IDisposable Subscribe(IObserver<int> observer)
    {
        if (!observers.Contains(observer))
        {
            observers.Add(observer);

        }//End-if (!observers.Contains(observer))

        return new Unsubscriber(observers, observer);
    }

    public void GenerateNumbers()
    {
        for (int i = 0; ; i++)
        {
            Thread.Sleep(250);

            foreach (var observer in observers.ToArray())
            {
                if (i == 10)
                {
                    observer.OnError(new NumberNotGeneratedException());
                }
                else
                {
                    observer.OnNext(i);

                }//End-if-else (i == 10)

            }//End-foreach (var observer in observers.ToArray())

        }//End-for (int i = 0; ; i++)
    }
}

NumberObserver 类是一个 Observer,实现了 IObserver<int> 接口,并能够将数字分类为偶数和奇数。

public class NumberObserver : IObserver<int>
{
    private IDisposable unsubscriber;
    private string instName;

    private bool isEvenObserver;

    public NumberObserver(string name, bool isEvenObserver)
    {
        this.instName = name;

        this.isEvenObserver = isEvenObserver;
    }

    public string Name
    { get { return this.instName; } }


    public virtual void Subscribe(IObservable<int> provider)
    {
        if (provider != null)
        {
            unsubscriber = provider.Subscribe(this);

        }//End-if (provider != null)
    }

    public virtual void OnCompleted()
    {
        Console.WriteLine("The Number Generator has completed generation {0}.", this.Name);
        this.Unsubscribe();
    }

    public virtual void OnError(Exception e)
    {
        Console.WriteLine("{0}: Error occured while generating number.", this.Name);
    }

    public virtual void OnNext(int value)
    {
        bool isEven = value % 2 == 0;

        if (this.isEvenObserver && isEven)
        {
            Console.WriteLine("{1}: The current number is Even. Value => {0}", value, this.Name);

        }
        else if (!this.isEvenObserver && !isEven)
        {
            Console.WriteLine("{1}: The current number is Odd. Value => {0}", value, this.Name);

        }//End-if (this.isEvenObserver && isEven)
    }

    public virtual void Unsubscribe()
    {
        Console.WriteLine("{0} unsubscribed.", this.Name);

        unsubscriber.Dispose();
    }
}

主程序创建相应的对象,并请求 NumberGenerate 对象使用单独的线程生成数字,直到用户按任意键退出。

static void Main(string[] args)
{
    //Define number provider
    NumberGenerator NumberProvider = new NumberGenerator();

    //Have two observers.
    NumberObserver reporter1 = new NumberObserver("EvenObserver", true);
    reporter1.Subscribe(NumberProvider);

    NumberObserver reporter2 = new NumberObserver("OddObserver", false);
    reporter2.Subscribe(NumberProvider);

    Console.WriteLine("Press any key to stop observering.");

    Task.Factory.StartNew(() => NumberProvider.GenerateNumbers());

    Console.ReadKey();

    NumberProvider.StopGeneration();

    Console.WriteLine("Press any key to exit.");

    Console.ReadKey();
}

为了展示错误处理能力,当生成数字 10 时,Subject 会抛出错误,Observer 可以通过 OnError() 方法捕获。这个基本程序没有利用 Rx 生成可观察序列的能力,并且不会提供线程安全的*/回调,因为用户线程驱动着 Observer 的通知逻辑。

下图展示了 AdvancedReactiveExtension 的工作原理。

高级程序借助静态类 System.Reactive.Linq.Observable 从一个非可观察的 Subject(在本例中为 NumberGenerator 类)创建可观察序列。主程序使用 Observable 静态类创建一个可观察对象并进行观察。

 static void Main(string[] args)
{
    NumberGenerator ng = new NumberGenerator();

    var observable = Observable.ToObservable(ng.GenerateNumbers(), Scheduler.Default);

    Console.WriteLine("Press any key to exit.");

    observable.Subscribe(x => Console.WriteLine("Number generated is => {0}", x));

    Console.ReadKey();

}

使用 Rx 创建的可观察序列,我们可以利用下面提到的所有特性,并提供非阻塞的线程安全回调。这使我们无需处理编写以管理应用程序异步模型的复杂同步逻辑。NumberGenerator 类有一个 GenerateNumbers() 方法来生成数字序列。

public class NumberGenerator
{
    int sequencer;

    public NumberGenerator()
    {
        sequencer = 0;
    }

    public IEnumerable<int> GenerateNumbers()
    {
        for (int i = 0; ; i++)
        {
            if (i % 2 == 0)
            {
                Thread.Sleep(250);

            }//End-if (i % 2 == 0)

            yield return sequencer++;

        }//End-for (int i = 0; ; i++)

    }
}

Rx 特性

Rx 允许您以不同的方式控制可观察序列,从而允许应用程序根据目标域需求进行利用。

Skip

此功能允许 Observer 跳过可观察序列中的一些通知。

Concat

此功能允许将两个可观察序列连接成一个序列,允许 Observer 将其作为单个序列进行消费。

Zip

此功能允许以交替的顺序观察两个可观察序列。

时间戳

这允许对可观察序列进行采样,指定时间间隔并通知 Observer。

节流

这允许将可观察序列缓冲到特定数量,然后再通知 Observer。

Rx 语言和框架支持

Rx 由以下框架支持:

  • .Net

  • Java

  • JavaScript

  • Ruby

历史

  • 在文章中添加了代码示例,解释了示例程序。
© . All rights reserved.