65.9K
CodeProject 正在变化。 阅读更多。
Home

NParallel,一个小型的并行执行库

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.06/5 (30投票s)

2007 年 11 月 29 日

CPOL

9分钟阅读

viewsIcon

71721

downloadIcon

653

一个简单的库,可以让你轻松地编写异步代码,几乎以同步的模式进行。

重要更新

NParallel0.2 发布,支持循环并行和多任务处理(For, ForEach, Do),以及其他许多增强功能(例如参数化的异步调用)。代码几乎重写,并为有经验的用户添加了大量注释。VS2005 版本将不再支持,因为后续版本将高度依赖 C#3.0 lambda 的函数式编程概念。

本文主要介绍 NParallel 的主要构造,关于循环并行,我发布了 一篇新文章,讨论了 NParallel 循环并行的设计和用法,您可以参考随附的测试用例。

引言

.NET 中有很多编写多线程代码的方法,例如使用异步方法调用、创建线程等。但这些方法大多数都会改变你编写代码的方式,并引入像 IAsyncResult 和回调委托这样的新概念,这些概念与应用程序逻辑有些不相关。它们通常使代码难以阅读和维护,并要求应用程序开发人员了解底层线程的工作原理。

我之前的项目严重依赖 Begin/EndInvoke 模型。在定义了许多委托和函数对之后,我厌倦了 Begin/End 代码,于是决定创建一个包装库来隐藏复杂性,使代码易于阅读和编写,同时提供灵活性以便以后更改底层线程机制。

于是我提出了 NParallel,我将在本文中向您介绍它。

问题:耗时任务

让我们先看看执行并行代码的旧方法。假设我们有一个查询,需要很长时间才能完成。

// Sample code 1: Time Consuming Operation

int LongRunOperation(int queryID)
{
    Thread.Sleep(10000);
    // Do some query

    return queryID * 2;
}

// synchronous call to the method     

int result = LongRunOperation(10);
我们定义了 LongRunOperation 方法并传入参数进行调用。一切看起来都很直接。但是执行线程将阻塞 10 秒!对于无法承受这种阻塞的应用程序,我们必须使其并行运行。

旧方法:异步方法调用

有很多方法可以使代码并行运行,在 .NET 中,线程和委托调用是最常用的。在异步方法调用 (AMI) 模式下,方法的调用将变成一个方法分派器和一个回调。它比显式线程更容易编写。以下代码显示了上述函数调用在 AMI 模式下的情况:
// asynchronous call to the method     

void BeginLongRunOperation(int queryID)
{
    Func<int, int> operation = LongRunOperation;
    IAsyncResult ar = operation.BeginInvoke(10 , EndBeginLongRunOperation, null);
}

void EndLongRunOperation(IAsyncResult ar)
{
    Func<int, int> operation2 = ((AsyncResult)ar).AsyncDelegate;
    int result = operation2.EndInvoke(ar);
    // process with the return value

}

在上面的代码中,我使用了预定义的 System.Linq.Func 而不是旧式的用户定义委托。尽管如此,代码仍然看起来冗长且容易出错,因为:

  • 方法调用变成了 operation.BeginInvoke(10 , EndBeginLongRunOperation, null);。这会将参数与方法本身分离开来。
  • 在编写回调方法时,会引入一个参数 IAsyncResult ar,这与应用程序逻辑本身无关。
  • 在回调方法中,有几次类型转换。这在编译时无法保证正确性。如果使用错误的委托类型,直到运行时出现错误才会被发现。

NParallel 的方式

当我开始定义 NParallel 时,目标是:

  • 使代码易于阅读和编写,使其与同步调用相似。
  • 不要破坏方法调用的方式,即保持函数及其参数在一起。
  • 让任何东西都可以异步调用,这意味着允许任何代码被异步调用。

我想到一个主意,如下所示:

// NParallel Psudo code

NResult pResult = NParallel.Execute(parellel_code_block);
if(pResult.IsDone())
{
    var result = pResult.Value;
    // work with the result

}

我期望 parallel_code_block 是一个方法调用、一个委托、一个 lambda 表达式,甚至是 LINQ 查询。在尝试了几种方法后,我发现匿名委托是 parallel_code_block 的理想实现。实际代码如下所示:

// Current NParallel Gramma, simple asynchronous call with callback:

NResult<int> pResult = NParallel.Execute<int>(()=>
    {return LongRunOperation(10); },     /*Asynchronous code block*/
    EndBeginLongRunOperation             /*Callback operation*/
);
void EndBeginLongRunOperation(int theResult)
{
    // process with the return value of LongRunOperation;

}

深入了解 NParallel

NParallel 的工作方式是将要异步执行的代码包装成一个匿名委托。由于匿名委托可以访问当前堆栈上的所有局部变量,因此我无需根据参数定义不同版本的 Execute 方法。我只定义了一个泛型版本和一个非泛型版本 Execute 来处理有返回值或无返回值的方法。以下是这两个方法的签名:

// Generic method signature for the invoker.

