65.9K
CodeProject 正在变化。 阅读更多。
Home

FloatingBridge

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2017年8月23日

CPOL

14分钟阅读

viewsIcon

13845

downloadIcon

166

一个基于MySQL和.NET的简单消息\工作流\ETL系统

引言

FloatingBridge 是一个基于MySQL和.NET的简单消息\工作流\ETL系统。MySQL用作系统的后端引擎。前端使用.NET WPF(Windows Presentation Foundation)设计。业务逻辑由.NET Windows服务处理。

本文档简要介绍了该系统。安装文件和详细文档随本文档提供。

背景

我曾大量使用过Microsoft BizTalk Server。我也使用过一些其他替代系统。我的初衷是构建一个基于现代RDS的简单消息\工作流\ETL系统,该系统使用JSON数据类型而非XML。

特点

系统附带一个管理控制台,可帮助您配置消息\工作流系统。管理控制台具有以下功能:

  • 后端配置 - 首次启动控制台时,会运行所有必需的脚本来初始化MySQL数据库。
  • 服务配置 - 系统提供3种服务。它们将在下文介绍。但是,我们可以配置任意数量的服务实例。通常,我们会为每个应用程序域配置一个。
  • 消息系统配置 - 可以通过配置相应的消息类型、发布者、订阅者、发布和订阅来配置。
  • 工作流配置 - 配置一个由分支任务组成的序列,其业务逻辑使用JSON定义。
  • 管理消息和工作流 - 旧消息可被标记为冗余(孤立),等待中的工作流可被重新启动。
  • 历史记录 - 可查看消息和工作流的历史记录。
  • 转换 - 可以使用JUST - https://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation 创建转换JSON的任务。
  • 自定义任务 - 系统自带一些已知的自定义任务。
  • API - 系统提供一个API,外部系统可调用该API执行系统操作,如发布消息、拉取订阅消息、发布工作流、运行同步工作流和重新启动等待中的工作流。核心API可用于实现您自己的自定义任务。

系统架构

下图说明了系统的不同组件。

数据库引擎

数据库引擎是系统的核心组件。MySQL数据库用作引擎。引擎存储系统的配置和跟踪数据。

存储以下配置数据:

  1. 消息配置 - 消息类型,其应用程序域
  2. 工作流配置 - 工作流类型,应用程序域和业务逻辑
  3. 能够发布消息和工作流的发布者
  4. 能够订阅消息的订阅者
  5. 既可以是工作流的一部分,也可以是监听器或订阅者的任务

此外,所有与消息和工作流相关的历史数据都将存储起来用于跟踪。

订阅者服务

这是一个.NET Windows服务,它轮询数据库以查找订阅任务。订阅任务只是从数据库读取已发布的消息并进行处理。系统提供了内置订阅任务,将在最后部分介绍。

监听器服务

这也是一个.NET Windows服务,用于运行与监听器相关的任务。监听器是一种任务,它从外部源读取数据,并根据从外部源收集的数据向数据库发布消息。

工作流服务

这是.NET Windows服务,它按照工作流的业务逻辑定义的顺序运行与工作流相关的各种任务。它执行以下任务:

  • 从数据库读取工作流消息,并根据业务逻辑和当前状态执行下一步操作。
  • 运行下一个同步任务,并将状态保存到引擎。
  • 运行下一个出站任务,并等待入站任务。
  • 根据重试次数重试任务。
  • 发生故障时运行错误处理程序或错误通知任务。

管理控制台

这是一个用WPF(Windows Presentation Foundation)编写的应用程序,使用户能够管理消息和工作流的配置和跟踪。

FloatingBridge API

FloatingBridge API是一个简单的DLL,可以在外部项目中引用。它位于安装文件夹中,也可作为NuGet包提供。通过引用该API,外部实体或系统可以执行以下任务:

  • 发布消息
  • 拉取订阅消息
  • 发布工作流
  • 运行同步工作流
  • 重新启动等待中的工作流

