应用长期运行的活动对象模式
如何使用活动对象模式来执行长期运行的任务
引言
我曾经遇到过一个问题,需要将文件从客户端应用程序上传到应用程序服务器。文件大小可能各不相同,并且服务可能会因为数据超出最大大小而拒绝文件或导致超时。客户端应用程序在执行长时间运行的文件上传任务时,需要对其他高优先级事件保持响应,例如处理应用程序服务器的事件、渲染用户界面等。该任务需要在完成前取消,以显示任务的完成进度。重要的是,传输要从错误发生的地方继续,而不是从头开始。

问题
一个进程需要执行一个长时间运行的任务(一个持续时间不可预测的任务),同时对高优先级事件保持响应。该任务应该被分解成更小的子任务。该任务可以随时中断、暂停,然后恢复。
解决方案
长运行活跃对象模式是解决这个问题以及其他类似任务的最佳方案。
结构
活跃对象与传统(被动)对象非常相似。它们有私有字段并提供操作这些数据的方法(图 2)。

唯一明显的区别在于,每个活跃对象都在其自己的控制线程中运行,并且被调用的方法不会阻塞客户端(调用者),而是异步执行。活跃对象具有公共接口和内部实现。公共接口通常被称为代理。它对客户端可用,负责接受方法调用,并且它与被动对象具有理想上相同的接口。代理将方法调用转换为消息并将它们放入队列(消息队列),并以未来对象的形式异步返回可能的结果。消息包含执行方法所需的所有必要信息、任务优先级等等。一个通常称为调度器或计划器的特殊对象会根据特定算法从队列中取出并处理每个传入消息,然后调用服务对象的实际方法。服务对象本身是实际的方法实现,或者换句话说,是在应用活跃对象模式之前使用的被动对象方法。活跃对象通过顺序地从队列中取出并处理排队的请求,并且始终在单个控制线程中执行服务对象的方法,从而本质上是线程安全的。活跃对象的线程安全性在很大程度上消除了手动锁定和互斥机制的需要(图 3)。

客户端可以随时尝试获取未来对象的结果。如果结果尚未计算,客户端将被迫暂停执行并等待活跃对象执行的结果。或者,可能会出现活跃对象执行导致错误的情况。在这种情况下,未来对象将进入错误状态并获取有关发生异常的完整信息。客户端可能会尝试取消方法的执行。如果这发生在消息被处理之前,调度器将取消服务对象的方法,并将未来对象的状态设置为已取消。未来对象的可能状态如图 4 所示。

动态
在对活动对象进行任何工作之前,必须创建一个调度器或计划器。调度器需要一个算法(策略)来从队列中获取下一条消息。该算法可以在启动调度器之前或期间设置。调度器与代理一起实现了生产者-消费者模式。调度器与代理共享一个公共消息队列并处理传入消息。调度器可以暂停然后恢复。图 5 显示了调度器的活动图。

如果一个或多个消息进入队列,调度器将启动工作线程(工作流)来处理它们。如果队列中没有消息,工作线程将保持待机模式。调度器根据选定的算法,从队列中接收第一条消息并调用其 `AllowInvoke()` 方法。此方法可以允许或拒绝执行服务对象的方法。因此,为安全调用服务对象实现了安全机制(守卫)。如果此时无法处理该消息,调度器将从队列中获取下一条消息。如果守卫允许执行,调度器将调用消息的 `Invoke()` 方法。如果执行出错,将应用另一个 `Failed()` 方法,并且 Future 对象将被设置为 `HasFailed` 状态。如果执行成功,调度器将 Future 对象的状态设置为 `HasSuccessed`,并且 Future 对象将分配 `Invoke()` 方法的执行结果。客户端也有可能取消消息的处理(`Cancelled`)。一旦调度器安装完毕,我们就可以执行活跃对象的方法。
Using the Code
为了解决一个特定问题,即在后台将不定大小的文件上传到应用程序服务并具有取消操作的能力,开发了以下类
`ProcessFileByPiecesCommand` 是一个实现了 `IServant` 接口的服务对象。这个类的主要目标是将文件分成相等的部分并连续读取它们。在服务对象将上一段文件发送到应用程序服务之后(此实现在此示例中省略),代理会自动创建下一条消息并将其添加到消息队列中。这个过程一直持续,直到文件的所有部分被完全读取和发送,或者由于用户的取消或发生错误而中止。
`ActiveObjectProxyServant` 是一个类适配器,用于将实现 `IServant` 接口的对象作为活跃对象来处理。代理的纯粹实现是不存在的。我们可以使用 Castle Project 或其他框架的动态代理来创建代理。这个适配器可以由代理调用。同一个适配器可以用于复制另一个文件、第二个、第三个等等。因此,该适配器有一个未来对象列表,用于存储每个方法调用的所有最终和中间结果。
`ActiveScheduler` 是一个处理消息的调度器。调度器可以执行系统中其他活跃对象的消息。例如,上传文件的任务优先级较低,因此如果队列中有一个优先级更高的新消息,调度器将优先处理它。因此,调度器将首先处理关键任务,然后才处理低优先级任务。
`PriorityAndLIFO` 是一个实现了 `IQueueSortStrategy` 接口的类。它根据消息的优先级和在队列中的序号对消息进行排序。
`Future` 及其派生类用于存储活跃对象方法执行的状态和结果。
`ServantOperation` 是一条消息(`IOperation`、`Operation`)。它包含有关操作的信息、其优先级以及对未来对象的引用。活跃对象的动态如图 6 所示

关注点
在实现过程中,除了 `Active Object` 之外,还使用了其他设计模式,它们是:`Strategy`、`Factory method`、`"Fail First"`(断言,Assertion)、`Command` 和 `Proxy`。在开发过程中,使用了以下 .NET Framework 功能,特别是版本 3.5:反射、自定义属性、运算符重载、多线程、锁(`ReaderWriterLockSlim`、`Monitor`),当然,还有强大的面向对象编程。此外,还使用了任何优秀代码的属性,如单元测试(NUnit)和代码文档。