65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Rx 实现观察者模式。

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.71/5 (7投票s)

2014年5月30日

CPOL

5分钟阅读

viewsIcon

33624

downloadIcon

292

观察者模式的替代方案 (C#)。

引言。

这是对 Observer Pattern (C#) 中详细介绍的观察者模式实现的一种替代方案。这种替代方案使用了一个基于 Linq 的事件处理系统,称为 Reactive Extension,简称 Rx。它有非常好的文档记录,在这里,Rx Wiki 在这里

对观察者模式的简要解释。

观察者模式 定义了对象之间的一对多关系。它也称为发布-订阅模式。当发布者(“一对”)发布数据时,所有订阅者(“多”)同时接收到数据。该模式有助于保持相关对象之间的一致性,并展示了良好的关注点分离,因为发布者只需要知道订阅者实现了一个接收数据的方法,而订阅者只需要知道发布者实现了一个允许它们接收数据的方法。

为什么使用 Rx?

使用 Rx 实现观察者模式的优点之一是 Rx 中的发布者是异步运行的,因此程序执行不会因为发布者忙于检索数据而被阻塞。取而代之的是,使用回调来在数据可用时通知订阅者。回调也用于在发生错误以及序列结束时通知订阅者。这避免了订阅者继续观察已经结束或出错的事件的情况。Rx 还促进了 Linq 在过滤、分组和组合数据方面的使用,并且有可用于测试以及执行时间相关处理(如缓冲和节流)的方法。

Rx 实现基础。

在 Rx 中,数据流的订阅者称为 Observers,数据流的发布者称为 Observable。Observable 实现 IObservable<T> 接口,它有一个名为 Subscribe 的方法,顾名思义,该方法允许 Observers 订阅。Subscribe 方法返回一个实现 IDisposable 的对象。取消订阅只需调用该对象的 Dispose() 方法即可。

Observers 实现 IObserver<T> 接口。IObserver<T> 有三个回调方法,允许 Observable 向其 Observers 发送信息。接口如下。

public interface IObserver<in T>
{
//Provides the observer with new data.
void OnNext(T value);

//Notifies the observer that the provider has experienced an error 
void OnError(Exception error);

//Notifies the observer that the provider has finished sending 
void OnCompleted();
}

使用 Rx 实现观察者模式。

Rx 有一个名为 Subject 的辅助类。Subject 同时实现了 IObservable<T>IObserver<T>,因此它可以同时作为 ObserverObservable。它的 OnNext(T value) 方法将 value 输出给所有已订阅的 Observers。这是一个简单的 IObserver<int> 类,可用于订阅 Subject 并将接收到的数据输出到控制台。

public class ObserverOfInts : IObserver<int>
    {
        public ObserverOfInts(string observerName)
        {
            this.ObserverName = observerName;
        }

        public string ObserverName { get; private set; }

        public void OnCompleted()
        {
            Console.WriteLine("Sequence completed");
        }

        public void OnError(Exception error)
        {
            Console.WriteLine("Exception raised. {0}", error.Message);
        }

        public void OnNext(int value)
        {
            Console.WriteLine("{0}: Observed {1}", this.ObserverName, value);
        }

     }

在此示例中,Subject 订阅了一个 Observable 并将其数据流输出给其订阅者。

        private static void SimpleDemoUsingSubject()
        {
            //declare an IEnumerable of ints with some data
            IEnumerable<int> intList = new List<int> { 2, 4, 8, 10, 14 };
            //Convert the enumerable to an Observable
            IObservable<int> observableOfInts = intList.ToObservable();
            var subject = new Subject<int>();
            var firstObserver = new ObserverOfInts("First");
            IDisposable firstSubscription = subject.Subscribe(firstObserver);
            //You can subscribe by providing only the OnNext(T value) callback handler
            IDisposable anonymousSubscription = subject.Subscribe(
                v => Console.WriteLine("  Anonymous observed " + v),
                //In this example, the OnCompleted callback is also provided
                () => Console.WriteLine("  Anonymous observed the sequence completed"));
            Console.WriteLine("Press Enter to End");
            //Connect the subject to the observable
            //This starts the sequence
            IDisposable subjectSubscription = observableOfInts.Subscribe(subject);
         
            Console.ReadLine();
            //Unsubscribe the subscribers
            firstSubscription.Dispose();
            anonymousSubscription.Dispose();
            subjectSubscription.Dispose();

            Console.ReadKey();
        }  

Simple Demo

使用 Rx 观察事件

事件不容易观察。相关数据可能深埋在事件参数中,并且事件通常牢固地绑定在其定义的类中。这使得它们难以传递以进行处理。使用 Rx,事件可以转换为等于相关数据类型的可观察对象,并且可以轻松地传递给方法和进行测试。以下示例使用 MouseMove Event,并将数据处理为 Points 的可观察对象。进行了一些过滤,以便仅当 10 是 point.X 和 point.Y 的因数时才将一个点发送给观察者。重要的方法是 Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove"),其中 MoveEventArgs 是事件参数类型,this 是发送者,MouseMoveEvent 名称。调用 ObserveOn 扩展方法的原因为稍后详述。

public void EventDemo()
        {
            this.OutputTextBox.AppendText("\r\n Event demo");
            this.OutputTextBox.AppendText("\r\n Move the mouse around this box to observe when 10 is a factor of both mouse Point.X and Point.Y ");
            IObservable<EventPattern<mouseeventargs>> observableOfArgs =
                Observable.FromEventPattern<mouseeventargs>(this.OutputTextBox, "MouseMove");
          
            IObservable<point> points = observableOfArgs.Select(args => args.EventArgs.Location);
         var multiplesOf10= points.Where(point => point.X % 10 == 0 && point.Y % 10 == 0);
            IDisposable subscription =
                multiplesOf10.ObserveOn(new ControlScheduler(this))
                    .Subscribe(point => this.OutputTextBox.AppendText("\r\n"+point.ToString()));
            this.disposables.Add(subscription);
        }</point>

测试观察者模式

Rx 提供了许多对测试有用的方法。输出可以带时间戳、延迟、节流和采样。一个用于构造事件代理的有用方法是 Observable.Interval(TimeSpan.span)。它以间隔 span 生成一个 Longs 流。输出是 0,1,2,3....可以这样使用。

  public static void PointsExample()
        {
            var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };

            IObservable<point> onePointPerSecondsObservable =
                Observable.Interval(TimeSpan.FromSeconds(1))
                    .Take(points.Count)
                    .Where(i => points[(int)i].X == points[(int)i].Y)
                    .Select(i => points[(int)i]);
            onePointPerSecondsObservable.Subscribe(
                point => Console.WriteLine(point.ToString()),
                () => Console.WriteLine("Completed"));
            Console.ReadKey();
        }
     //An alternative approach
    public static void PointsExampleA()
     {
       var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
       IObservable<point> onePointPerSecondsObservable =
         Observable.Generate(0, i => i < points.Count, i => i + 1,  //this is the for loop
         i => points[i], i => TimeSpan.FromSeconds(1))//select points[i] after 1 second delay
         .Where(point => point.X == point.Y);
       onePointPerSecondsObservable.Subscribe(
         point => Console.WriteLine(point.ToString()),
          () => Console.WriteLine("Completed"));
          Console.ReadKey();
        }
        

        

输出是。

MoveMove Test

需要注意的几个陷阱

Rx 有几个容易掉进去的“大象陷阱”。

在需要热的时候是冷的。

这样实现观察者模式行不通。

  private static void DemoColdA()
        {
            var oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
            var firstSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("First observed "+x));
            //wait before subscribing the second observer
            Thread.Sleep(TimeSpan.FromSeconds(2));
            var secondSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("Second observed "+x));
            Console.WriteLine("Press any key to stop the demo");
            Console.ReadKey();
            firstSubscriber.Dispose();
            secondSubscriber.Dispose();
            Console.WriteLine("Press any key to end the demo");
            Console.ReadKey();
        }

