65.9K
CodeProject 正在变化。 阅读更多。
Home

带进度跟踪的简单计算 API

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.83/5 (19投票s)

2011 年 4 月 6 日

CPOL

8分钟阅读

viewsIcon

44784

downloadIcon

1609

一个计算包装器,内置进度跟踪,使用 .NET 的响应式扩展(和 AsyncCTP)。

目录

  1. 引言
  2. 背景
  3. API
  4. 源代码、演示项目和二进制文件
  5. 可能的陷阱
  6. 未来工作
  7. 结论

1 引言

我一直很讨厌应用程序在进行某些计算时,既不通知用户其进度,甚至更糟的是,不异步运行计算。因此,我决定在我自己的应用程序中,计算将告诉用户发生了什么,同时 UI 保持响应。为了实现这个目标,我实现了 Computation<T> 类型,当完成时,最终产品感觉“对了”,所以我决定把它分享出来,作为万能的简单计算流畅 API™ 附带进度跟踪(多么花哨的名字 :))。无论如何,流畅部分的想法实际上很大程度上受到了 Sacha Barber 的这篇 文章 的启发。

该库使用 .NET 的响应式扩展 (Rx) 和可选的 AsyncCTP (SP1),并允许以下代码结构

var computation = Computation
  .Create(progress => ComputeSomeStuff(progress))
  .WhenProgressChanged(p => BusyIndicator.Status = p.StatusText)
  .WhenCompleted(result => DisplayResult(result));
computation.RunAsync();

或者,使用 AsyncCTP

var result = await Computation
  .Create(progress => DoSomeWork(progress))
  .WhenProgressChanged(p => BusyIndicator.Status = p.StatusText);
DisplayResult(result);

得益于 Rx,相同的代码在 .NET 和 Silverlight 上都可以工作。

2 背景

代码的基本思想是让 Rx 处理所有异步和调度工作。因此,代码的异步执行仅通过一行代码完成

new Func<TResult>(RunSynchronously).ToAsync(executedOn)().Subscribe();

其中 executedOn 是用于运行计算的 Scheduler(决定代码在哪个线程上执行)。AsyncCTP 支持是独立的(因为它仍然是“仅 CTP”),并在本 中进行描述。

尽管代码使用了响应式扩展,但基本使用该代码不需要了解 Rx(让我们面对现实吧,Rx 并不容易掌握)。但实际上,正是得益于 Rx,代码才能如此出色地工作。尽管从代码上看 Rx 用得很少,但 Rx 的 (I)Scheduler 做了很多工作。尤其是在优雅地防止无效的跨线程访问问题方面。

3 API

3.1 结构

计算类型的基础是接口 IComputationBaseIComputation<TResult, TProgress>

interface IComputationBase
{
  ComputationProgress Progress { get; }
  void Cancel();
}

interface IComputation<TResult, TProgress> : IComputationBase
{
  void RunAsync(Action<TResult> onCompleted, Action onCancelled, 
                Action<Exception> onError);
  TResult RunSynchronously();
}

之所以存在 IComputationBase 接口,是为了方便进行“匿名”取消和进度跟踪。

现在,有一个名为 Computation<TResult>IComputation 接口的抽象类实现,它具有以下“可见”字段和方法(为清晰起见省略了一些重载)

abstract class Computation<TResult> : IComputation<TResult, ComputationProgress>
{
  ComputationProgress Progress { get; }
  bool IsCancelled { get; }

  // The actual computation happens here.
  protected abstract TResult OnRun();           

  // This is to satisfy the IComputation interface.
  // Internally only calls When* functions and then RunAsync()
  void RunAsync(Action<TResult> onCompleted, 
       Action onCancelled, Action<Exception> onError);
  
  // Runs the computation on the scheduler specified by ExecutedOn
  void RunAsync();                
  
  // Runs the computation on the current thread and immediately returns a result.
  // Returns default(TResult) if something goes wrong.
  TResult RunSynchronously();

  // Cancels the computation if Progress.CanCancel is set to true.
  void Cancel();

  // Sets the scheduler the computation is observed on.
  // The "latest" one is used for all the calls.
  // By default, the observer scheduler is assigned
  // to Scheduler.Dispatcher when the Computation object is created.
  // Therefore, in most cases the observer functions
  // are dispatched to the UI thread.
  Computation<TResult> ObservedOn(IScheduler scheduler);
  
