65.9K
CodeProject 正在变化。 阅读更多。
Home

用于并行计算的轻量级框架

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (70投票s)

2014年1月13日

CPOL

14分钟阅读

viewsIcon

84009

downloadIcon

2599

本文介绍了一个适用于管理机器和过程控制、游戏、模拟器等的运行流程的并行计算的简单框架。

关于文章更新

最初,我写这篇文章是为了介绍一个用于机器和过程控制的小型框架。但根据一位读者的明智建议,我决定扩大其范围。事实上,该框架提供了简单的并行计算手段。

引言

在我多年的软件开发生涯中,曾多次处理过与过程/机器控制相关的项目。每次接手项目,我都会观察到一个惊人相似的景象。一家相对较小的公司设计和制造某种需要控制的机器或过程。该公司在其活动领域拥有非常好的专家、先进的电子和软件工程师。机器的机械部分在低级别上得到了充分控制。但当涉及到基于 PC 的流程控制软件时,项目就会遇到问题。这种软件很难理解和维护,更不用说适应新版本/新一代机器了。那么,流程控制软件出了什么问题呢?

依我看,问题根源在于此类软件的设计和开发的一般方法。我所见过的每家公司都从头开始开发自己的流程控制软件。而且,乍一看确实很简单:只需在定义明确的操作之间实现一些连接!PC 软件工程师的主要精力集中在开发操作本身——实现与控制器连接的协议、零件运动编程、摄像机控制及后续图像分析等。当所有这些开发阶段都成功完成后,突然发现它们之间的相互关系并不如预期。在尝试实现适当的机器行为时,开发人员面临着僵化、临时编写的代码,这些代码极难调试和修复。更糟糕的是,要调试软件,所有开发人员都需要共享相同的昂贵硬件。在某些情况下,在硬件上进行调试根本不可能。例如,考虑一个伺服电机,它的工作周期是n秒,然后空闲m秒。增加其连续工作时间会导致其过热并随后损坏。因此,开发人员无法在断点处停止软件进行调试,让电机长时间处于负载状态。

未能开发出足够的操纵流控制软件会导致公司遭受巨大的(相对于其预算而言)损失。那么,应该采取什么措施来防止这些损失并提高流程控制软件的质量呢?

背景

为了实现这一目标,我建议在开发流程控制软件时遵循简单的规则。

  • 将整个过程分解为独立的操作(以下称为命令)。
  • 使用通用的流程控制引擎。该引擎处理命令并分析处理结果。该引擎基于通用框架。该框架为顺序和并行命令执行、机器状态分析以及基于该分析生成新命令提供了统一的机制。此外,该框架还支持命令优先级、命令执行暂停/恢复、错误处理和日志记录等功能。
  • 清晰地将命令与其处理器分离。除了更清晰的逻辑和更容易理解之外,这条规则还提供了巨大的组织优势,通过将开发任务分配给乐于处理并行处理和线程同步的软件人员,以及理解命令性质并能够编写命令执行方法的工程师。
  • 每个命令都提供其执行和错误处理的虚拟方法。这些方法在派生类中实现,用于特定命令,并由命令处理器调用。
  • 每个命令都可以以实际模式或模拟模式执行。命令模拟允许开发人员通过模拟部分或全部命令执行来测试其软件。模拟方法可以针对机器模拟器,也可以模拟机器本身。
  • 控制引擎由一个单例处理器管理器管理。它主要负责创建和维护处理器,以及处理由命令执行引起的已更改状态事件。
  • 内置的错误处理和日志记录机制对引擎至关重要。

我相信,通过遵循上述规则,一家小型机器制造公司将能够在其机器操作流程控制软件的开发中取得良好成果。在本文中,我将介绍一个基于上述规则的简单通用框架,并提供两个使用示例。

