RESTGrid






4.79/5 (12投票s)
一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互
- GitHub源代码 - http://github.com/WorkMaze/RESTGrid
- MySql提供商的Orchestration Engine Docker容器 http://hub.docker.com/r/workmaze/restgrid.mysqlengine.workflow/
- MySql提供商的REST API Docker容器 http://hub.docker.com/r/workmaze/restgrid.mysqlengine.api/
- AWS DynamoDB(AWS的NoSQL服务)的Orchestration Engine Docker容器 https://hub.docker.com/r/workmaze/restgrid.dynamodbengine.workflow/
- AWS DynamoDB提供商的REST API Docker容器 http://hub.docker.com/r/workmaze/restgrid.dynamodbengine.api/
引言
RESTGrid是一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互。它具有以下特点
- 该系统能够使用JSON格式定义工作流。
- 工作流可以是同步的,也可以是长时间运行的异步的。
- 工作流中的每一步都是对REST服务的调用。
- 步骤可以是同步的,即前一个步骤执行完毕后调用下一个步骤。
- 步骤也可以是异步的,即在执行完一个步骤后,工作流进入等待阶段,等待外部调用。
- 可以通过REST API启动、重新启动工作流。
- 通过REST API进行管理。
- 能够使用JUST定义JSON转换 - https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation。
- 基于接口的系统,因此可以接入不同的数据源。
- 提供MySql提供商的Docker容器。
- 提供AWS DynamoDB(AWS的NoSQL服务)提供商的Docker容器。
背景
我之前的一篇文章(https://codeproject.org.cn/Articles/1202556/FloatingBridge)侧重于基于.NET Framework和MySql的消息传递\ETL\工作流系统。
我创建这个软件的目的是提供一个更简单的替代方案。它不包含丰富的Windows UI功能。但是,也有一些额外的功能
- 源代码是.NET Core,并且源文件中包含docker文件。
- MySql提供商的Docker容器可在Docker Hub上获取。
- 尽管缺少丰富的Windows UI功能,但包含了用于管理的REST API。
- 目前,只包含了MySql提供商。但是,可以很容易地使用提供的接口开发新的提供商。
- AWS(DynamoDB)的NoSQL服务提供商也已包含在此解决方案中。
系统架构
对系统各个组件的简要描述。下面是一个代表系统架构的示意图

编排引擎
这是系统的核心,它将任务入队,读取工作流的业务逻辑并运行任务。
REST API
外部系统使用它来启动\重新启动工作流,用户也用它来管理\设置工作流。
后端
这是存储配置和历史数据的存储。RESTGrid是定义系统接口、对象和编排逻辑的主项目。
由于这是一个基于接口的项目,只要我们实现了定义的接口,就可以接入任何后端。
RESTGrid的核心库可作为Nuget包获取:https://nuget.net.cn/packages/RESTGrid/
业务逻辑
系统的业务逻辑使用标准的JSON格式定义。该格式与FloatingBridge非常相似。但是,为了使其更简单,省略了一些内容。
root对象包含以下两个属性
- Start(工作流的起始任务)
- Tasks(工作流中所有后续任务的数组)
任务
Task JSON包含以下属性
- Identifier- 唯一标识符,用于标识任务
- Type- 任务类型
- Next- 任务标识符数组,指向此任务之后可能运行的任务
- TaskRetries- 任务在失败前可以重试的次数
- TaskProperties- 包含任务如何运行信息的JSON对象(可以使用- JUST进行转换)
- RunCondition- 任务运行必须满足的条件
运行条件
run条件决定了任务运行必须满足的条件。它具有以下属性
- Evaluator- 需要评估的表达式(可以使用- JUST进行转换)。
- Evaluated- 需要与之进行评估的表达式(可以使用- JUST进行转换)。
- Operator- 标准运算符:- stringequals
- stringcontains
- mathequals
- mathgreaterthan
- mathlessthan
- mathgreaterthanorequalto
- mathlessthanorequalto
 
任务类型
任务可以是以下四种类型之一
- Sync- 同步REST调用(同步调用后执行下一步)。
- Async- 异步REST调用(执行异步调用后,工作流等待外部输入)。
- Transformer- 转换消息体(- JUST转换)
- Splitter- 根据JSON中的数组分割消息体(- JUST分割)
REST任务(同步和异步)的任务属性
REST任务的任务属性JSON包含以下属性
- Url -REST服务的- Url
- Method -- GET、- POST、- PUT或- DELETE
- Headers -键值对JSON(可选)
- 正文
- 查询字符串(QueryString)
Transformer的任务属性
- TransformerID- 标识transformer JSON的整数ID(对于MySql,这是- transformer表的主键)
Splitter的任务属性
- ArrayPath- 指向数组的JSONPath。
业务逻辑JSON示例
"Start": {
    "Identifier": "CreateUser",
    "Next": [
     "CreateRole"
    ],
    "RunCondition": null,
    "Type": "Transformer",
    "TaskProperties": {
     "TransformerID": "3"
    }
 },
 "Tasks": [
    {
     "Identifier": "CreateRole",
     "Next": [
       "AddApplication",
       "Notify"
     ],
     "Type": "Splitter",
     "TaskProperties": {
       "ArrayPath": "$.Organization.Employee"
     }
    },
    {
     "Identifier": "AddApplication",
     "Type": "Sync",
     "Next": [
       "Approve"
     ],
     "RunCondition": {
       "Evaluated": "CreditCard",       
                    "Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
       "Operator": "stringequals"
     },
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    },
    {
     "Identifier": "Notify",
     "Next": [
       "Approve"
     ],
     "Type": "Async",
     "RunCondition": {
       "Evaluated": "Cash",
       "Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
       "Operator": "stringequals"
     },
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    },
    {
     "Identifier": "Approve",
     "Type": "Sync",
     "TaskProperties": {
       "Url": "https://:5001/",
       "Method": "POST",
       "Headers": null,
       "Body": {
         "Message": "Your payment has been approved"
       },
       "QueryString": "api/table/user"
     },
     "TaskRetries": 0
    }
 ]
}
上面的业务逻辑JSON代表了一个工作流,该工作流看起来像下面的示意图

