65.9K
CodeProject 正在变化。 阅读更多。
Home

数据驱动的流程调度

starIconstarIconstarIconstarIconstarIcon

5.00/5 (3投票s)

2019年12月23日

CPOL

10分钟阅读

viewsIcon

3907

downloadIcon

112

本文提出并讨论了一种新的数据驱动的编程思想和方法,旨在提高编程的灵活性、构建性和共享性。

引言

数据驱动的编程方法可以根据不同的数据类型灵活地控制执行逻辑,但目前这种编程方法并未引入调度技术,要执行的逻辑顺序是固定编码在执行程序中的。虽然数据与进程的对应关系可以提取出来单独放在数据库或者通过脚本动态语言来控制以增加灵活性,但如果数据与进程的对应关系无法固定(例如,根据某种规则执行的进程),或者进程的状态需要存储(例如,等待其他数据,待定),现有的处理方法就不再适用,需要引入调度机制。

数据驱动的流程调度包括数据对象、进程(函数)对象和调度器。数据和进程(函数)都可以被调度为对象。进程(函数)不再固定在某一个或几个逻辑处理序列中。进程对象的调度是由数据对象驱动的,并带有输入输出数据对象的类型描述和调度上下文。进程对象的描述类似于函数原型定义,不同的是,在函数原型定义中,输入输出类型没有限制,可能过于笼统(例如:函数输入字符串,这个字符串可能有很多意义,它可以是URL,可以是格式化数据,可以是普通文本,也可以是具有某种意义的序列),且函数执行逻辑与环境之间没有约束(例如:引用或修改全局变量),也不存储运行状态,结果等与调度相关的信息,导致可调度性非常弱。目前大多数调度都是基于线程粒度的。许多现代编程语言支持反射方法,可以获得函数原型的表示。虽然实现起来比较复杂,但可以支持函数的调度。但是,由于对输入输出数据对象意义的定义不清晰,以及与调度相关的上下文缺失,通过反射机制进行的调度缺乏灵活性,需要严格的逻辑控制。另一种微服务编程方法是将复杂的逻辑拆分成小的服务单元,但其初衷和目的更多是为了降低发布、维护和升级的复杂度和效率,而不是微服务自身的调度。

调度器根据数据对象的类型搜索能够处理该数据对象的进程(函数)对象,并将数据对象分配给该进程对象执行,得到输出数据对象。在查找进程对象时,应用可以设置期望的输出数据对象类型。如果不存在单个进程输出该类型,应用可以根据进程对象的输入输出对象类型,查找输出目标数据类型的进程对象序列。该序列形成一个进程链,处理输入数据对象并得到期望的输出数据对象。

进程链编程pchain(process chain)编程实现了此处提到的调度和编程方法,目前支持python语言,可通过pip安装。

pip install pchain

定义数据对象

数据应该有类型,如果数据丢失了类型,数据的原始意义就丢失了。此时,这种数据的处理就需要通过预设的逻辑序列来处理,因为只有设计者知道数据的含义,无法实现自动调度。支持大量数据对象类型的灵活定义是有意义的,正如我们生活中有数百万种昆虫一样。
就像传统的调度有线程的上下文一样,数据对象的调度也需要上下文,记录该数据对象被分配给了哪些进程,以防止数据对象被重复分配,如下图所示。

进程对象列表记录了数据对象已经被分配的进程对象。数据指向特定的数据内容或值。数据对象有了调度上下文后,可以记录更多信息,例如源对象列表:表示数据对象依赖于哪些对象;所有者进程对象:表示数据对象由哪个进程对象生成。这些记录反映了数据与进程的关系。

编程语言支持的类型是原始类型,如整数、字符串、浮点类型等。这些原始类型没有特定的含义,例如整数,它可以代表年龄、体重、大小等。编程语言支持结构或类。这些结构和类有特定的含义,它们的实例可以用作数据对象,成为其他进程(处理函数)的输入数据或输出数据。

当前编程语言中定义的类的实例缺乏这种支持调度的上下文管理结构,无法记录数据对象是否已被分配给进程对象,从而无法自动分配和调度数据对象。因此,需要一种新的数据对象定义方法。

在pchain实现中,python数据对象的定义由pydata模块支持。它通过DefineType / DefineSubType这两个函数实现。在定义python数据对象类型时,可以指定原始python类型。

DefineType(tpname,rawtype=None)
DefineSubType(parenttype,tpname,rawtype = None)

示例

pydata.DefineType("NumberClass",float)
pydata.DefineSubType(NumberClass,'IntClass')

创建数据对象实例时,应用程序必须指定一个值或一个原始实例。数据对象创建后,其内容不应被修改。应用程序可以调用Lock方法。如果数据对象对应于python类的原生实例,此方法无法阻止修改实例中的属性,这将在使用时得到保证。

数据对象不仅记录了已分配给哪些进程,以支持调度器的数据对象分配过程,还记录了数据对象之间的关系,包括数据对象由哪个进程对象生成,它又基于哪些数据对象,以及又由哪个数据对象生成。这些关系可用于提取与数据对象相关的规则。为了支持规则系统,数据对象需要有唯一的关键字。通过GetTag函数可以获得数据对象与规则网络节点之间的一一对应关系。

d = NumberClass(12.3)
key = d.GetTag()
print(key)

输出是:

6d5450b62cb0b8d2b4c0ace38eef4b4df697129e

数据对象可以存储为JSON字符串,并可在其他计算机上恢复以供共享。

buf=Service._ServiceGroup._NewParaPkg()
realm.SaveObject(buf,d)
val = buf._ToJSon()
print(val)

输出是:

{"PackageInfo":[],"ObjectList":[{"PackageInfo":[],
                  "Value":[null,"gANHQCiZmZmZmZou"],"ClassName":"NumberClass"}]}

