65.9K
CodeProject 正在变化。 阅读更多。
Home

RESTGrid

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.79/5 (12投票s)

2017年9月2日

CPOL

5分钟阅读

viewsIcon

28721

一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互

引言

RESTGrid是一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互。它具有以下特点

  • 该系统能够使用JSON格式定义工作流。
  • 工作流可以是同步的,也可以是长时间运行的异步的。
  • 工作流中的每一步都是对REST服务的调用。
  • 步骤可以是同步的,即前一个步骤执行完毕后调用下一个步骤。
  • 步骤也可以是异步的,即在执行完一个步骤后,工作流进入等待阶段,等待外部调用。
  • 可以通过REST API启动、重新启动工作流。
  • 通过REST API进行管理。
  • 能够使用JUST定义JSON转换 - https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation
  • 基于接口的系统,因此可以接入不同的数据源。
  • 提供MySql提供商的Docker容器。
  • 提供AWS DynamoDB(AWS的NoSQL服务)提供商的Docker容器。

背景

我之前的一篇文章(https://codeproject.org.cn/Articles/1202556/FloatingBridge)侧重于基于.NET Framework和MySql的消息传递\ETL\工作流系统。

我创建这个软件的目的是提供一个更简单的替代方案。它不包含丰富的Windows UI功能。但是,也有一些额外的功能

  • 源代码是.NET Core,并且源文件中包含docker文件。
  • MySql提供商的Docker容器可在Docker Hub上获取。
  • 尽管缺少丰富的Windows UI功能,但包含了用于管理的REST API。
  • 目前,只包含了MySql提供商。但是,可以很容易地使用提供的接口开发新的提供商。
  • AWS(DynamoDB)的NoSQL服务提供商也已包含在此解决方案中。

系统架构

对系统各个组件的简要描述。下面是一个代表系统架构的示意图

编排引擎

这是系统的核心,它将任务入队,读取工作流的业务逻辑并运行任务。

REST API

外部系统使用它来启动\重新启动工作流,用户也用它来管理\设置工作流。

后端

这是存储配置和历史数据的存储。RESTGrid是定义系统接口、对象和编排逻辑的主项目。

由于这是一个基于接口的项目,只要我们实现了定义的接口,就可以接入任何后端。

RESTGrid的核心库可作为Nuget包获取:https://nuget.net.cn/packages/RESTGrid/

业务逻辑

系统的业务逻辑使用标准的JSON格式定义。该格式与FloatingBridge非常相似。但是,为了使其更简单,省略了一些内容。

root对象包含以下两个属性

  • Start(工作流的起始任务)
  • Tasks(工作流中所有后续任务的数组)

任务

Task JSON包含以下属性

  • Identifier - 唯一标识符,用于标识任务
  • Type - 任务类型
  • Next - 任务标识符数组,指向此任务之后可能运行的任务
  • TaskRetries - 任务在失败前可以重试的次数
  • TaskProperties - 包含任务如何运行信息的JSON对象(可以使用JUST进行转换)
  • RunCondition - 任务运行必须满足的条件

运行条件

run条件决定了任务运行必须满足的条件。它具有以下属性

  • Evaluator - 需要评估的表达式(可以使用JUST进行转换)。
  • Evaluated - 需要与之进行评估的表达式(可以使用JUST进行转换)。
  • Operator - 标准运算符:
    1. stringequals
    2. stringcontains
    3. mathequals
    4. mathgreaterthan
    5. mathlessthan
    6. mathgreaterthanorequalto
    7. mathlessthanorequalto

任务类型

任务可以是以下四种类型之一

  • Sync - 同步REST调用(同步调用后执行下一步)。
  • Async - 异步REST调用(执行异步调用后,工作流等待外部输入)。
  • Transformer - 转换消息体(JUST转换)
  • Splitter - 根据JSON中的数组分割消息体(JUST分割)

REST任务(同步和异步)的任务属性

REST任务的任务属性JSON包含以下属性

  • Url - REST服务的Url
  • Method - GETPOSTPUTDELETE
  • Headers - 键值对JSON(可选)
  • 正文
  • 查询字符串(QueryString)

Transformer的任务属性

  • TransformerID - 标识transformer JSON的整数ID(对于MySql,这是transformer表的主键)

Splitter的任务属性

  • ArrayPath - 指向数组的JSONPath。

业务逻辑JSON示例

"Start": {
    "Identifier": "CreateUser",
    "Next": [
     "CreateRole"
    ],
    "RunCondition": null,
    "Type": "Transformer",
    "TaskProperties": {
     "TransformerID": "3"
    }
 },
 "Tasks": [
    {
     "Identifier": "CreateRole",
     "Next": [
       "AddApplication",
       "Notify"
     ],
     "Type": "Splitter",
     "TaskProperties": {
       "ArrayPath": "$.Organization.Employee"
     }
    },
    {
     "Identifier": "AddApplication",
     "Type": "Sync",
     "Next": [
       "Approve"
     ],
     "RunCondition": {
       "Evaluated": "CreditCard",       
                    "Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
       "Operator": "stringequals"
     },
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    },
    {
     "Identifier": "Notify",
     "Next": [
       "Approve"
     ],
     "Type": "Async",
     "RunCondition": {
       "Evaluated": "Cash",
       "Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
       "Operator": "stringequals"
     },
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    },
    {
     "Identifier": "Approve",
     "Type": "Sync",
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": {
         "Message": "Your payment has been approved"
       },
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    }
 ]
}

