65.9K
CodeProject 正在变化。 阅读更多。
Home

RX:如何使用流畅的扩展来扩展 RX。演示一个批处理、一个可暂停和一个同步的可观察对象

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2014年10月21日

CPOL

4分钟阅读

viewsIcon

27905

downloadIcon

89

演示了如何使用同步、可暂停和批处理的可观察对象,通过流畅的扩展来扩展 RX。

 

引言

如果您大量使用 **RX**,您无疑会遇到其功能提供的不足之处。  本文将指导您如何使用流畅的扩展来扩展 **RX**。

背景

扩展 RX 比您想象的要容易得多,只需几行代码即可实现。您真正需要的是两个元素:一个 **IObservable** 实现和一个扩展方法。

IObservable 实现

**IObservable** 接口是一个非常简单的契约,只有一个 Subscribe 方法,该方法接受一个 **IObserver** 参数。在我继续之前,如果您不知道 **热(hot)** 和 **冷(cold)** 可观察对象之间的区别,我将快速解释一下; **热** 可观察对象是一个即使没有观察者订阅也会发布数据的可观察对象,而 **冷** 可观察对象只有在有观察者订阅时才会发布数据。大多数提供的可观察对象的典型行为是冷的,它们只在观察者订阅时生成序列。

这对我们有什么重要性?  在创建您的 **IObservable** 实现时,**热** 和 **冷** 可观察对象将需要截然不同的实现。

热实现

在创建热实现时,每个可观察对象的订阅都将共享相同的生成序列。这意味着类需要在观察者订阅之前或在第一个观察者订阅时开始生成序列。它还意味着当观察者订阅时,它需要被捕获并保存在列表中,直到被释放,此时需要将其从列表中移除,请参见下文;

private readonly HashSet<IObserver<T>> _observers = new HashSet<IObserver<T>>();

public void Subscribe(IObserver<T> observer)
{
    // If not already add, add the observer
   lock(_observers) 
   {
      if(_observers.Contains(observer)) return Disposabled.Empty();
       _observers.Add(observer);
   }

   // Create a disposable delegate that removes the observable from the list when 
   // dispose is called
   return Disposable.Create(() =>
    {
       lock(_observers)
       {
           _observers.Remove(observer);
       }
    }
}

在生成序列时,每个观察者都需要被通知每个生成的元素。

public void Generate()
{
    int i = 0;
    while(true)
    {
        lock(_observers)
        {
            foreach(var o in _observers)
                o.OnNext(i);
        }
        Thread.Sleep(50);
    } 
}

冷实现

虽然冷实现不需要维护观察者列表,但它有不同的要求。由于序列仅在观察者订阅时才生成,并且仅为该观察者生成,因此需要在订阅时生成序列。  创建序列所需的所有功能和对象都必须在此时声明,并且不应共享,请参见下面的示例;

public void Subscribe(IObserver<int> observer)
{
    var thread = new Thread( () =>
     {
        int i = 0;
        while(true)
        {
            o.OnNext(i);
            Thread.Sleep(50);
        } 
     });
     thread.Start();

    return Disposable.Create(() => thread.Abort());
}

函数式方法

一种替代的、更函数式的方法是使用 **Observable.Create** 方法,该方法不需要您构建类,对于更简单的实现来说,这当然更有意义。  该方法有几个不同的重载,但可能最容易使用的是接受一个带有 **IObserver** 参数的函数并返回 **IDisposable** 的那个。  同样的原则仍然适用,即创建序列所需的所有功能和对象都必须在此时声明,并且不应共享。  请参见下面的示例;

Observable.Create<string>( observer =>
{
   return a => observer.OnNext(a => a.ToString()),  
                ex => observer.OnError(ex),  
                () => observer.OnCompleted());
}

 

与现有序列交互

如果您要创建流畅的扩展,最有可能的情况是您希望与现有序列交互,并修改其输出或添加某种形式的行为。  做到这一点实际上非常简单,您只需要一个现有的可观察对象。  可观察对象应作为参数传递给您的类构造函数,并且取决于您是实现冷可观察对象还是热可观察对象,将在您的类的 subscribe 方法中订阅,或者在您开始生成序列时订阅,请参见下面的热和冷示例;

热示例

  //
  // A constructor that takes an observable, that will be the source for the records
  //
  public ToStringObservable(IObservable<T> observable)
  {
    _observable = observable;
  }
  
  public void Generate()
  {
      _dispose = _observable.Subscribe(
        a => 
        { 
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnNext(a.ToString());
            }
         }, 
        ex => 
        { 
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnError(ex)
            }
        }, 
        () =>
        {
           lock(_observers)
            {
                foreach(var o in _observers)
                    o.OnCompleted();
            }
        });
   }

