面向 .NET 和 C# 开发者的响应式编程 - IEnumerable、IQueryable、IObservable 和 IQbservable 入门





5.00/5 (26投票s)
探索响应式编程,包括对 .NET 和 C# 开发者的交互式和响应式扩展的详细介绍。
前言
响应式扩展已经存在一段时间了,在这篇文章中我们将更详细地讨论响应式扩展。此外,在这篇文章中我们将介绍 IQbservable
s——世界上最神秘的命名事物/接口,也许仅次于希格斯玻色子。推拉序列无处不在——现在一端是设备,另一端是云,大多数数据事务都通过推拉序列进行。因此,掌握它们周围的编程模型的概念至关重要。
首先
让我们回顾一下,在进一步讨论响应式 IObservable
和 IQbservable
(Qbservables = Queryable Observables – 哦,没错,有趣的名字)之前,先讨论一下 IEnumerable
和 IQueryable
。
IEnumerable<T>
您可能知道,IEnumerable
模型可以看作是一个拉取操作。您会获得一个枚举器,然后通过使用 MoveNext
在一组项目上向前移动来迭代集合,直到到达最后一个项目。拉取模型在环境需要从外部源请求数据时很有用。为了覆盖一些基本知识——IEnumerable
有一个 GetEnumerator
方法,它返回一个带有 MoveNext()
方法和 Current
属性的枚举器。离线提示——C# 的 foreach
语句可以迭代任何可以返回 GetEnumerator
的“哑巴”对象。无论如何,非泛型版本的 IEnumerable
看起来是这样的
public interface IEnumerable
{
IEnumerator GetEnumerator();
}
public interface IEnumerator
{
Object Current {get;}
bool MoveNext();
void Reset();
}
现在,LINQ 在泛型版本的 IEnumerable
(即 IEnumerable<T>
)之上定义了一组扩展方法——因此,通过利用 泛型方法的类型推断支持,您可以对任何 IEnumerable
调用这些方法,而无需指定类型。也就是说,您可以说 someStringArray.Count()
而不是 someStringArray.Count<String>()
。您可以探索 Enumerable
类来查找这些静态扩展。
在这种情况下,实际的查询运算符(如 Where
、Count
等)及其相关表达式会被编译为 IL,并且它们的操作过程与 CLR 执行任何 IL 代码的过程非常相似。从实现的角度来看,LINQ 子句(如 Where
)的参数是一个 lambda 表达式(正如您可能已经知道的,from.. select
只是一个语法糖,会被展开为 IEnumerable<T>
的扩展方法),在大多数情况下,像 Func<T,..>
这样的委托可以从内存的角度表示一个表达式。但是,如果您想对位于其他地方的项目应用查询运算符呢?例如,如何在存储在数据库表中(可能在云端)的数据行集合之上应用 LINQ 运算符,而不是内存中的 IEnumerable<T>
集合?这正是 IQueryable<T>
的用途。
IQueryable<T>
IQueryable<T>
是一个 IEnumerable<T>
(它继承自 IEnumerable<T>
),它指向一个可以在远程世界中执行的查询表达式。用于查询 IQueryable<T>
类型对象的 LINQ 运算符定义在 Queryable
类中,并在您将它们应用于 IQueryable<T>
时返回 Expression<Func<T..>>
,这是一个 System.Linq.Expressions.Expression
(您可以在这里阅读有关表达式树的内容)。这将通过查询提供程序被翻译到远程世界(例如 SQL 系统)。所以,本质上,IQueryable
的具体实现指向一个查询表达式和一个查询提供程序——查询提供程序的任务是将查询表达式翻译成它执行的远程世界的查询语言。从实现的角度来看,您传递给应用于 IQueryable
的 LINQ 的参数被赋给了一个 Expression<T,..>
。 .NET 中的表达式树提供了一种将代码表示为数据或一种抽象语法树的方法。之后,查询提供程序将遍历它来构建远程世界中的等效查询。
public interface IQueryable : IEnumerable {
Type ElementType { get; }
Expression Expression { get; }
IQueryProvider Provider { get; }
}
public interface IQueryable<T> : IEnumerable<T>, IQueryable, IEnumerable {
..
}
例如,在 LINQ to Entity Framework 或 LINQ to SQL 中,查询提供程序会将表达式转换为 SQL 并将其交给数据库服务器。您甚至可以通过查看它们或简而言之,了解查询如何转换为目标查询语言(SQL);您应用于 IQueryable
的 LINQ 查询运算符将用于构建表达式树,然后由查询提供程序将其翻译以在远程世界中构建和执行查询。如果您不清楚如何使用 Lambda 构建 Expression<T>
来构建表达式树,请阅读这篇文章。
Reactive Extensions
现在,让我们深入了解可观察对象(observables)的解剖和哲学。
IObservable <T>
正如我们所讨论的,IEnumerable<T>
类型对象是拉取序列。但是,在现实世界中,有时我们也会推送东西——而不仅仅是拉取。(健康警报——当您同时进行这两者时,请确保安全地进行。)在许多场景中,推送模式非常有意义——例如,与其让您和您的邻居在当地邮局前日夜无限地排队等待领取蜗牛邮件,不如邮局代理在邮件到达时直接将邮件推送到您家。
现在,关于推拉序列的酷炫之处在于它们是偶对的。这也意味着,IObservable<T>
是 IEnumerable<T>
的偶对——请看下面的代码。所以,长话短说,通过 范畴偶对论 派生的 IEnumerable
的偶对接口是 IObservable
。故事是,Erik 团队的一些成员(他当时在微软)在发现这种偶对性时,经历了一次应得的、短暂的、宏大的、超级活跃的爆发。如果您对此更感兴趣,这里有一篇 Erik 关于此的精彩论文——下面是 Erik 论文的简要摘要。
//Generic version of IEnumerable, ignoring the non generic IEnumerable base
interface IEnumerable<out T>
{
IEnumerator<T> GetEnumerator();
}
interface IEnumerator<out T>: IDisposable
{
bool MoveNext(); // throws Exception
T Current { get; }
}
//Its dual IObservable
interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
interface IObserver<in T>
{
void OnCompleted(bool done);
void OnError(Exception exception);
T OnNext { set; }
}
令人惊讶的是,IObservable
的实现看起来像 观察者模式。
现在,LINQ 运算符很酷。它们非常富有表现力,并提供了查询事物的抽象。所以,响应式团队里的那些疯狂的家伙们想让他们将 LINQ 用于事件流。事件流实际上是推序列,而不是拉序列。所以,他们构建了 IObservable
。IObservable
框架允许您在推序列(如事件流)之上编写 LINQ 运算符,就像您查询 IEnumerable<T>
一样。用于 IObservable<T>
类型对象的 LINQ 运算符定义在 Observable
类中。那么,您将如何实现一个 LINQ 运算符,比如 Where
,在一个观察者上进行一些过滤呢?这里有一个用于 IEnumerable
和 IObservable
的 Where
过滤运算符的简单示例(为便于比较而简化)。在 IEnumerable
的情况下,完成后我们会处理枚举器。
//Where for IEnumerable
static IEnumerable<T> Where<T>(IEnumerable<T> source, Func<T, bool> predicate)
{
// foreach(var element in source)
// if (predicate(element))
// yield return element;
using (var enumerator = source.GetEnumerator())
{
while (enumerator.MoveNext())
{
var value= enumerator.Current;
if (predicate(value))
{
yield return value;
}
}
}
}
//Where for IObservable
static IObservable<T> Where<T>(this IObserver<T> source, Func<T, bool> predicate)
{
return Observable.Create<T>(observer =>
{
return source.Subscribe(Observer.Create<T>(value =>
{
try
{
if (predicate(value)) observer.OnNext(value);
}
catch (Exception e)
{
observer.OnError(e);
}
}));
});
}
现在,看看 IObservable
的 Where
实现。在这种情况下,我们返回 IDisposable
句柄给一个 Observable,以便我们可以处理它来停止订阅。对于过滤,我们只是创建一个内部 Observable,我们订阅到源以在其中应用我们的过滤逻辑——然后创建一个另一个顶层 Observable,它订阅我们创建的内部 Observable。现在,您可以为 IObservable<T>
提供任何具体的实现,它包装了一个事件源,然后您可以使用 Where
查询它!!太酷了。响应式扩展中的 Observable
类有一些帮助方法可以从事件创建 Observable,例如 FromEvent
。让我们创建一个 Observable,然后查询事件。幸运的是,Rx 团队已经拥有了 Observable 和相关查询运算符的完整实现,这样我们就不会编写自定义查询运算符了。
您可以安装 Rx,请使用 nuget install-package Rx-Main 来安装 Rx,然后尝试这个演示事件过滤的示例。
//Let us print all ticks between 5 seconds and 20 seconds
//Interval in milli seconds
var timer = new Timer() { Interval = 1000 };
timer.Start();
//Create our event stream which is an Observable
var eventStream = Observable.FromEventPattern<ElapsedEventArgs>(timer, "Elapsed");
var nowTime = DateTime.Now;
//Same as eventStream.Where(item => ...);
var filteredEvents = from e in eventStream
let time = e.EventArgs.SignalTime
where
time > nowTime.AddSeconds(5) &&
time < nowTime.AddSeconds(20)
select e;
//Subscribe to our observable
filteredEvents.Subscribe(t => Console.WriteLine(DateTime.Now));
Console.WriteLine("Let us wait..");
//Dispose filteredEvents explicitly if you want
Console.ReadKey();
显然,在上面的示例中,我们可以使用 Observable.Timer
——但我只是想展示如何用 Observables 包装外部事件源。同样,您可以包装您的鼠标事件或 WPF 事件。您可以进一步探索 Rx 和 Observables,以及这里的一些应用。现在让我们继续讨论 IQbservable
s。
IQbservable<T>
现在,让我们专注于 IQbservable<T>
。IQbservable<T>
是 IObserver<T>
的对应项,用于表示对推序列/事件源的查询作为一个表达式,就像 IQueryable<T>
是 IEnumerable<T>
的对应项一样。那么,这到底意味着什么呢?如果您检查 IQbservable
,您会发现
public interface IQbservable<out T> : IQbservable, IObservable<T>
{
}
public interface IQbservable
{
Type ElementType { get; }
Expression Expression { get; }
IQbservableProvider Provider { get; }
}
您可以看到它有一个 Expression
属性来表示 LINQ to Observable 查询,就像 IQueryable
有一个 Expression 来表示 LINQ 查询的 AST 一样。IQbservableProvider
负责将表达式翻译成远程事件源(可能是云中的流服务器)的语言。
交互式扩展
交互式扩展的核心是为 IEnumerable<T>
添加了许多新的扩展方法——也就是说,它添加了许多实用程序 LINQ to Object 查询运算符。您可能已经在自己的辅助类或实用程序类中手工编写了一些这些实用程序扩展方法,但现在 Rx 团队将它们聚合在一起了。此外,这篇文章假设您熟悉 C# 中的冷 IEnumerable
模型和迭代器。基本上,C# 编译器所做的是,它采用一个 yield return
语句,并为每个迭代器生成一个类。所以,从某种意义上说,每个 C# 迭代器内部都包含一个状态机。您可以使用 Reflector 或类似工具检查这一点,对于一个 yield return
的 IEnumerator<T>
方法。或者更好的是,我朋友 Abhishek Sur 有一篇 关于 C# 迭代器内部机制的精彩文章,或者这篇关于 C# 中迭代器实现的帖子。
更多关于交互式扩展
启动一个 C# 控制台应用程序,并使用 install-package Ix-Main 安装交互式扩展包。您可以在 System.Interactive.dll 中探索 System.Linq.EnumerationsEx
命名空间——现在,让我们探索添加到 IEnumerable
的一些有用的扩展方法。
检查交互式扩展中的几个实用方法
让我们快速检查几个有用的实用方法。
Do
Do
最简单的版本的作用很有趣。它会在序列中的每个元素上惰性地调用一个操作,当我们进行枚举时,它会利用迭代器。
//Let us create a set of numbers
var numbers = new int[] { 30, 40, 20, 40 };
var result=numbers.Do(n=>Console.WriteLine(n));
Console.WriteLine("Before Enumeration");
foreach(var item in result)
{
//The action will be invoked when we actually enumerate
}
Console.WriteLine("After Enumeration");
以及下面的结果。请注意,当我们在枚举时,该操作(在此例中是我们的 Console.WriteLine
来打印值)才会被应用。
现在,Do
方法最简单版本的实现如下:如果您在这里的代码 Plex 的交互式扩展源代码中快速浏览一下,您可以看到我们的 Do
方法是如何实现的。这是一个缩短的版本
public static class StolenLinqExtensions
{
public static IEnumerable<TSource> StolenDo<TSource>(
this IEnumerable<TSource> source, Action<TSource> onNext)
{
//Get the enumerator
using (var e = source.GetEnumerator())
{
while (true)
{
//Move next
if (!e.MoveNext())
break;
var current = e.Current;
//Call our action on top of the current item
onNext(current);
//Yield return
yield return current;
}
}
}
}
酷吧?
DoWhile
Ix 中的 DoWhile
非常有趣。它会生成一个可枚举序列,通过重复源序列直到给定条件为真。
IEnumerable<TResult> DoWhile<TResult>(IEnumerable<TResult> source, Func<bool> condition)
考虑以下代码:
var numbers = new int[] { 30, 40, 20, 40 };
var then = DateTime.Now.Add(new TimeSpan(0, 0, 10));
var results = numbers.DoWhile(() => DateTime.Now < then);
foreach (var r in results)
Console.WriteLine(r);
正如预期的那样,您会看到 foreach
循环重复枚举结果,直到我们满足 DateTime.Now
< then 条件——也就是说,直到我们到达 10 秒。
Scan
Scan
会获取一个序列,应用一个累加器函数来生成一个累加值序列。例如,让我们创建一个简单的求和累加器,它会获取一组数字来累加每个数字与前一个数字的总和。
var numbers = new int[] { 10, 20, 30, 40 };
//0 is just the starting seed value
var results = numbers.Scan(0,(sum, num) => sum+num);
//Print Results. Results will contain 10, 30, 60, 100
//0+10=10
//10+20 = 30
//30 + 30 = 60
//60 + 40 = 100
您也可以看一下实际的 Scan
实现,从 CodePlex 中的 Rx 仓库。这是一个缩写版本。
IEnumerable<TAccumulate> StolenScan<TSource, TAccumulate>
(this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate,
TSource, TAccumulate> accumulator)
{
var acc = seed;
foreach (var item in source)
{
acc = accumulator(acc, item);
yield return acc;
}
}
结论
我们只是触及了冰山一角,因为这篇文章的目的是向您介绍 Ix 和 Rx。Bart De Smet 有一个 非常激动人心的演讲,您不应该错过。Ix 特别有趣,因为它有函数式编程的根源。看看 CodePlex 中的响应式扩展仓库 以获取更多灵感,它应该会给您带来更多关于一些函数式模式的想法。您还可以尝试 Ix Providers 和 Ix Async 包。
我冒昧地嵌入 Charles 创建的图,这是 Bart 在白板上画的抽象图的具体表示。这是本文的总结。
我们将在未来讨论 Rx 和 Ix 非常有用的更多实际场景——主要是设备到云的交互场景、复杂事件处理、使用 ISheduler
进行任务分发等——以及其他人正在 Rx 之上创建的一些出色的附加库。但这一篇只是一个快速介绍。祝您编码愉快!!