Using the Code
RESTGrid是一个基于接口的系统。核心库提供了可以继承以实现您自己的后端提供商的接口。
提供了以下接口
IAdministration
namespace RESTGrid.Interfaces
{
 public interface IAdministration
    {
        void CreateWorkflowType(string workflowTypeName, JObject businessLogicJson);
        void CreateTransformer(JObject transformerJson);
        WorkflowHistory GetHistory(string workflowID);
    }
}
IOrchestration
namespace RESTGrid.Interfaces
{
    public interface IOrchestration
    {
  
        void PublishWorkflowStep(string workflowTypeName, Guid workflowID, 
           JObject messageBodyJson, JObject customPropertiesJson, string stepIdentifier,
    bool stepSucceeded, bool workflowCompleted, int retries, bool active, 
           string runStepIdentifier, string splitID);
        void SetWorkflowActive(JObject messageBodyJson, string customPropertyName, 
             string customPropertyValue);
        List<RESTGrid.Models.Queue> Enqueue();
        JObject GetTransformer(int transformerID);
    }
}
一旦实现了IOrchestration接口,您就可以轻松开发自己的Orchestration\Workflow引擎。MySql引擎的实现方式如下
MySqlOrchestration orchestration = new MySqlOrchestration(connectionString);
OrchestrationEngine engine = new OrchestrationEngine(orchestration);
Console.WriteLine("Running orchestration engine...");
while (true)
{                   
	engine.Run();
}
MySql提供商的REST API具有以下API
管理API
GET {url}/api/Administration/History{workflowID}
Returns 200 OK with a JSON containing the entire workflow history.
POST {url}/api/Administration/WorkflowType/{workflowTypeName}
Body containing the Business Logic JSON.
Returns 204 No Content.
POST {url}/api/Administration/Transformer
Returns 204 No Content.
编排API
PUT {url}/api/Orchestration/Workflow/{customPropertyName}/{customPropertyValue}
Body (Optional) - <JSON containing the new input message>.
Returns 204 No Content.
POST {url}/api/Orchestration/Workflow/{workflowTypeName}
Body containing the JSON message in the following format:-
{
"MessageBody": <JSON containing the input message>,
"CustomProperties": <One-level JSON representing a key value pair>,
}
Returns 204 No Content
关注点
- 由于此项目使用.NET Core和docker容器,因此可以轻松地将其托管在支持docker的云提供商上。
- MySql数据库可以是独立的系统,也可以托管在支持它的云提供商上。
- 系统调用的REST服务可以托管在云端,从而有可能将整个系统托管在云端。
历史
- RESTGrid的第一个版本(RESTGrid的MySql提供商)
- 添加了AWS DynamoDB(AWS的NoSQL服务)提供商

