65.9K
CodeProject 正在变化。 阅读更多。
Home

一种新的 .NET APM 方法

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.73/5 (9投票s)

2009年6月15日

CPOL

4分钟阅读

viewsIcon

28355

downloadIcon

118

一个用于 .NET 异步编程模型的包装器

引言

线程是应用程序开发的重要组成部分。 .NET 提供了几种处理线程的绝佳方法,其中一种是异步编程模型 (APM)。然而,在使用 APM 时,您往往会编写大量样板代码和重复代码,尤其是在 WinForms 环境中。我介绍的代码包装了这些样板代码,并抽象了开发人员在将回调封送到适当线程时可能承担的一些职责。

背景

APM 暴露了多种不同的模型用于线程处理(回调、轮询、阻塞)。我的 APM 助手主要设计用于处理回调模型。通常,您会缓存来自 BeginInvoke() 操作的 IAsyncResult 并注册一个回调。如果您在第一个 BeginInvoke() 完成之前提供了再次调用的能力,通常您会想要丢弃第一个 BeginInvoke() 的结果。这可以通过检查缓存的 IAsyncResult 并在回调中与它进行引用检查来轻松完成。如果它们指向相同的内存位置,则表示您具有正确的回调,因此也具有正确的结果。如果您处于 WinForms 环境中,您通常会在此数据上执行一些操作。当然,您的回调是在后台工作线程上执行的,因此如果您不将数据封送到 UI 线程,您将收到一个漂亮的 InvalidOperationException“跨线程操作无效:控件 'blahblahblah' 访问的线程不是创建它的线程。”如果您在回调中捕获到异常并且不小心将其正确封送到 UI 线程,情况也是如此。

推荐的 APM 用法

using System;
using System.Runtime.Remoting.Messaging;
using System.Windows.Forms;

namespace ApmHelperWinformsExample
{
    /// <summary>
    /// The ordained way of programming apm
    /// </summary>
    public partial class OrdainedApm : Form
    {
        private delegate double Add3(int a, int b, int c);
        private readonly object _lock = new object();
        private IAsyncResult _current;

        public OrdainedApm()
        {
            InitializeComponent();
        }

        private IAsyncResult Current
        {
            get { lock (this._lock) return this._current; }
            set { lock (this._lock) this._current = value; }
        }
        
        private void button1_Click(object sender, EventArgs e)
        {
            int a = int.Parse(this.textBox1.Text);
            int b = int.Parse(this.textBox2.Text);
            int c = int.Parse(this.textBox3.Text);
            Add3 add3 = SomeWebServiceMaybe.SomeLongRunningAddFunction;
            this.Current = add3.BeginInvoke(a, b, c, this.Callback, null);
        }

        private void Callback(IAsyncResult result)
        {
            try
            {
                var asyncResult = (AsyncResult) result;
                //ReturnMessage msg = (ReturnMessage)asyncResult.GetReplyMessage();
                var add3Del = (Add3) asyncResult.AsyncDelegate;
                double d = add3Del.EndInvoke(result);
                if(result == this.Current)
                {
                    this.Display(d);
                }
            }
            catch(Exception ex)
            {
                if(result == this.Current)
                {
                    this.Display(ex);
                }
            }
        }

        private void Display(double d)
        {
            if (this.InvokeRequired)
            {
                this.Invoke((MethodInvoker) (() => this.Display(d)));
                return;
            }
            this.label1.Text = d.ToString();
        }

        private void Display(Exception ex)
        {
            if (this.InvokeRequired)
            {
                this.Invoke((MethodInvoker)(() => this.Display(ex)));
                return;
            }
            this.label1.Text = ex.ToString();
            
        }
    }
}

如您所见,这里有很多样板代码。声明一个委托,缓存并提供对 IAsyncResult 的线程安全访问,try/catch EndInvoke(),将 IAsyncResult 强制转换为 AsyncResult 以获取委托以进行 EndInvoke() 调用(当然,这可以作为状态传递,但当使用 .NET 的委托时,它已经提供了),以及在我们获取数据后将在 UI 线程上调用 Invoke()。

我宁愿写这样的代码

