带进度跟踪的简单计算 API






4.83/5 (19投票s)
一个计算包装器,内置进度跟踪,使用 .NET 的响应式扩展(和 AsyncCTP)。
目录
1 引言
我一直很讨厌应用程序在进行某些计算时,既不通知用户其进度,甚至更糟的是,不异步运行计算。因此,我决定在我自己的应用程序中,计算将告诉用户发生了什么,同时 UI 保持响应。为了实现这个目标,我实现了 Computation<T>
类型,当完成时,最终产品感觉“对了”,所以我决定把它分享出来,作为万能的简单计算流畅 API™ 附带进度跟踪(多么花哨的名字 :))。无论如何,流畅部分的想法实际上很大程度上受到了 Sacha Barber 的这篇 文章 的启发。
该库使用 .NET 的响应式扩展 (Rx) 和可选的 AsyncCTP (SP1),并允许以下代码结构
var computation = Computation
.Create(progress => ComputeSomeStuff(progress))
.WhenProgressChanged(p => BusyIndicator.Status = p.StatusText)
.WhenCompleted(result => DisplayResult(result));
computation.RunAsync();
或者,使用 AsyncCTP
var result = await Computation
.Create(progress => DoSomeWork(progress))
.WhenProgressChanged(p => BusyIndicator.Status = p.StatusText);
DisplayResult(result);
得益于 Rx,相同的代码在 .NET 和 Silverlight 上都可以工作。
2 背景
代码的基本思想是让 Rx 处理所有异步和调度工作。因此,代码的异步执行仅通过一行代码完成
new Func<TResult>(RunSynchronously).ToAsync(executedOn)().Subscribe();
其中 executedOn
是用于运行计算的 Scheduler
(决定代码在哪个线程上执行)。AsyncCTP 支持是独立的(因为它仍然是“仅 CTP”),并在本 节 中进行描述。
尽管代码使用了响应式扩展,但基本使用该代码不需要了解 Rx(让我们面对现实吧,Rx 并不容易掌握)。但实际上,正是得益于 Rx,代码才能如此出色地工作。尽管从代码上看 Rx 用得很少,但 Rx 的 (I)Scheduler
做了很多工作。尤其是在优雅地防止无效的跨线程访问问题方面。
3 API
3.1 结构
计算类型的基础是接口 IComputationBase
和 IComputation<TResult, TProgress>
interface IComputationBase
{
ComputationProgress Progress { get; }
void Cancel();
}
interface IComputation<TResult, TProgress> : IComputationBase
{
void RunAsync(Action<TResult> onCompleted, Action onCancelled,
Action<Exception> onError);
TResult RunSynchronously();
}
之所以存在 IComputationBase
接口,是为了方便进行“匿名”取消和进度跟踪。
现在,有一个名为 Computation<TResult>
的 IComputation
接口的抽象类实现,它具有以下“可见”字段和方法(为清晰起见省略了一些重载)
abstract class Computation<TResult> : IComputation<TResult, ComputationProgress>
{
ComputationProgress Progress { get; }
bool IsCancelled { get; }
// The actual computation happens here.
protected abstract TResult OnRun();
// This is to satisfy the IComputation interface.
// Internally only calls When* functions and then RunAsync()
void RunAsync(Action<TResult> onCompleted,
Action onCancelled, Action<Exception> onError);
// Runs the computation on the scheduler specified by ExecutedOn
void RunAsync();
// Runs the computation on the current thread and immediately returns a result.
// Returns default(TResult) if something goes wrong.
TResult RunSynchronously();
// Cancels the computation if Progress.CanCancel is set to true.
void Cancel();
// Sets the scheduler the computation is observed on.
// The "latest" one is used for all the calls.
// By default, the observer scheduler is assigned
// to Scheduler.Dispatcher when the Computation object is created.
// Therefore, in most cases the observer functions
// are dispatched to the UI thread.
Computation<TResult> ObservedOn(IScheduler scheduler);
// Sets the scheduler the computation is executed on.
// The "latest" one is used for all the calls.
// ThreadPool scheduler is used by default.
Computation<TResult> ExecutedOn(IScheduler scheduler);
// Called right before the computation starts executing.
Computation<TResult> WhenStarted(Action action);
// Called if the computation provides a result.
Computation<TResult> WhenCompleted(Action<TResult> action);
// Called when progress of the computation is called.
Computation<TResult> WhenProgressUpdated(Action<ProgressTick> observer);
// Called when the computation is cancelled.
Computation<TResult> WhenCancelled(Action action);
// Called when an exception occurs duting the computation.
Computation<TResult> WhenError(Action<Exception> action);
// Always called when the computation stops
// regardless of cancellation or error.
Computation<TResult> WhenFinished(Action action);
}
为了方便起见,添加了一个派生类 Computation : Computation<Unit>
,用于仅具有副作用的函数(例如 Action
,Unit
类型实现为一个空的 struct
,表示 void
)。在内部,有两个类:RelayComputation : Computation
和 RelayComputation<TResult> : Computation<TResult>
,用于快速将 lambda 转换为带有进度跟踪的异步计算(进度跟踪对象作为参数传递)。
new RelayComputation<int>(p =>
{
p.UpdateStatus("...");
//...
return result;
});
要创建 RelayComputation
的实例,必须使用函数 Computation.Create(lambda)
。例如
var computation = Computation.Create(() => 1);
通过调用 Func
和 Action
类型的扩展方法 AsComputaion
也可以实现同样的效果
Func<ComputationProgress, int> f = p => DoSomething(p);
var computation = f.AsComputation();
为了跟踪和更新进度,使用了 ComputationProgress
类。它包含以下成员(同样,为清晰起见省略了一些)
class ComputationProgress : INotifyPropertyChanged
{
bool IsComputing { get; }
bool CanCancel { get; }
string StatusText { get; }
bool IsIndeterminate { get; }
int Current { get; }
int Length { get; }
// This throw a ComputationCancelledException that is caught by the Computation object.
// Reason for using a custom implementation instead of a CancellationToken is
// that Silverlight does not currently contain a CancellationToken without using AsyncCTP.
void ThrowIfCancellationRequested();
void UpdateStatus(string statusText);
void UpdateCanCancel(bool canCancel);
void UpdateIsIndeterminate(bool isIndeterminate);
void UpdateProgress(int current, int length);
void NotifyPropertyChanged(string propertyName)
{
PropertyChangedEventHandler handler = PropertyChanged;
if (handler != null)
{
// Make sure the handler is executed on the correct thread.
// ObserverScheduler is set by the Computation object.
ObserverScheduler.Schedule(() => handler(this,
new PropertyChangedEventArgs(propertyName)));
}
}
}
实现 INotifyPropertyChanged
的原因是为了允许无缝数据绑定。例如
<StackPanel DataContext="{Binding CurrentComputation.Progress, Mode=OneWay}" >
<TextBlock Text="{Binding StatusText, Mode=OneWay}" />
<ProgressBar IsIndeterminate="{Binding IsIndeterminate, Mode=OneWay}"
Minimum="0"
Maximum="{Binding Length, Mode=OneWay}"
Value="{Binding Current, Mode=OneWay}" />
</StackPanel>
最后,进度更新回调函数接收一个类型为 ProgressTick
的参数
class ProgressTick
{
string StatusText { get; }
bool IsIndeterminate { get; }
bool CanCancel { get; }
int Current { get; }
int Length { get; }
}
之所以有一个单独的对象用于进度刻度,而不是直接将 Update*
方法暴露给观察者。
3.2 自定义计算
可以通过派生自 Computation<TResult>
(或 Compuation
)类并通过重写 OnRun
方法来创建自定义计算。例如
class UltimateComputation : Computation<int>
{
int result;
public UltimateAnswer(int result)
{
this.result = result;
}
protected override int OnRun()
{
Progress.UpdateStatus("Computing the ultimate answer ...");
Progress.UpdateProgress(1, 100);
//...
Progress.UpdateStatus("Pretend doing some work ...");
//...
Progress.UpdateStatus("Almost there ...");
//...
Progress.UpdateProgess(100);
return result;
}
}
然后像这样使用
new UltimateComputation(42)
.WhenProgressChanged(p => ShowProgress(p))
.WhenCompleted(r => DisplayResult(r))
.RunAsync();
我使用该代码的一种方式是用于大型对象的重复转换
class SomeLargeObject
{
Graph aLargeGraph;
public Computation UpdateAsync(double parameter)
{
return Computation.Create(p =>
{
Phase1Modification(aLargeGraph, parameter);
p.UpdateStatus("I did something interesting");
Phase2Modification(aLargeGraph, parameter);
});
}
}
3.3 异常
如果在底层计算中发生异常,它会被 Computation
对象捕获,并通知 WhenError
观察者。当使用 AsyncCTP 时,此行为有所不同,异常会被重新抛出,并且可以被 try/catch
块捕获。
3.4 取消
Cancel
方法提供了一种取消计算的方式。调用 Cancel
时会发生以下情况
cancel
标志被设置为true
,以便下一次调用ThrowIfCancellationRequested
将抛出ComputationCancelledException
。如果计算不通过ThrowIfCancellationRequested
在内部支持取消,它将继续运行直到完成。- "Cancel message" 被发送给进度观察者。
WhenFinished
观察者将收到通知。- 所有观察者都被处置。因此,即使计算不内部支持取消,也不会再有通知。
3.5 AsyncCTP 支持
AsyncCTP 支持允许 await 计算对象。这是通过 ComputationAwaiter<TResult>
对象和扩展方法 GetAwaiter
实现的
public static ComputationAwaiter<TResult> GetAwaiter<TResult>(
this Computation<TResult> computation)
{
return new ComputationAwaiter<TResult>(computation);
}
ComputationAwaiter<TResult>
通过在 OnCompleted
函数中订阅 When*
函数来工作
computation
// if cancelled, set the cancelled flag to true
.WhenCancelled(() => this.cancelled = true)
// if an exception occured, store it
.WhenError(e => this.exception = e)
// if the computation yielded a result, store it
.WhenCompleted(r => this.result = r)
// after computation has finished (either completed, cancelled or by exception)
// invoke the continuation prepared by the async builder.
.WhenFinished(() => continuation())
// run the computation
.RunAsync();
EndAwait
只检查计算提供了什么类型的结果并转发它
if (this.cancelled) throw new ComputationCancelledException();
if (this.exception != null) throw this.exception;
return this.result;
使用 AsyncCTP 时的正确代码用法是
var computation = Computation.Create(...);
try
{
var result = await computation;
Display(result);
}
catch (ComputationCancelledException)
{
// ...
}
catch (Excetion e)
{
// ...
}
当然,如果你知道你的计算不支持取消/抛出异常,你可以省略 try/catch
。此外,即使在使用 AsyncCTP 时异常会被重新抛出,WhenError
订阅者仍然会被调用。
4 源代码、演示项目和二进制文件
源代码解决方案中有七个项目
- SimpleComputation - 包含 API 的 .NET 版本。
- SimpleComputation.AsyncCTP - 包含 AsyncCTP 扩展的 .NET 版本。要编译此项,需要在你的计算机上安装 AsyncCTP。如果你懒得安装 CTP,只需从解决方案中排除该项目。
- SimpleComputation.Silverlight - 包含 API 的 Silverlight 版本。这些只是指向 SimpleComputation 项目和不同引用程序集的文件的链接。
- SimpleComputation.Silverlight.AsyncCTP - 包含 AsyncCTP 扩展的 Silverlight 版本。同样,只是指向 .NET 项目的文件链接。要编译此项,需要在你的计算机上安装 AsyncCTP。
- SimpleComputation.Tests - 一些非常基本的单元测试。
- SimpleComputation.WpfDemo - WPF 演示应用程序。
- SimpleComputation.SilverlightDemo - Silverlight 演示应用程序。与 WPF 演示共享相同的视图模型。
二进制文件的使用(不要忘记“解锁”它们)
- .NET 4:添加 SimpleComputation.dll 以及来自 .NET 响应式扩展库(版本 1.0.2856.104 及更高版本,如果你的计算机上没有安装,它们已包含在内)的
System.CoreEx
、System.Interactive
和System.Reactive
的引用。要添加 AsyncCTP 支持,请链接 SimpleComputation.AsyncCTP.dll 和 AsyncCTP.dll(要编译项目,必须在你的 Visual Studio 中安装 AsyncCTP)。 - Silverlight 4:添加 SimpleComputation.Silverlight.dll 以及来自 Silverlight 响应式扩展库(版本 1.0.2856.104 及更高版本,如果你的计算机上没有安装,它们已包含在内)的
System.CoreEx
、System.Interactive
、System.Reactive
和System.Observable
的引用。要添加 AsyncCTP 支持,请链接 SimpleComputation.AsyncCTP.Silverlight.dll 和 AsyncCTP_Silverlight.dll(要编译项目,必须在你的 Visual Studio 中安装 AsyncCTP)。
5 可能的陷阱
RunAsync/Synchronously
函数只能调用一次。如果第二次调用,将抛出InvalidOperationException
。- 如果计算不通过
ThrowIfCancellationRequested
在内部支持取消,在调用Cancel
时,它将一直运行直到完成。但是,从外部来看,计算似乎已被终止。 - 该代码设计用于从 WPF/Silverlight 应用程序执行,因此计算进度、完成、错误和取消通知的默认调度程序设置为
Scheduler.DispatcherScheduler
。现在,我不完全确定这在 WinForms 中是如何工作的,因此在不调用ObservedOn
函数的情况下,代码可能无法正常工作。 - 在取消具有副作用的操作时,需要非常小心。
6 未来工作
我希望在未来实现一些功能
- 一个
ProgressiveComputation
类型,它将为具有中间结果的计算提供基础。例如,想象一下在计算点时生成函数图,或者显示某些迭代算法的中间结果,并允许用户在满意结果时说“停止”。API 将如下所示
Computation
.Create(p => IProduceIntermediateResults(p))
.ResultsNoMoreOftenThan(TimeSpan.FromSeconds(1))
.OnIntermediateResult(r => Display(r));
但是,这可能又是另一个项目/文章,因为我想保持它尽可能轻量级和简单。
When*
事件都使用相同的调度程序。这可能是实现下一项功能的干净方法。computationA.FollowedBy(computationB)
、computationB.ExecutedAtTheSameTimeAs(computationB)
或 computationA.ReplacedByIfFinishedSooner(computationB)
的操作。7 结论
尽管代码在“生产代码”中经过了一定的测试,但事实证明,有很多细微之处可能出错,因此它仍处于开发中。如果你使用该代码并发现任何问题,请告知我。
参考文献
历史
- 4 月 6 日日
- 初始发布。
- 4 月 8 日日
- 移除了“一站式”源代码,因为它难以维护。改而添加了 .NET 和 Silverlight 二进制文件。
- 移除了存储“异步订阅”的
IDisposable
计算。它的工作方式并非我所设想的那样 :) - 将
RelayComputation
类设为私有。 - 在项目中添加了可见类和方法的类图。
- 添加了一节关于取消的内容。
- 澄清了计算被调用两次时会发生什么(抛出异常)。
- 4 月 10 日日
- 大幅更改了文章的结构。
- 主要是由于我的懒惰/无知,AsyncCTP 支持有点“错误”(它能工作,但在发生异常/取消时不行)。我通过为计算对象编写自定义 awaiter 来纠正了这一点。
ComputationCancelledException
现在是公共的。- 4 月 25 日日
- 更新至 AsyncCTP SP1。
- 演示项目不再依赖于 AsyncCTP。