上面的业务逻辑JSON代表了一个工作流,该工作流看起来像下面的示意图

Using the Code

RESTGrid是一个基于接口的系统。核心库提供了可以继承以实现您自己的后端提供商的接口。

提供了以下接口

IAdministration

namespace RESTGrid.Interfaces
{
 public interface IAdministration
    {
        void CreateWorkflowType(string workflowTypeName, JObject businessLogicJson);

        void CreateTransformer(JObject transformerJson);

        WorkflowHistory GetHistory(string workflowID);
    }
}

IOrchestration

namespace RESTGrid.Interfaces
{
    public interface IOrchestration
    {
  
        void PublishWorkflowStep(string workflowTypeName, Guid workflowID, 
           JObject messageBodyJson, JObject customPropertiesJson, string stepIdentifier,
    bool stepSucceeded, bool workflowCompleted, int retries, bool active, 
           string runStepIdentifier, string splitID);

        void SetWorkflowActive(JObject messageBodyJson, string customPropertyName, 
             string customPropertyValue);

        List<RESTGrid.Models.Queue> Enqueue();

        JObject GetTransformer(int transformerID);
    }
}

一旦实现了IOrchestration接口,您就可以轻松开发自己的Orchestration\Workflow引擎。MySql引擎的实现方式如下

MySqlOrchestration orchestration = new MySqlOrchestration(connectionString);
OrchestrationEngine engine = new OrchestrationEngine(orchestration);
Console.WriteLine("Running orchestration engine...");
while (true)
{                   
	engine.Run();
}

MySql提供商的REST API具有以下API

管理API

GET {url}/api/Administration/History{workflowID}
Returns 200 OK with a JSON containing the entire workflow history.

POST {url}/api/Administration/WorkflowType/{workflowTypeName}
Body containing the Business Logic JSON.
Returns 204 No Content.

POST {url}/api/Administration/Transformer
Returns 204 No Content.

编排API

PUT {url}/api/Orchestration/Workflow/{customPropertyName}/{customPropertyValue}
Body (Optional) - <JSON containing the new input message>.
Returns 204 No Content.

POST {url}/api/Orchestration/Workflow/{workflowTypeName}
Body containing the JSON message in the following format:-
{
"MessageBody": <JSON containing the input message>,
"CustomProperties": <One-level JSON representing a key value pair>,
}
Returns 204 No Content

关注点

  • 由于此项目使用.NET Core和docker容器,因此可以轻松地将其托管在支持docker的云提供商上。
  • MySql数据库可以是独立的系统,也可以托管在支持它的云提供商上。
  • 系统调用的REST服务可以托管在云端,从而有可能将整个系统托管在云端。

历史

  1. RESTGrid的第一个版本(RESTGrid的MySql提供商)
  2. 添加了AWS DynamoDB(AWS的NoSQL服务)提供商
© . All rights reserved.