Cold Observable

很明显,两个观察者没有在同一时间观察相同的值。原因在于 observable 是“Cold”(冷的)。Cold observables 为每个 Observer 重新启动数据流,每个 Observer 都会得到自己的流。解决方案是通过使用 Publish 扩展方法将“Cold”Observable 转换为“Hot”Observable。

 var oneNumberPerSecondObservableHot = oneNumberPerSecondObservableCold.Publish();

在错误的线程上观察

Windows Forms、WPF 和 Silverlight 都有线程定义的运算。也就是说,必须在特定线程上执行的操作。如果您尝试在 Windows Forms 中执行以下操作,将会收到错误。

var subscription = obsLongs.Subscribe(x => form.Text = "Event generated " + x);

窗体将抛出“跨线程操作无效”异常,因为它正在从 Observer 的线程访问。此问题以前也适用于 WPF 和 Silverlight 项目,但似乎在最新版本的 Rx 中已得到解决。然而,有时仍希望定义 Observer 运行的线程。这可以通过将调度程序传递给 ObserveOn 扩展方法来完成。以下是 Windows Forms 的解决方案。

//The Control Scheduler is in System.Reactive.Windows.Forms.dll 
//The Nuget package id is Rx-WinForms
//It takes a Control as an argument
var subscription = obsLongs.ObserveOn(new ControlScheduler(form)).Subscribe(x => form.Text = "Event generated " + x);