  // Sets the scheduler the computation is executed on.
  // The "latest" one is used for all the calls.
  // ThreadPool scheduler is used by default.
  Computation<TResult> ExecutedOn(IScheduler scheduler);
  
  // Called right before the computation starts executing.
  Computation<TResult> WhenStarted(Action action);  
  // Called if the computation provides a result.
  Computation<TResult> WhenCompleted(Action<TResult> action);
  // Called when progress of the computation is called.
  Computation<TResult> WhenProgressUpdated(Action<ProgressTick> observer);  
  // Called when the computation is cancelled.
  Computation<TResult> WhenCancelled(Action action);
  // Called when an exception occurs duting the computation.
  Computation<TResult> WhenError(Action<Exception> action);
  // Always called when the computation stops
  // regardless of cancellation or error.
  Computation<TResult> WhenFinished(Action action);
}

为了方便起见,添加了一个派生类 Computation : Computation<Unit>,用于仅具有副作用的函数(例如 ActionUnit 类型实现为一个空的 struct,表示 void)。在内部,有两个类:RelayComputation : ComputationRelayComputation<TResult> : Computation<TResult>,用于快速将 lambda 转换为带有进度跟踪的异步计算(进度跟踪对象作为参数传递)。

new RelayComputation<int>(p => 
  { 
    p.UpdateStatus("...");
    //...
    return result; 
  });

要创建 RelayComputation 的实例,必须使用函数 Computation.Create(lambda)。例如

var computation = Computation.Create(() => 1);

通过调用 FuncAction 类型的扩展方法 AsComputaion 也可以实现同样的效果

Func<ComputationProgress, int> f = p => DoSomething(p);
var computation = f.AsComputation();

为了跟踪和更新进度,使用了 ComputationProgress 类。它包含以下成员(同样,为清晰起见省略了一些)

class ComputationProgress : INotifyPropertyChanged
{
  bool IsComputing { get; }
  bool CanCancel { get; }
  string StatusText { get; }
  bool IsIndeterminate { get; }
  int Current { get; }
  int Length { get; }  

  // This throw a ComputationCancelledException that is caught by the Computation object.
  // Reason for using a custom implementation instead of a CancellationToken is
  // that Silverlight does not currently contain a CancellationToken without using AsyncCTP.
  void ThrowIfCancellationRequested();
  
  void UpdateStatus(string statusText);
  void UpdateCanCancel(bool canCancel);
  void UpdateIsIndeterminate(bool isIndeterminate);
  void UpdateProgress(int current, int length);
  
  void NotifyPropertyChanged(string propertyName)
  {
    PropertyChangedEventHandler handler = PropertyChanged;
    if (handler != null)
    {
      // Make sure the handler is executed on the correct thread.
      // ObserverScheduler is set by the Computation object.
      ObserverScheduler.Schedule(() => handler(this, 
                        new PropertyChangedEventArgs(propertyName)));
    }
  }
}

实现 INotifyPropertyChanged 的原因是为了允许无缝数据绑定。例如

<StackPanel DataContext="{Binding CurrentComputation.Progress, Mode=OneWay}" >
  <TextBlock Text="{Binding StatusText, Mode=OneWay}" />
  <ProgressBar IsIndeterminate="{Binding IsIndeterminate, Mode=OneWay}"
               Minimum="0"
               Maximum="{Binding Length, Mode=OneWay}"
               Value="{Binding Current, Mode=OneWay}" />
</StackPanel>

最后,进度更新回调函数接收一个类型为 ProgressTick 的参数

class ProgressTick
{
  string StatusText { get; }
  bool IsIndeterminate { get; }
  bool CanCancel { get; }
  int Current { get; }
  int Length { get; }
}

之所以有一个单独的对象用于进度刻度,而不是直接将 Update* 方法暴露给观察者。

3.2 自定义计算

可以通过派生自 Computation<TResult>(或 Compuation)类并通过重写 OnRun 方法来创建自定义计算。例如

class UltimateComputation : Computation<int>
{
  int result;
  
  public UltimateAnswer(int result) 
  {
    this.result = result;
  }
  