定义进程对象

进程对象包含一个描述部分,该部分描述了输入和输出数据的类型,以及一个逻辑执行部分。进程对象的输入和输出必须是上述定义的数据对象。基于这些数据对象拥有的管理结构,可以实现数据对象驱动的调度。如果进程的输入不是数据对象,调度器就无法确定可以分配给该进程的数据对象。而如果进程对象的输出不是数据对象,调度器在进程产生输出后就无法进一步处理输出数据。

在进程执行过程中,有时整个执行不能一次完成。例如,进程可能数据不全,需要更多的数据对象,可能需要重复执行,或者由于等待而被挂起。因此,进程对象也需要一个调度上下文。现有的方法(函数)定义不支持这一点,需要定义新的进程对象。

在pchain中,python进程对象的定义由pyproc模块支持,通过DefineProc函数实现。

pyproc.DefineProc (tpname, InputDataType, OutputDataType, PyFunc)

例如

@pyproc.DefineProc('HelloWorldProc',None,None)
def Execute(self) :   
  print('Hello world !')
  return (0,1,None)

进程对象在使用时,必须创建实例。上述方法定义的进程对象是python进程对象,它也对应于CLE进程对象,并负责记录进程执行的上下文。

cle_p = HelloWorldProc().Wrap()

进程对象还有一个唯一的Tag,通过GetTag函数获取,所有实例的标签都相同。

key = cle_p.GetTag()
print(key)

输出是:

proc_global_HelloWorldProc

进程对象可以存储为JSON string,并在其他计算机上恢复以供共享。

buf=Service._ServiceGroup._NewParaPkg()
realm.SaveObject(buf,cle_p)
val = buf._ToJSon()
print(val)

输出是:

{"PackageInfo":[],"ObjectList":[{"ClassName":"HelloWorldProc",
                  "ObjectID":"fa3d7eb9-5720-490f-a47a-b7a0103c6e38","Type":"PCProc"}]}

调度执行

通过将数据对象和进程对象放在一起,应用程序可以调度执行。在pchain中,单元对象(cell object)管理调度,单元必须被添加到区域(realm)中才能执行。多个单元可以添加到区域中执行。这是一个简单的例子,即从键盘输入两个数字并计算它们的和。

  1. 定义数据对象
    pydata.DefineType('NumberClass')
  2. 定义进程对象

    这里定义了两个进程对象,一个进程对象负责从键盘输入数字,一个进程对象负责计算两个数字的和。

    1. 输入进程对象

      该进程对象不需要输入数据对象,它是从键盘输入的;输出一个数据对象。定义如下:

      @pyproc.DefineProc('InputProc',None,NumberClass)
      def Execute(self) :   
        Context = self.Context   
        if Context['SelfObj'].Status < 0 :
          return None
        val = input('input a number : ')
        return (4,1,NumberClass(val))
    2. 求和进程对象

      该进程对象输入两个数据对象,计算两个数的和,并打印出来。没有输出数据对象。定义如下:

      @pyproc.DefineProc('OutputProc',(NumberClass,NumberClass),None)
      def Execute(self,num1,num2) :   
        Context = self.Context
        print('sum = ', num1.value() + num2.value())
        Context['Cell'].Finish()
        return (0,1,None)

      执行完成后,调用Cell.Finish()函数结束执行并退出。

  3. 创建CellRealm,将进程对象添加到Cell中执行。
    cell = Service.PCCellBase()
    cell.AddProc(InputProc,OutputProc)
    realm.AddCell(cell)
    realm.Execute()

    结果如下:

    input a number : 1
    input a number : 2
    sum =  12
  4. 调度过程的描述如下:

    调度分为两个步骤:数据对象分配和进程对象执行。首先处理数据对象分配。如果数据可以分配给进程,则创建一个进程的执行实例,将数据对象分配给进程,然后依次调度执行。

    在上面的例子中,单元中有两个进程。由于“InputProc”不需要输入数据并且满足调度条件,因此可以调度它执行。“OutputProc”需要两个NumberClass对象作为输入。由于此时单元中没有数据对象,调度条件不满足,第一次调度只能执行“InputProc”。

    InputProc”将从键盘输入一个数据,将其转换为数字对象,并将其放入Cell中。对于下一轮调度,此时“OutputProc”仍然不满足调度条件,只能再次执行“InputProc”来输入新的数字对象。

    单元中有两个数字对象。“OutputProc”可以被调度执行。“OutputProc”计算两个数字的和,将其打印到屏幕上,然后调用Cell.Finish()函数结束执行并退出。

关注点

通过定义数据对象类型和进程对象类型,并添加支持调度的参数,可以实现数据驱动的流程调度方法。在这种编程方法中,进程对象与各种编程语言当前支持的进程(函数)不同。进程对象是一个可调度、可分配的对象,不再是执行代码中固定的逻辑序列。这种方法具有极大的灵活性、构建性和共享性。

  1. 灵活性。如果数据对象随时需要某些处理,应用程序可以将数据对象和进程对象放在一起以获得处理结果。
  2. 构建性,根据输入数据对象和输出数据对象类型,可以获得一个或多个进程序列,以获得期望的输出数据类型。类似于自动编程。
  3. 可共享性,这一点在上面的例子中没有体现。进程对象和数据对象都可以以JSON格式的字符串形式存储,可以在其他计算单元上恢复和执行。

每个进程对象和数据对象都有一个唯一的Tag,可以作为Key将对象映射到图网络或数据库中的节点。进程链的构建可以由图网络的规则系统指导。随时可以创建单元来调度数据和进程对象的执行。

历史

  • 2019年12月23日:初始版本
© . All rights reserved.