FloatingBridge 术语

理解FloatingBridge中使用的术语非常重要,因为管理控制台完全基于这些术语,并且围绕这些术语进行各种操作。

  • 应用程序域 - 应用程序的逻辑分组,关联了消息类型和工作流。每个服务都在一个应用程序域下运行。
  • 服务 - 一个.NET Windows服务(监听器、订阅者或工作流),在一个应用程序域下运行。服务需要两个输入参数:
    1. 应用程序域
    2. MySQL数据库引擎的连接字符串
  • 消息类型 - 定义消息的类型。发布者可以发布此类型的消息,订阅者可以拉取此类型的消息。
  • 发布者 - 有权向系统发布消息的客户端。
  • 订阅者 - 有权订阅系统消息的客户端。
  • 任务 - 系统运行的一个函数。任务可以是发布任务、订阅任务或工作流任务。
  • 监听器 - 由监听器服务定期运行,监听需要发布的消息。例如,一个读取目录中文件的任务,一旦找到文件,就会根据文件内容发布消息。
  • 发布 - 此配置决定了发布者或监听器是否可以发布特定消息类型。
  • 订阅 - 此配置决定了订阅者或任务是否可以从系统中拉取消息。
  • 活动消息 - 已发布到系统但正在等待被拉取的消息。
  • 孤立消息 - 在管理控制台中,活动消息可以被标记为孤立。一旦被标记为孤立,该消息就无法被拉取。
  • 工作流(工作流类型) - 要执行的任务序列。这些任务以JSON格式定义,称为业务逻辑。工作流与应用程序域相关联。工作流服务执行工作流中的任务。
  • 孤立工作流 - 一个等待手动外部输入或入站任务的工作流。

如上所述,任务是系统运行的一个函数。共有8种不同类型的任务。它们是简单的C#类,需要实现FloatinBridge.Core程序集中的8个接口之一。
这8种不同的任务每种都有不同的目的:

  • 发布者任务 - 由监听器服务运行的任务。该任务返回一个将被发布到系统中的消息。
  • 订阅者任务 - 由订阅者服务运行的任务。服务拉取该任务订阅的所有消息并将其传递给该任务。
  • 工作流任务 - 由工作流服务运行的同步工作流步骤。服务将消息发送给任务,任务将消息返回给服务。
  • 工作流拆分器任务 - 与上一个任务类似,唯一不同的是在此情况下,任务将多个消息返回给服务。当我们需要将工作流拆分为多个工作流时,可以使用此任务。当工作流被拆分为多个工作流时,会为每个工作流分配一个称为SplitID的值。此ID决定了从原始工作流分离出来的独立工作流。
  • 出站任务 - 异步工作流步骤。执行此任务后,工作流将进入等待状态。工作流需要通过API手动恢复,或通过入站任务恢复。
  • 入站任务 - 异步工作流步骤,用于恢复等待中的工作流。
  • 错误处理程序任务 - 这是一个补偿性任务,与工作流步骤相关联,并在步骤失败时执行。一旦执行此任务,工作流将继续执行此任务的输出。
  • 错误通知任务 - 与上一个任务类似,只是工作流在完成其任务后不会继续执行。工作流需要通过API手动恢复。

安装与配置

安装过程是一个正常的安装过程,您只需双击MSI文件,然后选择系统上的应用程序文件夹。安装程序会将所有组件复制到应用程序文件夹。

1202556/Screen3.png

可以使用菜单上的各种选项并选择正确的工件来配置消息\工作流系统。本文档不附带每个配置选项的截图。

下图显示了用于配置工件的各种选项。右侧面板显示了消息历史记录

1202556/Screen1.png

已配置的工作流

1202556/Screen2.png

工作流业务逻辑

工作流的流程和功能由业务逻辑定义。业务逻辑是一个JSON对象。下面是上面部分已配置工作流的JSON表示。