using System;
using System.Windows.Forms;

namespace ApmHelperWinformsExample
{
    /// <summary>
    /// the way I want to program apm
    /// </summary>
    public partial class ApmHelperExample : Form
    {
        private readonly ApmHelper<int, int, int, double> _helper;

        public ApmHelperExample()
        {
            InitializeComponent();
            this._helper = new ApmHelper<int, int, int, double>
			(SomeWebServiceMaybe.SomeLongRunningAddFunction);
        }

        private void Callback(double result)
        {
            this.label1.Text = result.ToString();
        }

        private void ExceptionHandler(Exception ex)
        {
            this.label1.Text = ex.ToString();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            int a = int.Parse(this.textBox1.Text);
            int b = int.Parse(this.textBox2.Text);
            int c = int.Parse(this.textBox3.Text);
            this._helper.InvokeAsync(a, b, c, this.Callback, this.ExceptionHandler);
        }
    }
}

请注意,我们不需要缓存 InvokeAsync() 的任何结果,我们不需要担心将任何数据封送到 UI 线程,也不需要担心捕获任何异常并将它们封送到 UI 线程。我们只需要提供参数(请注意,这些参数也是强类型的)、回调,以及可选的异常处理程序。

APMHelper 的代码

public enum ThreadCallback
{
    /// <summary>
    /// This is the thread that InvokeAsync() is called on
    /// </summary>
    AsyncCallerThread,
    /// <summary>
    /// This is the thread the InvokeAsync() spawned
    /// </summary>
    WorkerThread
}

/// <summary>
/// This class is responsible for tracking the IAsyncResult from a BeginInvoke(),
/// Catching exceptions from EndInvoke(), and Marshalling the data or exception to the
/// correct thread
/// </summary>
/// <typeparam name="TResult">result of function call</typeparam>
public abstract class ApmHelperBase<TResult> : IDisposable
{
    /// <summary>
    /// lock to preserve thread safety on getting/setting IAsyncResult
    /// </summary>
    private readonly object _lock = new object();
    /// <summary>
    /// Pointer to method to execute when EndInvoke() throws exception
    /// </summary>
    private readonly Action<Exception> _defaultExceptionHandler;
    /// <summary>
    /// Method to get begin, end invoke functions from
    /// </summary>
    private readonly Delegate _function;
    /// <summary>
    /// cache of BeginInvoke method
    /// </summary>
    private readonly MethodInfo _beginInvokeMethod;
    /// <summary>
    /// cache of EndInvoke method
    /// </summary>
    private readonly MethodInfo _endInvokeMethod;
    /// <summary>
    /// cache of callback that all BeginInvoke()s wire to
    /// </summary>
    private readonly AsyncCallback _asyncCallback;
    /// <summary>
    /// Cache of the current call from BeginInvoke()
    /// </summary>
    private IAsyncResult _current;

    /// <summary>
    /// ctor
    /// </summary>
    protected ApmHelperBase(Delegate function, Action<Exception> exceptionHandler)
        : this(function, exceptionHandler, ThreadCallback.AsyncCallerThread) { }

    /// <summary>
    /// ctor
    /// </summary>
    /// <param name="function">function user is going to bind to.
    /// This will actually be some func but we don't care here</param>
    /// <param name="exceptionHandler">optional method user wants 
    /// to be notified if async call throws exception</param>
    /// <param name="threadCallback">Which thread should callbacks occur on</param>
    protected ApmHelperBase(Delegate function, 
		Action<Exception> exceptionHandler, ThreadCallback threadCallback)
    {
        if (null == function) throw new ArgumentNullException("function");
        this._function = function;
        // cache the methods
        Type type = function.GetType();
        this._beginInvokeMethod = type.GetMethod("BeginInvoke");
        this._endInvokeMethod = type.GetMethod("EndInvoke");
        // if no ex handler, we use our own
        this._defaultExceptionHandler = exceptionHandler ?? DefaultExceptionHandler;
        // all async calls will get pointed to this callback
        this._asyncCallback = this.Callback;
        // cache which thread user wants callbacks on..he can change later if he wants
        this.TheThreadCallback = threadCallback;
    }