在设计框架时,开发人员通常会面临两难境地:要么为用户提供最大程度的工具和方法,要么只提供最关键和最复杂的方法,并让用户拥有最大的灵活性。在现实生活中,这总是一种权衡。对于这个小型的流程控制框架,我选择了一种更接近后一种方法的设计。决定设计一个“极简”框架。这意味着框架应仅包含对操作流程管理至关重要的功能。我认为在大多数情况下,代码的简单性和可维护性比某些性能优势更重要。例如,可以考虑 Microsoft Robotics Developer Studio [1]。尽管它具有非常有用的高级功能(例如其分布式轻量级服务、可视化编程语言、出色的可视化模拟环境等),但由于安装、使用和学习的复杂性,这个庞大的框架在行业中的普及度相对较低。

设计

以下框图说明了设计。

该框架主要由五个主要类型组成,即

  1. ProcessorManager
  2. Processor
  3. 命令
  4. ParallelCommand
  5. Log

ProcessorManager 是一个单例类,负责创建和管理Processor。它有一个内部类ProcessorManager.PriorityQueue,该类根据优先级处理处理器。对于每个优先级,ProcessorManager都有一个PriorityQueue类型的实例,提供可用处理器的队列以及该优先级当前正在运行的处理器的字典。PriorityQueue类型还支持暂停/恢复执行低于当前运行优先级的所有处理器的机制。此功能默认激活,但可以通过创建带有 `Initialize()` 方法的 ProcessorManager 来禁用,并将其参数设置为 `false`。实际根据优先级挂起/恢复处理器是由 ProcessorManager 执行的。ProcessorManager类型的一个关键功能是根据受控过程的当前状态决定命令执行流程。为此使用 `OnStateChanged()` 方法。它实现了由用户提供的决策制定委托的同步调用。通过适当的 ProcessorManager 类型索引器提供委托。

用户可以决定同时创建多个具有相同优先级的 Processor 实例(通常在应用程序开始时)。这可能很有用,因为创建新处理器意味着创建其线程,这是一个相对昂贵的操作。ProcessorManager类型的 `IncreaseProcessorPool()` 方法创建新处理器。PriorityQueue类型支持处理器池。可以从池中取出一个处理器并将其返回到池中。这样做是为了避免创建新线程,而新线程在创建新处理器时是必需的。

Processor类型的主要工作是处理其自己的 Command 队列并执行出队命令。命令的执行发生在每个 Processor 类型实例拥有的单独线程中。实例的特征是其唯一的 Id 和优先级,优先级本质上是一个整数值(但为了方便起见,可以使用适当的枚举器来表示一些已知值)。Processor 实例在其线程中封装了命令执行的整个机制,包括队列管理、同步、错误处理、日志记录和适当的回调调用。在执行每个命令后,处理器在其线程上下文中调用同步方法 ProcessorManager.OnStateChanged()(如上所述)。由于此方法由所有处理器同步调用,因此执行的委托应快速,以确保良好的性能。在调用 ProcessorManager.OnStateChanged() 之后,如果用户提供了该回调,处理器可以调用其自己的命令执行后回调 OnStateChangedEvent。因此,ProcessorManagerProcessor 的协作提供了具有用户提供的回调的命令执行流程,有效地向用户隐藏了队列和线程同步的一些棘手细节。应在处理器准备好执行命令之前(即在调用 `StartProcessing()` 方法之前)将其分配给该委托。这是通过获取 Processor 类型的实例,将第二个参数设置为 `false`,然后分配 OnStateChangedEvent 委托,最后调用 `StartProcessing()` 方法来实现的。这在代码示例中有所展示。命令入队到处理器意味着它们的异步执行,这意味着入队方法立即返回,并且实际命令处理将在以后由处理器的线程进行。

上面描述的 ProcessorManagerProcessor 类型不由用户更改——只需要提供决策回调。派生自 Command 类型的类型提供过程/机器特定的功能。与 ProcessorManagerProcessor 不同,基于 Command 的类免于同步和其他棘手的事情。它们相当直接。命令仅与被控制对象交互,因此可以由领域专家开发,即使软件技能有限。命令的一个重要特性是它们能够以实际模式或模拟模式执行。派生类应实现适当的虚拟方法 ProcessReal()ProcessMock()