{ 
   "Start":{ 
      "Next":[ 
         "Branch1",
         "Branch2",
         "Branch3"
      ],
      "TaskID":41,
      "OnFailure":null,
      "Identifier":"FirstTask",
      "TaskRetries":0,
      "RunCondition":null,
      "TaskProperties":null,
      "IncommingTaskID":0,
      "IncommingTaskProperties":null
   },
   "Tasks":[ 
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":23,
         "OnFailure":"NotifierTask",
         "Identifier":"Branch1",
         "TaskRetries":5,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Norway",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/Branch1.json"
         },
         "IncommingTaskID":31,
         "IncommingTaskProperties":{ 
            "FileName":"D:/Test/Branch1In.json"
         }
      },
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":23,
         "OnFailure":null,
         "Identifier":"Branch3",
         "TaskRetries":-1,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Denmark",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/branch3.json"
         },
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      },
      { 
         "Next":[ 
            "Final"
         ],
         "TaskID":35,
         "OnFailure":null,
         "Identifier":"Branch2",
         "TaskRetries":-1,
         "RunCondition":{ 
            "Operator":"stringequals",
            "Evaluated":"Sweden",
            "Evaluator":"#valueof($.MessageBody.Country)"
         },
         "TaskProperties":{ 
            "FileName":"D:/Test/Branch2.json"
         },
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      },
      { 
         "Next":null,
         "TaskID":41,
         "OnFailure":null,
         "Identifier":"Final",
         "TaskRetries":-1,
         "RunCondition":null,
         "TaskProperties":null,
         "IncommingTaskID":0,
         "IncommingTaskProperties":null
      }
   ],
   "OnFailure":null,
   "TaskRetries":0
}

上述JSON初看可能令人望而生畏,但它是使用管理控制台配置的。

业务逻辑JSON包含以下属性:

  • Start - 工作流的第一步或起点。这是一个Task JSON对象。
  • Tasks - 工作流中所有其他步骤的数组。一个包含Task JSON对象的JSON数组。
  • TaskRetries - 全局设置,指定一个步骤在失败前应重试的次数。此设置会被单个步骤设置覆盖。
  • OnFailure - 全局设置,指定在发生故障时应执行的ErrorHandlerErrorNotification任务步骤。此设置会被单个步骤设置覆盖。

任务

任务代表工作流中的单个步骤。它具有以下属性:

  • Identifier - 一个简单的string,用于在工作流中标识任务。在工作流中必须唯一。
  • TaskID - 这是数据库中任务的实际ID。创建任务时会自动设置。可以修改。
  • OnFailure - 如果已定义,则覆盖工作流的相应属性。
  • TaskRetries - 如果已定义,则覆盖工作流的相应属性。
  • Next - 一个string数组,包含此task之后需要执行的task标识符列表。
  • RunCondition - 一个JSON对象,定义了此任务的执行条件。当我们需要根据不同条件分支工作流时使用。
  • TaskProperties - 一个JSON对象,包含执行步骤的属性。例如,对于JSON写入器,它将包含要写入的文件名。
  • IncommingTaskID - 如果此步骤是单向出站步骤,则为入站任务步骤的标识符。
  • IncommingTaskProperties - 此入站任务的相应JSON任务属性。

任务属性

Task属性是一个JSON对象,定义了任务执行的属性。每个任务都有自己的JSON模式。例如,对于JSON写入器tasktask属性如下:

{ 
   "FileName":"D:/Test/Out.json"
}

Task属性可以通过JUST - JSON Under Simple Transformationhttps://codeproject.org.cn/Articles/1187172/JUST-JSON-Under-Simple-Transformation)动态化。

例如,我们想根据message body中的value来设置“FileName”。

{ 
   "FileName":"#valueof($.MessageBody.FileName)"
}

条件