对于 WPF 和 Silverlight,您可以使用静态类 Scheduler

  //schedule on the current thread
 observable.ObserveOn(Scheduler.CurrentThread).Subscribe(x => label1.Content = x.X.ToString());
 //schedule on the plarform's default thread
  observable.ObserveOn(Scheduler.Default).Subscribe(x => label1.Content = x.X.ToString());
 //schedule to execute immediately on the current thread
 observable.ObserveOn(Scheduler.Immediate).Subscribe(x => label1.Content = x.X.ToString());
       

带缓冲和 Linq 的观察者模式

Rx 的真正强大之处在于它能够过滤数据,以控制观察者看到什么以及何时看到它们。在下面的示例中,数据流代表温度控制器的水温输出。如果连续两次读数高于 40C,则会通知观察者。数据流使用缓冲分割成段。然后使用 Linq 处理段,然后将数据传递给观察者。

  private static void DemoBuffered()
        {
            //Set up an observable of random numbers between 1 and 100 at a rate of 1 per second
            IObservable<long> oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
            var random = new Random();
            IConnectableObservable<long> randomNumberObservableHot =
                oneNumberPerSecondObservable.Select(n => (long)random.Next(1, 100)).Publish();

            //Subscribe an observer to output the number generated to the Console
            IDisposable randomNumberSubscription =
                randomNumberObservableHot.Subscribe(x => Console.WriteLine("Generated " + x));

            //Set up a rolling buffer of size 2 that advances the stream index by 1 each time
            //So that a stream of 0,1,2,3 would result in buffers of {0,1} {1,2} etc
            var buffered = from firstLast in randomNumberObservableHot.Buffer(2, 1)
                //filter the buffers by using Linq
                where firstLast[0] > 40 && firstLast[1] > 40
                select new { First = firstLast[0], Last = firstLast[1] };

            //Introduce a time out period of 8 seconds  
            //If nothing is observed for 8 seconds an exception will be thrown
            var observableWithTimeout = buffered.Timeout(TimeSpan.FromSeconds(8));

            //Subscribe two observers
            IDisposable firstSubscription =
                observableWithTimeout.Subscribe(
                    anon => Console.WriteLine("First observed {0} and {1} ", anon.First, anon.Last),
                    ex => Console.WriteLine("First observed an Exception. " + ex.Message));
            IDisposable secondSubscription =
                observableWithTimeout.Subscribe(
                    anon => Console.WriteLine("Second observed {0} and {1} ", anon.First, anon.Last),
                    ex => Console.WriteLine("Second observed an Exception. " + ex.Message));

            //start the ball rolling
            randomNumberObservableHot.Connect();
            Console.WriteLine("Press Enter to unsubscribe the first observer");
            Console.ReadLine();

            //Unsubscribe an observer
            firstSubscription.Dispose();
            Console.WriteLine("***first subscriber has unsubscribed ***");
            Console.WriteLine("Press Enter to end");
            Console.ReadLine();
            secondSubscription.Dispose();
            randomNumberSubscription.Dispose();
            Console.ReadKey();
        }

Buffered output

结论

这篇简短的文章只是触及了 Rx 所能实现的冰山一角。如果您想深入研究,有一些很棒的 视频,其中有两位热情的 Rx 开发者 Wes Dyer 和 Bart De Smet。

© . All rights reserved.