控制框架的一个重要特性是它支持命令的并行执行。这是通过特殊命令类型 ParallelCommand : Command 实现的。在其由 ProcessReal()ProcessMock() 调用的 `ProcessParallel()` 方法中,ParallelCommand 将每个并行执行的命令排队到新获得的单独处理器中。这些处理器在命令执行完成后会自动返回到处理器池。

日志记录使用相同的处理器-命令方法实现。通用类 Log 实现接口 ILog,该接口通过用户提供的派生自 abstract class CommandLogBase : Command 的类进行参数化。在构造函数中,Log 类创建一个专用的日志处理器,并且 ILog.Write() 的每个重载方法都将 CommandLogBase 类型的命令排队到该处理器中。日志处理器的优先级是固定的,用户创建的处理器的优先级应选择以确保控制命令执行和日志记录之间所需的比例。在使用框架一段时间后,我建议除了普通的日志文件外,再使用一个特殊格式的日志文件。每个命令在其开始和结束时都会向此附加日志文件输出一条记录。每个处理器的记录都放在一个单独的列中。因此,此日志文件按时间顺序说明了每个处理器的命令执行情况。尽管目前此类日志记录不是框架的一部分,但它对于理解命令流程和调试非常有用。代码 SampleA 会生成此类文件。

代码示例

上面描述的框架类位于 ParallelCompLib 项目中。位于 Samples 文件夹下的两个示例使用了该框架。两个应用程序都实现了活动领域和日志相关的命令类,这些类派生自 Command,由具有不同优先级的处理器执行。SampleA 应用程序演示了多个顺序和并行命令的执行。命令具有不同的处理持续时间(由 Thread.Sleep() 方法定义)。该示例还显示了处理器返回到池中。SampleA 的日志记录会生成文件 _test.log(包含常规日志输出)和文件 _flow.log(显示每个处理器命令的执行时间,并说明处理器返回到池中及其后续重用)。文件 _flow.log 中的每一行都表示相应命令处理的开始或结束。处理器 ID 在有关每个命令的信息之前以括号显示。文件中的行按时间顺序排列。因此,如果最初只有一个具有特定优先级的处理器被放入池中,那么就可以清楚地看到处理器的重用。有趣的是,通过注释和取消注释文件 Program.cs 开头的条件编译符号,可以观察到命令执行流程的差异。要从 Visual Studio 运行 SampleA,请构建并启动 SampleA 项目。

让我们仔细看看 SampleA。其代码(省略了一些细微之处)如下所示。

//#define _NO_LOWER_PRIORITY_SUSPEND
//#define _BIG_PROCESSOR_POOL

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Drawing;
using System.Threading;
using System.Threading.Tasks;
using IL.ParallelCompLib;

