用于任务流控制和并行计算的紧凑型框架。






4.97/5 (18投票s)
本文介绍了一个用于各种控制、模拟和测试应用程序中操作流程管理的紧凑框架。
目录
引言
任务(操作)流程控制在许多软件项目中起着重要作用。其可靠有效的实现可能对项目成功至关重要。几年前,我曾发表过一篇题为《用于并行计算的微型框架》的文章。虽然该作品的主要方法保持不变,但一些概念正在发展,设计也明显不同。主要类型的实现已得到简化,并且为基础架构添加了更多重要实用的功能。我的目的是使本文自成一体,独立于其前身,因此其中一些概念会重复。本框架的主题是控制由独立操作组成的进程,这些操作可以顺序或并行执行。此类进程的示例包括机器控制、复杂软件测试、金融流程等。代码是用 C# 编写的,目标是 .NET Core,但可以轻松移植到其他语言。它已在 Windows 和 Linux(Ubuntu 18.04)上进行了测试。
主要概念
本框架的两个主要实体是命令(Command)和处理器(Processor)(因此我称之为处理器-命令框架,简称PCF)。命令是指实现ICommand
接口的任何类。该接口的主要方法是Do()
。此方法执行命令的有用活动。处理器是指负责命令的正确排队和执行其Do()
方法的类型。根据它们的排队方式,可以顺序或并行调用这些方法。处理器负责执行Do()
执行所需的所有操作,例如错误处理、日志记录等。
本框架的核心思想是清晰严格地分离命令和处理器。这种方法在技术和组织上都有几个优点。这些优点是
- 命令实现高度灵活。包含命令的程序集(DLL)可以在运行时加载。
- 与处理器相关的代码相对稳定。一旦编写完成,几乎不需要更改。这将大大减少测试量。
- 虽然处理器开发需要多线程和同步,但命令代码的编写通常(但不总是!)更简单,但需要更多关于活动领域知识。因此,在许多情况下,命令可以由技术支持和 QA 人员开发。
- 该框架适用于各种领域的任务流程。其活动领域由其命令集定义,通常在运行时动态加载。
设计
框架主要元素之间的关系如下图所示
基础设施组件
下表显示了基础设施组件及其简要说明
组件 (Component) | 描述 |
AsyncLockLib | 用于async -await 方法的同步类型 |
CommandLib | 与Command 和State 相关的接口和类型 |
CommandsAssemblyLoaderLib | 动态命令 DLL 加载器 |
ConfigurationLib | 应用程序配置支持 |
LogInterfaceLib | ILog 接口 |
LogLib | 基于 NLog 的日志记录实现 |
ProcessorLib | 与Processor 和队列相关的类型 |
SerializationLib | 对象序列化支持 |
SignalRBaseHubClientLib | SignalR 中心客户端的基类 |
SignalRBaseHubServerLib | 提供对象流的 SignalR 中心基类 |
StreamingDataProviderLib | 流数据提供程序的接口和基类 |
TcpHelperLib | TCP通信组件 |
命令
在本框架的范围内,每个动作都是一个命令。它可以是与域相关的过程,例如进行一些测量或与数据存储交互。或者命令可以调用远程服务。这可以通过阻塞模式(远程方法调用获取服务响应)或异步模式(响应稍后由另一个对象接收)来完成。异步调用将在下面的状态章节中讨论。命令还可以切换某些操作模式。它可以创建新命令,甚至创建新处理器。本框架中的日志记录也作为顺序命令操作(请参阅下面的详细信息)。
所有从实现ICommand
接口的abstract
基类Command
派生的命令,其主要方法Do()
如上所述。要执行命令,首先应该由相应的处理器将其入队。为了灵活性,命令可以放置在单独的程序集中,并在运行时动态加载。这种加载是通过CommandsAssemblyLoaderLib
基础设施组件的CommandsAssemblyLoader
类进行的。该类还提供了实现ICommandFactory
接口的public
方法CreateCommand()
,用于从加载的程序集中创建命令。
当动态加载包含命令的程序集时,调用者代码中不存在包含命令的构造函数。因此,命令可以通过三种方式创建。第一种方式是使用反射和Activator.CreateInstance()
方法。这种方法不需要额外的代码,但速度非常慢,因此不适合频繁创建的命令。第二种方法是使用Expression.Lambda()
方法,该方法在CommandLib
基础设施组件的static
类CommandCreator
中实现。与直接使用反射相比,这种方法速度更快,但与构造函数调用相比效率仍然较低。因此,如果命令类型需要频繁实例化,那么实现CommandFactory : ICommandFactory
类将非常有用,其CreateCommand()
方法会调用所需的命令的构造函数。CommandsAssemblyLoader
类型创建命令的方式如下。首先,在其Load()
方法中,加载命令程序集后,它会尝试在该程序集中查找实现ICommandFactory
接口的命令工厂。如果找到该类型,则使用反射实例化一次,并将其保留在CommandsAssemblyLoader
实例中。创建命令时,CreateCommand()
方法会尝试使用此命令工厂(如果找到)。如果未找到命令工厂或命令工厂未能创建所需的命令,则使用 Expression Lambda 技术。
Processor
ProcessorLib
组件负责运行命令。它为命令的排队和执行提供了环境。命令根据其优先级通过Processor
和ProcessorQueue
类型进行排队。Processor.ProcessAsync()
执行Command.Do()
方法。
private Task<ParallelLoopResult> ProcessAsync(params ICommand[] commands)
{
if (commands == null)
return null;
return Task.Run(() =>
{
return Parallel.ForEach(commands, command =>
{
if (command == null || command.IsProcessed || command.Err != null)
return;
var commandDescription =
$"COMMAND Type: \"{command.GetType()}\",
Id:\"{command.Id}\", Priority: {command.Priority} ";
_log?.Debug($"{_logPrefix}{commandDescription} - BEGIN.");
try
{
command.Do();
command.IsProcessed = true;
_log?.Debug($"{_logPrefix}{commandDescription} - END.");
}
catch (Exception e)
{
command.Err = new Error { Ex = e };
_log?.Error($"{_logPrefix}{commandDescription} produced the following exception: ",e);
try
{
_actionOnException?.Invoke(command, e);
}
catch (Exception ex)
{
var msg = $"{_logPrefix}Exception in exception handler for
command \"{command.Id}\".";
_log?.Fatal(msg, ex);
throw new Exception(msg, ex);
}
}
finally
{
if (command.IsProcessed)
{
var now = DateTime.Now;
command.ProcessingLag = now - command.TimeStamp;
command.TimeStamp = now;
SetMaxProcessingLag(command.GetType(), command.ProcessingLag);
}
_log?.Debug($"{_logPrefix}{command}");
}
});
});
}
在其构造函数中,Processor
类获取日志记录实现(如果需要),以避免与LogLib
组件发生循环引用,并可选地在执行Command.Do()
方法时发生的异常上执行操作。Processor
的方法
public void Enqueue(params ICommand[] commands)
和
public void EnqueueParallel(params ICommand[] commands)
允许调用者顺序或并行地入队命令数组。
注意:通过同一个Enqueue()
方法入队的顺序执行命令将根据其各自的优先级执行。然而,通过同一个EnqueueParallel()
方法入队的并行执行命令将根据其最高优先级执行。
状态
类State
属于CommandLib
组件。顾名思义,它维护系统的状态。这是一个具有受保护多线程访问属性的字典单例。
private static readonly ConcurrentDictionary<string, object> _cdctProperties =
new ConcurrentDictionary<string, object>();
命令从State
获取数据进行处理,并将其部分输出放入其中以供后续命令使用。State
还包含用于接收和处理异步消息(事件)以及来自服务的流数据的处理程序。这些处理程序对象由命令创建,用于与服务建立连接。在收到来自服务的异步消息后,它们通常会入队其他命令来处理这些消息。这将在我们下面的软件示例讨论中展示。
实用功能
配置
ConfigurationLib
基础设施组件提供对从专用 JSON 文件读取配置的支持。类Configuration
将其构造函数的参数设置为该文件的路径,并提供方便的方法从文件中读取数据。默认情况下,JSON 配置文件名的模式如下:<应用程序名称>.config.json。
序列化
SerializationLib
基础设施组件提供使用Stream
(static
类SerializationBin
)进行二进制序列化的方法,以及一组有用的字节和 JSON 序列化扩展方法。
日志记录
日志记录由两个组件提供,即LogInterfaceLib
和LogLib
。前者定义了接口ILog
,而后者使用类型Log : ILog
提供了其实现。此实现基于众所周知的 NLog 产品,并使用适当的配置文件<应用程序名称>nlog.config。日志记录器在内部使用处理器-命令范例,及其专用的处理器和命令类LogCommand : Command
。
沟通
为命令和服务之间的通信提供易于使用的组件非常重要。本基础结构支持开箱即用的 TCP 套接字和 SignalR 技术通信。
TCP套接字
组件TcpHelperLib
提供了 TCP 套接字通信的基础结构,包括远程方法调用和数据流的可能性。它还可以使用上面讨论的Configuration
类型通过 JSON 进行配置。服务器和客户端的主要公共类型是TcpHelper
。此 TCP 通信机制在此组件中实现,并在我的 Code Project 文章《TCP Socket Off-the-shelf - Revisited with Async-Await and .NET Core》中进行了详细描述。
SignalR
SignalR
是一个库,允许用户通过 WebSockets(可用时首选)或 HTTP 长轮询组织双工通信。组件SignalRBaseHubServerLib
提供 SignalR 服务器基础结构,而组件SignalRBaseHubClientLib
代表客户端。这里使用的 SignalR 通信技术(经过非常小的更改)在我之前的 Code Project 文章《Simple SignalR Data Streaming Infrastructure Out-of-the-Box》中有详细描述。
数据流
本文中使用的以及上面讨论的两种通信技术都为用户提供了数据流的基础结构。流式传输是指客户端向服务器注册并异步订阅服务器发送的数据的过程。服务器拥有负责生成数据的提供商。一旦生成了新的数据,数据提供商就会将其发送给所有订阅了这些数据的客户端。基础结构组件StreamingDataProviderLib
提供了通用的基类StreamingDataProvider<T>
供用户流数据提供商使用。此基类使用将要流式传输的数据类型进行参数化。调用StreamingDataProvider<T>.Current
的 setter 会触发将新数据发送给订阅者。在此过程中,会检查订阅者的有效性,并将无效订阅者从订阅者列表中清除。
同步
由于大多数地方的多线程都基于async
-await
范例,因此需要适当的同步机制。基础结构组件AsyncLockLib
提供了来自此处的适当同步类AsyncLock
和来自此源代码的AsyncAutoResetEvent
。
Machine
MachineLib
组件不是框架基础结构的一部分。但它是唯一的类型Machine
,它在Processor
之上提供了一些有用(但非必需)的超级结构。Machine
在其当前实现中的构造函数加载基本配置,创建主处理器并入队第一个命令。
测试示例
讨论
文件夹Tests包含两个控制台测试应用程序。应用程序CommanderTest
包含三个命令类。其Main()
方法以各种组合方式,使用具有不同优先级的处理器TestCommand
入队。命令SuspendCommand
和ResumeCommand
用于暂停然后恢复处理器功能。请注意,暂停命令由主处理器执行。SuspendCommand
的Do()
方法创建一个新处理器,稍后执行ResumeCommand
,并将此处理器放入State
。为了恢复主处理器,ResumeCommand
会与恢复处理器一起入队。该命令从State
获取主处理器并执行其Resume()
方法。当然,在这种简单情况下,我们可以直接执行主处理器的resume
方法,而无需额外的处理器和命令,但在更复杂的情况下,使用额外的处理器和命令可能更合适。分析命令执行顺序然后尝试不同的命令入队组合很有趣。所有命令几乎立即入队,因为Processor
类的Enqueue()
和EnqueueParallel()
方法依次调用内部类ProcessorQueue
的Enqueue()
方法。后者将命令入队,调用Run()
方法,该方法在线程池的线程中执行,并立即返回。然而,通过第一个Processor.Enqueue()
方法调用入队的首个最高优先级命令将首先执行(在测试中是命令S-12
)。其余命令将按照其优先级和入队顺序执行。测试的预期输出作为注释在文件ProgramCommanderTest.cs的末尾提供。
第二个测试是MainApp
应用程序。它与TcpSvc
和SignalRSvc
服务一起工作。其Main()
方法创建Machine
类型的实例。Machine
类的构造函数读取 JSON 配置文件MainApp.config.json。从Machines
数组中,它选择具有给定"id": "Machine1"
的机器。根据配置,构造函数从目录$(OutputDir)/CommandLibs加载两个外部命令程序集(DLL)GeneralCommandsLib.dll和StreamHandlersCommandsLib.dll,并创建MainProcessor
。最后,Machine
构造函数将InitCommand
与MainProcessor
一起入队。
注意:动态加载的程序集可能引用其他程序集,尤其是基础结构程序集。因此,建议主应用程序引用所有基础结构程序集。
InitCommand
的Do()
方法以并行方式将命令CreateTcpClientCommand
和CreateSignalRClientCommand
与MainProcessor
一起入队。每个命令都会创建一个适当的通信客户端对象,以连接到TcpSvc
和SignalRSvc
服务。两个命令的Do()
方法都会调用其各自的异步方法async void ConnectAsync()
并立即返回。这样,冗长的连接建立过程(可能包含多次重试)不会阻塞MainProcessor
的执行流程(实际上,由于命令的设计方式,无论它们是顺序入队还是并行入队,都不会有影响)。在建立与服务的连接后,相应的客户端对象会被放置在State
中。
TCP 和 SignalR 连接对象虽然本质上不同,但从用户角度来看,操作方式基本相同。它们提供了双工远程方法调用以及从服务到客户端的数据流功能。为了强调这种相似性,TcpSvc
和SignalRSvc
服务都使用相同的数据提供程序组件DtoProviderLib
。它流式传输ModelLib
组件中定义的Dto
对象。TCP 和 SignalR 通信均可在文件<应用程序名称>.config.json中配置。对于 TCP,TcpHelper
类的构造函数会在服务器和客户端读取配置文件的相应部分。对于 SignalR,基础结构类HubClient
的构造函数会读取客户端的配置部分,而创建中心的服务(在本例中是SignalRSvc
)会读取服务器的配置部分。SignalR 配置提供了一个布尔参数"isSecureProtocol"
,它定义了是创建 HTTP("isSecureProtocol": false
)还是 HTTPS("isSecureProtocol": true
)连接。
如何运行?
源代码可以在 Visual Studio 2017 中使用PCF.sln解决方案加载和构建。MainApp
的输出目录将是$(OutputDir) = $(SolutionDir)/_bin/netcoreapp<version>。根据MainApp.config.json文件中的配置,应该在运行时加载的程序集将被放置在目录$(OutputDir)/CommandLibs中。成功构建后,您可以通过其各自的输出目录中的标准命令dotnet,在 Visual Studio 中或使用控制台运行服务TcpSvc.dll、SignalRSvc.dll和应用程序MainApp.dll。根据其 NLog 配置文件<DLL 名称>.nlog.config,应用程序将记录到各自的控制台,并写入文件$(OutputDir)/Log/<DLL 名称>-<日期 YYYY-MM-DD>.txt。
本文的演示同时也是部署和运行在 Linux 上的所有必需文件的集合。这种集合是通过运行服务和MainApp
的_publish.cmd文件生成的。这些文件包含以下命令
dotnet publish -o publish -c Release
此命令会在publish目录中生成运行应用程序所需的所有文件(出于我不知道的原因,MainApp.config.json配置文件未自动包含,需要手动复制)。这些文件的内容,对于TcpSvc
、SignalRSvc
和MainApp
,应该被复制到一个单独的演示文件夹中。此外,在同一个文件夹中,应该放置包含主应用程序运行时加载的程序集(发布版本)的CommandLibs目录。然后,可以使用普通的dotnet命令从演示文件夹中启动服务和主应用程序。
要在 Linux 上运行演示,首先需要在 Linux 环境中安装适当版本的 .NET Core(目前是 2.2 版本)。Ubuntu 18.04 x64 的安装过程在此处进行了描述。我已在 Oracle VirtualBox 中安装的 Ubuntu 18.04 上使用MobaXterm
应用程序来部署和运行软件,对TcpSvc
、SignalRSvc
和MainApp
进行了测试。
结论
本文介绍了一个可扩展的处理器-命令框架(PCF),用于操作流程控制。该框架提供了灵活的命令排队和执行机制,同时强制区分处理器和命令。处理器部分是稳定的,而命令通过实现各种操作来确保灵活性。PCF 还提供了开箱即用的配置、日志记录、TCP 和 SignalR 通信等基础设施。这个紧凑且易于使用的框架适用于具有操作流程的活动领域,并可与 Actor 模型产品和消息代理结合使用。