  protected override int OnRun()
  { 
    Progress.UpdateStatus("Computing the ultimate answer ...");
    Progress.UpdateProgress(1, 100);
    //...
    Progress.UpdateStatus("Pretend doing some work ...");
    //...
    Progress.UpdateStatus("Almost there ...");
    //...
    Progress.UpdateProgess(100);
    
    return result;
  }
}

然后像这样使用

new UltimateComputation(42)
  .WhenProgressChanged(p => ShowProgress(p))
  .WhenCompleted(r => DisplayResult(r))
  .RunAsync();

我使用该代码的一种方式是用于大型对象的重复转换

class SomeLargeObject
{
  Graph aLargeGraph;
  
  public Computation UpdateAsync(double parameter)
  {
    return Computation.Create(p => 
    { 
      Phase1Modification(aLargeGraph, parameter);
      p.UpdateStatus("I did something interesting");
      Phase2Modification(aLargeGraph, parameter);
    });
  }
}

3.3 异常

如果在底层计算中发生异常,它会被 Computation 对象捕获,并通知 WhenError 观察者。当使用 AsyncCTP 时,此行为有所不同,异常会被重新抛出,并且可以被 try/catch 块捕获。

3.4 取消

Cancel 方法提供了一种取消计算的方式。调用 Cancel 时会发生以下情况

  • cancel 标志被设置为 true,以便下一次调用 ThrowIfCancellationRequested 将抛出 ComputationCancelledException。如果计算不通过 ThrowIfCancellationRequested 在内部支持取消,它将继续运行直到完成。
  • "Cancel message" 被发送给进度观察者。
  • WhenFinished 观察者将收到通知。
  • 所有观察者都被处置。因此,即使计算不内部支持取消,也不会再有通知。

3.5 AsyncCTP 支持

AsyncCTP 支持允许 await 计算对象。这是通过 ComputationAwaiter<TResult> 对象和扩展方法 GetAwaiter 实现的

public static ComputationAwaiter<TResult> GetAwaiter<TResult>(
              this Computation<TResult> computation)
{
  return new ComputationAwaiter<TResult>(computation);
}

ComputationAwaiter<TResult> 通过在 OnCompleted 函数中订阅 When* 函数来工作

computation
  // if cancelled, set the cancelled flag to true
  .WhenCancelled(() => this.cancelled = true)
  // if an exception occured, store it
  .WhenError(e => this.exception = e)
  // if the computation yielded a result, store it
  .WhenCompleted(r => this.result = r)
  // after computation has finished (either completed, cancelled or by exception)
  // invoke the continuation prepared by the async builder.
  .WhenFinished(() => continuation())
  // run the computation
  .RunAsync();

EndAwait 只检查计算提供了什么类型的结果并转发它

if (this.cancelled) throw new ComputationCancelledException();
if (this.exception != null) throw this.exception;
return this.result;

使用 AsyncCTP 时的正确代码用法是

var computation = Computation.Create(...);
try
{
  var result = await computation;
  Display(result);
}
catch (ComputationCancelledException)
{
  // ...
}
catch (Excetion e)
{
  // ...
}

当然,如果你知道你的计算不支持取消/抛出异常,你可以省略 try/catch。此外,即使在使用 AsyncCTP 时异常会被重新抛出,WhenError 订阅者仍然会被调用。

4 源代码、演示项目和二进制文件

源代码解决方案中有七个项目

  • SimpleComputation - 包含 API 的 .NET 版本。
  • SimpleComputation.AsyncCTP - 包含 AsyncCTP 扩展的 .NET 版本。要编译此项,需要在你的计算机上安装 AsyncCTP。如果你懒得安装 CTP,只需从解决方案中排除该项目。
  • SimpleComputation.Silverlight - 包含 API 的 Silverlight 版本。这些只是指向 SimpleComputation 项目和不同引用程序集的文件的链接。
  • SimpleComputation.Silverlight.AsyncCTP - 包含 AsyncCTP 扩展的 Silverlight 版本。同样,只是指向 .NET 项目的文件链接。要编译此项,需要在你的计算机上安装 AsyncCTP。
  • SimpleComputation.Tests - 一些非常基本的单元测试。
  • SimpleComputation.WpfDemo - WPF 演示应用程序。
  • SimpleComputation.SilverlightDemo - Silverlight 演示应用程序。与 WPF 演示共享相同的视图模型。

