NParallel,一个小型的并行执行库
一个简单的库,可以让你轻松地编写异步代码,几乎以同步的模式进行。
- 下载 NParallel2005.zip - 59.4 KB
- 下载 NParallel2008.zip - 46.8 KB
- 下载 NParallel.0.2.vs2008.12.19.zip - 209 KB
重要更新
NParallel0.2 发布,支持循环并行和多任务处理(For, ForEach, Do),以及其他许多增强功能(例如参数化的异步调用)。代码几乎重写,并为有经验的用户添加了大量注释。VS2005 版本将不再支持,因为后续版本将高度依赖 C#3.0 lambda 的函数式编程概念。本文主要介绍 NParallel 的主要构造,关于循环并行,我发布了 一篇新文章,讨论了 NParallel 循环并行的设计和用法,您可以参考随附的测试用例。
引言
.NET 中有很多编写多线程代码的方法,例如使用异步方法调用、创建线程等。但这些方法大多数都会改变你编写代码的方式,并引入像 IAsyncResult
和回调委托这样的新概念,这些概念与应用程序逻辑有些不相关。它们通常使代码难以阅读和维护,并要求应用程序开发人员了解底层线程的工作原理。
我之前的项目严重依赖 Begin/EndInvoke
模型。在定义了许多委托和函数对之后,我厌倦了 Begin/End
代码,于是决定创建一个包装库来隐藏复杂性,使代码易于阅读和编写,同时提供灵活性以便以后更改底层线程机制。
于是我提出了 NParallel
,我将在本文中向您介绍它。
问题:耗时任务
让我们先看看执行并行代码的旧方法。假设我们有一个查询,需要很长时间才能完成。
// Sample code 1: Time Consuming Operation
int LongRunOperation(int queryID)
{
Thread.Sleep(10000);
// Do some query
return queryID * 2;
}
// synchronous call to the method
int result = LongRunOperation(10);
我们定义了 LongRunOperation
方法并传入参数进行调用。一切看起来都很直接。但是执行线程将阻塞 10 秒!对于无法承受这种阻塞的应用程序,我们必须使其并行运行。旧方法:异步方法调用
有很多方法可以使代码并行运行,在 .NET 中,线程和委托调用是最常用的。在异步方法调用 (AMI) 模式下,方法的调用将变成一个方法分派器和一个回调。它比显式线程更容易编写。以下代码显示了上述函数调用在 AMI 模式下的情况:// asynchronous call to the method
void BeginLongRunOperation(int queryID)
{
Func<int, int> operation = LongRunOperation;
IAsyncResult ar = operation.BeginInvoke(10 , EndBeginLongRunOperation, null);
}
void EndLongRunOperation(IAsyncResult ar)
{
Func<int, int> operation2 = ((AsyncResult)ar).AsyncDelegate;
int result = operation2.EndInvoke(ar);
// process with the return value
}
在上面的代码中,我使用了预定义的 System.Linq.Func
而不是旧式的用户定义委托。尽管如此,代码仍然看起来冗长且容易出错,因为:
- 方法调用变成了
operation.BeginInvoke(10 , EndBeginLongRunOperation, null);
。这会将参数与方法本身分离开来。 - 在编写回调方法时,会引入一个参数
IAsyncResult ar
,这与应用程序逻辑本身无关。 - 在回调方法中,有几次类型转换。这在编译时无法保证正确性。如果使用错误的委托类型,直到运行时出现错误才会被发现。
NParallel 的方式
当我开始定义 NParallel
时,目标是:
- 使代码易于阅读和编写,使其与同步调用相似。
- 不要破坏方法调用的方式,即保持函数及其参数在一起。
- 让任何东西都可以异步调用,这意味着允许任何代码被异步调用。
我想到一个主意,如下所示:
// NParallel Psudo code
NResult pResult = NParallel.Execute(parellel_code_block);
if(pResult.IsDone())
{
var result = pResult.Value;
// work with the result
}
我期望 parallel_code_block
是一个方法调用、一个委托、一个 lambda 表达式,甚至是 LINQ 查询。在尝试了几种方法后,我发现匿名委托是 parallel_code_block
的理想实现。实际代码如下所示:
// Current NParallel Gramma, simple asynchronous call with callback:
NResult<int> pResult = NParallel.Execute<int>(()=>
{return LongRunOperation(10); }, /*Asynchronous code block*/
EndBeginLongRunOperation /*Callback operation*/
);
void EndBeginLongRunOperation(int theResult)
{
// process with the return value of LongRunOperation;
}
深入了解 NParallel
NParallel
的工作方式是将要异步执行的代码包装成一个匿名委托。由于匿名委托可以访问当前堆栈上的所有局部变量,因此我无需根据参数定义不同版本的 Execute
方法。我只定义了一个泛型版本和一个非泛型版本 Execute
来处理有返回值或无返回值的方法。以下是这两个方法的签名:
// Generic method signature for the invoker.
NResult<T> Execute<T>(Func<T> asyncRoutine, NResultDel<T> callbackRoutine, NExceptionDel exceptionRoutine, CallbackMode callbackMode);
// Method for executing non-result code blocks
NResult Execute <T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode)
Execute 的两个版本都有许多重载,有关更多信息请参考代码,第一个参数定义了要异步调用的代码块,第二个参数定义了回调,第三个参数定义了如何处理异常,最后一个参数定义了回调方法将如何被调用。除第一个参数外,所有参数都是可选的。我们将逐一介绍每个参数。
第一个参数:要调用的代码块
让我们先看看第一个参数的两个委托的签名:
// Generic code block delegate
public delegate TResult Func<TResult>();
// Usage in NParallel
delegate() // or just use ()=>
{
T result;
//your code block here
return result;
}
// Void call code block delegate
public delegate void DelegateVoid();
// Usage in NParallel
delegate()
{
//your code block here
}
您几乎可以将任何内容放入委托代码块中,如果代码块更改共享变量,则由您负责管理锁定。以下是一些您可以放入委托中的更复杂的代码块。看起来很酷,不是吗?// Current NParallel Gramma, calling to asynchronous code blocks:
NResult<int> pResult = NParallel.Execute<int>(delegate()
{
int op1 = PrepareParam(localVar1);
int op2 = PrepareParam(localVar2);
return LongRunOperation(op1 + op2);
} /*Asynchronous code block*/
);
// calling a linq query in NParallel
NParallel.Execute<IList<int>>
(
delegate()
{
return (from i in Enumerable.Range(10, 100)
where i % 5 == 0
select i).ToList();
}/*Asynchronous code block in LINQ*/
);
NParallel
最好的地方在于它易于阅读,并且看起来与同步调用几乎相同,您无需打破任何东西,只需标记您希望并行调用的调用。重要说明
当使用上述场景中的委托时,很容易误用闭包变量(将被复制到委托执行堆栈的局部变量)。例如,假设有一个名为 curCount 的变量,您在匿名委托中使用它,在调用 Execute 后,该变量将被更改,NParallel 引擎无法得知这一点,并可能使用更改后的值。如果您想将变量发送到执行线程,可以使用 state 版本。
int localVariable = 6;
NParallel.Execute<int ,IList<int>>
(
localVariable, // the state variable to send in the execution thread.
delegate(criteria) // using state variable
{
return (from i in Enumerable.Range(10, 100)
where i % criteria== 0
select i).ToList();
}
);
第二个参数:提供您自己的回调
回调可用于将方法传递给异步代码块,它将在代码块本身完成后被调用。您可以在NParallel
中提供一个委托作为回调,与异步委托调用不同,您的回调不需要处理 IAsyncResults
,您只需处理您期望从方法获得的结果。回调的签名定义如下:// Generic callback delegate
public delegate void TResultDel<T>(T result);
// void callback delegate
public delegate void DelegateVoid();
// Sample: Provide a callback when invoke the method:
void PrintResult(int value);
NParallel.Execute(delegate(){return temp1 + temp2;},
PrintResult);
如果没有提供回调方法,将调用默认回调。并且 EndInvoke()
无论如何都会被调用。您可以直接触发并忽略它。
第三个参数:异常处理例程
如果在执行异步代码块时引发了异常,NParallel
将捕获该异常并调用异常处理例程。您可以通过此参数指定例程。如果没有指定例程,当调用 Value
时将抛出异常。
public delegate void NExceptionDel(Exception exc);
您可以通过设置 NParallel.ExceptionRoutine
来提供全局异常处理例程。第四个参数:如何完成调用
当使用 BeginInvoke/EndInvoke
APM 模型时,回调将在方法完成后自动调用。它将在执行方法的线程池的同一线程上执行。在某些情况下,我们可能不希望它这样工作,我们可能希望将回调编组到一个集中的队列中,或者在运行时决定何时回调。NParallel.Execute
的第三个参数使您能够做到这一点。CallbackMode
是一个定义如下的枚举:
public enum CallbackMode
{
Manual,
Queue,
Auto
}
- 当
CallbackMode
设置为Auto
时,EndInvoke
和回调方法会自动调用,就像您使用Begin/EndInvoke
一样。 - 如果您将
CallbackMode
设置为Queue
,所有异步调用都将在队列中进行编组(参见 Queue)。 - 如果您将
CallbackMode
设置为Manual
,您必须手动调用NResult.Wait()
来执行EndInvoke
和回调(参见 NResult)。
CallbackMode
的默认值可以通过 NParallel.DefaultMarshal
设置,它默认为 Marshal.Auto
。您可以在应用程序中混合使用这三种回调模式。
NResult 类
NResult
有两个版本:NResult
和 NResult<T>
。以下是它们最重要的几个方法:// For NResult <T> only
public T Value{get;};
//Block current thread, wait for the method to finish
public void Wait();
// Get the current status of the task.
public bool IsDone();
Wait
会阻塞当前线程,并调用 EndInvoke
和提供给它的回调方法以获取代码块的结果。调用 Value
将导致调用 Wait()
。
使用队列
NQueue
可用于队列所有使用 CallbackMode.Queue
调用的任务。它需要被放入应用程序的常规循环中,您可以使用应用程序的消息循环或定时器,只需调用 NParallel.Queue.Update();
。回调将在定时器运行的线程中调用。//Call code block with CallbackMode.Queue
NParallel.Execute(()=>{
return 0; // your method here
},
CallbackMode.Queue);
//Update NQueue within a timer proc
void TimerProc()
{
NParallel.Queue.Update();
}
回调将在与 TimerProc
相同的线程中调用。取消任务
该库附带简单的取消功能。您可以对未完成的 NResult 调用 Cancel,这将阻止回调被调用。NResult<T> result; result.Cancel();请注意,如果任务已执行,Cancel 将返回 false。即使调用了 Cancel,仍然会为异步例程调用 EndInvoke。
幕后
该库目前只是简单地将委托的 Begin/EndInvoke 调用包装为 NDefaultStrategy/NDefaultResult。这只是您可以采用的异步模型之一,您可能想使用显式线程或自己的线程池,您可以通过替换策略层来更改线程策略。您可以使用 IParallelStrategy
接口来做到这一点。
public interface IParallelStrategy
{
/// <summary>
/// Execute a code block asynchronously, with return value
/// </summary>
/// <typeparam name="TReturnValue">Result type of the code block</typeparam>
/// <param name="callerState">CallerState variable</param>
/// <param name="asyncRoutine">The code block to execute</param>
/// <param name="callbackRoutine">The callback routine</param>
/// <param name="callbackMode"></param>
/// <param name="globalExceptionRoutine">The Exception Routine</param>
/// <returns>Holder of the result for the code block, canbe used to wait</returns>
NResult<TReturnValue> Execute<TReturnValue, TState>(TState state, Func<TState, TReturnValue> asyncRoutine, NStateVoidFunc<TReturnValue> callbackRoutine,
NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode);
/// <summary>
/// Execute a code block asynchronously, without return value
/// </summary>
/// <typeparam name="TReturnValue">type of the callerState variable</typeparam>
/// <param name="callerState">CallerState variable</param>
/// <param name="asyncRoutine">The code block to execute</param>
/// <param name="callbackRoutine">The callback routine</param>
/// <param name="callbackMode"></param>
/// <param name="globalExceptionRoutine">The Exception Routine</param>
/// <returns>Holder of the result for the code block, canbe used to wait</returns>
NResult Execute<TState>(TState state, NStateVoidFunc<TState> asyncRoutine, NVoidFunc callbackRoutine, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode);
/// <summary>
/// The overall exception handling routine,
/// will be used when no exception routine is provided for the function
/// </summary>
NStateVoidFunc<Exception> ExceptionHandler { get; set; }
}
// NResult abstract class
public abstract class NResult
{
public abstract bool IsDone();
internal abstract object CallerRoutine{get;}
public abstract void Wait(/* int milli-sec-to-wait */);
public virtual bool Cancel();
}
public abstract class NResult<T> : NResult
{
public abstract T Value { get; }
}
您必须实现 IParallelStrategy
接口,然后调用 NParallel.Strategy
来替换默认的 NAMPStrategyImpl
,您还必须实现自己的 NResult
。我在许多小型测试和一个我正在开发的应用中使用了这些代码。它使得异步编程变得非常容易和有趣。我希望这些代码对您也有用。您可以随意修改代码并在您自己的项目中重复使用,但请保留署名。如果您在代码中发现缺陷和错误,请告诉我。
已知问题
关于代码
VS2005 版本包包含三个项目:NParallel、NParallelCore 和 Testcase。后两个是 NParallel 的最初版本,包含早期快速粗糙的代码和一些不完整的功能。如果您感兴趣,可以看看。如果您只想要 NParallel,可以忽略其他两个项目。
VS2005 版本 NParallel 将不再支持后续版本(从 0.2 开始)。
VS2008 版本包仅包含一个控制台项目,如果您想要一个 DLL,可以将其输出类型更改为库。
V0.2 包含三个测试项目,包括一个 GUI 图像处理库。
历史
- 2007-12-19 V0.2。支持循环并行。捆绑了其他增强功能。
- 2007-11-30 错误修复,回调调用错误。v0.1. 更新了源代码。感谢 radioman.lt@gmail.com
- 2007-11-30 添加了 VS2005 的源代码
- 2007-11-28 文档的初始版本和初始版本 0.1。
参考文献
我看到过许多关于异步编程的文章,它们都很棒。您可以在 MSDN 上搜索。
Functional Programming For The Rest of Us
我在 codeproject 上找到的一篇适合初学者的文章可以在这里找到:这里。
您现在可以从 MSDN 下载 ParallelFX CTP。这是 .NET3.5 的并行扩展。