    /// <summary>
    /// User can set if they want callbacks to occur on the 
    /// thread this object is called InvokeAsycn(), or on the thread
    /// spawned from the BeingInvoke() method.
    /// </summary>
    /// <remarks>This applies to the exception handler as well</remarks>
    public ThreadCallback TheThreadCallback { get; set; }

    /// <summary>
    /// Provides the asyncresult in a threadsafe manner
    /// </summary>
    public IAsyncResult CurrentIAsyncResult
    {
        get { lock (this._lock) return this._current; }
        private set { lock (this._lock) this._current = value; }
    }

    /// <summary>
    /// Sets IssueCallbacksOnInvokesAsync to false and
    /// wipes out the CurrentIAsyncResult so no callback fires
    /// </summary>
    /// <remarks>Subsequent calls to InvokeAsync() will
    /// set the CurrentIAsyncResult so the last InvokeAsync() 
    /// will get the callback</remarks>
    public void Ignore()
    {
        this.IssueCallbacksOnInvokesAsync = false;
        this.CurrentIAsyncResult = null;
    }

    /// <summary>
    /// If true, APMHelper will issue callbacks on ALL BeginInvokes(), otherwise
    /// only the last InvokeAsync() gets the callback
    /// </summary>
    public bool IssueCallbacksOnInvokesAsync { get; set; }

    /// <summary>
    /// Ignores all outstanding calls.
    /// </summary>
    /// <remarks>You could actually start using it again</remarks>
    public void Dispose()
    {
        this.Ignore();
    }

    /// <summary>
    /// User should convert his arguments in order into args parm
    /// </summary>
    /// <param name="args">T1, T2..etc</param>
    /// <param name="userCallback">method user wants results pumped to</param>
    /// <param name="exceptionHandler">method user wants exceptions pumped into</param>
    protected void InvokeAsync(List<object> args, 
	Action<TResult> userCallback, Action<Exception> exceptionHandler)
    {
        // if a sync context is available and user wants callback on AsyncCallerThread,
        // then callback will happen on the thread calling this method now.
        // Otherwise, the normal bg thread will call the callback
        args.Add(
            this.TheThreadCallback == ThreadCallback.AsyncCallerThread
                ? SyncContextAsyncCallback.Wrap(this._asyncCallback)
                : this._asyncCallback);
        // we need to pass in the pointer to the method the user wants his notification
        args.Add(new CallbackState(userCallback, exceptionHandler));
        // even though we call Invoke, 
        // we are actually calling the BeginInvoke() so this won't block
        this.CurrentIAsyncResult = 
	(IAsyncResult) this._beginInvokeMethod.Invoke(this._function, args.ToArray());
    }

    /// <summary>
    /// User should convert his arguments in order into args parm
    /// </summary>
    /// <param name="args">T1, T2..etc</param>
    /// <param name="userCallback">method user wants results pumped to</param>
    protected void InvokeAsync(List<object> args, Action<TResult> userCallback)
    {
        this.InvokeAsync(args, userCallback, this._defaultExceptionHandler);
    }

    /// <summary>
    /// all async calls come through here to make sure they are valid
    /// </summary>
    private void Callback(IAsyncResult result)
    {
        bool correctCall = result == this.CurrentIAsyncResult || 
					this.IssueCallbacksOnInvokesAsync;
        var tmp = (AsyncResult) result;
        var callbackState = (CallbackState)((AsyncResult)result).AsyncState;

        TResult output;
        try
        {
            // get our results
            output = (TResult)this._endInvokeMethod.Invoke
				(this._function, new[] { result });
        }
        catch (Exception ex)
        {
            if (correctCall)
            {
                // get our callback
                ExecuteExceptionHandler(ex, callbackState.ExceptionHandler);
            }
            return;
        }
        if (!correctCall) return;
        if (null == callbackState.UserCallback) return; // user might have just 
						// issued fire and forget
        // notify the user
        callbackState.UserCallback(output);
    }

    private static void ExecuteExceptionHandler
	(Exception exception, Action<Exception> exHandler)
    {
        if (null != exHandler) exHandler(exception);
    }