NResult<T> Execute<T>(Func<T> asyncRoutine, NResultDel<T> callbackRoutine, NExceptionDel exceptionRoutine, CallbackMode callbackMode);
// Method for executing non-result code blocks

NResult Execute <T>(T state, NStateVoidFunc<T> parallelRoutine, NVoidFunc callback, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode)

Execute 的两个版本都有许多重载,有关更多信息请参考代码,第一个参数定义了要异步调用的代码块,第二个参数定义了回调,第三个参数定义了如何处理异常,最后一个参数定义了回调方法将如何被调用。除第一个参数外,所有参数都是可选的。我们将逐一介绍每个参数。

第一个参数:要调用的代码块

让我们先看看第一个参数的两个委托的签名:

// Generic code block delegate

public delegate TResult Func<TResult>();
// Usage in NParallel

delegate() // or just use ()=>
{
    T result;
    //your code block here

    return result;
}
// Void call code block delegate

public delegate void DelegateVoid();
// Usage in NParallel

delegate()
{
    //your code block here

}
您几乎可以将任何内容放入委托代码块中,如果代码块更改共享变量,则由您负责管理锁定。以下是一些您可以放入委托中的更复杂的代码块。看起来很酷,不是吗?
// Current NParallel Gramma, calling to asynchronous code blocks:

NResult<int> pResult = NParallel.Execute<int>(delegate()    
    {
        int op1 = PrepareParam(localVar1);
        int op2 = PrepareParam(localVar2);
        return LongRunOperation(op1 + op2); 
    }     /*Asynchronous code block*/
);
// calling a linq query in NParallel

NParallel.Execute<IList<int>>
(
     delegate()
     {
         return (from i in Enumerable.Range(10, 100)
                        where i % 5 == 0
                        select i).ToList();
     }/*Asynchronous code block in LINQ*/
);
NParallel 最好的地方在于它易于阅读,并且看起来与同步调用几乎相同,您无需打破任何东西,只需标记您希望并行调用的调用。

重要说明

当使用上述场景中的委托时,很容易误用闭包变量(将被复制到委托执行堆栈的局部变量)。例如,假设有一个名为 curCount 的变量,您在匿名委托中使用它,在调用 Execute 后,该变量将被更改,NParallel 引擎无法得知这一点,并可能使用更改后的值。如果您想将变量发送到执行线程,可以使用 state 版本。

int localVariable = 6;
NParallel.Execute<int ,IList<int>>
(
     localVariable,	// the state variable to send in the execution thread.
     delegate(criteria) // using state variable
     {
         return (from i in Enumerable.Range(10, 100)
                        where i % criteria== 0
                        select i).ToList();
     }
     );

第二个参数:提供您自己的回调

回调可用于将方法传递给异步代码块,它将在代码块本身完成后被调用。您可以在 NParallel 中提供一个委托作为回调,与异步委托调用不同,您的回调不需要处理 IAsyncResults,您只需处理您期望从方法获得的结果。回调的签名定义如下:
// Generic callback delegate

public delegate void TResultDel<T>(T result);
// void callback delegate

public delegate void DelegateVoid();
// Sample: Provide a callback when invoke the method:

void PrintResult(int value);
NParallel.Execute(delegate(){return temp1 + temp2;},
    PrintResult);

如果没有提供回调方法,将调用默认回调。并且 EndInvoke() 无论如何都会被调用。您可以直接触发并忽略它。

第三个参数:异常处理例程

如果在执行异步代码块时引发了异常,NParallel 将捕获该异常并调用异常处理例程。您可以通过此参数指定例程。如果没有指定例程,当调用 Value 时将抛出异常。

public delegate void NExceptionDel(Exception exc);
您可以通过设置 NParallel.ExceptionRoutine 来提供全局异常处理例程。

第四个参数:如何完成调用

当使用 BeginInvoke/EndInvoke APM 模型时,回调将在方法完成后自动调用。它将在执行方法的线程池的同一线程上执行。在某些情况下,我们可能不希望它这样工作,我们可能希望将回调编组到一个集中的队列中,或者在运行时决定何时回调。NParallel.Execute 的第三个参数使您能够做到这一点。CallbackMode 是一个定义如下的枚举:

public enum CallbackMode 
{
    Manual,
    Queue,
    Auto
}
  • CallbackMode 设置为 Auto 时,EndInvoke 和回调方法会自动调用,就像您使用 Begin/EndInvoke 一样。
  • 如果您将 CallbackMode 设置为 Queue,所有异步调用都将在队列中进行编组(参见 Queue)。
  • 如果您将 CallbackMode 设置为 Manual,您必须手动调用 NResult.Wait() 来执行 EndInvoke 和回调(参见 NResult)。

CallbackMode 的默认值可以通过 NParallel.DefaultMarshal 设置,它默认为 Marshal.Auto。您可以在应用程序中混合使用这三种回调模式。

NResult 类

NResult 有两个版本:NResultNResult<T>。以下是它们最重要的几个方法:
// For NResult <T> only
public T Value{get;};

//Block current thread, wait for the method to finish
public void Wait();

// Get the current status of the task.
public bool IsDone();
Wait 会阻塞当前线程,并调用 EndInvoke 和提供给它的回调方法以获取代码块的结果。