这决定了一个步骤是否应该运行。它是一个具有这3个属性的JSON对象:

  • Evaluator - 一个将要被评估的表达式。
  • Evaluated - 与评估器进行比较的表达式。
  • Operator - 评估运算符。支持以下运算符:
    • stringequals
    • stringcontains
    • mathequals
    • mathgreaterthan
    • mathlessthan
    • mathgreaterthanorequalto
    • Mathlessthanorequalto

以上所有3个属性都可以使用JUST实现动态化。

{ 
   "Operator":"stringequals",
   "Evaluated":"Norway",
   "Evaluator":"#valueof($.MessageBody.Country)"
}

转换器

JSON转换器任务是系统的一个内置任务。该任务使用JUST将一个JSON转换为另一个JSON。系统中一些转换可能相当复杂。因此,我决定将转换器添加为独立功能,而不是将其添加到task属性中。

Using the Code

FloatingBridge程序集随安装包一起提供。它也可通过Nuget获得。

Install-Package FloatingBridge -Version 1.0.0  

可以按以下方式初始化一个新的FloatingBridgeClient实例:

FloatingBridgeClient client = new FloatingBridgeClient(connectionString)

发布消息

可以通过API并调用PublishMessage方法来发布消息。

User user = new User() {  Country = "Norway",  Language = "Norsk",  Name = "Ola"  }; 
client.PublishMessage("UserAppDomain", "UserMessage", "UserPublisher", 
"123",JsonConvert.SerializeObject(user), null);  

拉取订阅消息

可以通过API并调用Getsubscribedmessages方法来拉取Message

var messages = client.GetSubscribedMessages("UserAppDomain", 
"UserMessage", "UserSubscriber", "123");   
foreach (Message message in messages) 
{            
	 Console.WriteLine(JsonConvert.SerializeObject(message));
}  

结果是:

{   
	"ID":"71ca9177-19d0-4aba-bdb0-6d8e414114d4",  
	"Body":{"Name":"Ola","Country":"Norway","Language":"Norsk"}, 
	"CustomProperties":{"PublisherIdentity":"UserPublisher"},  
	"Timestamp":"2017-08-09T14:44:41",
	"UniqueID":"46cd3ec4-7d11-11e7-b4fe-a08cfd1a3662",  
	"MessageTypeID":1, 
	"ConditionExpression":{"Operator":"stringequals",
	"Evaluated":"Norway","Evaluator":"#valueof($. Body.Country)"}
} 

发布工作流

client.PublishWorkflow("UserAppDomain", "UserWorkflow", "UserPublisher", 
"123", "{\"Country\":\"Sweden\"}", null);

同步运行工作流

需要指定等待完成的步骤的步骤标识符。一旦此步骤完成,将返回此步骤的结果。

client.RunWorkflow("pinecone", "SimpleWorkflow", 
"CreditCard", "abcdefg", "{}", null,"​AddApplication​");

根据工作流ID重新启动工作流

处于等待(孤立)状态的工作流可以通过API使用workflow ID重新启动。

client.RestartWorkflow("UserPublisher", "123", 
"{}", "56eaa328-b8fe-43fb-87db-80b118eb8ee4");

根据自定义属性重新启动工作流

在上面的示例中,了解工作流的workflow ID很重要。如果外部系统想要使用API重新启动workflow,那么系统可能希望基于某个关联属性来执行此操作。

因此,我们还需要能够根据自定义属性重新启动workflow的功能。

这是一个使用自定义属性名称和值来启动所有等待中的workflows的示例代码:

client.RestartWorkflowsByProperty("UserPublisher", "123", "LastExecutedTask", "Branch3", "{}");

使用FloatingBridge Core创建自定义任务

必须引用应用程序文件夹中的FloatingBridge.Core.dll。也提供Nuget。

Install-Package FloatingBridge.Core -Version 1.0.0

提供了总共8个接口,它们都具有相同的Run方法,但参数和返回类型不同。下表概述了这些接口。

接口名称 输入参数 返回类型
IPublisherTask
  • string taskPropertiesJson