namespace SampleA
{
    class Program
    {
        static void Main(string[] args)
        {
#if _NO_LOWER_PRIORITY_SUSPEND
            ProcessorManager.Initialize(false);
#endif
            ProcessorManager.Instance.Logger = new Log<CommandLog>(LogLevel.Debug);

           
            var priority = PriorityLevel.Low;

#if _BIG_PROCESSOR_POOL
            ProcessorManager.Instance.IncreaseProcessorPool(priority, 15);
#endif
            
            var evSynch = new AutoResetEvent(false);

            CommandLog.dlgtEndLogging = (s) => 
                {
                    if (!string.IsNullOrEmpty(s) && s.Contains("Z") && s.Contains("End"))
                        evSynch.Set();
                };

            var processor1 = ProcessorManager.Instance[priority];

            ProcessorManager.Instance[ProcessorState.CommandProcessed] = (pr, e) =>
                {
                    if (e.Cmd != null && e.Cmd.ProcessState != Command.State.NotYetProcessed && 
                        !string.IsNullOrEmpty(e.Cmd.Name))
                    {
                        // This event handler is always called after command has been processed.
                        // The call is performed in context of thread of the processor 
                        //     caused the event.
                        if (e.Cmd.Name == "M")
                        {
                            processor1.EnqueueCommand(new CommandS("Z"));
                            return;
                        }

                        if (e.Cmd.Name.Contains("->1"))
                            // After end of appropriate command processing processor is 
                            //    returned to pool.
                            // This has to be done as parallel task, outside of the processor's 
                            //    thread context.
                            ProcessorManager.Instance.ReturnToPoolAsync(e.Cmd.Priority, 
                                                                        e.Cmd.ProcessorId);
                    }
                };

            // After being taken from pool, processor2 constitutes different 
            //    instance of Processor type.
            var processor2 = ProcessorManager.Instance[priority, false];
            
            processor2.OnStateChangedEvent += (pr, e) =>
                {
                    if (e.Cmd != null && e.Cmd.ProcessState != Command.State.NotYetProcessed && 
                        !string.IsNullOrEmpty(e.Cmd.Name))
                    {
                        // This event handler is called only before return of this processor 
                        //   to the processor pool.
                        if (e.Cmd.Name == "_ParallelCommand" && 
                            e.Cmd.ProcessState == Command.State.ProcessedOK)
                                processor1.EnqueueCommand(new CommandS("H"));

                        // Usage of evSynch makes command "K" synchronous
                        if (e.Cmd.Name == "K")
                            evSynch.Set();
                    }
                };

            processor2.StartProcessing();

            int si = 0;
            int pi = 0;

            processor2.EnqueueCommand(new CommandS(GetName("S", si++)));
            processor2.EnqueueCommands(new Command[] 
                        { new CommandS(GetName("S", si++)), new CommandS(GetName("S", si++)) });
            processor2.EnqueueCommandsForParallelExecution(new CommandP[] 
                        { new CommandP(GetName("P", pi++)), new CommandP(GetName("P", pi++)) });
            processor2.EnqueueCommands(new CommandS[2] 
                        { new CommandS(GetName("S", si++)), new CommandS(GetName("S", si++)) });
            processor2.EnqueueCommand(new CommandS("K"));

            evSynch.WaitOne();

            ProcessorManager.Instance.ReturnToPool(ref processor2);

            // After processor returned to the pool it is stripped from its 
            //    previous OnStateChangedEvent handler.
                
            int processorsInPool = ProcessorManager.Instance.ProcessorsCount(priority);

            processor2 = ProcessorManager.Instance[priority]; // command processor

            processor2.EnqueueCommands(new CommandS[2] 
                        { new CommandS(GetName("S", si++)), new CommandS(GetName("S", si++)) });
            processor2.EnqueueCommandsForParallelExecution(new CommandP[] 
                        { new CommandP(GetName("P", pi++)), new CommandP(GetName("P", pi++)) });
            processor2.EnqueueCommand(new CommandS("M"));

            evSynch.WaitOne();

            ProcessorManager.Instance.Dispose();
        }

        static string GetName(string name, int n) 
        {
            return string.Format("{0}{1}", name, n);
        }
    }
}

现在,我们来讨论 SampleA_flow.log 日志文件。该文件最信息部分的格式非常简单。每一列代表一个单独的处理器及其线程。处理器 ID 在第一个字段中以括号给出。然后,适当的词语表示每个命令的开始和结束,后面跟着(破折号后)命令 ID 和命令名称。为了简单起见,日志处理器(在本例中其 ID 为 0)、日志命令和 ParallelCommand 已从日志中省略。每个处理器和命令通过递增前一个 ID 来获得其唯一 ID。文件中给出的命令 ID 不是连续的,因为“缺失”的数字属于日志命令。

下面是当 _BIG_PROCESSOR_POOL 被注释掉时,SampleA_flow.log 日志文件的一个片段。在这种情况下,没有预先创建给定优先级的处理器池。

