事件驱动异步模式的自动实现






4.78/5 (30投票s)
使用此代码生成器自动实现事件驱动异步模式

目录
引言
AsyncGen 是一个实用程序,它根据以下带注解的类和接口生成实现基于事件的异步模式的客户端类,例如...
using System;
using System.Collections.Generic;
using System.Text;
using AsyncGen;
namespace TestAssembly
{
[GenerateAsyncClientClass("Client")]
public interface IServer
{
[GenerateAsyncOperation(
GenerateCancelMethod = true,
CompletedEventName = "CalculationComplete",
CallbackInterface = typeof(IServerCallbacks))]
double Calculate(double argument,
[TaskID] object userState, IServerCallbacks callbacks);
}
public interface IServerCallbacks
{
[GenerateProgressEvent("CalculationProgressChanged")]
[return:CancelFlag] bool ReportCalculationProgress
(double approximateResult, [ProgressPercentage] int percentage,
[TaskID] object userState);
}
}
...AsyncGen 将从中生成以下客户端类

术语说明:在本文的其余部分,我将互换使用“客户端类”和“代理”这两个术语。
背景
自其诞生以来,.NET 框架为异步调用操作提供了广泛的支持。许多 .NET BCL(基类库)中的方法都有异步版本,形式为一对名为 BeginOperation
和 EndOperation
的方法。例如,FileStream 类提供了 BeginRead 和 EndRead 方法,它们作为 Read 方法的异步版本。BeginRead 返回一个实现 IAsyncResult 接口的对象,该对象用于将每次 EndRead 调用与较早的 BeginRead 调用关联起来。这种模式在不同地方被称为异步编程模型 (APM)、IAsyncResult 模式,或简称为异步模式。(后一个术语尤其用于 .NET 2.0 之前的文献,因为这是 .NET 1.x 中唯一用于异步操作的模式。)
除了 BCL 提供的异步方法之外,异步委托机制允许您异步调用任何方法。编译器每次使用 delegate 关键字时自动生成的 BeginInvoke
和 EndInvoke
方法符合与 BCL 方法相同的模式。因此,可以说异步委托提供了 IAsyncResult
模式的通用实现。
IAsyncResult
模式非常适合 FileStream.Read 等“幕后”操作,但它不利于用户交互,具体来说:
- 它不支持取消。
- 它不支持进度报告。
- 它不支持增量结果。
- 它不支持完成回调的不同线程模型。
由于这些原因,.NET 2.0 中引入了一种新模式,称为基于事件的异步模式,它支持上述所有功能。(另请参阅 MSDN 上的异步编程设计模式。)到目前为止,最接近于事件驱动模式的通用实现是 BackgroundWorker 类,它允许您在后台运行任意操作而不会阻塞主线程,并且仍然能够在操作完成和有进度时在主线程上接收通知。然而,即使 BackgroundWorker
也未以其最通用形式实现事件驱动模式;例如,它不支持同一操作的多个并发调用。此外,它的事件不能在不同的应用程序域或进程之间进行封送,因此不适用于分布式场景。
AsyncGen 试图提供事件驱动模式的通用实现,包括 MSDN 文章中描述的所有功能。为了使其适用于分布式场景,AsyncGen 采用客户端/服务器方法,这意味着异步调用的细节不由实现操作逻辑的同一类处理,而是由使用原始类作为服务器的代理类处理。代理可以与服务器在同一进程中,也可以在单独的进程中。在后一种情况下,代理和服务器使用 .NET Remoting 进行通信。在内部,代理使用异步委托来实现操作。
使用 AsyncGen
概述
要使用 AsyncGen,您必须遵循以下步骤:
- 将项目引用添加到 AsyncGenLib.dll。
- 使用一些特殊属性注解您的
public
类或接口。此步骤将在本页后面详细解释。 - 构建您的程序集。
- 打开命令提示符,
cd
到目标目录,并在编译后的程序集上调用 AsyncGen.exe。 - 将生成的类添加到与原始类相同的项目,或添加到引用原始类的另一个项目中。
命令行语法
摘要
AsyncGen.exe [ options ] <assembly containing annotated classes>
[ <additional assembly needed to compile the generated classes> ... ]
选项
/lang:<output language>
<output language> 的有效值为:
- CS : C#(默认)
- VB : Visual Basic
- CPP : C++/CLI
代码在当前目录中生成,每个类在一个单独的源文件中,名为 class.generated.cs。
注意:运行 AsyncGen.exe 时,请确保 AsyncGenLib.dll 在当前目录中。通过在项目的 bin\Debug 或 bin\Release 目录中运行 AsyncGen.exe 可以很容易满足此要求。
注解您的类
在使用 AsyncGen.exe 之前,您必须使用 AsyncGenLib.dll 中的以下属性来注解您的源类和接口,风格类似于 WCF 服务:
GenerateAsyncClientClassAttribute
将此属性放置在您希望为其生成异步代理的每个类或接口上。如果您未指定任何构造函数参数或命名属性,则代理类的名称将与源类或接口的名称相同,并附加“Client”一词,并从接口名称中去除前导“I”。
GenerateAsyncOperationAttribute
将此属性放置在您希望为其实现模式的每个方法上。它有几个属性可用于自定义生成成员的名称,例如启动方法和完成事件。有一个属性值得特别关注:CallbackInterface
属性指定一个接口类型,您的服务器可以使用该接口类型在代理上引发进度事件。此问题将在下一节中解释。
GenerateProgressEventAttribute
此属性用于回调接口的方法上,以指示每个方法应引发的事件类型。
TaskIDAttribute、ProgressPercentageAttribute 和 CancelFlagAttribute
这些属性用于识别在事件驱动模式实现中具有特殊意义的方法参数。它们没有任何构造函数参数或命名属性。
回调接口
虽然完成事件由代理在服务器方法返回时(正常或异常)自动引发,但进度事件必须由服务器本身引发。这通过回调接口完成。与 WCF 隐式将回调接口传递给服务器不同,AsyncGen 要求回调接口明确定义为原始方法的参数之一,最好是最后一个参数。
回调接口的方法通过 GenerateProgressEventAttribute
与代理上的特定事件绑定。如果回调方法有多个参数,AsyncGen 会将它们全部打包到一个值类型中,该值类型又包含在一个通用 EventArgs
对象中。如果方法只有一个参数,它将直接包含在 EventArgs
中。
您不必实现回调接口。它由生成的代码自动实现,如以下示例所示:
// --- Source file: IServer.cs ---
[GenerateAsyncClientClass("Client")]
public interface IServer
{
[GenerateAsyncOperation(CallbackInterface = typeof(IServerCallbacks))]
double Calculate(double argument, [TaskID] object userState,
IServerCallbacks callbacks);
}
public interface IServerCallbacks
{
[GenerateProgressEvent("CalculationProgressChanged")]
[return:CancelFlag] bool ReportCalculationProgress
(double approximateResult, [ProgressPercentage] int percentage,
[TaskID] object userState);
}
// --- Source file: Client.generated.cs ---
public partial class Client : ClientBase<TestAssembly.IServer>
{
// ...
public void CalculateAsync(double argument, object userState)
{
this._calculateTracker.CreateOperation(userState);
CalculateDelegate d = new CalculateDelegate(this.server.Calculate);
d.BeginInvoke(argument, userState, this._calculateTracker,
new System.AsyncCallback(this._calculateTracker.PostOperationCompleted),
userState);
}
// ...
public event AsyncGen.ProgressChangedEventHandler<double> CalculationProgressChanged;
// ...
private class CalculateTracker : OperationTracker<IServer, Client,
CalculateDelegate, double>, TestAssembly.IServerCallbacks
{
// ...
bool IServerCallbacks.ReportCalculationProgress
(double approximateResult, int percentage, object userState)
{
this.PostProgress<double>(new System.Threading.SendOrPostCallback
(this.OnCalculationProgressChanged), percentage,
approximateResult, userState);
return this.IsOperationCancelled(userState);
}
}
// ...
}
当服务器调用 callbacks.ReportCalculationProgress
时,会发生以下情况:
- 如果客户端位于不同的应用程序域或不同的进程中,则通过 Remoting 通道将调用封送到其中。
- 回调方法使用与当前调用(由
userState
参数标识)关联的 AsyncOperation 对象进一步将事件封送到适当的线程。在 WinForms 应用程序中,此线程通常是主线程,或者更一般地说,是拥有用于启动操作的控件的线程。在控制台应用程序中,此线程是调用线程(如果使用SendProgress
同步发送消息),或者是线程池中的任意线程(如果使用PostProgress
异步发送消息)。 DoSomethingProgressChanged
事件在适当的线程上引发。传递给ReportProgress
的参数被打包到EventArgs
参数中。
请注意 CalculateAsync
签名中缺少回调参数。CalculateAsync
在调用服务器的 Calculate
方法时自行提供此参数。
取消
默认情况下,AsyncGen 不会生成 Cancel 方法。如果您的服务器支持取消,您应该将 GenerateAsyncOperationAttribute.GenerateCancelMethod
属性设置为 true
。您可以使用 GenerateAsyncOperationAttribute.CancelMethodName
属性自定义此方法的名称。
取消操作的能力要求服务器轮询一个布尔标志,客户端可以在操作生命周期的任何时候引发该标志。您有以下两种选择:
-
在服务器端存储标志。 AsyncGen 不为此选项提供任何特殊支持,因为它需要完全在服务器端实现。为了符合事件驱动模式,服务器应定义一个名为
DoSomethingAsyncCancel
的public
方法来设置取消标志。此方法不应进行注解。 -
在客户端存储标志。 此选项的主要优点是可以在将服务器进度报告给客户端的回调返回端进行轮询。AsyncGen 通过提供
CancelFlagAttribute
来支持此功能。将此属性应用于回调方法的返回值或其out
参数之一(两者都必须是Boolean
)会导致 AsyncGen 在回调方法中生成一个附加语句,将取消标志的值返回给服务器。例如,如果您按如下方式注解您方法的返回值...public interface IServerCallbacks { [GenerateProgressEvent("CalculationProgerssChanged")] [return:CancelFlag] bool ReportCalculationProgress(double approximateResult, [ProgressPercentage] int percentage, [TaskID] object userState); }
...AsyncGen 将生成以下代码:
bool IServerCallbacks.ReportCalculationProgress (double approximateResult, int percentage, object userState) { this.PostProgress<double>(new System.Threading.SendOrPostCallback (this.OnCalculationProgerssChanged), percentage, approximateResult, userState); return this.IsOperationCancelled(userState); }
如果您更喜欢使用
out
参数,如下所示:public interface IServerCallbacks { [GenerateProgressEvent("CalculationProgerssChanged")] void ReportCalculationProgress(double approximateResult, [ProgressPercentage] int percentage, [TaskID] object userState, [CancelFlag] out bool cancel); }
...AsyncGen 将生成以下代码:
void IServerCallbacks.ReportCalculationProgress (double approximateResult, int percentage, object userState, out bool cancel) { this.PostProgress<double>(new System.Threading.SendOrPostCallback (this.OnCalculationProgerssChanged), percentage, approximateResult, userState); cancel = this.IsOperationCancelled(userState); }
然后服务器可以使用回调接口将进度报告与轮询取消结合起来,如以下示例所示:
public int DoSomething(int n, object userState, IDoSomethingCallbacks callbacks) { bool cancelRequested; for (int i = 0; i < 20; i++) { if (callbacks != null) { callbacks.ReportProgress(5 * i, userState, out cancelRequested); if (cancelRequested) return -1; } Thread.Sleep(500); } if (callbacks != null) { callbacks.ReportProgress(100, userState, out cancelRequested); if (cancelRequested) return -1; } return n * n; }
重载方法
C# 中最简单的重载用法是为一项或多项参数提供默认值。通常在这种情况下,方法有一个主要的重载,它接受所有可能的参数并实现操作的逻辑,以及几个简单的重载,它们调用主要的重载并为某些参数提供默认值。例如,在以下代码中,DoSomething
的第一个重载为第二个重载的参数 n
提供了默认值 17
。
using System;
using System.Collections.Generic;
using System.Text;
using AsyncGen;
namespace TestAssembly
{
[GenerateAsyncClientClass("OverloadedClient")]
public class Server
{
[GenerateAsyncOperation]
public int DoSomething(int m)
{
return DoSomething(m, 17);
}
[GenerateAsyncOperation]
public int DoSomething(int m, int n)
{
return 42;
}
}
}
当您为这个类生成异步代理时,您可能希望它包含两个名为 DoSomethingAsync
的方法——一个带一个 int
参数,另一个带两个——以及一个名为 DoSomethingCompleted
的单个事件。然而,上述注解不会产生预期的结果。相反,生成的代码将包含两个名为 doSomethingDelegate
的委托,两个名为 DoSomethingTracker
的类,以及两个名为 _dosomethingTracker
的字段,因此它将无法编译。
有两种方法可以达到预期结果:
-
只注解最通用的重载,并在代理类中手动实现其他重载。这就是 partial 修饰符派上用场的地方,因为它允许您在单独的源文件中实现附加方法。这是更简单的方法,因此应尽可能优先选择它。例如:
// OverloadedServer.cs using System; using System.Collections.Generic; using System.Text; using AsyncGen; namespace TestAssembly { [GenerateAsyncClientClass("OverloadedClient")] public class OverloadedServer { public int DoSomething(int m) { return DoSomething(m, 17); } [GenerateAsyncOperation] public int DoSomething(int m, int n) { return 42; } } } // OverloadedClient.cs using System; using System.Collections.Generic; using System.Text; namespace TestAssembly { public partial class OverloadedClient { public int DoSomething(int m) { return DoSomething(m, 17); } public void DoSomethingAsync(in-t m) { DoSomethingAsync(m, 17); } } } // OverloadedClient.generated.cs namespace TestAssembly { using System; using System.ComponentModel; using System.Diagnostics; using AsyncGen; public partial class OverloadedClient : ClientBase<TestAssembly.OverloadedServer> { // ... public event AsyncCompletedEventHandler<System.Int32> DoSomethingCompleted // ... public int DoSomething(int m, int n) { // ... } public void DoSomethingAsync(int m, int n) { // ... } // ... delegate int DoSomethingDelegate(int m, int n); } }
-
注解所有重载;为每个重载指定不同的基本名称,但使用
GenerateAsyncOperationAttribute
的其他命名属性为启动方法、取消方法和完成事件分配相同的名称。例如:using System; using System.Collections.Generic; using System.Text; namespace TestAssembly { [AsyncGen.GenerateAsyncClientClass("OverloadedClient")] public interface IOverloadedServer { [AsyncGen.GenerateAsyncOperation("op1", StartMethodName = "DoSomethingAsync", CancelMethodName = "DoSomethingAsyncCancel", CompletedEventName = "DoSomethingCompleted")] void DoSomething(int m); [AsyncGen.GenerateAsyncOperation("op2", StartMethodName = "DoSomethingAsync", CancelMethodName = "DoSomethingAsyncCancel", CompletedEventName = "DoSomethingCompleted")] void DoSomething(int m, int n); } }
...AsyncGen 将从中生成以下客户端类:
请注意,DoSomethingAsync
被重载,但 DoSomethingAsyncCancel
没有,并且只有一个 DoSomethingCompleted
事件。
当同一个方法的多个重载实现复杂逻辑时,您必须采用第二种方法。
警告:如果重载具有不同的 ref
或 out
参数或不同的返回类型(尽管这似乎是不良实践),您必须将它们的完成事件命名不同以防止不同输出类型之间的类型冲突。
同步调用
生成的代理还包含服务器方法的同步版本,没有回调接口参数。如果您想同步调用操作,请使用此方法而不是直接调用服务器方法。请注意,您仍然可以引发进度事件,但如果您尝试在典型的 WinForms 应用程序中执行此操作,您将遇到以下问题之一,具体取决于 GenerateProgressEventAttribute.Async
属性的值:
- 如果您接受默认值(
true
),则事件将在操作已经完成后引发。 - 如果您将
Async
设置为false
,您将遇到死锁。
尽管如此,在几种情况下,同步调用的方法可以安全地(并且有用地)引发进度事件:
- 在使用默认的自由线程 SynchronizationContext 的控制台应用程序中。
- 在 WinForms 应用程序中,如果方法由后台线程调用。
- 在 WinForms 应用程序中,如果方法由另一个方法调用,而该方法又异步调用。
最后一个场景在实践中发生,当您需要调用一个由一系列阶段组成的操作,每个阶段都必须向用户报告其进度,而不会阻塞主线程。
生命周期管理
当代理和服务器通过 .NET Remoting 相互通信时,它们都受制于生命周期管理。如果其中一方在使用完另一方之前下线,您将收到以下异常:
"Object ... has been disconnected or does not exist at the server."
为了确保服务器不会过早下线,您应该通过以下方法之一设置其生命周期限制:
-
在服务器的配置文件中添加一个
<lifetime>
元素,例如:<configuration> <system.runtime.remoting> <application> <lifetime leaseTimeout="10M" renewOnCallTime="5M" /> </application> </system.runtime.remoting> </configuration>
-
重写从
MarshalByRefObject
继承的InitializeLifetimeService
方法。(您的服务器必须派生自MarshalByRefObject
才能被 .NET Remoting 访问。)例如:class Server : MarshalByRefObject { public override object InitializeLifetimeService() { ILease tmp = (ILease) base.InitializeLifetimeService(); if (tmp.CurrentState == LeaseState.Initial) { tmp.InitialLeaseTime = TimeSpan.FromSeconds(5); tmp.RenewOnCallTime = TimeSpan.FromSeconds(1); } return tmp; } }
为了确保代理在服务器下线之前不会下线,您的服务器应该赞助其代理。如果您的服务器实现 ISponsor 接口,它将自动被代理注册为赞助商。ISponsor
包含一个方法 Renewal,该方法由框架在代理租约即将到期时调用。Renewal
应该返回一个合理的时间量,以延长代理的租约,例如:
class Server : MarshalByRefObject, ISponsor
{
TimeSpan ISponsor.Renewal(ILease lease)
{
return TimeSpan.FromMinutes(2);
}
}
要了解有关生命周期管理的更多信息,请参阅[1]。
演示项目