List<TaskMessage>
ISubscriberTask
  • TaskMessage input
  • string taskPropertiesJson
void
IWorkflowTask
  • TaskMessage input
  • string taskPropertiesJson
TaskMessage
IWorkflowSplitterTask
  • TaskMessage input
  • string taskPropertiesJson
SplitTaskMessage
IIncommingTask
  • string taskPropertiesJson
TaskMessage
IOutgoingTask
  • TaskMessage input
  • string taskPropertiesJson
void
IErrorHandlerTask
  • TaskMessage input
  • string taskPropertiesJson
TaskMessage
IErrorNotificationTask
  • TaskMessage input
  • string taskPropertiesJson
void

内置任务

上一节介绍了Core中提供的各种接口,这些接口有助于我们开发自己的任务。

但是,系统已提供了一些内置任务。

IErrorHandlerTask

命名空间 - FloatingBridge.Core.Tasks.ErrorHandler

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • JsonTransformer
  • MessagePublisher
  • MySqlDataConnector
  • MySqlDataWriter
  • Passthrough
  • RESTConnector
  • WorkflowPublisher

IErrorNotificationTask

命名空间 - FloatingBridge.Core.Tasks.ErrorNotification

  • JsonWriter - 将MessageBody写入JSON文件
  • XmlWriter - 将MessageBody转换为XML并写入XML文件
  • FlatFileWriter - 将MessageBody转换为平面文件格式并写入文件
  • MessagePublisher - 将MessageBody发布到FloatingBridge发布
  • MySqlDataWriter - 在配置的MySQL上执行配置的存储过程或文本
  • RESTConnector - 执行配置的REST API
  • WorkflowPublisher - 将MessageBody发布为新的FloatingBridge工作流

IIncommingTask

命名空间 - FloatingBridge.Core.Tasks.Incomming

  • JsonReader - 从指定的JSON文件读取
  • XmlReader - 从指定的XML文件读取
  • FlatFileReader - 从指定文件读取平面文件格式
  • MySqlDataConnector - 连接到配置的MySQL并从SP或文本读取
  • RESTConnector
  • MessageSubscriber - 订阅配置的FloatingBridge订阅

IOutgoingTask

命名空间 - FloatingBridge.Core.Tasks.Outgoing

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MessagePublisher
  • MySqlDataWriter
  • RESTConnector
  • WorkflowPublisher

IPublisherTask

命名空间 - FloatingBridge.Core.Tasks.Publisher

  • JsonReader
  • XmlReader
  • FlatFileReader
  • MySqlDataConnector
  • RESTConnector
  • JsonMultiFileReader - 从文件夹读取多个JSON文件
  • XmlMultiFileReader - 从文件夹读取多个XML文件
  • MultiFlatFileReader - 从文件夹读取多个平面文件

ISubscriberTask

命名空间 - FloatingBridge.Core.Tasks.Subscriber

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MySqlDataWriter
  • RESTConnector

IWorkflowTask

命名空间 - FloatingBridge.Core.Tasks.Workflow

  • JsonWriter
  • XmlWriter
  • FlatFileWriter
  • MessagePublisher
  • MySqlDataWriter
  • RESTConnector
  • WorkflowPublisher
  • Passthrough - 将消息传递给下一个任务。当您想为分支配置多个条件时,此任务很有用。每个通过任务都应有一个必需的运行条件。
  • SyncWorkflowRunner - 同步运行另一个FloatingBridge工作流。
  • DependentWorkflowStarter - 重新启动另一个FloatingBridge工作流。
  • JsonTransformer - 使用JUST转换MessageBody
  • MySqlDataConnector

IWorkflowSplitterTask

命名空间 - FloatingBridge.Core.Tasks.Workflow

  • JsonSplitter - 使用JUST拆分MessageBody
  • MySqlDataRowReader - 连接到配置的MySQL,并按读取的每一行拆分消息。