在这种情况下,我们可以看到返回到处理器池的处理器被重用了。processor1 是第一个创建的处理器,但命令(即 HZ)是在一段时间后,在更改事件处理程序中分配给它的。所以这个处理器有 ID (1),但只出现在最后一列。processor2 (ID (2)) 开始对 CommandS 类型的三个命令(即 S0、S1S2)进行顺序处理。然后,该处理器被分配了两个 CommandP 命令(P0P1)用于并行执行。ParallelCommand 实例分别在新建的处理器 (3) 和 (4) 中执行了这些命令。每个 CommandP 命令会创建另一个处理器——(5) 和 (6)——来执行两个连续的 CommandS 类型命令。有趣的是,处理器 (2) 在所有并行命令都结束后才继续处理其下一个连续命令 S3S4K。这是 ParallelCommand 设计所导致的期望行为。然而,在并行命令结束后,由并行命令在处理器 (5) 和 (6) 中启动的顺序命令仍然在运行。ParallelCommand 命令的结束导致将命令 H 入队到处理器 (1)。入队操作发生在处理器 (2) 开始命令 S3 之前,但处理器 (1) 执行命令 H 的实际开始时间更晚。因此,S3 的开始早于 H 的开始。所有处理器 (3)-(6) 都已返回到处理器池。处理器 (3) 和 (4) 由 ParallelCommand 返回,而处理器 (5) 和 (6) 由 ProcessorManager 更改事件处理程序调用方法 ReturnToPoolAsync() 返回。由于以上四个处理器是异步返回到处理器池的,因此它们在池中的新顺序是不可预测的。在示例中,`AutoResetEvent evSynch.WaitOne()` 导致执行等待直到命令 K 结束。然后通过调用 ProcessorManager 的方法 ReturnToPool() 将处理器 (2) 返回到处理器池。接下来的命令由我们已知的从池中取回的处理器执行。

_BIG_PROCESSOR_POOL 被取消注释时,为给定优先级创建了一个包含 15 个处理器的处理器池。现在已使用的处理器也已返回到池中。但预先创建的处理器数量足够多,以至于观察不到这一点。有趣的是,在这种情况下,显示的命令开始编号远高于前一种情况。这是因为创建处理器导致了许多日志命令,这些命令未显示在 _flow.log 文件中。

SampleB 应用程序控制 Simulator WinForms 应用程序。Simulator 使用 [2] 中描述的 DynamicHotSpotControl(经过一些现代化改造)。Simulator 作为 WCF 服务的宿主,接收来自 SampleB 的命令。为了观察 Simulator 状态的变化,使用了 [3] 中描述的“智能轮询”技术。选择这种方法是为了说明处理器 OnStateChangedEvent 和低优先级(低于日志)的持久命令的使用。开始时,命令会导致 Simulator 创建其具有适当状态和动态的视觉对象。然后 Simulator 会通知 SampleB 应用程序关于鼠标左键单击视觉对象的每个事件。ProcessorManager 在其适当的回调中处理 Simulator 状态更改,并以 CommandChangeStateCommandChangeDynamics 响应 Simulator。要在 Visual Studio 中运行 SampleBSimulator,您应该同时构建并将它们作为多个启动项目运行。

演示

解压缩包含演示的文件。要运行 SampleA 的演示,应启动文件 SampleA.exe。对于 SampleB,请先启动 Simulator.exe(因为它托管 WCF 服务器),然后启动 SampleB.exe

讨论

应谨慎使用所呈现框架的灵活性。在应用程序的许多点使用过多的处理器和命令入队可能会导致性能下降,甚至意外的操作流程。即使我们有工具可以使事情复杂化,也不应忽略“保持简单”原则。

结论

本文介绍了一个针对并行计算的简单框架,适用于机器和过程控制、游戏、模拟器等的运行流程管理。该框架提供了顺序和并行命令执行、受控过程状态分析、错误处理和日志记录的机制。它的使用允许开发人员清晰地将命令与执行流程分离,模拟(模拟)部分命令,同时实际执行其他命令。建议的通用框架易于使用,并且可以作为各种活动领域并行计算应用程序的基础。

参考文献

© . All rights reserved.