Rx 框架示例






4.98/5 (53投票s)
通过一些实际示例介绍响应式框架
引言
Rx 框架是一个非常有趣且有用的库,最近通过 DevLabs 发布。该框架的目的是提供一个大型框架,用于处理观察者模式。这是一个很好的学术描述,但不足以立即开始使用它。因此,让我们深入探讨一下。
本文旨在通过示例而非描述来介绍 Rx 框架。我不会深入探讨内部行为的任何细节,但我希望在即将发表的文章中涵盖这一点。我发现当学习全新的东西时,实际的工作示例为我提供了足够的信息来开始,并使我能够学习更多。
背景
此处提供的所有信息都总结或来源于 Channel 9 上出色的视频和文本博客文章。我强烈推荐它作为所有开发人员的学习资源。
对于本文,假定读者对观察者模式有基本的了解。
本文的代码是针对 NuGet 上的 Reactive Framework 版本 1.0.10621.0 编写的。
观看 Channel 9 上的 Rx 框架介绍视频也会有所帮助。
IObservable<T>
Rx 框架的核心是可观察模式,由 IObservable<T> 接口定义。它只提供一个方法,用于使用实现 IObserver<T>
接口的对象实例来订阅可观察对象。
Rx 框架的开发者做出的有趣联系是 IObservable<T>
是 IEnumerable<T>
接口的数学对偶。这在此视频中得到了很好的解释,但总结是 IEnumerable<T>
从序列中拉取元素,而 IObservable<T>
从序列中推送元素。
我能用它做什么?
我发现最具挑战性的部分是如何用它解决问题。它似乎是一个寻找问题的大锤子。虽然它不能解决无法解决的问题,但我认为它确实提供了解决现有问题的新颖有趣的方法,有时还能以更易于编写和维护的方式解决问题。
该框架提供内置的线程支持和控制,允许在后台线程中观察集合。
获取目录树
在处理磁盘时,一个众所周知的任务是在 UI 应用程序中获取树中的目录列表。