任务属性

每个任务都有其预定义的任务属性配置。以下是初始化内置任务属性的示例。在创建自己的自定义任务时,需要定义相应的任务属性。

任务 任务属性示例
JsonWriter {“FileName” : “D:/test/out.json”}
XmlWriter {“FileName” : “D:/test/out.xml”, ”RootElementName” : “Root”}
FlatFileWriter {“FileName” : “D:/test/out.txt”,<br> ”FieldDelimiter” : “, ” , ”RecordDelimiter” : “\r\n”}
Passthrough  
JsonMultiFileReader {“DirectoryName” : “D:/test”}
XmlMultiFileReader {“DirectoryName” : “D:/test”}
MultiFlatFileReader {“DirectoryName” : “D:/test”, “Pattern” : “*.csv”}
RESTConnector
{
	"Url":"Http://www.yahoo.com", 
	"Method":"POST",
	"Headers":{"header1":"value1","header2":"value2"}, 
    "Body":"#valueof($.MessageBody.RestRequest)"
}
MySqlDataWriter
{ // Example for stored procedure
	"ConnectionString": “xxxxxxxx”,
	 "CommandText": null,
	 "StoredProcedureName": “StoreValues”,
	 "StoredProcedureParamaters":   [
	  {“Name”:”param1”,”Value”:”Value1”},
	  {“Name”:”param2”,”Value”:”Value2”}
	 ]
	}

	{ // Example for text
	 "ConnectionString": “xxxxxxxx”,
	 "CommandText": "#xconcat(UPDATE user SET username = 
                    ,#valueof($.value.Window), WHERE id = 4)",
	 "StoredProcedureName": null,
	 "StoredProcedureParamaters":   null
	}
MySqlDataConnector MySqlDataWriter配置相同。
MySqlDataRowReader MySqlDataWriter配置相同。
JsonReader {“FileName” : “D:/test/in.json”}
XmlReader {“FileName” : “D:/test/in.xml”}
FlatFileReader
{
	 “FileName” : “D:/test/in.txt”, 
	 ”FieldDelimiter” : [“,”,”;”] , 
	 ”RecordDelimiter” : [“\r\n”,”\n”],
	 “HeaderIdentifier”:[“header”,”start”], //All records 
	                    //containing these strings are considered header
	 “FooterIdentifier”:[“footer”,”end”] // All records 
	                    //containing these strings are considered footers
						}
JsonTransformer {“TransformerID” : 2}
JsonSplitter {“ArrayPath” : “$.MessageBody.Organisation.Users”}
MessagePublisher
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"MessageTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
MessageSubscriber
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"MessageTypeName":"UserMessage",
	"SubscriberIdentity":"UserPubliser",
	"SubscriberSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
WorkflowPublisher
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"WorkflowTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"}
}
SyncWorkflowRunner
{
	"ConnectionString":"xxxxxxx", 
	"ApplicationDomainName":"UserAppDomain",
	"WorkflowTypeName":"UserMessage",
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	"CustomProperties":{"source":"scanner","OS":"windows"},
	“StepIdentifier” :”AddRolesStep”
}
DependentWorkflowStarter
{ //RestartWorkflowsByProperty
	"ConnectionString":"xxxxxxx", 
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	“SplitID” :null,
	“WorkflowID” :null,
	“CustomPropertyName” :”LastExecutedTask”,
	“CustomPropertyValue” :”Branch3”
	}
						 
	{ //RestartWorkflow
	"ConnectionString":"xxxxxxx", 
	"PublisherIdentity":"UserPubliser",
	"PublisherSecret":"123",
	“SplitID” :1,
	“WorkflowID” :”56eaa328-b8fe-43fb-87db-80b118eb8ee4”,
	“CustomPropertyName” :null,
	“CustomPropertyValue” :null
}

历史

  • FloatingBridge的第一个版本
  • 添加了源代码链接
© . All rights reserved.