RESTGrid






4.79/5 (12投票s)
一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互
- GitHub源代码 - http://github.com/WorkMaze/RESTGrid
- MySql提供商的Orchestration Engine Docker容器 http://hub.docker.com/r/workmaze/restgrid.mysqlengine.workflow/
- MySql提供商的REST API Docker容器 http://hub.docker.com/r/workmaze/restgrid.mysqlengine.api/
- AWS DynamoDB(AWS的NoSQL服务)的Orchestration Engine Docker容器 https://hub.docker.com/r/workmaze/restgrid.dynamodbengine.workflow/
- AWS DynamoDB提供商的REST API Docker容器 http://hub.docker.com/r/workmaze/restgrid.dynamodbengine.api/
引言
RESTGrid是一个简单的.NET Core工作流\ETL系统,它使用REST服务与外部世界交互。它具有以下特点
- 该系统能够使用JSON格式定义工作流。
- 工作流可以是同步的,也可以是长时间运行的异步的。
- 工作流中的每一步都是对REST服务的调用。
- 步骤可以是同步的,即前一个步骤执行完毕后调用下一个步骤。
- 步骤也可以是异步的,即在执行完一个步骤后,工作流进入等待阶段,等待外部调用。
- 可以通过REST API启动、重新启动工作流。
- 通过REST API进行管理。
- 能够使用JUST定义JSON转换 - https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation。
- 基于接口的系统,因此可以接入不同的数据源。
- 提供MySql提供商的Docker容器。
- 提供AWS DynamoDB(AWS的NoSQL服务)提供商的Docker容器。
背景
我之前的一篇文章(https://codeproject.org.cn/Articles/1202556/FloatingBridge)侧重于基于.NET Framework和MySql的消息传递\ETL\工作流系统。
我创建这个软件的目的是提供一个更简单的替代方案。它不包含丰富的Windows UI功能。但是,也有一些额外的功能
- 源代码是.NET Core,并且源文件中包含docker文件。
- MySql提供商的Docker容器可在Docker Hub上获取。
- 尽管缺少丰富的Windows UI功能,但包含了用于管理的REST API。
- 目前,只包含了MySql提供商。但是,可以很容易地使用提供的接口开发新的提供商。
- AWS(DynamoDB)的NoSQL服务提供商也已包含在此解决方案中。
系统架构
对系统各个组件的简要描述。下面是一个代表系统架构的示意图
编排引擎
这是系统的核心,它将任务入队,读取工作流的业务逻辑并运行任务。
REST API
外部系统使用它来启动\重新启动工作流,用户也用它来管理\设置工作流。
后端
这是存储配置和历史数据的存储。RESTGrid是定义系统接口、对象和编排逻辑的主项目。
由于这是一个基于接口的项目,只要我们实现了定义的接口,就可以接入任何后端。
RESTGrid的核心库可作为Nuget包获取:https://nuget.net.cn/packages/RESTGrid/
业务逻辑
系统的业务逻辑使用标准的JSON格式定义。该格式与FloatingBridge
非常相似。但是,为了使其更简单,省略了一些内容。
root
对象包含以下两个属性
Start
(工作流的起始任务)Tasks
(工作流中所有后续任务的数组)
任务
Task
JSON包含以下属性
Identifier
- 唯一标识符,用于标识任务Type
- 任务类型Next
- 任务标识符数组,指向此任务之后可能运行的任务TaskRetries
- 任务在失败前可以重试的次数TaskProperties
- 包含任务如何运行信息的JSON对象(可以使用JUST
进行转换)RunCondition
- 任务运行必须满足的条件
运行条件
run
条件决定了任务运行必须满足的条件。它具有以下属性
Evaluator
- 需要评估的表达式(可以使用JUST
进行转换)。Evaluated
- 需要与之进行评估的表达式(可以使用JUST
进行转换)。Operator
- 标准运算符:stringequals
stringcontains
mathequals
mathgreaterthan
mathlessthan
mathgreaterthanorequalto
mathlessthanorequalto
任务类型
任务可以是以下四种类型之一
Sync
- 同步REST调用(同步调用后执行下一步)。Async
- 异步REST调用(执行异步调用后,工作流等待外部输入)。Transformer
- 转换消息体(JUST
转换)Splitter
- 根据JSON中的数组分割消息体(JUST
分割)
REST任务(同步和异步)的任务属性
REST任务的任务属性JSON包含以下属性
Url -
REST服务的Url
Method -
GET
、POST
、PUT
或DELETE
Headers -
键值对JSON(可选)正文
查询字符串(QueryString)
Transformer的任务属性
TransformerID
- 标识transformer JSON的整数ID(对于MySql,这是transformer
表的主键)
Splitter的任务属性
ArrayPath
- 指向数组的JSONPath。
业务逻辑JSON示例
"Start": {
"Identifier": "CreateUser",
"Next": [
"CreateRole"
],
"RunCondition": null,
"Type": "Transformer",
"TaskProperties": {
"TransformerID": "3"
}
},
"Tasks": [
{
"Identifier": "CreateRole",
"Next": [
"AddApplication",
"Notify"
],
"Type": "Splitter",
"TaskProperties": {
"ArrayPath": "$.Organization.Employee"
}
},
{
"Identifier": "AddApplication",
"Type": "Sync",
"Next": [
"Approve"
],
"RunCondition": {
"Evaluated": "CreditCard",
"Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
"Operator": "stringequals"
},
"TaskProperties": {
"Url": "https://:5001/",
"Method": "POST",
"Headers": null,
"Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
"QueryString": "api/table/user"
},
"TaskRetries": 0
},
{
"Identifier": "Notify",
"Next": [
"Approve"
],
"Type": "Async",
"RunCondition": {
"Evaluated": "Cash",
"Evaluator": "#valueof($.MessageBodyJson.Organization.Employee.PaymentMode)",
"Operator": "stringequals"
},
"TaskProperties": {
"Url": "https://:5001/",
"Method": "POST",
"Headers": null,
"Body": "#valueof($.MessageBodyJson.Organization.Employee.Details)",
"QueryString": "api/table/user"
},
"TaskRetries": 0
},
{
"Identifier": "Approve",
"Type": "Sync",
"TaskProperties": {
"Url": "https://:5001/",
"Method": "POST",
"Headers": null,
"Body": {
"Message": "Your payment has been approved"
},
"QueryString": "api/table/user"
},
"TaskRetries": 0
}
]
}
上面的业务逻辑JSON代表了一个工作流,该工作流看起来像下面的示意图
Using the Code
RESTGrid是一个基于接口的系统。核心库提供了可以继承以实现您自己的后端提供商的接口。
提供了以下接口
IAdministration
namespace RESTGrid.Interfaces
{
public interface IAdministration
{
void CreateWorkflowType(string workflowTypeName, JObject businessLogicJson);
void CreateTransformer(JObject transformerJson);
WorkflowHistory GetHistory(string workflowID);
}
}
IOrchestration
namespace RESTGrid.Interfaces
{
public interface IOrchestration
{
void PublishWorkflowStep(string workflowTypeName, Guid workflowID,
JObject messageBodyJson, JObject customPropertiesJson, string stepIdentifier,
bool stepSucceeded, bool workflowCompleted, int retries, bool active,
string runStepIdentifier, string splitID);
void SetWorkflowActive(JObject messageBodyJson, string customPropertyName,
string customPropertyValue);
List<RESTGrid.Models.Queue> Enqueue();
JObject GetTransformer(int transformerID);
}
}
一旦实现了IOrchestration
接口,您就可以轻松开发自己的Orchestration
\Workflow
引擎。MySql引擎的实现方式如下
MySqlOrchestration orchestration = new MySqlOrchestration(connectionString);
OrchestrationEngine engine = new OrchestrationEngine(orchestration);
Console.WriteLine("Running orchestration engine...");
while (true)
{
engine.Run();
}
MySql提供商的REST API具有以下API
管理API
GET {url}/api/Administration/History{workflowID}
Returns 200 OK with a JSON containing the entire workflow history.
POST {url}/api/Administration/WorkflowType/{workflowTypeName}
Body containing the Business Logic JSON.
Returns 204 No Content.
POST {url}/api/Administration/Transformer
Returns 204 No Content.
编排API
PUT {url}/api/Orchestration/Workflow/{customPropertyName}/{customPropertyValue}
Body (Optional) - <JSON containing the new input message>.
Returns 204 No Content.
POST {url}/api/Orchestration/Workflow/{workflowTypeName}
Body containing the JSON message in the following format:-
{
"MessageBody": <JSON containing the input message>,
"CustomProperties": <One-level JSON representing a key value pair>,
}
Returns 204 No Content
关注点
- 由于此项目使用.NET Core和docker容器,因此可以轻松地将其托管在支持docker的云提供商上。
- MySql数据库可以是独立的系统,也可以托管在支持它的云提供商上。
- 系统调用的REST服务可以托管在云端,从而有可能将整个系统托管在云端。
历史
- RESTGrid的第一个版本(RESTGrid的MySql提供商)
- 添加了AWS DynamoDB(AWS的NoSQL服务)提供商