使用响应式扩展 - 冷观察者





5.00/5 (7投票s)
创建冷观察者、拦截观察者并将观察者拆分为异步任务。
引言
这是一系列文章,描述了如何在应用程序中实现响应式扩展,上一篇文章讨论了热观察者。
在本文中,我将重点介绍冷观察者,即您在程序中通过订阅它们产生的事件来创建和启动的观察者。它们在您介入之前不会运行,就像 MouseMove、KeyDown 等事件的连接一样。乍一看这似乎是一个不必要的区别,但请相信我,事实并非如此,我将向您展示原因。
热观察者已经在给定的线程上启动了,例如与鼠标相关的事件,它们已经在 UI 线程上运行。由于它是热观察者,您无法决定事件应该在哪里创建或何时启动,您能做的只是决定是否要订阅它。
与上一篇文章一样,这里的代码在 WPF 应用程序中运行,使用了 rx-main 和 rx-xaml 的 NuGet 包。这两个都是微软发布的。
UI 线程
您可能已经知道,UI 线程是唯一可以更新 WinForm 或 WPF 应用程序中任何控件的线程。这使得尽快将任务从 UI 线程移开变得可取,以确保 UI 不会变得无响应。
您对此无能为力,因为只有一个线程负责更新 UI,如果您尝试从不同的线程直接修改 WPF 或 WinForm 控件,无论其优先级如何,它都会抛出一个错误,说您试图从非指定的 UI 线程修改控件。
这个问题通常的解决方法是为每个您想通过按钮点击事件执行的耗时任务启动一个后台工作程序(或后台线程)。然后您将通过 BackgroundWorker 的回调事件更新 UI 线程。如果您使用普通线程,有一个名为 Progress 的新类,您可以与 IProgress<T> 接口一起使用,以回调 UI 线程。
您可能已经预料到,即使是最简单的后台任务,为了正常工作,也需要做很多工作,包括错误处理、取消和更新进度条等。Rx 提供了一种统一的方式来在业务应用程序中实现这一点,只需几行代码即可确保 UI 始终响应。
我们将从使用一个简单的 MouseMove 事件钩子来创建一个热观察者流开始。
var RxMouseMoveEvent = Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
.Select(arg => arg.EventArgs.GetPosition(this));
我将根据鼠标位置进行一些繁重的计算,所以我希望尽快将其移出 UI 线程。您可能已经注意到我在 Select 中放入了一些代码来获取相对于 UI 元素(在本例中是 Window)的鼠标位置。由于 Window 是一个 UI 控件,这必须在 UI 线程上完成。Select 语句还使用它将流从 Observable<EventPattern<MouseEventArgs>>
更改为 Observable<Point>
。
我现在想将观察者流从 UI 线程移到后台线程。实际上,使用 Rx 来实现这一目标非常容易。
RxMouseMoveEvent //Take it off the UI thread .ObserveOn(Scheduler.Default)
我现在希望在 MouseMove 事件没有触发 0.5 秒时执行计算,即鼠标是空闲的。然后,我想调用一个需要一些时间来完成的函数,正如您可能猜到的,这个函数调用最方便在 Select 调用中完成。由于我将把结果发布到 UI 控件上,所以我选择使用 ObserveOnDispatcher() 调用将流路由回 UI 线程。
RxMouseMoveEvent
//Take it off the UI thread
.ObserveOn(Scheduler.Default)
// Wait for a 0.5 second pause
.Throttle(new TimeSpan(0, 0, 0, 0, 500))
//Do calcualtions
.Select(xy => CalculatePointRelateThing(xy))
//Change it back to the UI thread
.ObserveOnDispatcher()
//Subscribe to the change
.Subscribe(evt =>
{
txtText.Text = evt.ToString();
});
evt
只是流 Observable<Point>
的一个单独点值。诚然,我也可以直接从后台线程更新 TextBox,如下所示:
RxMouseMoveEvent
//Take it off the UI thread
.ObserveOn(Scheduler.Default)
// Wait for a 0.5 second pause
.Throttle(new TimeSpan(0, 0, 0, 0, 500))
//Do calcualtions
.Select(xy => CalculatePointRelateThing(xy))
.Subscribe(evt =>
{
this.txtText.Dispatcher.Invoke(DispatcherPriority.Normal,
new Action(() => { this.txtText.Text = evt.ToString(); })); ;
}
);
只要您不需要与多个控件在错误报告和其他 UI 线程上完成的操作进行交互,这就可以了(您现在可以看到为什么将流切换回 UI 线程的函数称为 ObserveOnDispatcher
)。总之,Rx 版本比任何旧的(或新的)回调方法都更易读、更短、更灵活。
冷观察者
这是一个热观察者示例,让我们看看如何开始和在冷观察者之间切换线程。有 3 种主要方式可以启动冷观察者流,即创建单个实例值或流(如 Observable.Create
),在流中创建多个实例(如 Observable.Range
),最后将通用的 IEnumerable 对象转换为 Observable 对象流(如 Task<T>.ToObservable
),您可以查看 Lee Campbell 的免费在线书籍 以获取更多示例。还有第四种方式,就是直接使用 Subject<T>
,但不推荐这样做,主要是出于 Observable 流创建和处理一致性的考虑。
创建一个整数的冷观察者流非常简单。
var MyIntStream = Observable.Range(0, 5);
但这里要强调的是,这段代码实际上并没有运行流。它只有在您订阅流时才开始运行,也就是说,这就是冷观察者的定义方式。现在假设我想让这个流从一开始就在后台线程上初始化。我可以通过使用 ObserveOn()
函数调用来更改线程,但这将意味着流最初在 UI 线程上启动。冷观察者实际上有一个不同的技巧,通过使用 SubscribeOn(Scheduler.Default)
来更改初始化线程。然而,SubscribeOn 方法的工作原理值得更彻底的 walkthrough。记住,冷观察者实际上是由 Subscribe 调用启动的。如果您参考 StackOverflow 中的示例,您就能更清楚地了解发生了什么。
Thread.CurrentThread.Name = "Main";
IScheduler thread1 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread1" });
IScheduler thread2 = new NewThreadScheduler(x => new Thread(x) { Name = "Thread2" });
Observable.Create<int>(o =>
{
Console.WriteLine("Created on " + Thread.CurrentThread.Name);
o.OnNext(1);
o.OnCompleted();
return Disposable.Create(() => { });
})
.SubscribeOn(thread1)
.ObserveOn(thread2)
.Subscribe(
x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)
, ex => Console.WriteLine(ex.Message)
, () => { Console.WriteLine("Completed on " + Thread.CurrentThread.Name); }
);
这将向您展示 Observable 是在 SubscribeOn
包含的线程上创建的。有趣的是,如果您移除 ObserveOn,Subscribe 函数中的所有内容都将在 SubscribeOn 中提供的线程上执行。您还可以添加一个 Select 来检查 SubscribeOn 是否在所有点上都改变了线程。
.SubscribeOn(thread1)
.Select(x =>
{
Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);
return x;
})
所有操作都将在同一线程上进行。但是,如果您将 Observable 改为热的,这里的情况会有点不同。
var KeyDown = Observable.FromEventPattern<KeyEventArgs>(this, "PreviewKeyDown")
.Select(x =>{Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);return x;})
.SubscribeOn(thread1)
.Select(x =>{Console.WriteLine("Current thread check: " + Thread.CurrentThread.Name);return x;})
.Subscribe(
x => Console.WriteLine("Observing '" + x + "' on " + Thread.CurrentThread.Name)
, ex => Console.WriteLine(ex.Message)
, () => { Console.WriteLine("Completed on " + Thread.CurrentThread.Name); }
);
SubscribeOn 在这里将不起任何作用,在上面的示例中,所有操作都将在主线程上进行。事实上,我很难看出它对热观察者有什么影响,而且无论如何,我实在看不出在热观察者上使用它的意义,因为它们运行的线程已经确定,您所能做的只是改变线程。如果我错了,请提供一个代码示例并指出我的错误。
创建中间层
在之前的例子中,我在 Select 函数中调用了函数,以改变或对流式值进行计算。然而,还有一种不同的方法,那就是创建一个中间层,该层订阅传入的流,并返回一个新的流作为结果。这种技术可以用于,正如 Nicolas Dorier 所做的那样,创建弱订阅和其他一些东西。他不是唯一使用它的人,它实际上在 Rx .NET 源代码中用得相当多,您可以 在这里 下载并自行查看。
public static IObservable<TItem> ObserveWeakly<TItem>(this IObservable<TItem> collection)
{
return Observable.Create<TItem>(obs =>
{
var weakSubscription = new WeakSubscription<TItem>(collection, obs);
return () =>
{
weakSubscription.Dispose();
};
});
}
obs
实际上是一个 Observer<T>
对象,并且是 Observable.Create
方法将 OnNext、OnCompleted 和 OnError 传递到流中的对象。所以我们创建了一个类,它订阅传入的 Observable,并将适当的值传递给观察者,前提是它仍然处于活动状态。如果值不是活动的,订阅将被终止。
public class WeakSubscription<T> : IDisposable, IObserver<T>
{
private readonly WeakReference reference;
private readonly IDisposable subscription;
private bool disposed;
public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
{
this.reference = new WeakReference(observer);
this.subscription = observable.Subscribe(this);
}
void IObserver<T>.OnCompleted()
{
var observer = (IObserver<T>)this.reference.Target;
if(observer != null)
observer.OnCompleted();
else
this.Dispose();
}
void IObserver<T>.OnError(Exception error)
{
var observer = (IObserver<T>)this.reference.Target;
if(observer != null)
observer.OnError(error);
else
this.Dispose();
}
void IObserver<T>.OnNext(T value)
{
var observer = (IObserver<T>)this.reference.Target;
if(observer != null)
observer.OnNext(value);
else
this.Dispose();
}
public void Dispose()
{
if(!this.disposed)
{
this.disposed = true;
this.subscription.Dispose();
}
}
}
这种方法实际上非常强大,正如我之前提到的。它实际上是对流的中间订阅,您可以在类中拦截任何流值并执行复杂的算术运算,同时仍然利用 Rx 的全部功能。
并行和异步进程
现在假设我想基于整数流输入并行运行一些计算。最简单的方法是使用 Task.Run
方法。您应该意识到 Task.Run
主要是一个异步过程,可能不会并行运行所有内容。然而,在我的例子中,Task.Run
更适合,因为它会根据您启动的任务数量调整其线程池,基本上使它们更易于使用。要在一项操作中启动它们,您可以简单地写:
.Select(x=>Task.Run(()=>DoTaskWork(x)))
但这会导致您的一些麻烦,因为此函数调用 Task.Run
的返回值实际上是 Task<T>
,更糟糕的是,它会在启动后立即返回 Task<T>
。您实际上不想要这个,您通常希望任务在完成后返回 T
值。幸运的是,有一个简单的技巧可以使用,那就是调用 SelectMany
。它用于将多个流展平成一个流,这正是我想要的,即让任务在完成后尽快返回一个值。如果我想让它们按照原始顺序排列,我需要在所有都完成后对它们进行排序。
代码确实非常简单整洁,最重要的是,我无需进行任何复杂的 Callbacks 即可更新 UI。
var MyIntStream = Observable.Range(0, 5)
// Take the Observable off the UI thread
.SubscribeOn(Scheduler.Default)
//Create parallel tasks and report results when completed
.SelectMany(x => Task.Run<FreqValues>(() => DoTaskWork(x), ctr.Token))
// Switch back to the UI thread
.ObserveOnDispatcher()
.Subscribe(args => {
//Update the progressbar when a thread is completed
pgbProgress.Value += 1;
//Tell me what thread is completed
txtText.Text += Environment.NewLine + args.Frequency.ToString();
},
() => {
//All done, here I could do sorting etc.
txtText.Text += Environment.NewLine + "All done";
}
);
摘要
以上是冷观察者的基础知识。诚然,还需要更多文章来解释您可以创建的所有冷热观察者变体。希望这能帮助您创建自己的 Rx 项目。
参考文献