FloatingBridge





5.00/5 (4投票s)
一个基于MySQL和.NET的简单消息\工作流\ETL系统
引言
FloatingBridge
是一个基于MySQL和.NET的简单消息\工作流\ETL系统。MySQL用作系统的后端引擎。前端使用.NET WPF(Windows Presentation Foundation)设计。业务逻辑由.NET Windows服务处理。
本文档简要介绍了该系统。安装文件和详细文档随本文档提供。
背景
我曾大量使用过Microsoft BizTalk Server。我也使用过一些其他替代系统。我的初衷是构建一个基于现代RDS的简单消息\工作流\ETL系统,该系统使用JSON数据类型而非XML。
特点
系统附带一个管理控制台,可帮助您配置消息\工作流系统。管理控制台具有以下功能:
- 后端配置 - 首次启动控制台时,会运行所有必需的脚本来初始化MySQL数据库。
- 服务配置 - 系统提供3种服务。它们将在下文介绍。但是,我们可以配置任意数量的服务实例。通常,我们会为每个应用程序域配置一个。
- 消息系统配置 - 可以通过配置相应的消息类型、发布者、订阅者、发布和订阅来配置。
- 工作流配置 - 配置一个由分支任务组成的序列,其业务逻辑使用JSON定义。
- 管理消息和工作流 - 旧消息可被标记为冗余(孤立),等待中的工作流可被重新启动。
- 历史记录 - 可查看消息和工作流的历史记录。
- 转换 - 可以使用JUST - https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation 创建转换JSON的任务。
- 自定义任务 - 系统自带一些已知的自定义任务。
- API - 系统提供一个API,外部系统可调用该API执行系统操作,如发布消息、拉取订阅消息、发布工作流、运行同步工作流和重新启动等待中的工作流。核心API可用于实现您自己的自定义任务。
系统架构
下图说明了系统的不同组件。
数据库引擎
数据库引擎是系统的核心组件。MySQL数据库用作引擎。引擎存储系统的配置和跟踪数据。
存储以下配置数据:
- 消息配置 - 消息类型,其应用程序域
- 工作流配置 - 工作流类型,应用程序域和业务逻辑
- 能够发布消息和工作流的发布者
- 能够订阅消息的订阅者
- 既可以是工作流的一部分,也可以是监听器或订阅者的任务
此外,所有与消息和工作流相关的历史数据都将存储起来用于跟踪。
订阅者服务
这是一个.NET Windows服务,它轮询数据库以查找订阅任务。订阅任务只是从数据库读取已发布的消息并进行处理。系统提供了内置订阅任务,将在最后部分介绍。
监听器服务
这也是一个.NET Windows服务,用于运行与监听器相关的任务。监听器是一种任务,它从外部源读取数据,并根据从外部源收集的数据向数据库发布消息。
工作流服务
这是.NET Windows服务,它按照工作流的业务逻辑定义的顺序运行与工作流相关的各种任务。它执行以下任务:
- 从数据库读取工作流消息,并根据业务逻辑和当前状态执行下一步操作。
- 运行下一个同步任务,并将状态保存到引擎。
- 运行下一个出站任务,并等待入站任务。
- 根据重试次数重试任务。
- 发生故障时运行错误处理程序或错误通知任务。
管理控制台
这是一个用WPF(Windows Presentation Foundation)编写的应用程序,使用户能够管理消息和工作流的配置和跟踪。
FloatingBridge API
FloatingBridge
API是一个简单的DLL,可以在外部项目中引用。它位于安装文件夹中,也可作为NuGet包提供。通过引用该API,外部实体或系统可以执行以下任务:
- 发布消息
- 拉取订阅消息
- 发布工作流
- 运行同步工作流
- 重新启动等待中的工作流
FloatingBridge 术语
理解FloatingBridge
中使用的术语非常重要,因为管理控制台完全基于这些术语,并且围绕这些术语进行各种操作。
- 应用程序域 - 应用程序的逻辑分组,关联了消息类型和工作流。每个服务都在一个应用程序域下运行。
- 服务 - 一个.NET Windows服务(监听器、订阅者或工作流),在一个应用程序域下运行。服务需要两个输入参数:
- 应用程序域
- MySQL数据库引擎的连接字符串
- 消息类型 - 定义消息的类型。发布者可以发布此类型的消息,订阅者可以拉取此类型的消息。
- 发布者 - 有权向系统发布消息的客户端。
- 订阅者 - 有权订阅系统消息的客户端。
- 任务 - 系统运行的一个函数。任务可以是发布任务、订阅任务或工作流任务。
- 监听器 - 由监听器服务定期运行,监听需要发布的消息。例如,一个读取目录中文件的任务,一旦找到文件,就会根据文件内容发布消息。
- 发布 - 此配置决定了发布者或监听器是否可以发布特定消息类型。
- 订阅 - 此配置决定了订阅者或任务是否可以从系统中拉取消息。
- 活动消息 - 已发布到系统但正在等待被拉取的消息。
- 孤立消息 - 在管理控制台中,活动消息可以被标记为孤立。一旦被标记为孤立,该消息就无法被拉取。
- 工作流(工作流类型) - 要执行的任务序列。这些任务以JSON格式定义,称为业务逻辑。工作流与应用程序域相关联。工作流服务执行工作流中的任务。
- 孤立工作流 - 一个等待手动外部输入或入站任务的工作流。
如上所述,任务是系统运行的一个函数。共有8种不同类型的任务。它们是简单的C#类,需要实现FloatinBridge.Core
程序集中的8个接口之一。
这8种不同的任务每种都有不同的目的:
- 发布者任务 - 由监听器服务运行的任务。该任务返回一个将被发布到系统中的消息。
- 订阅者任务 - 由订阅者服务运行的任务。服务拉取该任务订阅的所有消息并将其传递给该任务。
- 工作流任务 - 由工作流服务运行的同步工作流步骤。服务将消息发送给任务,任务将消息返回给服务。
- 工作流拆分器任务 - 与上一个任务类似,唯一不同的是在此情况下,任务将多个消息返回给服务。当我们需要将工作流拆分为多个工作流时,可以使用此任务。当工作流被拆分为多个工作流时,会为每个工作流分配一个称为
SplitID
的值。此ID决定了从原始工作流分离出来的独立工作流。 - 出站任务 - 异步工作流步骤。执行此任务后,工作流将进入等待状态。工作流需要通过API手动恢复,或通过入站任务恢复。
- 入站任务 - 异步工作流步骤,用于恢复等待中的工作流。
- 错误处理程序任务 - 这是一个补偿性任务,与工作流步骤相关联,并在步骤失败时执行。一旦执行此任务,工作流将继续执行此任务的输出。
- 错误通知任务 - 与上一个任务类似,只是工作流在完成其任务后不会继续执行。工作流需要通过API手动恢复。
安装与配置
安装过程是一个正常的安装过程,您只需双击MSI文件,然后选择系统上的应用程序文件夹。安装程序会将所有组件复制到应用程序文件夹。
可以使用菜单上的各种选项并选择正确的工件来配置消息\工作流系统。本文档不附带每个配置选项的截图。
下图显示了用于配置工件的各种选项。右侧面板显示了消息历史记录。
已配置的工作流
工作流业务逻辑
工作流的流程和功能由业务逻辑定义。业务逻辑是一个JSON对象。下面是上面部分已配置工作流的JSON表示。
{
"Start":{
"Next":[
"Branch1",
"Branch2",
"Branch3"
],
"TaskID":41,
"OnFailure":null,
"Identifier":"FirstTask",
"TaskRetries":0,
"RunCondition":null,
"TaskProperties":null,
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
"Tasks":[
{
"Next":[
"Final"
],
"TaskID":23,
"OnFailure":"NotifierTask",
"Identifier":"Branch1",
"TaskRetries":5,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Norway",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/Branch1.json"
},
"IncommingTaskID":31,
"IncommingTaskProperties":{
"FileName":"D:/Test/Branch1In.json"
}
},
{
"Next":[
"Final"
],
"TaskID":23,
"OnFailure":null,
"Identifier":"Branch3",
"TaskRetries":-1,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Denmark",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/branch3.json"
},
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
{
"Next":[
"Final"
],
"TaskID":35,
"OnFailure":null,
"Identifier":"Branch2",
"TaskRetries":-1,
"RunCondition":{
"Operator":"stringequals",
"Evaluated":"Sweden",
"Evaluator":"#valueof($.MessageBody.Country)"
},
"TaskProperties":{
"FileName":"D:/Test/Branch2.json"
},
"IncommingTaskID":0,
"IncommingTaskProperties":null
},
{
"Next":null,
"TaskID":41,
"OnFailure":null,
"Identifier":"Final",
"TaskRetries":-1,
"RunCondition":null,
"TaskProperties":null,
"IncommingTaskID":0,
"IncommingTaskProperties":null
}
],
"OnFailure":null,
"TaskRetries":0
}
上述JSON初看可能令人望而生畏,但它是使用管理控制台配置的。
业务逻辑JSON包含以下属性:
Start
- 工作流的第一步或起点。这是一个Task
JSON对象。Tasks
- 工作流中所有其他步骤的数组。一个包含Task
JSON对象的JSON数组。TaskRetries
- 全局设置,指定一个步骤在失败前应重试的次数。此设置会被单个步骤设置覆盖。OnFailure
- 全局设置,指定在发生故障时应执行的ErrorHandler
或ErrorNotification
任务步骤。此设置会被单个步骤设置覆盖。
任务
任务代表工作流中的单个步骤。它具有以下属性:
Identifier
- 一个简单的string
,用于在工作流中标识任务。在工作流中必须唯一。TaskID
- 这是数据库中任务的实际ID。创建任务时会自动设置。可以修改。OnFailure
- 如果已定义,则覆盖工作流的相应属性。TaskRetries
- 如果已定义,则覆盖工作流的相应属性。Next
- 一个string
数组,包含此task
之后需要执行的task
标识符列表。RunCondition
- 一个JSON对象,定义了此任务的执行条件。当我们需要根据不同条件分支工作流时使用。TaskProperties
- 一个JSON对象,包含执行步骤的属性。例如,对于JSON写入器,它将包含要写入的文件名。IncommingTaskID
- 如果此步骤是单向出站步骤,则为入站任务步骤的标识符。IncommingTaskProperties
- 此入站任务的相应JSON任务属性。
任务属性
Task
属性是一个JSON对象,定义了任务执行的属性。每个任务都有自己的JSON模式。例如,对于JSON写入器task
,task
属性如下:
{
"FileName":"D:/Test/Out.json"
}
Task
属性可以通过JUST - JSON Under Simple Transformation(https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation)动态化。
例如,我们想根据message
body
中的value
来设置“FileName
”。
{
"FileName":"#valueof($.MessageBody.FileName)"
}
条件
这决定了一个步骤是否应该运行。它是一个具有这3个属性的JSON对象:
Evaluator
- 一个将要被评估的表达式。Evaluated
- 与评估器进行比较的表达式。Operator
- 评估运算符。支持以下运算符:stringequals
stringcontains
mathequals
mathgreaterthan
mathlessthan
mathgreaterthanorequalto
Mathlessthanorequalto
以上所有3个属性都可以使用JUST实现动态化。
{
"Operator":"stringequals",
"Evaluated":"Norway",
"Evaluator":"#valueof($.MessageBody.Country)"
}
转换器
JSON转换器任务是系统的一个内置任务。该任务使用JUST将一个JSON转换为另一个JSON。系统中一些转换可能相当复杂。因此,我决定将转换器添加为独立功能,而不是将其添加到task
属性中。
Using the Code
FloatingBridge
程序集随安装包一起提供。它也可通过Nuget获得。
Install-Package FloatingBridge -Version 1.0.0
可以按以下方式初始化一个新的FloatingBridgeClient
实例:
FloatingBridgeClient client = new FloatingBridgeClient(connectionString)
发布消息
可以通过API并调用PublishMessage
方法来发布消息。
User user = new User() { Country = "Norway", Language = "Norsk", Name = "Ola" };
client.PublishMessage("UserAppDomain", "UserMessage", "UserPublisher",
"123",JsonConvert.SerializeObject(user), null);
拉取订阅消息
可以通过API并调用Getsubscribedmessages
方法来拉取Message
。
var messages = client.GetSubscribedMessages("UserAppDomain",
"UserMessage", "UserSubscriber", "123");
foreach (Message message in messages)
{
Console.WriteLine(JsonConvert.SerializeObject(message));
}
结果是:
{
"ID":"71ca9177-19d0-4aba-bdb0-6d8e414114d4",
"Body":{"Name":"Ola","Country":"Norway","Language":"Norsk"},
"CustomProperties":{"PublisherIdentity":"UserPublisher"},
"Timestamp":"2017-08-09T14:44:41",
"UniqueID":"46cd3ec4-7d11-11e7-b4fe-a08cfd1a3662",
"MessageTypeID":1,
"ConditionExpression":{"Operator":"stringequals",
"Evaluated":"Norway","Evaluator":"#valueof($. Body.Country)"}
}
发布工作流
client.PublishWorkflow("UserAppDomain", "UserWorkflow", "UserPublisher",
"123", "{\"Country\":\"Sweden\"}", null);
同步运行工作流
需要指定等待完成的步骤的步骤标识符。一旦此步骤完成,将返回此步骤的结果。
client.RunWorkflow("pinecone", "SimpleWorkflow",
"CreditCard", "abcdefg", "{}", null,"AddApplication");
根据工作流ID重新启动工作流
处于等待(孤立)状态的工作流可以通过API使用workflow
ID重新启动。
client.RestartWorkflow("UserPublisher", "123",
"{}", "56eaa328-b8fe-43fb-87db-80b118eb8ee4");
根据自定义属性重新启动工作流
在上面的示例中,了解工作流的workflow
ID很重要。如果外部系统想要使用API重新启动workflow
,那么系统可能希望基于某个关联属性来执行此操作。
因此,我们还需要能够根据自定义属性重新启动workflow
的功能。
这是一个使用自定义属性名称和值来启动所有等待中的workflows
的示例代码:
client.RestartWorkflowsByProperty("UserPublisher", "123", "LastExecutedTask", "Branch3", "{}");
使用FloatingBridge Core创建自定义任务
必须引用应用程序文件夹中的FloatingBridge.Core.dll。也提供Nuget。
Install-Package FloatingBridge.Core -Version 1.0.0
提供了总共8个接口,它们都具有相同的Run
方法,但参数和返回类型不同。下表概述了这些接口。
接口名称 | 输入参数 | 返回类型 |
IPublisherTask |
| List<TaskMessage> |
ISubscriberTask |
| void |
IWorkflowTask |
| TaskMessage |
IWorkflowSplitterTask |
| SplitTaskMessage |
IIncommingTask |
| TaskMessage |
IOutgoingTask |
| void |
IErrorHandlerTask |
| TaskMessage |
IErrorNotificationTask |
| void |
内置任务
上一节介绍了Core中提供的各种接口,这些接口有助于我们开发自己的任务。
但是,系统已提供了一些内置任务。
IErrorHandlerTask
命名空间 - FloatingBridge.Core.Tasks.ErrorHandler
JsonWriter
XmlWriter
FlatFileWriter
JsonTransformer
MessagePublisher
MySqlDataConnector
MySqlDataWriter
Passthrough
RESTConnector
WorkflowPublisher
IErrorNotificationTask
命名空间 - FloatingBridge.Core.Tasks.ErrorNotification
JsonWriter
- 将MessageBody
写入JSON文件XmlWriter
- 将MessageBody
转换为XML并写入XML文件FlatFileWriter
- 将MessageBody
转换为平面文件格式并写入文件MessagePublisher
- 将MessageBody
发布到FloatingBridge
发布MySqlDataWriter
- 在配置的MySQL上执行配置的存储过程或文本RESTConnector
- 执行配置的REST APIWorkflowPublisher
- 将MessageBody
发布为新的FloatingBridge
工作流
IIncommingTask
命名空间 - FloatingBridge.Core.Tasks.Incomming
JsonReader
- 从指定的JSON文件读取XmlReader
- 从指定的XML文件读取FlatFileReader
- 从指定文件读取平面文件格式MySqlDataConnector
- 连接到配置的MySQL并从SP或文本读取RESTConnector
MessageSubscriber
- 订阅配置的FloatingBridge
订阅
IOutgoingTask
命名空间 - FloatingBridge.Core.Tasks.Outgoing
JsonWriter
XmlWriter
FlatFileWriter
MessagePublisher
MySqlDataWriter
RESTConnector
WorkflowPublisher
IPublisherTask
命名空间 - FloatingBridge.Core.Tasks.Publisher
JsonReader
XmlReader
FlatFileReader
MySqlDataConnector
RESTConnector
JsonMultiFileReader
- 从文件夹读取多个JSON文件XmlMultiFileReader
- 从文件夹读取多个XML文件MultiFlatFileReader
- 从文件夹读取多个平面文件
ISubscriberTask
命名空间 - FloatingBridge.Core.Tasks.Subscriber
JsonWriter
XmlWriter
FlatFileWriter
MySqlDataWriter
RESTConnector
IWorkflowTask
命名空间 - FloatingBridge.Core.Tasks.Workflow
JsonWriter
XmlWriter
FlatFileWriter
MessagePublisher
MySqlDataWriter
RESTConnector
WorkflowPublisher
Passthrough
- 将消息传递给下一个任务。当您想为分支配置多个条件时,此任务很有用。每个通过任务都应有一个必需的运行条件。SyncWorkflowRunner
- 同步运行另一个FloatingBridge
工作流。DependentWorkflowStarter
- 重新启动另一个FloatingBridge
工作流。JsonTransformer
- 使用JUST转换MessageBody
。MySqlDataConnector
IWorkflowSplitterTask
命名空间 - FloatingBridge.Core.Tasks.Workflow
JsonSplitter
- 使用JUST拆分MessageBody
。MySqlDataRowReader
- 连接到配置的MySQL,并按读取的每一行拆分消息。
任务属性
每个任务都有其预定义的任务属性配置。以下是初始化内置任务属性的示例。在创建自己的自定义任务时,需要定义相应的任务属性。
任务 | 任务属性示例 |
JsonWriter | {“FileName” : “D:/test/out.json”} |
XmlWriter | {“FileName” : “D:/test/out.xml”, ”RootElementName” : “Root”} |
FlatFileWriter | {“FileName” : “D:/test/out.txt”,<br> ”FieldDelimiter” : “, ” , ”RecordDelimiter” : “\r\n”} |
Passthrough | |
JsonMultiFileReader | {“DirectoryName” : “D:/test”} |
XmlMultiFileReader | {“DirectoryName” : “D:/test”} |
MultiFlatFileReader | {“DirectoryName” : “D:/test”, “Pattern” : “*.csv”} |
RESTConnector | {
"Url":"Http://www.yahoo.com",
"Method":"POST",
"Headers":{"header1":"value1","header2":"value2"},
"Body":"#valueof($.MessageBody.RestRequest)"
}
|
MySqlDataWriter | { // Example for stored procedure
"ConnectionString": “xxxxxxxx”,
"CommandText": null,
"StoredProcedureName": “StoreValues”,
"StoredProcedureParamaters": [
{“Name”:”param1”,”Value”:”Value1”},
{“Name”:”param2”,”Value”:”Value2”}
]
}
{ // Example for text
"ConnectionString": “xxxxxxxx”,
"CommandText": "#xconcat(UPDATE user SET username =
,#valueof($.value.Window), WHERE id = 4)",
"StoredProcedureName": null,
"StoredProcedureParamaters": null
}
|
MySqlDataConnector | 与MySqlDataWriter 配置相同。 |
MySqlDataRowReader | 与MySqlDataWriter 配置相同。 |
JsonReader | {“FileName” : “D:/test/in.json”} |
XmlReader | {“FileName” : “D:/test/in.xml”} |
FlatFileReader | {
“FileName” : “D:/test/in.txt”,
”FieldDelimiter” : [“,”,”;”] ,
”RecordDelimiter” : [“\r\n”,”\n”],
“HeaderIdentifier”:[“header”,”start”], //All records
//containing these strings are considered header
“FooterIdentifier”:[“footer”,”end”] // All records
//containing these strings are considered footers
}
|
JsonTransformer | {“TransformerID” : 2} |
JsonSplitter | {“ArrayPath” : “$.MessageBody.Organisation.Users”} |
MessagePublisher | {
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"MessageTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
MessageSubscriber | {
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"MessageTypeName":"UserMessage",
"SubscriberIdentity":"UserPubliser",
"SubscriberSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
WorkflowPublisher | {
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"WorkflowTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"}
}
|
SyncWorkflowRunner | {
"ConnectionString":"xxxxxxx",
"ApplicationDomainName":"UserAppDomain",
"WorkflowTypeName":"UserMessage",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
"CustomProperties":{"source":"scanner","OS":"windows"},
“StepIdentifier” :”AddRolesStep”
}
|
DependentWorkflowStarter | { //RestartWorkflowsByProperty
"ConnectionString":"xxxxxxx",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
“SplitID” :null,
“WorkflowID” :null,
“CustomPropertyName” :”LastExecutedTask”,
“CustomPropertyValue” :”Branch3”
}
{ //RestartWorkflow
"ConnectionString":"xxxxxxx",
"PublisherIdentity":"UserPubliser",
"PublisherSecret":"123",
“SplitID” :1,
“WorkflowID” :”56eaa328-b8fe-43fb-87db-80b118eb8ee4”,
“CustomPropertyName” :null,
“CustomPropertyValue” :null
}
|
历史
FloatingBridge
的第一个版本- 添加了源代码链接