二进制文件的使用(不要忘记“解锁”它们)

  • .NET 4:添加 SimpleComputation.dll 以及来自 .NET 响应式扩展库(版本 1.0.2856.104 及更高版本,如果你的计算机上没有安装,它们已包含在内)的 System.CoreExSystem.InteractiveSystem.Reactive 的引用。要添加 AsyncCTP 支持,请链接 SimpleComputation.AsyncCTP.dllAsyncCTP.dll(要编译项目,必须在你的 Visual Studio 中安装 AsyncCTP)。
  • Silverlight 4:添加 SimpleComputation.Silverlight.dll 以及来自 Silverlight 响应式扩展库(版本 1.0.2856.104 及更高版本,如果你的计算机上没有安装,它们已包含在内)的 System.CoreExSystem.InteractiveSystem.ReactiveSystem.Observable 的引用。要添加 AsyncCTP 支持,请链接 SimpleComputation.AsyncCTP.Silverlight.dllAsyncCTP_Silverlight.dll(要编译项目,必须在你的 Visual Studio 中安装 AsyncCTP)。

5 可能的陷阱

  • RunAsync/Synchronously 函数只能调用一次。如果第二次调用,将抛出 InvalidOperationException
  • 如果计算不通过 ThrowIfCancellationRequested 在内部支持取消,在调用 Cancel 时,它将一直运行直到完成。但是,从外部来看,计算似乎已被终止。
  • 该代码设计用于从 WPF/Silverlight 应用程序执行,因此计算进度、完成、错误和取消通知的默认调度程序设置为 Scheduler.DispatcherScheduler。现在,我不完全确定这在 WinForms 中是如何工作的,因此在不调用 ObservedOn 函数的情况下,代码可能无法正常工作。
  • 在取消具有副作用的操作时,需要非常小心。

6 未来工作

我希望在未来实现一些功能

  • 一个 ProgressiveComputation 类型,它将为具有中间结果的计算提供基础。例如,想象一下在计算点时生成函数图,或者显示某些迭代算法的中间结果,并允许用户在满意结果时说“停止”。API 将如下所示
  • Computation
      .Create(p => IProduceIntermediateResults(p))
      .ResultsNoMoreOftenThan(TimeSpan.FromSeconds(1))
      .OnIntermediateResult(r => Display(r));

    但是,这可能又是另一个项目/文章,因为我想保持它尽可能轻量级和简单。

  • 允许每个观察者的调度程序。目前,所有 When* 事件都使用相同的调度程序。这可能是实现下一项功能的干净方法。
  • 添加对带有进度跟踪的计算进行组合的“原生”支持。例如,能够执行类似 computationA.FollowedBy(computationB)computationB.ExecutedAtTheSameTimeAs(computationB)computationA.ReplacedByIfFinishedSooner(computationB) 的操作。
  • 添加已用时间/剩余估计时间支持。

7 结论

尽管代码在“生产代码”中经过了一定的测试,但事实证明,有很多细微之处可能出错,因此它仍处于开发中。如果你使用该代码并发现任何问题,请告知我。

参考文献

历史

  • 4 月 6 日
    • 初始发布。
  • 4 月 8 日
    • 移除了“一站式”源代码,因为它难以维护。改而添加了 .NET 和 Silverlight 二进制文件。
    • 移除了存储“异步订阅”的 IDisposable 计算。它的工作方式并非我所设想的那样 :)
    • RelayComputation 类设为私有。
    • 在项目中添加了可见类和方法的类图。
    • 添加了一节关于取消的内容。
    • 澄清了计算被调用两次时会发生什么(抛出异常)。
  • 4 月 10 日
    • 大幅更改了文章的结构。
    • 主要是由于我的懒惰/无知,AsyncCTP 支持有点“错误”(它能工作,但在发生异常/取消时不行)。我通过为计算对象编写自定义 awaiter 来纠正了这一点。
    • ComputationCancelledException 现在是公共的。
  • 4 月 25 日
    • 更新至 AsyncCTP SP1。
    • 演示项目不再依赖于 AsyncCTP。
一个带有进度跟踪的简单计算 API - CodeProject - 代码之家
© . All rights reserved.