65.9K
CodeProject 正在变化。 阅读更多。
Home

Rx 框架示例

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.98/5 (53投票s)

2010 年 1 月 14 日

CPOL

10分钟阅读

viewsIcon

160514

downloadIcon

3892

通过一些实际示例介绍响应式框架

引言

Rx 框架是一个非常有趣且有用的库,最近通过 DevLabs 发布。该框架的目的是提供一个大型框架,用于处理观察者模式。这是一个很好的学术描述,但不足以立即开始使用它。因此,让我们深入探讨一下。

本文旨在通过示例而非描述来介绍 Rx 框架。我不会深入探讨内部行为的任何细节,但我希望在即将发表的文章中涵盖这一点。我发现当学习全新的东西时,实际的工作示例为我提供了足够的信息来开始,并使我能够学习更多。

背景

此处提供的所有信息都总结或来源于 Channel 9 上出色的视频和文本博客文章。我强烈推荐它作为所有开发人员的学习资源。

对于本文,假定读者对观察者模式有基本的了解。

本文的代码是针对 NuGet 上的 Reactive Framework 版本 1.0.10621.0 编写的。

观看 Channel 9 上的 Rx 框架介绍视频也会有所帮助。

IObservable<T>

Rx 框架的核心是可观察模式,由 IObservable<T> 接口定义。它只提供一个方法,用于使用实现 IObserver<T> 接口的对象实例来订阅可观察对象。

Rx 框架的开发者做出的有趣联系是 IObservable<T>IEnumerable<T> 接口的数学对偶。这在此视频中得到了很好的解释,但总结是 IEnumerable<T> 从序列中拉取元素,而 IObservable<T> 从序列中推送元素。

我能用它做什么?

我发现最具挑战性的部分是如何用它解决问题。它似乎是一个寻找问题的大锤子。虽然它不能解决无法解决的问题,但我认为它确实提供了解决现有问题的新颖有趣的方法,有时还能以更易于编写和维护的方式解决问题。

该框架提供内置的线程支持和控制,允许在后台线程中观察集合。

获取目录树

在处理磁盘时,一个众所周知的任务是在 UI 应用程序中获取树中的目录列表。

此示例的代码位于 Directories 项目中。

考虑这个 IEnumerable<string> 结果

static IEnumerable<string> GetAllDirectories(string path)
{
  string[] subdirs = null;
  
  // Some directories may be inaccessible.
  try
  {
    subdirs = Directory.GetDirectories(path);
  }
  catch (IOException)
  {
  }
  
  if (subdirs != null)
  {
    foreach (var subdir in subdirs)
    {
      yield return subdir;
      
      foreach (var grandchild in GetAllDirectories(subdir))
      {
        yield return grandchild;
      }
    }
  }
}

您可以以异步方式订阅它,使用

