使用 Rx 实现观察者模式。






4.71/5 (7投票s)
观察者模式的替代方案 (C#)。
引言。
这是对 Observer Pattern (C#) 中详细介绍的观察者模式实现的一种替代方案。这种替代方案使用了一个基于 Linq
的事件处理系统,称为 Reactive Extension,简称 Rx。它有非常好的文档记录,在这里,Rx Wiki 在这里。
对观察者模式的简要解释。
观察者模式 定义了对象之间的一对多关系。它也称为发布-订阅模式。当发布者(“一对”)发布数据时,所有订阅者(“多”)同时接收到数据。该模式有助于保持相关对象之间的一致性,并展示了良好的关注点分离,因为发布者只需要知道订阅者实现了一个接收数据的方法,而订阅者只需要知道发布者实现了一个允许它们接收数据的方法。
为什么使用 Rx?
使用 Rx 实现观察者模式的优点之一是 Rx 中的发布者是异步运行的,因此程序执行不会因为发布者忙于检索数据而被阻塞。取而代之的是,使用回调来在数据可用时通知订阅者。回调也用于在发生错误以及序列结束时通知订阅者。这避免了订阅者继续观察已经结束或出错的事件的情况。Rx 还促进了 Linq 在过滤、分组和组合数据方面的使用,并且有可用于测试以及执行时间相关处理(如缓冲和节流)的方法。
Rx 实现基础。
在 Rx 中,数据流的订阅者称为 Observers,数据流的发布者称为 Observable。Observable 实现 IObservable<T>
接口,它有一个名为 Subscribe
的方法,顾名思义,该方法允许 Observers 订阅。Subscribe
方法返回一个实现 IDisposable
的对象。取消订阅只需调用该对象的 Dispose()
方法即可。
Observers 实现 IObserver<T>
接口。IObserver<T>
有三个回调方法,允许 Observable
向其 Observers 发送信息。接口如下。
public interface IObserver<in T>
{
//Provides the observer with new data.
void OnNext(T value);
//Notifies the observer that the provider has experienced an error
void OnError(Exception error);
//Notifies the observer that the provider has finished sending
void OnCompleted();
}
使用 Rx 实现观察者模式。
Rx 有一个名为 Subject
的辅助类。Subject
同时实现了 IObservable<T>
和 IObserver<T>
,因此它可以同时作为 Observer
和 Observable
。它的 OnNext(T value)
方法将 value
输出给所有已订阅的 Observers。这是一个简单的 IObserver<int>
类,可用于订阅 Subject
并将接收到的数据输出到控制台。
public class ObserverOfInts : IObserver<int>
{
public ObserverOfInts(string observerName)
{
this.ObserverName = observerName;
}
public string ObserverName { get; private set; }
public void OnCompleted()
{
Console.WriteLine("Sequence completed");
}
public void OnError(Exception error)
{
Console.WriteLine("Exception raised. {0}", error.Message);
}
public void OnNext(int value)
{
Console.WriteLine("{0}: Observed {1}", this.ObserverName, value);
}
}
在此示例中,Subject
订阅了一个 Observable
并将其数据流输出给其订阅者。
private static void SimpleDemoUsingSubject()
{
//declare an IEnumerable of ints with some data
IEnumerable<int> intList = new List<int> { 2, 4, 8, 10, 14 };
//Convert the enumerable to an Observable
IObservable<int> observableOfInts = intList.ToObservable();
var subject = new Subject<int>();
var firstObserver = new ObserverOfInts("First");
IDisposable firstSubscription = subject.Subscribe(firstObserver);
//You can subscribe by providing only the OnNext(T value) callback handler
IDisposable anonymousSubscription = subject.Subscribe(
v => Console.WriteLine(" Anonymous observed " + v),
//In this example, the OnCompleted callback is also provided
() => Console.WriteLine(" Anonymous observed the sequence completed"));
Console.WriteLine("Press Enter to End");
//Connect the subject to the observable
//This starts the sequence
IDisposable subjectSubscription = observableOfInts.Subscribe(subject);
Console.ReadLine();
//Unsubscribe the subscribers
firstSubscription.Dispose();
anonymousSubscription.Dispose();
subjectSubscription.Dispose();
Console.ReadKey();
}
使用 Rx 观察事件
事件不容易观察。相关数据可能深埋在事件参数中,并且事件通常牢固地绑定在其定义的类中。这使得它们难以传递以进行处理。使用 Rx,事件可以转换为等于相关数据类型的可观察对象,并且可以轻松地传递给方法和进行测试。以下示例使用 MouseMove Event
,并将数据处理为 Points
的可观察对象。进行了一些过滤,以便仅当 10 是 point.X 和 point.Y 的因数时才将一个点发送给观察者。重要的方法是 Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
,其中 MoveEventArgs
是事件参数类型,this
是发送者,MouseMove
是 Event
名称。调用 ObserveOn
扩展方法的原因为稍后详述。
public void EventDemo()
{
this.OutputTextBox.AppendText("\r\n Event demo");
this.OutputTextBox.AppendText("\r\n Move the mouse around this box to observe when 10 is a factor of both mouse Point.X and Point.Y ");
IObservable<EventPattern<mouseeventargs>> observableOfArgs =
Observable.FromEventPattern<mouseeventargs>(this.OutputTextBox, "MouseMove");
IObservable<point> points = observableOfArgs.Select(args => args.EventArgs.Location);
var multiplesOf10= points.Where(point => point.X % 10 == 0 && point.Y % 10 == 0);
IDisposable subscription =
multiplesOf10.ObserveOn(new ControlScheduler(this))
.Subscribe(point => this.OutputTextBox.AppendText("\r\n"+point.ToString()));
this.disposables.Add(subscription);
}</point>
测试观察者模式
Rx 提供了许多对测试有用的方法。输出可以带时间戳、延迟、节流和采样。一个用于构造事件代理的有用方法是 Observable.Interval(TimeSpan.span)
。它以间隔 span
生成一个 Longs 流。输出是 0,1,2,3....可以这样使用。
public static void PointsExample()
{
var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
IObservable<point> onePointPerSecondsObservable =
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(points.Count)
.Where(i => points[(int)i].X == points[(int)i].Y)
.Select(i => points[(int)i]);
onePointPerSecondsObservable.Subscribe(
point => Console.WriteLine(point.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();
}
//An alternative approach
public static void PointsExampleA()
{
var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
IObservable<point> onePointPerSecondsObservable =
Observable.Generate(0, i => i < points.Count, i => i + 1, //this is the for loop
i => points[i], i => TimeSpan.FromSeconds(1))//select points[i] after 1 second delay
.Where(point => point.X == point.Y);
onePointPerSecondsObservable.Subscribe(
point => Console.WriteLine(point.ToString()),
() => Console.WriteLine("Completed"));
Console.ReadKey();
}
输出是。
需要注意的几个陷阱
Rx 有几个容易掉进去的“大象陷阱”。
在需要热的时候是冷的。
这样实现观察者模式行不通。
private static void DemoColdA()
{
var oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
var firstSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("First observed "+x));
//wait before subscribing the second observer
Thread.Sleep(TimeSpan.FromSeconds(2));
var secondSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("Second observed "+x));
Console.WriteLine("Press any key to stop the demo");
Console.ReadKey();
firstSubscriber.Dispose();
secondSubscriber.Dispose();
Console.WriteLine("Press any key to end the demo");
Console.ReadKey();
}
很明显,两个观察者没有在同一时间观察相同的值。原因在于 observable 是“Cold”(冷的)。Cold observables 为每个 Observer 重新启动数据流,每个 Observer 都会得到自己的流。解决方案是通过使用 Publish
扩展方法将“Cold”Observable 转换为“Hot”Observable。
var oneNumberPerSecondObservableHot = oneNumberPerSecondObservableCold.Publish();
在错误的线程上观察
Windows Forms、WPF 和 Silverlight 都有线程定义的运算。也就是说,必须在特定线程上执行的操作。如果您尝试在 Windows Forms 中执行以下操作,将会收到错误。
var subscription = obsLongs.Subscribe(x => form.Text = "Event generated " + x);
窗体将抛出“跨线程操作无效”异常,因为它正在从 Observer 的线程访问。此问题以前也适用于 WPF 和 Silverlight 项目,但似乎在最新版本的 Rx 中已得到解决。然而,有时仍希望定义 Observer 运行的线程。这可以通过将调度程序传递给 ObserveOn
扩展方法来完成。以下是 Windows Forms 的解决方案。
//The Control Scheduler is in System.Reactive.Windows.Forms.dll
//The Nuget package id is Rx-WinForms
//It takes a Control as an argument
var subscription = obsLongs.ObserveOn(new ControlScheduler(form)).Subscribe(x => form.Text = "Event generated " + x);
对于 WPF 和 Silverlight,您可以使用静态类 Scheduler
。
//schedule on the current thread
observable.ObserveOn(Scheduler.CurrentThread).Subscribe(x => label1.Content = x.X.ToString());
//schedule on the plarform's default thread
observable.ObserveOn(Scheduler.Default).Subscribe(x => label1.Content = x.X.ToString());
//schedule to execute immediately on the current thread
observable.ObserveOn(Scheduler.Immediate).Subscribe(x => label1.Content = x.X.ToString());
带缓冲和 Linq 的观察者模式
Rx 的真正强大之处在于它能够过滤数据,以控制观察者看到什么以及何时看到它们。在下面的示例中,数据流代表温度控制器的水温输出。如果连续两次读数高于 40C,则会通知观察者。数据流使用缓冲分割成段。然后使用 Linq 处理段,然后将数据传递给观察者。
private static void DemoBuffered()
{
//Set up an observable of random numbers between 1 and 100 at a rate of 1 per second
IObservable<long> oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
var random = new Random();
IConnectableObservable<long> randomNumberObservableHot =
oneNumberPerSecondObservable.Select(n => (long)random.Next(1, 100)).Publish();
//Subscribe an observer to output the number generated to the Console
IDisposable randomNumberSubscription =
randomNumberObservableHot.Subscribe(x => Console.WriteLine("Generated " + x));
//Set up a rolling buffer of size 2 that advances the stream index by 1 each time
//So that a stream of 0,1,2,3 would result in buffers of {0,1} {1,2} etc
var buffered = from firstLast in randomNumberObservableHot.Buffer(2, 1)
//filter the buffers by using Linq
where firstLast[0] > 40 && firstLast[1] > 40
select new { First = firstLast[0], Last = firstLast[1] };
//Introduce a time out period of 8 seconds
//If nothing is observed for 8 seconds an exception will be thrown
var observableWithTimeout = buffered.Timeout(TimeSpan.FromSeconds(8));
//Subscribe two observers
IDisposable firstSubscription =
observableWithTimeout.Subscribe(
anon => Console.WriteLine("First observed {0} and {1} ", anon.First, anon.Last),
ex => Console.WriteLine("First observed an Exception. " + ex.Message));
IDisposable secondSubscription =
observableWithTimeout.Subscribe(
anon => Console.WriteLine("Second observed {0} and {1} ", anon.First, anon.Last),
ex => Console.WriteLine("Second observed an Exception. " + ex.Message));
//start the ball rolling
randomNumberObservableHot.Connect();
Console.WriteLine("Press Enter to unsubscribe the first observer");
Console.ReadLine();
//Unsubscribe an observer
firstSubscription.Dispose();
Console.WriteLine("***first subscriber has unsubscribed ***");
Console.WriteLine("Press Enter to end");
Console.ReadLine();
secondSubscription.Dispose();
randomNumberSubscription.Dispose();
Console.ReadKey();
}
结论
这篇简短的文章只是触及了 Rx 所能实现的冰山一角。如果您想深入研究,有一些很棒的 视频,其中有两位热情的 Rx 开发者 Wes Dyer 和 Bart De Smet。