冷示例

  public ToStringObservable(IObservable<T> observable)
  {
    _observable = observable;
  }
  
  public IDisposable Subscribe(IObserver<String> observer)
  {
     return _observable.Subscribe( a => 
         { 
             observer.OnNext(a.ToString());
         }, e => observer.OnError(e), () => observer.OnCompleted());
  }

}

创建流畅的扩展

这可能是整个过程中比较容易的部分,您只需构造并返回您的 IObservable 实现。  对于热的或共享的,这意义不大。

// Converts each element of the sequence to a string
public static IObservable<string> ToStringObservable<T>(this IObservable<T> observable)
{
    return new ToStringObservable<T>(observable);
}

冷函数式流畅扩展

遵循之前的函数式示例,下面是如何使用函数式方法创建流畅扩展的示例。  简而言之,您将案例中的功能移到了 **Observable.Create** 方法中。

// Converts a sequence into a pausable sequence
public static IObservable<string> ToPausable<T>(this IObservable<T> observable, 
                                                          PauseState pause)
{
    //Call the IObservable create method to create the sequence
    return Observable.Create<string>( observer =>
        {
           //queue to store the squence items while paused
           var queue = new Queue<T>(); 

           //as we have mutiple observable sequences we will have several observables
           //so a composite disposable collection becomes very useful
           var disposables = new CompositeDisposable();

           //watch for the pause state to be set to unpaused 
           //so we can replay collected items
           disposables.Add(_pause.StateChanged
                  .Where(p => !p)
                  .Subscribe( _ =>
                      {
                          lock (queue)
                          {
                              while (queue.Count > 0)
                              {
                                  observer.OnNext(queue.Dequeue());
                              }
                          }
                      }));

           //subscribe to the sequence and while pause collect the items
           disposables.Add( observable.Subscribe( a => 
               {
                    lock(queue)
                    {
                        if (_pause.Paused)
                            queue.Enqueue(a);
                        else
                            observer.OnNext(a);
                    }
                }, e => observer.OnError(e), () => observer.OnCompleted()));

       return disposables;
    });
} 

 

简单的批处理可观察对象

Rx 提供了 **SelectMany Linq** 扩展,该扩展将单个元素转换为一个枚举,但是如果您想将更新批量处理并在同一时间进行处理呢?(例如,如果您想限制对 **dispatcher** 的调用)。  那么,这个批处理可观察对象怎么样?

下面是一个 **IObservable** 实现,它观察序列的通知,并将它们批量处理成指定大小的批次,然后再将它们发送给观察者。  所有执行批处理所需的功能和对象都需要在 Subscribe 方法中执行和创建,因为该方法可能会被多次调用,并且每个观察者都希望不共享批处理数据。

//
// Batched observable class that batches records in a specified size
//
public class BatchObservable<T> : IObservable<IEnumerable<T>>
{
  private readonly IObservable<T> _observable;
  private readonly int _batchSize;
  
  //
  // A constructor that takes an observable, that will be the source for the records
  //
  public BatchObservable(IObservable<T> observable, int batchSize)
  {
    _observable = observable;
    _batchSize = batchSize;
  }
  
  //
  // Subscribes the batch observable
  // The records are held in a list until enough records are found 
  // at which point the observer is informed
  //
  public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
  {
     var batch = new List<T>();
     return _observable.Subscribe( a => 
         { 
             batch.Add(a);
             if( batch.Count == _batchSize)
             {
                observer.OnNext(batch);
                batch.Clear();
             }
         }, e => observer.OnError(e), () => observer.OnCompleted());
  }

}

接下来我们需要扩展方法。  很简单,它接受与批处理类相同的参数,我们将调用 ToBatch 方法。

//
// Extension method to create the batchObserver
//
public static IObservable<IEnumerable<T>> ToBatch<T>(this IObservable<T> observable, int batchSize)
{
    return new BatchObservable<T>(observable, batchSize);
}

要使用该方法,我们只需从观察者调用它。

//
// Call the ToBatch method
//
obseverable.ToBatch(100)

其他示例

zip 文件包含以下示例可观察对象

LatestByKeyObservable     基于键提供每个项目的最新元素。

PausableObservable           一个可以暂停的可观察对象。

SynchronisedObservable   根据指定的比较同步两个可观察对象。

© . All rights reserved.