    private static void DefaultExceptionHandler(Exception ex)
    {
        // log if you want
        throw ex;
    }

    private sealed class CallbackState
    {
        public readonly Action<TResult> UserCallback;
        public readonly Action<Exception> ExceptionHandler;
        public CallbackState(Action<TResult> userCallback, 
				Action<Exception> exceptionHandler)
        {
            this.UserCallback = userCallback;
            this.ExceptionHandler = exceptionHandler;
        }
    }
}

关注点

本质上,您可以看到子类(下面有一个示例)提供了传递给构造函数的委托。然后我们使用反射来获取 begin 和 endinvoke 方法。操作发生在 InvokeAsync() 中。此方法将使用参数调用 BeginInvoke(),并添加另外两个参数:回调和状态。状态通常是当 EndIvoke() 完成时应该调用的 Action 方法。请注意对 SyncContextAsyncCallback.Wrap(this._asyncCallback) 的调用。这段代码实际上是我从 Jeffrey Richter 那里获得的,他在他的 Power Threading Library 中提供了它。这段代码的作用基本上是缓存从 InvokeAsync() 调用使用的 SynchronizationContext。在上面的例子中,这将是 UI 线程。这个的好处是,它会将结果或异常封送到调用 InvokeAsync() 的线程。发生的情况是,我的回调被包装在 SyncContextAsyncCallback.Callback 实例中。然后,该回调负责将 APMHelperBase 中注册的回调发布到 InvokeAysnc() 调用的 SynchronizationContext。当然,如果没有 SynchronizationContext,或者用户通过 ThreadCallback 属性指定回调在后台工作线程上发出,那么我们就忽略此过程。

APMHelperBase 是抽象的,所以这里有一个子类的示例。这个子类的目的是(以及所有 Func<> 签名的子类)提供强类型。

/// <summary>
/// Provides async service for Func T1, T2, T3, TResult
/// </summary>
public class ApmHelper<T1, T2, T3, TResult> : ApmHelperBase<TResult>
{
    /// <summary>
    /// ctor
    /// </summary>
    public ApmHelper(Func<T1, T2, T3, TResult> func) : this(func, null) { }

    /// <summary>
    /// ctor
    /// </summary>
    public ApmHelper(Func<T1, T2, T3, TResult> func, Action<Exception> exceptionHandler)
        : this(func, exceptionHandler, ThreadCallback.AsyncCallerThread)
    {
    }

    /// <summary>
    /// ctor
    /// </summary>
    public ApmHelper(Func<T1, T2, T3, TResult> func, ThreadCallback threadCallback)
        : this(func, null, threadCallback)
    {
    }

    /// <summary>
    /// ctor
    /// </summary>
    public ApmHelper(Func<T1, T2, T3, TResult> func, 
		Action<Exception> exceptionHandler, ThreadCallback threadCallback)
        : base(func, exceptionHandler, threadCallback)
    {
    }

    /// <summary>
    /// starts the async call
    /// </summary>
    public void InvokeAsync(T1 t1, T2 t2, T3 t3, Action<TResult> callback)
    {
        this.InvokeAsync(new List<object> { t1, t2, t3 }, callback);
    }

    /// <summary>
    /// starts the async call
    /// </summary>
    public void InvokeAsync(T1 t1, T2 t2, T3 t3, 
	Action<TResult> callback, Action<Exception> exceptionHandler)
    {
        this.InvokeAsync(new List<object> { t1, t2, t3 }, callback, exceptionHandler);
    }
}

几点说明

您可以设置几个选项。例如,如果您想从 InvokeAsync() 获得每个回调,您可以将 IssueCallbacksOnInvokesAsync 设置为 true。但是,您将按调用完成的顺序获得它们,不一定是执行的顺序。Ignore() 将基本上将当前的 IAsyncResult 设置为 null,这样我们就不会发出我们的回调。如果您在关闭表单但仍有未完成的调用正在执行,这很有用。如果您想获取当前的 IAsyncResult,您可以这样做,然后当然您可以进行轮询、阻塞等,如果您愿意的话。

© . All rights reserved.