var observable = Observable.ToObservable(GetAllDirectories(@"c:\"), Scheduler.ThreadPool);
observable.Subscribe(outputDirectory);	

其中 outputDirectory 是一个 Action<string>。此委托在 ThreadPool 中的线程上调用,这意味着在调用 Winforms 对象上的方法时它不会直接工作。您可以使用正常的 Control.Invoke() 方法来解决此问题,或者您可以执行此操作

var observable = Observable.ToObservable
	(GetAllDirectories(@"c:\")).ObserveOn(SynchronizationContext.Current);
observable.Subscribe(outputDirectory);

这将确保对 outputAction 的所有调用都在创建 myWinformsControl 的同一线程上进行。

现在,这样做的缺点是,高频率的单独调用 Winforms 更新会导致用户界面卡顿。解决此问题的一个方法是缓冲观察者生成的 string。Reactive Framework 可以通过数量、时间或两者兼有来缓冲。由于频率是问题,我们将使用时间进行缓冲。

var observable = 

Observable.ToObservable(GetAllDirectories(@"c:\"))
        .Buffer(TimeSpan.FromSeconds(1))
        .ObserveOn(this);

observable.Subscribe(outputDirectories);

这将缓冲直到一秒钟过去,然后调用 getDirectories,它是一个 Action<IEnumerable<string>>

现在这对我很有效,直到我有幸获得了一块 SSD 硬盘。在 1 秒的延迟中,我每秒现在可以检索大约 16000 个元素。这比 TreeView 控件能够添加并仍然保持流畅的用户界面要多得多。

乍一看,解决方案是向 Buffer() 提供 1000 的元素计数,但这只意味着缓冲将在每 1000 个元素完成,大约是 80 毫秒,这只会让事情变得更糟。

解决方案是组合两个序列,一个是有规律的间隔,另一个是 1000 个元素的缓冲区。

Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
.Zip(GetAllDirectories(@"c:\").ToObservable(Scheduler.ThreadPool)
.Buffer(1000), (a, b) => b)

Zip() 运算符依次组合来自每个序列的元素。这意味着每 1000 个条目与每 1 秒生成的一个元素配对。

最后一部分是停止读取目录。每次调用 Subscribe 方法时,它都会返回一个 IDisposable,可用于取消订阅观察者。我们可以用它来停止读取,如下所示

IDisposable observer = observable.Subscribe(outputDirectories)
      ...
observer.Dispose() 

用户界面交互

我处理的许多应用程序都具有交互性,例如绘制或修改对象的 2D 视图。此环境中最常见的任务是选择、绘制和修改对象。这些行为的核心部分是鼠标交互,特别是将鼠标按钮和移动组合成拖动操作。

这些方面的挑战在于跟踪鼠标按钮和移动状态,以便可以在鼠标事件之间计算位置增量。虽然做这项工作并不具有挑战性,但困难的部分是实现一个干净的设计。人们通常会使用许多布尔变量来跟踪是否正在拖动、是否刚刚拖动、是否已移动至少 3 像素以及其他任意数量的组合。

虽然我还没有在生产环境中使用响应式框架来解决这些问题,但它有一些前景,至少在学术上很有趣,并且确实展示了 Rx 框架的其他一些方面。

展示这些技术的示例位于 DrawingApplication 项目中。它是一个非常简单的线条绘制迷你应用程序。它允许您在屏幕上绘制和移动线条。

观察事件

第一步是将事件设置为 IObservable

var movingEvents = Observable.FromEventPattern<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h);

var upEvents = Observable.FromEvent<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseUp += h, h => this.MouseUp -= h);

var downEvents = Observable.FromEvent<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseDown += h, h => this.MouseDown -= h);

这设置了三个 IObservable<IEvent<MouseEventArgs>> 实例:movingEventsupEventsdownEvents。每次发生鼠标交互时,任何观察者都将收到通知。

我们可以从这里做很多事情。

识别鼠标按下和鼠标抬起之间的鼠标移动

var draggingEvents = movingEvents.WaitUntil(downEvents).Until(upEvents);

现在,问题在于 draggingEvents 可观察对象是一次性的。这意味着一旦鼠标按钮释放,可观察对象将不再产生元素。解决方案是使用 Repeat() 扩展方法。

var draggingEvents = 
		movingEvents.WaitUntil(downEvents).Until(upEvents).Repeat();

挑选左键点击

var leftClicks = from e in downEvents 
                 where e.EventArgs.Button == MouseButtons.Left 
                 select e;

leftClicks.Subscribe(...);

这也表明您可以使用 LINQ 查询语法来处理可观察对象。

配对拖动的鼠标事件

这是与屏幕上的项目交互时的常见要求。您希望在鼠标被拖动时,鼠标坐标对之间或上一个位置和当前位置之间存在增量。这对于平移屏幕、移动选定项目或绘制项目很有用。或者,这可以表示为实际的点对而不是增量。

var deltas = from pair in movingEvents.Buffer(2)
             let array = pair.ToArray()
             let a = array[0].EventArgs.Location
             let b = array[1].EventArgs.Location
             select new Size(b.X - a.X, b.Y - a.Y);

var dragDeltas = moveDeltas.WaitUntil(downEvents).Until(upEvents).Repeat();

dragDeltas 是一个 IObservable,它提供连续的鼠标事件,并计算出位置差异。对 Observable.Buffer() 的调用不会将元素拆分为单独的对,而是使每个单独的鼠标移动事件出现在两对中,当然,除了第一个和最后一个事件。

合并流

此示例应用程序是 ConsoleReaderConsoleOutputterConsoleReader 是父进程,ConsoleOutputter 代表子进程。

Console Reader Screenshot

在使用外部进程时,stdoutstderr 消息通常需要重定向并输出到主窗口。这可能是一个挑战,因为从子进程的标准输出和错误流读取是一个阻塞操作。响应式框架使其变得容易。

第一部分是将 StreamReader 表示为 IEnumerable,以便于使用 Reactive Framework。

private static IEnumerable<string> GetLineReader(StreamReader reader)
{
  while (reader.BaseStream.CanRead)
  {
    var l = reader.ReadLine();
    
    if (l == null)
    {
      break;
    }
    
    yield return l;
  }
}

接下来是将标准流和输出流合并到单个可观察对象中

var process = Process.Start(info);

var childStdOut = GetLineReader(process.StandardOutput).ToObservable();
var childStdErr = GetLineReader(process.StandardError).ToObservable();

Observable.Merge(childStdOut, childStdErr).Subscribe(LineOutputter);

LineOutputter 是一个 Action<string> 委托,它为子进程的 StandardOutputStandardError 流产生的每一行调用。

一个简单的图像查看应用程序

此示例的代码位于 ImageViewer 项目中。

想象一个简单的应用程序,您可以检查一个目录,显示其中的图像文件,然后能够单击文件以显示图像。

这将有以下主要工作场所

  • 在目录中查找文件
  • 为每个文件加载缩略图
  • 单击选定图像时加载图像

为了实现响应式用户界面,所有这三个操作都需要在后台线程中进行。其中具有挑战性的部分是如何以简单且线程安全的方式将任务排队,并实现线程之间的通信。下面是一个显示需要发生的动作和信息流的图表。

Chart showing the flow of data for the image viewer application

有趣的是,当加载图像时,我们也有机会为图像设置缩略图。

主体

我们在这个例子中使用的新概念是 ISubject<T> 接口及其默认实现 Subject<T>Subject<T> 提供了一种简单的机制,可以提前设置一些观察者,然后为这些观察者提供我们尚不了解的其他序列。可以把它想象成一扇门,你可以观察进来的人,但你并不知道他们是从哪里来的。

我们将此用于缩略图设置,因为缩略图可以来自两个来源——根据文件列表加载缩略图,以及在查看完整图像时加载缩略图。

主体和 OnComplete 的链接

使用 Subject 观察可观察对象的一个意外结果是它没有重复。考虑以下两个代码片段

myObservable.Subscribe(mySubject)

myObservable.Subcribe(item => mySubject.OnNext(item))

它们相似之处在于 subject 会收到通知,但关键区别在于当 myObservable 调用 OnCompleted() 时,subject 的 OnCompleted() 方法也会被调用。这意味着订阅到 subject 的任何内容将不再发送通知。只需注意这一点。

发布还是不发布

在本文的第一个版本中,这个图像查看器有一些严重的内存问题。它会大幅波动内存使用量,这非常表明发生了大量的内存分配,然后被垃圾回收。根本原因最终是使用查询方法评估可观察对象的方式。

我需要一些帮助来解决这个问题,Rx 论坛上的好心人很好地解释了这些事情。这是相关帖子

当在可观察对象上使用查询时,例如这样

var images = from filename in filenames 
            select Image.FromFile(filenames)

结果发现,对于附加到图像的每个观察者,select 语句每次都会被评估。当处理像 Images 这样的大型 Disposable 对象时,这是一个严重的问题。

解决方案在概念上与 IEnumerableToList() 扩展方法类似,在这种情况下是 Publish()。这会强制 select 语句对于每个传入项只评估一次,而不是对于每个传出观察者评估一次。就像这样

var images = (from filename in filenames 
             select Image.FromFile(filenames)).Publish()

// ... subscribe more observables

images.Connect();

设置可观察对象链

下面的代码展示了图像查看器表单的核心。在这里,我们将各种线程上的所有可观察对象连接起来,以便在按下“查找图像”按钮时读取文件名。

  // Create a subject to make it easy to listen for thumbnail images 
  // and update them on the winforms thread.
  var thumbnailSubject = new Subject<KeyValuePair<ListViewItem, Image>>();
  
  // Observes thumbnailSubject on the winforms thread and puts the thumbnail 
  // into the image list and the listview.
  thumbnailSubject.ObserveOn(SynchronizationContext.Current).Subscribe(item =>
  {
    this.imageList.Images.Add(item.Key.Text, item.Value);
    item.Key.ImageKey = item.Key.Text;
  });
  
  // We want to receive item notifications on a different thread, so we 
  // make use of the ObserveOn() method.
  var thumbnailLoader = from listItem in itemObservable.ObserveOn(Scheduler.ThreadPool)
                        let filename = listItem.Text
                        let thumb = LoadThumbnail(filename)
                        select MakePair(listItem, (Image)thumb);
                        
  // We can't just call thumbnailLoader.Subscribe(thumbnailSubject) 
  // because when the thumbnailLoader 
  // finishes it will call the subjects OnComplete(), 
  // and that will stop our thumbnails loading on a different directory.
  thumbnailLoader.Subscribe(a => thumbnailSubject.OnNext(a));
  
  // Create an observable based on the SelectedIndexChanged event on the list 
  // view, and provide objects describing the selected item 
  // and the filename for that item.
  var selectedFilename = (from index in Observable.FromEventPattern<EventArgs>
  		(this.listViewThumbnails, "SelectedIndexChanged")
                         where this.listViewThumbnails.SelectedItems.Count > 0
                         select new
                         {
                           Item = this.listViewThumbnails.SelectedItems[0],
                           Filename = this.listViewThumbnails.SelectedItems[0].Text
                         }).DistinctUntilChanged(i => i.Filename);
                         
  // Transform the selectedFilename observable into one that 
  // loads in images from the selectedFilename observable.
  // We use ObserveOn() to make sure the image loading happens on a separate thread.
  var selectedImage = (from item in selectedFilename.ObserveOn(Scheduler.ThreadPool)
                       let filename = item.Filename
                       let image = Image.FromFile(filename)
                       let thumb = new Bitmap(image, new Size(64, 64))
                       select new
                       {
                         Item = item.Item,
                         Filename = item.Filename,
                         Image = image,
                         Thumb = thumb
                       }).Publish();
                       
  // Observe the selectedImage and construct an object the thumbnailSubject understands
  var selectedImageThumb = from item in selectedImage
                           select MakePair(item.Item, (Image)item.Thumb);
                           
  // Connect the selected thumb image to the thumbnail subject 
  // so when we load the selected image, the thumbnail updates.
  selectedImageThumb.Subscribe(a => thumbnailSubject.OnNext(a));
  
  // Observe selectedImage on the WinForms thread to show the selected image 
  // in the main window.
  selectedImage.ObserveOn(SynchronizationContext.Current).Subscribe
			(item => ShowImage(item.Image));
  
  // now that everything is subscribed to the selected image, 
  // connect it up to start publishing.
  selectedImage.Connect();

如果您遵循代码,您可以看到它遵循上面显示的流程图。

关注点

响应式框架并非万能药,也并非包罗万象,更不是解决所有问题的“大锤”。它只是开发人员工具箱中的又一个工具。

我还没有涉及框架的内部原理或其基本工作方式。那将是未来文章的内容。

Rx 框架为转换管道或数据流网络开启了一些有趣的可能性。虽然不是完美契合,但仍然是一个值得探讨的有趣话题。

谢谢

感谢 MSDN 论坛上的 fcharlon 协助解决了某些 Rx 问题。

感谢好心的 CodeProject 成员 leeloo999 识别并帮助修复了图像查看器中的一些内存使用问题。

感谢所有阅读并投票给这篇文章的人!

历史

  • 2010 年 1 月 10 日 - 初稿
  • 2010 年 1 月 15 日 - 首次发布
  • 2010 年 2 月 5 日 - 修复了一些内存问题和其他 minor 文章问题
  • 2011 年 11 月 4 日 - 更新为使用 NuGet 和 Visual Studio 2010 上的官方 Rx Release 1.0.10621.0
© . All rights reserved.