此示例的代码位于 Directories
项目中。
考虑这个 IEnumerable<string>
结果
static IEnumerable<string> GetAllDirectories(string path)
{
string[] subdirs = null;
// Some directories may be inaccessible.
try
{
subdirs = Directory.GetDirectories(path);
}
catch (IOException)
{
}
if (subdirs != null)
{
foreach (var subdir in subdirs)
{
yield return subdir;
foreach (var grandchild in GetAllDirectories(subdir))
{
yield return grandchild;
}
}
}
}
您可以以异步方式订阅它,使用
var observable = Observable.ToObservable(GetAllDirectories(@"c:\"), Scheduler.ThreadPool);
observable.Subscribe(outputDirectory);
其中 outputDirectory
是一个 Action<string>
。此委托在 ThreadPool
中的线程上调用,这意味着在调用 Winforms 对象上的方法时它不会直接工作。您可以使用正常的 Control.Invoke()
方法来解决此问题,或者您可以执行此操作
var observable = Observable.ToObservable
(GetAllDirectories(@"c:\")).ObserveOn(SynchronizationContext.Current);
observable.Subscribe(outputDirectory);
这将确保对 outputAction
的所有调用都在创建 myWinformsControl
的同一线程上进行。
现在,这样做的缺点是,高频率的单独调用 Winforms 更新会导致用户界面卡顿。解决此问题的一个方法是缓冲观察者生成的 string
。Reactive Framework 可以通过数量、时间或两者兼有来缓冲。由于频率是问题,我们将使用时间进行缓冲。
var observable =
Observable.ToObservable(GetAllDirectories(@"c:\"))
.Buffer(TimeSpan.FromSeconds(1))
.ObserveOn(this);
observable.Subscribe(outputDirectories);
这将缓冲直到一秒钟过去,然后调用 getDirectories
,它是一个 Action<IEnumerable<string>>
。
现在这对我很有效,直到我有幸获得了一块 SSD 硬盘。在 1 秒的延迟中,我每秒现在可以检索大约 16000 个元素。这比 TreeView
控件能够添加并仍然保持流畅的用户界面要多得多。
乍一看,解决方案是向 Buffer()
提供 1000
的元素计数,但这只意味着缓冲将在每 1000
个元素完成,大约是 80 毫秒,这只会让事情变得更糟。
解决方案是组合两个序列,一个是有规律的间隔,另一个是 1000
个元素的缓冲区。
Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
.Zip(GetAllDirectories(@"c:\").ToObservable(Scheduler.ThreadPool)
.Buffer(1000), (a, b) => b)
Zip()
运算符依次组合来自每个序列的元素。这意味着每 1000 个条目与每 1 秒生成的一个元素配对。
最后一部分是停止读取目录。每次调用 Subscribe
方法时,它都会返回一个 IDisposable
,可用于取消订阅观察者。我们可以用它来停止读取,如下所示
IDisposable observer = observable.Subscribe(outputDirectories)
...
observer.Dispose()
用户界面交互

我处理的许多应用程序都具有交互性,例如绘制或修改对象的 2D 视图。此环境中最常见的任务是选择、绘制和修改对象。这些行为的核心部分是鼠标交互,特别是将鼠标按钮和移动组合成拖动操作。
这些方面的挑战在于跟踪鼠标按钮和移动状态,以便可以在鼠标事件之间计算位置增量。虽然做这项工作并不具有挑战性,但困难的部分是实现一个干净的设计。人们通常会使用许多布尔变量来跟踪是否正在拖动、是否刚刚拖动、是否已移动至少 3 像素以及其他任意数量的组合。
虽然我还没有在生产环境中使用响应式框架来解决这些问题,但它有一些前景,至少在学术上很有趣,并且确实展示了 Rx 框架的其他一些方面。
展示这些技术的示例位于 DrawingApplication
项目中。它是一个非常简单的线条绘制迷你应用程序。它允许您在屏幕上绘制和移动线条。
观察事件
第一步是将事件设置为 IObservable
。
var movingEvents = Observable.FromEventPattern<MouseEventHandler,
MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h);
var upEvents = Observable.FromEvent<MouseEventHandler,
MouseEventArgs>(h => this.MouseUp += h, h => this.MouseUp -= h);
var downEvents = Observable.FromEvent<MouseEventHandler,
MouseEventArgs>(h => this.MouseDown += h, h => this.MouseDown -= h);
这设置了三个 IObservable<IEvent<MouseEventArgs>>
实例:movingEvents
、upEvents
和 downEvents
。每次发生鼠标交互时,任何观察者都将收到通知。
我们可以从这里做很多事情。
识别鼠标按下和鼠标抬起之间的鼠标移动
var draggingEvents = movingEvents.WaitUntil(downEvents).Until(upEvents);
现在,问题在于 draggingEvents
可观察对象是一次性的。这意味着一旦鼠标按钮释放,可观察对象将不再产生元素。解决方案是使用 Repeat()
扩展方法。
var draggingEvents =
movingEvents.WaitUntil(downEvents).Until(upEvents).Repeat();
挑选左键点击
var leftClicks = from e in downEvents
where e.EventArgs.Button == MouseButtons.Left
select e;
leftClicks.Subscribe(...);
这也表明您可以使用 LINQ 查询语法来处理可观察对象。
配对拖动的鼠标事件
这是与屏幕上的项目交互时的常见要求。您希望在鼠标被拖动时,鼠标坐标对之间或上一个位置和当前位置之间存在增量。这对于平移屏幕、移动选定项目或绘制项目很有用。或者,这可以表示为实际的点对而不是增量。
var deltas = from pair in movingEvents.Buffer(2)
let array = pair.ToArray()
let a = array[0].EventArgs.Location
let b = array[1].EventArgs.Location
select new Size(b.X - a.X, b.Y - a.Y);
var dragDeltas = moveDeltas.WaitUntil(downEvents).Until(upEvents).Repeat();
dragDeltas
是一个 IObservable
,它提供连续的鼠标事件,并计算出位置差异。对 Observable.Buffer()
的调用不会将元素拆分为单独的对,而是使每个单独的鼠标移动事件出现在两对中,当然,除了第一个和最后一个事件。
合并流
此示例应用程序是 ConsoleReader
和 ConsoleOutputter
。ConsoleReader
是父进程,ConsoleOutputter
代表子进程。

在使用外部进程时,stdout
和 stderr
消息通常需要重定向并输出到主窗口。这可能是一个挑战,因为从子进程的标准输出和错误流读取是一个阻塞操作。响应式框架使其变得容易。
第一部分是将 StreamReader
表示为 IEnumerable
,以便于使用 Reactive Framework。
private static IEnumerable<string> GetLineReader(StreamReader reader)
{
while (reader.BaseStream.CanRead)
{
var l = reader.ReadLine();
if (l == null)
{
break;
}
yield return l;
}
}
接下来是将标准流和输出流合并到单个可观察对象中
var process = Process.Start(info);
var childStdOut = GetLineReader(process.StandardOutput).ToObservable();
var childStdErr = GetLineReader(process.StandardError).ToObservable();
Observable.Merge(childStdOut, childStdErr).Subscribe(LineOutputter);
LineOutputter
是一个 Action<string>
委托,它为子进程的 StandardOutput
和 StandardError
流产生的每一行调用。
一个简单的图像查看应用程序
此示例的代码位于 ImageViewer
项目中。

想象一个简单的应用程序,您可以检查一个目录,显示其中的图像文件,然后能够单击文件以显示图像。
这将有以下主要工作场所
- 在目录中查找文件
- 为每个文件加载缩略图
- 单击选定图像时加载图像
为了实现响应式用户界面,所有这三个操作都需要在后台线程中进行。其中具有挑战性的部分是如何以简单且线程安全的方式将任务排队,并实现线程之间的通信。下面是一个显示需要发生的动作和信息流的图表。

有趣的是,当加载图像时,我们也有机会为图像设置缩略图。
主体
我们在这个例子中使用的新概念是 ISubject<T>
接口及其默认实现 Subject<T>
。Subject<T>
提供了一种简单的机制,可以提前设置一些观察者,然后为这些观察者提供我们尚不了解的其他序列。可以把它想象成一扇门,你可以观察进来的人,但你并不知道他们是从哪里来的。
我们将此用于缩略图设置,因为缩略图可以来自两个来源——根据文件列表加载缩略图,以及在查看完整图像时加载缩略图。
主体和 OnComplete 的链接
使用 Subject 观察可观察对象的一个意外结果是它没有重复。考虑以下两个代码片段
myObservable.Subscribe(mySubject)
和
myObservable.Subcribe(item => mySubject.OnNext(item))
它们相似之处在于 subject 会收到通知,但关键区别在于当 myObservable
调用 OnCompleted()
时,subject 的 OnCompleted()
方法也会被调用。这意味着订阅到 subject 的任何内容将不再发送通知。只需注意这一点。
发布还是不发布
在本文的第一个版本中,这个图像查看器有一些严重的内存问题。它会大幅波动内存使用量,这非常表明发生了大量的内存分配,然后被垃圾回收。根本原因最终是使用查询方法评估可观察对象的方式。
我需要一些帮助来解决这个问题,Rx 论坛上的好心人很好地解释了这些事情。这是相关帖子。
当在可观察对象上使用查询时,例如这样
var images = from filename in filenames
select Image.FromFile(filenames)
结果发现,对于附加到图像的每个观察者,select
语句每次都会被评估。当处理像 Images
这样的大型 Disposable
对象时,这是一个严重的问题。
解决方案在概念上与 IEnumerable
的 ToList()
扩展方法类似,在这种情况下是 Publish()
。这会强制 select
语句对于每个传入项只评估一次,而不是对于每个传出观察者评估一次。就像这样
var images = (from filename in filenames
select Image.FromFile(filenames)).Publish()
// ... subscribe more observables
images.Connect();
设置可观察对象链
下面的代码展示了图像查看器表单的核心。在这里,我们将各种线程上的所有可观察对象连接起来,以便在按下“查找图像”按钮时读取文件名。
// Create a subject to make it easy to listen for thumbnail images
// and update them on the winforms thread.
var thumbnailSubject = new Subject<KeyValuePair<ListViewItem, Image>>();
// Observes thumbnailSubject on the winforms thread and puts the thumbnail
// into the image list and the listview.
thumbnailSubject.ObserveOn(SynchronizationContext.Current).Subscribe(item =>
{
this.imageList.Images.Add(item.Key.Text, item.Value);
item.Key.ImageKey = item.Key.Text;
});
// We want to receive item notifications on a different thread, so we
// make use of the ObserveOn() method.
var thumbnailLoader = from listItem in itemObservable.ObserveOn(Scheduler.ThreadPool)
let filename = listItem.Text
let thumb = LoadThumbnail(filename)
select MakePair(listItem, (Image)thumb);
// We can't just call thumbnailLoader.Subscribe(thumbnailSubject)
// because when the thumbnailLoader
// finishes it will call the subjects OnComplete(),
// and that will stop our thumbnails loading on a different directory.
thumbnailLoader.Subscribe(a => thumbnailSubject.OnNext(a));
// Create an observable based on the SelectedIndexChanged event on the list
// view, and provide objects describing the selected item
// and the filename for that item.
var selectedFilename = (from index in Observable.FromEventPattern<EventArgs>
(this.listViewThumbnails, "SelectedIndexChanged")
where this.listViewThumbnails.SelectedItems.Count > 0
select new
{
Item = this.listViewThumbnails.SelectedItems[0],
Filename = this.listViewThumbnails.SelectedItems[0].Text
}).DistinctUntilChanged(i => i.Filename);
// Transform the selectedFilename observable into one that
// loads in images from the selectedFilename observable.
// We use ObserveOn() to make sure the image loading happens on a separate thread.
var selectedImage = (from item in selectedFilename.ObserveOn(Scheduler.ThreadPool)
let filename = item.Filename
let image = Image.FromFile(filename)
let thumb = new Bitmap(image, new Size(64, 64))
select new
{
Item = item.Item,
Filename = item.Filename,
Image = image,
Thumb = thumb
}).Publish();
// Observe the selectedImage and construct an object the thumbnailSubject understands
var selectedImageThumb = from item in selectedImage
select MakePair(item.Item, (Image)item.Thumb);
// Connect the selected thumb image to the thumbnail subject
// so when we load the selected image, the thumbnail updates.
selectedImageThumb.Subscribe(a => thumbnailSubject.OnNext(a));
// Observe selectedImage on the WinForms thread to show the selected image
// in the main window.
selectedImage.ObserveOn(SynchronizationContext.Current).Subscribe
(item => ShowImage(item.Image));
// now that everything is subscribed to the selected image,
// connect it up to start publishing.
selectedImage.Connect();
如果您遵循代码,您可以看到它遵循上面显示的流程图。
关注点
响应式框架并非万能药,也并非包罗万象,更不是解决所有问题的“大锤”。它只是开发人员工具箱中的又一个工具。
我还没有涉及框架的内部原理或其基本工作方式。那将是未来文章的内容。
Rx 框架为转换管道或数据流网络开启了一些有趣的可能性。虽然不是完美契合,但仍然是一个值得探讨的有趣话题。
谢谢
感谢 MSDN 论坛上的 fcharlon 协助解决了某些 Rx 问题。
感谢好心的 CodeProject 成员 leeloo999 识别并帮助修复了图像查看器中的一些内存使用问题。
感谢所有阅读并投票给这篇文章的人!
历史
- 2010 年 1 月 10 日 - 初稿
- 2010 年 1 月 15 日 - 首次发布
- 2010 年 2 月 5 日 - 修复了一些内存问题和其他 minor 文章问题
- 2011 年 11 月 4 日 - 更新为使用 NuGet 和 Visual Studio 2010 上的官方 Rx Release 1.0.10621.0