调用 Value 将导致调用 Wait()

使用队列

NQueue 可用于队列所有使用 CallbackMode.Queue 调用的任务。它需要被放入应用程序的常规循环中,您可以使用应用程序的消息循环或定时器,只需调用 NParallel.Queue.Update();。回调将在定时器运行的线程中调用。
//Call code block with CallbackMode.Queue

NParallel.Execute(()=>{
    return 0; // your method here

},
CallbackMode.Queue);

//Update NQueue within a timer proc

void TimerProc()
{
    NParallel.Queue.Update();
}
回调将在与 TimerProc 相同的线程中调用。

取消任务

该库附带简单的取消功能。您可以对未完成的 NResult 调用 Cancel,这将阻止回调被调用。
NResult<T> result;
result.Cancel();
请注意,如果任务已执行,Cancel 将返回 false。即使调用了 Cancel,仍然会为异步例程调用 EndInvoke。

幕后

该库目前只是简单地将委托的 Begin/EndInvoke 调用包装为 NDefaultStrategy/NDefaultResult。这只是您可以采用的异步模型之一,您可能想使用显式线程或自己的线程池,您可以通过替换策略层来更改线程策略。您可以使用 IParallelStrategy 接口来做到这一点。

    public interface IParallelStrategy
    {
        /// <summary>
        /// Execute a code block asynchronously, with return value
        /// </summary>
        /// <typeparam name="TReturnValue">Result type of the code block</typeparam>
        /// <param name="callerState">CallerState variable</param>
        /// <param name="asyncRoutine">The code block to execute</param>
        /// <param name="callbackRoutine">The callback routine</param>
        /// <param name="callbackMode"></param>
        /// <param name="globalExceptionRoutine">The Exception Routine</param>
        /// <returns>Holder of the result for the code block, canbe used to wait</returns>
        NResult<TReturnValue> Execute<TReturnValue, TState>(TState state, Func<TState, TReturnValue> asyncRoutine, NStateVoidFunc<TReturnValue> callbackRoutine, 
NStateVoidFunc<Exception> exceptionRoutine, CallbackMode callbackMode); /// <summary> /// Execute a code block asynchronously, without return value /// </summary> /// <typeparam name="TReturnValue">type of the callerState variable</typeparam> /// <param name="callerState">CallerState variable</param> /// <param name="asyncRoutine">The code block to execute</param> /// <param name="callbackRoutine">The callback routine</param> /// <param name="callbackMode"></param> /// <param name="globalExceptionRoutine">The Exception Routine</param> /// <returns>Holder of the result for the code block, canbe used to wait</returns> NResult Execute<TState>(TState state, NStateVoidFunc<TState> asyncRoutine, NVoidFunc callbackRoutine, NStateVoidFunc<Exception> exceptionRoutine,
CallbackMode callbackMode); /// <summary> /// The overall exception handling routine, /// will be used when no exception routine is provided for the function /// </summary> NStateVoidFunc<Exception> ExceptionHandler { get; set; } } // NResult abstract class public abstract class NResult { public abstract bool IsDone(); internal abstract object CallerRoutine{get;} public abstract void Wait(/* int milli-sec-to-wait */); public virtual bool Cancel(); } public abstract class NResult<T> : NResult { public abstract T Value { get; } }
您必须实现 IParallelStrategy 接口,然后调用 NParallel.Strategy 来替换默认的 NAMPStrategyImpl,您还必须实现自己的 NResult

我在许多小型测试和一个我正在开发的应用中使用了这些代码。它使得异步编程变得非常容易和有趣。我希望这些代码对您也有用。您可以随意修改代码并在您自己的项目中重复使用,但请保留署名。如果您在代码中发现缺陷和错误,请告诉我。

已知问题

关于代码

VS2005 版本包包含三个项目:NParallel、NParallelCore 和 Testcase。后两个是 NParallel 的最初版本,包含早期快速粗糙的代码和一些不完整的功能。如果您感兴趣,可以看看。如果您只想要 NParallel,可以忽略其他两个项目。

VS2005 版本 NParallel 将不再支持后续版本(从 0.2 开始)。

VS2008 版本包仅包含一个控制台项目,如果您想要一个 DLL,可以将其输出类型更改为库。

V0.2 包含三个测试项目,包括一个 GUI 图像处理库。

历史

  • 2007-12-19 V0.2。支持循环并行。捆绑了其他增强功能。
  • 2007-11-30 错误修复,回调调用错误。v0.1. 更新了源代码。感谢 radioman.lt@gmail.com
  • 2007-11-30 添加了 VS2005 的源代码
  • 2007-11-28 文档的初始版本和初始版本 0.1。

参考文献

我看到过许多关于异步编程的文章,它们都很棒。您可以在 MSDN 上搜索。

Functional Programming For The Rest of Us

我在 codeproject 上找到的一篇适合初学者的文章可以在这里找到:这里

您现在可以从 MSDN 下载 ParallelFX CTP。这是 .NET3.5 的并行扩展。

© . All rights reserved.