要查看注解类型更改如何影响生成的代码,请使用 AsyncGen.sln 中的 TestAssembly.csproj。由于调试设置未保存在 *.csproj 文件中,您必须自行配置启动选项:
- 在解决方案资源管理器窗格中,右键单击 AsyncGen 并选择“属性”。
- 切换到调试选项卡。
- 确保启动操作设置为“启动项目”。
- 从配置下拉列表中,选择“所有配置”。
- 将命令行参数设置为:
testassembly.dll /language:cs
- 对于工作目录,单击省略号 (...) 并导航到:
..\..\..\TestAssembly\bin\Debug
完成此配置后,在 Visual Studio 中按 F5 或 Ctrl+F5 将编译 TestAssembly.csproj 并在 TestAssembly.dll 上运行 AsyncGen.exe。
要了解代理在分布式应用程序中如何工作,请打开演示解决方案 EventBasedAsync.sln。此解决方案包含三个项目:
- Interface — 包含
IServer
接口的定义以及生成的Proxy
类。 - Server — 一个控制台应用程序,它创建
Server
类的一个实例,该类实现IServer
,然后等待客户端通过 Remoting 连接到此对象。 - Client — 上面看到的 Windows Forms 应用程序。
同样,您必须配置一些未保存在 *.sln 和 *.csproj 文件中的设置:
- 在解决方案资源管理器中,右键单击解决方案并选择“设置启动项目...”
- 选择“多个启动项目”。
- 对于客户端和服务器,将操作更改为“启动”。对于接口,将操作设置为“无”。
在 Visual Studio 中按 F5 或 Ctrl+F5 时,服务器和客户端都会启动。当您按下“提交请求”时,客户端会根据“多少?”框中的数字向服务器发送一个或多个同时请求。对于每个提交的请求,都会向 ListView
添加一个新行。当服务器处理每个请求时,它会向客户端发送进度报告,客户端使用每行中嵌入的进度条显示这些报告。您可以通过选择任何仍在进行的 operación 并从右键菜单中选择“取消选定”来取消它。最后,您可以通过从右键菜单中选择“清除已完成”来清除 ListView
中所有已取消和已完成的 operaciones。
除了演示 AsyncGen 生成的代理的功能外,此项目还演示了 .NET 线程池的行为。服务器提供了一个原始命令行界面,允许您配置其线程池的大小限制。以下命令可用:
sminc n
: 将完成端口线程的最小数量设置为 n。smaxc n
: 将完成端口线程的最大数量设置为 n。sminw n
: 将工作线程的最小数量设置为 n。smaxw n
: 将工作线程的最大数量设置为 n。g
: 查询当前线程池大小限制。q
: 退出服务器。
在分布式应用程序中,客户端的请求由服务器线程池的完成端口部分中的线程处理,因此工作线程的数量对应用程序的性能没有影响。另一方面,如果您将服务器移动到与客户端相同的 AppDomain 中,请求将由工作线程处理。在这种情况下,您应该会看到性能仅受工作线程数量的影响,而不受完成端口线程数量的影响。
设计与实现
AsyncGenLib.dll
除了上面描述的属性类之外,AsyncGenLib.dll 还包含几个有助于实现生成的代理的类:
AsyncCompletedEventArgs
和AsyncCompletedEventHandler
的泛型版本,派生自 .NET 框架定义的非泛型版本。类型参数TOutput
表示保存操作输出的类型。如果操作具有单个输出值(例如,返回值但没有out
或ref
参数),则此值的类型将替换TOutput
。如果操作具有一个或多个out
或ref
参数,则这些参数的值以及方法的返回值将打包到一个值类型中(通常命名为DoSomethingOutput
,但可以使用GenerateAsyncOperationAttribute.OutputTypeName
属性进行自定义),该值类型将替换TOutput
。如果操作的返回类型为void
且没有out
或ref
参数,则使用AsyncCompletedEventArgs
的非泛型版本。- 类似地,还有 ProgressChangedEventArgs 和 ProgressChangedEventHandler 的泛型版本。
ClientBase
,用于生成代理的泛型基类。类型参数TServer
表示原始类或接口的类型。OperationTracker
,代理内部用于完成大部分繁重工作的基类(见下文)。实际上,OperationTracker
有两个版本,两者都是泛型的。更通用的版本有两个类型参数:TDelegate
,表示带注解方法的签名,和TOutput
,表示其输出。更专业化的版本只接受TDelegate
参数,用于没有任何输出的方法。拥有两个基类可供选择有助于简化生成的代码。
客户端类的实现方式
对于每个带注解的方法,AsyncGen.exe 在客户端类中生成两个 private
成员:
- 一个嵌套类,派生自 OperationTracker。
- 一个嵌套类类型的字段,以及在构造函数中初始化它的语句。
在内部,每个 OperationTracker
维护一个 OperationState
对象的字典,索引为应用程序在操作启动时提供的唯一任务 ID。(为了使其工作,原始方法的一个参数必须由 TaskIDAttribute
注解。)每个 OperationState
跟踪一个特定的调用。如果启动方法没有任务 ID 参数,则使用单个虚拟对象代替。在这种情况下,操作不支持多个并发调用。
如果一个方法被重载,每个重载都有自己的跟踪器类和相应的字段。这些成员的名称是使用为每个重载指定的基名构造的,因此每个方法的基名必须不同。(参见上面的“处理重载方法”)。程序确保如果多个重载为以下任何成员指定相同的名称,则实际上只会将一个成员添加到主类中:
- 完成事件
- 取消方法
- 每种进度事件(如果多个重载指定相同的回调接口)
跟踪器类(私有)为回调接口中每个带有 GenerateProgressEventAttribute
注解的方法声明一个事件,而主类(公共)声明一个相同的事件,并为跟踪器的事件注册一个处理程序,该处理程序引发公共事件。类似地,主类声明一个完成事件,并为每个跟踪器(继承自 OperationTracker
)的 OperationCompleted
事件注册一个处理程序,以引发公共事件。
作为通过主类中的处理程序传递事件的优雅替代方案,主类可以为每个 public
事件定义一个 add
访问器,该访问器立即将传递给它的处理程序注册到适当跟踪器(或,在重载方法的情况下,跟踪器)的相应事件上,以及一个 remove
访问器,该访问器将给定的处理程序从所有跟踪器中取消注册。这种实现唯一的问题是 System.CodeDom 命名空间不支持带有自定义访问器的事件,因此必须使用 代码片段。因此,此实现目前仅在 C# 和 Visual Basic 中可用,而上一段中更朴素的实现则在具有 CodeDOM 提供程序的每种语言中都可用。您可以通过选择“Debug”或“Debug With Snippets”作为解决方案配置来切换两种替代实现。
关注点
.NET 远程处理
.NET Remoting 通过 IMessageSink.AsyncProcessMessage 和几个类似方法支持异步调用,这些方法由 Remoting 管道的每个阶段(在 Remoting 术语中称为“消息槽”)或“槽链”实现。相同的客户端代码可以处理本地服务器(由本地 AppDomain 托管的对象)和远程服务器(由不同 AppDomain、进程或机器托管的对象)。
例如,考虑以下代码:
DoSomethingDelegate d = new DoSomethingDelegate(server.DoSomething);
d.BeginInvoke(new AsyncCallbac(server_DoSomethingCompleted), this);
通常,BeginInvoke
调用 ThreadPool.QueueUserWorkItem 并将一个委托传递给 server.DoSomething
。但是,如果 server
是远程对象的透明代理,BeginInvoke
会创建 MethodCall 消息并将其传递给代理槽链中第一个槽的 IMessageSink.AsyncProcessMessage 方法,该方法最终通过 Remoting 通道将其发送到服务器。它还注册一个回复槽来处理操作完成时由服务器发送的 ReturnMessage。回复槽调用传递给 BeginInvoke
的 AsyncCallback
。
需要注意的是,底层连接仍然是同步的。这意味着对于每个并发运行的异步调用,都将建立单独的连接。[1]
实际上,当服务器位于不同的进程中时,操作是使用服务器的线程池执行的。在客户端,每个通道只有一个线程被阻塞,而不是每个并发操作调用一个线程。客户端的线程池仅用于执行回调。理想情况下,回调应该足够快地返回,以避免同时使用多个线程,这在 .NET 2.0 中意味着它们必须在 500 毫秒内返回(请参阅 Marc Clifton 的.NET 的 ThreadPool 类 - 幕后)。
WCF
WCF 有一个类似的机制,但您必须使用带有 /async 开关的 SvcUtil 来生成代理类中的两个额外方法,而不是委托(它们与 .NET Remoting 硬连接在一起):[2]
[OperationContract(AsyncPattern = true,
Action = "<original action name>",
ReplyAction = "<original response name>")]
IAsyncResult Begin<Operation>(<in arguments>,
AsyncCallback callback, object asyncState);
<returned type> End<Operation>(<out arguments>, IAsyncResult result);
目前 AsyncGen 不支持 WCF。添加这种支持将需要对 AsyncGenLib.dll 进行一些扩展,并且 AsyncGen.exe 将要么自行调用 SvcUtil /async,要么依赖于所有已处理类型的 WCF 异步代理的预先存在。
如果您使用 WCF 实现事件驱动模式,您可能还需要利用 WCF 内置的回调接口支持。
完整示例
以下是根据上面声明的 IServer
接口生成的代码:
C#
//------------------------------------------------------------------------------
// <auto-generated>
// This code was generated by a tool.
// Runtime Version:2.0.50727.1433
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
// </auto-generated>
//------------------------------------------------------------------------------
namespace TestAssembly
{
using System;
using System.ComponentModel;
using System.Diagnostics;
using AsyncGen;
public partial class Client : ClientBase<TestAssembly.IServer>
{
private CalculateTracker _calculateTracker;
public Client(TestAssembly.IServer server) :
base(server)
{
this._calculateTracker = new CalculateTracker(this.server, this);
this._calculateTracker.OperationCompleted +=
new AsyncGen.AsyncCompletedEventHandler<double>
(this._calculateTracker_OperationCompleted);
this._calculateTracker.CalculationProgressChanged +=
new AsyncGen.ProgressChangedEventHandler<double>
(this._calculateTracker_CalculationProgressChanged);
}
public event AsyncGen.AsyncCompletedEventHandler<double> CalculationComplete;
public event AsyncGen.ProgressChangedEventHandler<double>
CalculationProgressChanged;
public double Calculate(double argument, object userState)
{
double value;
this._calculateTracker.CreateOperation(userState);
try
{
value = this.server.Calculate(argument, userState,
this._calculateTracker);
}
finally
{
this._calculateTracker.CompleteOperation(userState);
}
return value;
}
public void CalculateAsync(double argument, object userState)
{
this._calculateTracker.CreateOperation(userState);
CalculateDelegate d = new CalculateDelegate(this.server.Calculate);
d.BeginInvoke(argument, userState, this._calculateTracker,
new System.AsyncCallback(this._calculateTracker.PostOperationCompleted),
userState);
}
public void CalculateAsyncCancel(object userState)
{
if (this._calculateTracker.TryCancelOperation(userState))
{
return;
}
throw new System.ArgumentException();
}
private void _calculateTracker_CalculationProgressChanged
(object sender, AsyncGen.ProgressChangedEventArgs<double> args)
{
if ((this.CalculationProgressChanged != null))
{
this.CalculationProgressChanged(this, args);
}
}
private void _calculateTracker_OperationCompleted
(object sender, AsyncGen.AsyncCompletedEventArgs<double> args)
{
if ((this.CalculationComplete != null))
{
this.CalculationComplete(this, args);
}
}
private class CalculateTracker : OperationTracker<IServer, Client,
CalculateDelegate, double>, TestAssembly.IServerCallbacks
{
public CalculateTracker(IServer server, Client client) :
base(server, client)
{
}
public event AsyncGen.ProgressChangedEventHandler<double>
CalculationProgressChanged;
protected override void CallEndInvoke(CalculateDelegate d,
System.IAsyncResult iar, out double output)
{
output = d.EndInvoke(iar);
}
protected virtual void OnCalculationProgressChanged(object args)
{
if ((this.CalculationProgressChanged != null))
{
this.CalculationProgressChanged(this.client,
((AsyncGen.ProgressChangedEventArgs<double>)(args)));
}
}
bool IServerCallbacks.ReportCalculationProgress
(double approximateResult, int percentage, object userState)
{
this.PostProgress<double>(new System.Threading.SendOrPostCallback
(this.OnCalculationProgressChanged), percentage,
approximateResult, userState);
return this.IsOperationCancelled(userState);
}
}
delegate double CalculateDelegate(double argument, object userState,
TestAssembly.IServerCallbacks callbacks);
}
}
Visual Basic
'------------------------------------------------------------------------------
' <auto-generated>
' This code was generated by a tool.
' Runtime Version:2.0.50727.1433
'
' Changes to this file may cause incorrect behavior and will be lost if
' the code is regenerated.
' </auto-generated>
'------------------------------------------------------------------------------
Option Strict Off
Option Explicit On
Imports TestAssembly
Imports AsyncGen
Imports System
Imports System.ComponentModel
Imports System.Diagnostics
Namespace TestAssembly
Partial Public Class Client
Inherits ClientBase(Of IServer)
Private _calculateTracker As CalculateTracker
Public Sub New(ByVal server As IServer)
MyBase.New(server)
Me._calculateTracker = New CalculateTracker(Me.server, Me)
AddHandler Me._calculateTracker.OperationCompleted, _
AddressOf Me._calculateTracker_OperationCompleted
AddHandler Me._calculateTracker.CalculationProgressChanged, _
AddressOf Me._calculateTracker_CalculationProgressChanged
End Sub
Public Event CalculationComplete As _
AsyncGen.AsyncCompletedEventHandler(Of Double)
Public Event CalculationProgressChanged As _
AsyncGen.ProgressChangedEventHandler(Of Double)
Public Function Calculate(ByVal argument As Double, _
ByVal userState As Object) As Double
Dim value As Double
Me._calculateTracker.CreateOperation(userState)
Try
value = Me.server.Calculate(argument, userState, Me._calculateTracker)
Finally
Me._calculateTracker.CompleteOperation(userState)
End Try
Return value
End Function
Public Sub CalculateAsync(ByVal argument As Double, ByVal userState As Object)
Me._calculateTracker.CreateOperation(userState)
Dim d As CalculateDelegate = AddressOf Me.server.Calculate
d.BeginInvoke(argument, userState, Me._calculateTracker, _
AddressOf Me._calculateTracker.PostOperationCompleted, userState)
End Sub
Public Sub CalculateAsyncCancel(ByVal userState As Object)
If Me._calculateTracker.TryCancelOperation(userState) Then
Return
End If
Throw New System.ArgumentException
End Sub
Private Sub _calculateTracker_CalculationProgressChanged_
(ByVal sender As Object, ByVal args As _
AsyncGen.ProgressChangedEventArgs(Of Double))
RaiseEvent CalculationProgressChanged(Me, args)
End Sub
Private Sub _calculateTracker_OperationCompleted(ByVal sender As Object, _
ByVal args As AsyncGen.AsyncCompletedEventArgs(Of Double))
RaiseEvent CalculationComplete(Me, args)
End Sub
Private Class CalculateTracker
Inherits OperationTracker(Of IServer, Client, CalculateDelegate, Double)
Implements IServerCallbacks
Public Sub New(ByVal server As IServer, ByVal client As Client)
MyBase.New(server, client)
End Sub
Public Event CalculationProgressChanged As _
AsyncGen.ProgressChangedEventHandler(Of Double)
Protected Overrides Sub CallEndInvoke(ByVal d As CalculateDelegate, _
ByVal iar As System.IAsyncResult, ByRef output As Double)
output = d.EndInvoke(iar)
End Sub
Protected Overridable Sub OnCalculationProgressChanged(ByVal args As Object)
RaiseEvent CalculationProgressChanged(Me.client, CType_
(args,AsyncGen.ProgressChangedEventArgs(Of Double)))
End Sub
Public Function ReportCalculationProgress_
(ByVal approximateResult As Double, ByVal percentage As Integer, _
ByVal userState As Object) As Boolean Implements _
IServerCallbacks.ReportCalculationProgress
Me.PostProgress(Of Double)(AddressOf Me.OnCalculationProgressChanged, _
percentage, approximateResult, userState)
Return Me.IsOperationCancelled(userState)
End Function
End Class
Delegate Function CalculateDelegate(ByVal argument As Double, _
ByVal userState As Object, ByVal callbacks As IServerCallbacks) As Double
End Class
End Namespace
C++/CLI
// --- Source file: Client.h ---
#pragma once
#using <mscorlib.dll>
using namespace System::Security::Permissions;
[assembly:SecurityPermissionAttribute(SecurityAction::RequestMinimum,
SkipVerification=false)];
namespace TestAssembly {
using namespace System;
using namespace System::ComponentModel;
using namespace System::Diagnostics;
using namespace AsyncGen;
using namespace System;
ref class Client;
public ref class Client : public ClientBase<TestAssembly::IServer^ >
{
private : ref class CalculateTracker;
private: TestAssembly::Client::CalculateTracker^ _calculateTracker;
private : delegate System::Double CalculateDelegate
(System::Double argument, System::Object^ userState,
TestAssembly::IServerCallbacks^ callbacks);
public: event AsyncGen::AsyncCompletedEventHandler
<System::Double >^ CalculationComplete;
public: event AsyncGen::ProgressChangedEventHandler
<System::Double >^ CalculationProgressChanged;
public: Client(TestAssembly::IServer^ server);
public: System::Double Calculate
(System::Double argument, System::Object^ userState);
public: System::Void CalculateAsync
(System::Double argument, System::Object^ userState);
public: System::Void CalculateAsyncCancel(System::Object^ userState);
private: System::Void _calculateTracker_CalculationProgressChanged
(System::Object^ sender,
AsyncGen::ProgressChangedEventArgs<System::Double >^ args);
private: System::Void _calculateTracker_OperationCompleted
(System::Object^ sender,
AsyncGen::AsyncCompletedEventArgs<System::Double >^ args);
private : ref class CalculateTracker :
public OperationTracker<IServer^, TestAssembly::Client^,
TestAssembly::Client::CalculateDelegate^, System::Double >,
public TestAssembly::IServerCallbacks
{
public: event AsyncGen::ProgressChangedEventHandler
<System::Double >^ CalculationProgressChanged;
public: CalculateTracker(IServer^ server, TestAssembly::Client^ client);
protected: virtual System::Void CallEndInvoke
(TestAssembly::Client::CalculateDelegate^ d,
System::IAsyncResult^ iar, System::Double %output) override;
protected: virtual System::Void OnCalculationProgressChanged
(System::Object^ args);
public: virtual System::Boolean ReportCalculationProgress
(System::Double approximateResult, System::Int32 percentage,
System::Object^ userState) sealed;
};
};
}
// --- Source file: Client.cpp ---
#include "StdAfx.h"
#include "Client.h"
namespace TestAssembly
{
inline Client::Client(TestAssembly::IServer^ server) :
ClientBase<TestAssembly::IServer^ >(server)
{
this->_calculateTracker = (gcnew TestAssembly::Client::CalculateTracker
(this->server, this));
this->_calculateTracker->OperationCompleted +=
gcnew AsyncGen::AsyncCompletedEventHandler<System::Double >
(this, &TestAssembly::Client::_calculateTracker_OperationCompleted);
this->_calculateTracker->CalculationProgressChanged +=
gcnew AsyncGen::ProgressChangedEventHandler<System::Double >(this,
&TestAssembly::Client::_calculateTracker_CalculationProgressChanged);
}
inline System::Double Client::Calculate(System::Double argument,
System::Object^ userState)
{
System::Double __identifier(value);
this->_calculateTracker->CreateOperation(userState);
try
{
__identifier(value) = this->server->Calculate
(argument, userState, this->_calculateTracker);
}
finally
{
this->_calculateTracker->CompleteOperation(userState);
}
return __identifier(value);
}
inline System::Void Client::CalculateAsync(System::Double argument,
System::Object^ userState)
{
this->_calculateTracker->CreateOperation(userState);
TestAssembly::Client::CalculateDelegate^
d = gcnew TestAssembly::Client::CalculateDelegate
((cli::safe_cast<TestAssembly::IServer^ >(this->server)),
&TestAssembly::IServer::Calculate);
d->BeginInvoke(argument, userState, this->_calculateTracker,
gcnew System::AsyncCallback(this->_calculateTracker,
&TestAssembly::Client::CalculateTracker::PostOperationCompleted),
userState);
}
inline System::Void Client::CalculateAsyncCancel(System::Object^ userState)
{
if (this->_calculateTracker->TryCancelOperation(userState))
{
return;
}
throw (gcnew System::ArgumentException());
}
inline System::Void Client::_calculateTracker_CalculationProgressChanged
(System::Object^ sender,
AsyncGen::ProgressChangedEventArgs<System::Double >^ args)
{
this->CalculationProgressChanged(this, args);
}
inline System::Void Client::_calculateTracker_OperationCompleted
(System::Object^ sender,
AsyncGen::AsyncCompletedEventArgs<System::Double >^ args)
{
this->CalculationComplete(this, args);
}
inline Client::CalculateTracker::CalculateTracker
(IServer^ server, TestAssembly::Client^ client) :
OperationTracker<IServer^, TestAssembly::Client^,
TestAssembly::Client::CalculateDelegate^,
System::Double >(server, client)
{
}
inline System::Void Client::CalculateTracker::CallEndInvoke
(TestAssembly::Client::CalculateDelegate^ d,
System::IAsyncResult^ iar, System::Double %output)
{
output = d->EndInvoke(iar);
}
inline System::Void Client::CalculateTracker::OnCalculationProgressChanged
(System::Object^ args)
{
this->CalculationProgressChanged(this->client,
(cli::safe_cast<AsyncGen::ProgressChangedEventArgs
<System::Double >^ >(args)));
}
inline System::Boolean Client::CalculateTracker::ReportCalculationProgress
(System::Double approximateResult, System::Int32 percentage,
System::Object^ userState)
{
this->PostProgress<System::Double >
(gcnew System::Threading::SendOrPostCallback(this,
&TestAssembly::Client::CalculateTracker::OnCalculationProgressChanged),
percentage, approximateResult, userState);
return this->IsOperationCancelled(userState);
}
}
历史
- 2008 年 11 月:初始版本
参考文献
[1] Ingo Rammer,《高级 .NET Remoting (C# 版)》,Apress © 2002
[2] Juval Löwy,《编程 WCF 服务》,O'Reilly © 2007