65.9K
CodeProject 正在变化。 阅读更多。
Home

通用类,用于封装异步 Begin/End 操作,使用 .NET 的响应式扩展 (Rx)

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.65/5 (12投票s)

2010年2月3日

CPOL

4分钟阅读

viewsIcon

69977

downloadIcon

80

本文介绍了一种可重用的类和技术,用于使用 Begin/End 模式和新的 .NET 响应式扩展 (Rx) 库轻松进行异步编程。

引言

作为 .NET 程序员,您会遇到一种模式,即用于进行异步函数调用的 Begin/End 模式。该模式的目的是允许长时间运行的操作在与调用线程不同的线程上执行,从而使调用线程能够自由(非阻塞)地继续执行。这是构建响应式 GUI 和有效进行远程调用的重要技术(无论您是调用 WCF 服务、使用 .NET Remoting、访问某些基于 REST 的 Web 服务等)。如果您以前从未见过,它的外观如下:

IAsyncResult BeginOperation(...some number of parameters as input, 
             AsyncCallback callback, object state);
SomeResult EndOperation(IAsyncResult);

虽然 BeginInvoke/EndInvoke 模式功能强大,但它很笨拙且不直观,尤其是当 EndOperation 部分返回一个值时。如果您查看上面的模式,您会发现 Begin 部分接受一个回调委托和一个“状态”对象,并返回一个 IAsyncResult。并不容易看出您应该将什么作为回调或状态传递,也不清楚您应该如何处理调用函数后获得的 IAsyncResult!最重要的是,它并不清楚如何获取 EndOperation 的结果。

为了更轻松地使用 Begin/End 模式,Microsoft 推荐一种称为事件驱动异步模式的方法。虽然这种特定模式有所改进,但我们可以使用新的 .NET 响应式扩展 (Rx) 或 Reactive LINQ 库再进一步。

背景

使用代码

要使用包装器,只需创建一个通用类型 AsyncPatternWrapper 的新实例。您必须至少使用一个泛型类型参数——该参数将表示 EndOperation 中将返回的内容。如果 BeginOperation 除了回调和状态参数外还接受其他参数,那么您必须传入其他泛型参数。

例如,如果 BeginOperation 接受一个 string,而 EndOperation 返回一个 int,那么您将声明 AsyncPatternWrapper<string, int>(BeginOperation, EndOperation) 的新实例。

任何 AsyncPatternWrapper 类的构造函数都接受两个参数——BeginOperation 及其对应的 EndOperation。

AsyncPatternWrapper 类实现了 IObservable<TResult> 接口,其中 TResult 是 EndOperation 调用的结果。IObservable 是新的 .NET 响应式扩展库的一部分。

除了实现 IObservable 之外,AsyncPatternWrapper 只有一个函数——Invoke()。它所做的就是调用 BeginOperation。完成后,另一个结果将在 IObservable 流中到来。只要您订阅了 IObservable 流(通过“Subscribe”函数),您就会在 EndOperation 完成时自动处理其结果。

示例 1(消费 REST Web 服务)

WebRequest request = HttpWebRequest.Create(
  @"http://services.digg.com/containers?appkey=http%3A%2F%2Fapidoc.digg.com");
var wrapper = new AsyncPatternWrapper<WebResponse>(
  request.BeginGetResponse, request.EndGetResponse);

wrapper.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
wrapper.Invoke();

新的响应式扩展库(以前称为 Reactive LINQ)的酷之处在于,您可以使用标准的 LINQ 查询运算符来处理事件流。所以在上面的示例 1 中,您不必打印每个 WebResponseContentLength,而是可以只打印 ContentLength 大于 3000 的 WebResponse,例如。

wrapper
    .Where(webResponse => webResponse.ContentLength > 3000)
    .Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));

示例 2(消费 WCF 服务)

// Some WCF service interface
[ServiceContract]
public interface IService
{
    [OperationContract(AsyncPattern = true)]
    IAsyncResult BeginGetCustomers(AsyncCallback callback, object state);

    List<Customer> EndGetCustomers(IAsyncResult result);
}

// Code snippet - illustrates consumption of the WCF service on the client-side
            
var cf = new ChannelFactory<IService>(new BasicHttpBinding(), 
         new EndpointAddress(@"https://:8085"));
var service = cf.CreateChannel();

var wrapper = new AsyncPatternWrapper<List<Customer>>(
                  service.BeginGetCustomers, service.EndGetCustomers);
wrapper.ObserveOnDispatcher().Subscribe(customers => UpdateUI(customers));

// refreshButton is some button on the UI - every time you
// click it, you will asynchronously refresh the list of customers:
refreshButton.Click += (s,e) => wrapper.Invoke();

请注意,在上面的示例 2 中,您不必通过 Dispatcher 切换回 UI 线程即可更新 UI!因此,您的代码最终会更简洁,可读性也更高。

另一个收获是,您不必依赖 WCF 代理生成器,而是可以使用 AsyncPatternWrapper 类轻松创建自己的代理。这对于 WCF 服务不公开 .svc 文件的情况,或者在不可能存在这种情况的情况下(例如 WCF REST)非常有价值。

AsyncPatternWrapper

这是 AsyncPatternWrapper 的源代码,用于 Begin 操作中没有参数,以及 Begin 操作中有一个参数的情况。有关 Begin 操作中最多四个参数的 AsyncPatternWrapper 类,请参阅随附的压缩文件。

using System;
using System.Linq;

namespace System
{
    #region No arguments

    public class AsyncPatternWrapper<TResult> : IObservable<TResult>
    {
        Func<AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke()
        {
            IAsyncResult result = null;
            result = beginOp(
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        #region IObservable<TResult> Members

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }

        #endregion
    }

    #endregion


    #region One argument

    public class AsyncPatternWrapper<T, TResult> : IObservable<TResult>
    {
        Func<T, AsyncCallback, object, IAsyncResult> beginOp;
        Func<IAsyncResult, TResult> endOp;
        IObservable<TResult> observable;
        event EventHandler<EventArgs<TResult>> done;

        public AsyncPatternWrapper(
            Func<T, AsyncCallback, object, IAsyncResult> beginOp,
            Func<IAsyncResult, TResult> endOp)
        {
            this.beginOp = beginOp;
            this.endOp = endOp;

            observable = Observable.FromEvent<EventArgs<TResult>>(
                e => this.done += e, e => this.done -= e)
                .Select(s => s.EventArgs.Item);
        }

        public void Invoke(T param)
        {
            IAsyncResult result = null;
            result = beginOp(
                param,
                new AsyncCallback(notUsed =>
                {
                    try
                    {
                        var res = endOp(result);
                        if (done != null)
                            done(null, new EventArgs<TResult> { Item = res });
                    }
                    catch (Exception)
                    {
                        // Don't raise an event if there was an exception
                    }
                }),
                null);
        }

        public IDisposable Subscribe(IObserver<TResult> observer)
        {
            return observable.Subscribe(observer);
        }
    }

    #endregion

    internal class EventArgs<T> : EventArgs
    {
        public T Item { get; set; }
    }
}

请注意,上面的代码中异常被简单地丢弃了——如果您觉得重新抛出异常更有用,或者另外公开一个仅用于处理异常的 IObservable 流,那么更改起来也很容易。

希望您觉得本文有用,并且阅读愉快!

历史

  • 2010 年 2 月 2 日 - 版本 1.0。
© . All rights reserved.