65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Apache Camel 进行企业应用程序集成

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.67/5 (5投票s)

2015年5月6日

CPOL

10分钟阅读

viewsIcon

26115

downloadIcon

240

一篇关于企业应用集成及其使用 Apache Camel 框架实现的文章。

什么是 EIP(企业集成模式)

模式语言教读者如何在有限的问题空间内解决无限种类的问题。因为每次解决的总体问题都不同,所以通过模式的路径以及它们的应用方式也是独一无二的。

企业集成不仅仅是创建一个具有分布式 n 层架构的单一应用程序,这种架构允许单一应用程序分布在多台计算机上。然而,分布式应用程序中的一层不能独立运行,而集成应用程序是独立的程序,它们都可以独立运行,但通过松散耦合的方式相互协调来发挥作用。

随着时间的推移,我们过去常常通过四种主要模式来解决集成问题。这些模式随着技术进步而演进。

  1. 文件传输(大型机时代)
  2. 共享数据库(RDBMS 时代)
  3. 远程过程调用(分布式时代)
  4. 面向消息的中间件(高度分布式异构时代)

这些模型如下图所示。

 

EIP Modes

为什么需要 EAI(企业应用集成)

根据我的经验,任何集成项目最终都会涉及多个应用程序在多个平台上通过多种传输方式进行通信。正如我们可以想象的,在大型企业应用中,这会很快变得非常复杂。大部分复杂性源于三个问题:

  1. 处理应用程序和传输的细节
  2. 为集成问题提供良好的解决方案
  3. 适应新兴技术趋势

这在下图中有所体现。

EAI Model

由于新产品和新应用的出现,几乎所有公司都需要企业应用集成。集成这些应用程序会带来几个问题。每十年都会出现新的范式,例如客户端/服务器通信、面向服务架构(SOA)或云计算。

过去(许多年前)将数据存储在文件中,而如今通常使用 SQL 数据库。有时,在某些用例中甚至需要 NoSQL 数据库。同步远程过程调用或异步消息传递用于通过多种技术(如 RMI、SOAP Web 服务、REST 或 JMS)进行通信。存在许多软件孤岛。然而,这几十年来的所有应用程序和产品都必须相互通信才能完美地协同工作。

Apache Camel

Apache Camel 是一个开源的 Java 框架,致力于让集成对开发人员来说更简单、更容易实现。它通过以下方式实现这一点:

  • 提供所有广泛使用的 EIP 的具体实现
  • 连接到各种传输和 API
  • 易于使用的领域特定语言(DSL)来连接 EIP 和传输

Camel Pack

它是著名的开源 EAI 框架,涵盖了:

架构

逻辑

Apache Camel 架构包含如下所示的七个列出的构建块。Camel Context 是该框架的核心,它是运行时平台。

Camel Building Blocks

这些构建块用于协调核心功能。

  1. 注册表:它允许您查找 bean;默认情况下是 JNDI 注册表
  2. 数据格式:加载的数据格式,如 JSON、RSS、CSV、SOAP 等。
  3. 端点:创建的端点,如 TCP、HTTP 等。
  4. 路由:处理器链;Camel 的核心抽象
  5. 语言:用于定义领域特定语言的表达式语言
  6. 类型转换器:数据格式转换组件
  7. 组件:动态加载组件

 

物理

Camel Flow

由于 CamelContext 是系统的核心,它包含所有语句,如连接器、转换、路由等。它是一个简单的 bean,在 Spring 上下文中不一定是唯一的。

端点通过 URI 实现与应用程序的接口,URI 描述了所使用的协议和组件的地址信息。它可以隐式声明(例如 <from uri="uri =" file:="" in="" "="">),但在当前版本的 Camel 中,只有允许使用 PropertyPlaceHolderConfigurer 将常量外包到属性文件中。每个 URI 可能包含额外的参数来调整行为,例如 jms: queue: testqueue jmsMessageType = Text; ReceiveTimeout = 5000

如上图所示,路由是两个或多个端点之间的通信通道。每个路由都有一个入口点通向一个或多个输出。可以插入执行处理的组件,如处理器;但无需 necesariamente 实现特定接口,框架会执行自省并能够注入消息体。

每条消息都包含一个头部(HashMap 属性)和一个主体(任何可序列化的 Java 对象)。传递的消息被封装在一个 Exchange 对象中,如下图所示。

 

Camel Message

垂直可扩展性

可扩展性是衡量性能和业务扩展的关键参数。在垂直可扩展性方面,可以设置一台机器来增加 CPU、增加内存(RAM)或 JVM 大小,但在任何情况下我们都不会配置框架。

仍然可以作用于其速度组件,特别是在轮询器上。您可以重新定义侦听线程的数量或减少两次轮询之间的延迟,这些设置可以在编写 URI 端点时进行配置。

水平可扩展性

在横向扩展方面,Camel 运行时可以在多台机器或多个容器中冗余,但它们是分隔的,它们互不感知,从这个意义上说,它不是真正的集群。如果两个 Camel 运行时指向同一个应用程序的同一个端点,它们很可能会错误地重复消费同一条消息。Apache Camel 的配置应适当。

实施 - 预设置

项目类型

要开始实施 Apache Camel 模型,开发人员需要创建一个新的 Maven 项目,如下图所示。在此示例中,我正在创建一个新的 Maven 项目,组 ID 为 org.codeproject,Artifact ID 为 camelsample。

Camel pre set up

pom.xml

POM 是 Project Object Model 的缩写,它是 Maven 中最基本的 XML 文件单元。在执行任务或目标时,Maven 会从本地或给定路径提取所需的配置信息来执行目标。通过这种方式,Apache Camel 相关库会从公共仓库中拉取。

我们的 POM 需要包含上面列出的一些基本的 Camel 库。

实现 - 文件模式

在第一个示例应用程序中,Apache Camel 将处理文件到文件的处理。

布局

问题陈述是扫描特定文件夹 (c://temp/in) 中的所有文件,使用 Apache Camel 框架处理它们,并将文件内容传递到输出文件夹 (c://temp/out)。

Camel Context 是 Apache Camel 执行的运行时平台。使用 DefaultCamelContext,路由是使用 RouteBuilder 工厂构建的。在定义的路由中,消息被嵌入以使用定义的自定义处理器进行处理。

Camel File Mode

源代码

Camel 支持使用 Java DSL(领域特定语言)定义路由规则,这避免了使用 RouteBuilder 繁琐的 XML。

RouteBuilder 是一个基类,通过继承它来使用 DSL 创建路由规则。然后将 RouteBuilder 的实例添加到 CamelContext 中。

Camel 使用一个名为 Service 的简单生命周期接口,该接口只有一个 start() 和 stop() 方法。各种类都实现了 Service,例如 CamelContext 以及许多 Component 和 Endpoint 类。当您使用 Camel 时,通常需要启动 CamelContext,它将启动所有各种组件和端点,并激活路由规则,直到上下文再次停止。

如果文件组件的noop参数设置为true,则文件不会以任何方式移动或删除。在这里,Camel也会将idempotent=true设置为true,以避免重复消费相同的文件。这可能非常适合ETL类型的需求。

	private static void camelCopy() throws Exception {
		CamelContext context = new DefaultCamelContext();
		context.addRoutes(new RouteBuilder() {
			public void configure() {
				from("file:c://temp/in?noop=true")
				.process(new Processor() {
					public void process(Exchange exchange) throws Exception {
						System.out.println("Within Interim Processor ");		
						System.out.println("We just processed the file named: "
								+ exchange.getIn().getHeader("CamelFileName"));						
					}
				})
				.to("file:c://temp/out");
			}
		});
		System.out.println("Context starts here..");
		context.start();
		Thread.sleep(10000);
		context.stop();
		System.out.println("Context ends here..");
	}

处理器代码

Processor 接口用于实现消息交换的消费者或实现消息转换器。该进程可以在路由中用作匿名内部类,如下所示:

	public void process(Exchange exchange) throws Exception {
		System.out.println("Within Processor bean");		
		System.out.println("We just processed the file named: "
				+ exchange.getIn().getHeader("CamelFileName"));
	}

这种内部模式适用于快速编写一些代码。如果内部类中的代码变得有点复杂,当然建议将其重构为一个单独的类。

项目结构

基于文件的 Apache Camel 处理的 Maven 项目结构已附在此处。

Camel Project Structure

执行后,给定的问题陈述成功运行。在源文件夹中,有 2 个文件;一个 txt 文件和一个 xml 文件。它们由 Apache Camel 处理;然后交付到输出文件夹。结果反映在控制台窗口中。

配置驱动流程

Spring 是目前最流行的控制反转(IoC)Java 容器。其核心框架允许您将“bean”连接起来,形成应用程序。这种连接是通过 XML 配置文件完成的。

	private static void camelConfigCopy() throws Exception {
		System.out.println("Context starts here..");
		new ClassPathXmlApplicationContext("/FileCopy.xml");
		Thread.sleep(10000);
		System.out.println("Context ends here..");
	}

有几种不同的方法来配置组件和端点。当使用 XML 配置在 Camel 中定义路由时,您可能希望定义一些路由。当您使用 routeContext 标签时,它们是分开的,不能重用 camelContext 中定义的现有标签 onException、intercept、dataFormats 和类似的横切功能。换句话说,routeContext 标签目前是隔离的。

routeContext 中定义的路由可以被多个 camelContext 重用。然而,它只是被重用的定义。在运行时,每个 CamelContext 将根据定义创建自己的路由实例。

Camel Config driven File Processor

实现 - 消息模式

消息是系统在使用消息通道时相互通信的实体。消息从发送方流向接收方,方向是单向的。第二个示例是由基于消息的处理驱动的。

布局

第二个问题是扫描给定文件夹中的所有文件,将它们打包成 JMS 消息,根据收到的内容进行路由,并处理数据以完成。该模型如下图所示。

目标是展示使用 Apache Camel 框架进行基于消息的处理。

Camel Message Mode

内容基于路由(CBR)是一种消息路由器,它根据消息的内容将消息路由到目的地。内容可以是消息头、有效负载数据类型、有效负载本身的一部分——几乎是消息交换中的任何内容。

在给定的示例中,IncomingOrder 消息被注入到 CBR 处理中,以通过三个通道(xml、csv、bad)进行路由。

源代码

为了实现第二个问题,源代码是使用 CamelContext、RouteBuilder、Processor 等相同的方法编写的。

在这里,我们使用文件名扩展名来确定特定的订单消息应该发送到 CSV 订单队列还是 XML 订单队列。这在下面的源代码中有所描述。

	private static void FileToMsg() throws Exception {
		CamelContext context = new DefaultCamelContext();
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://");
        context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                // load files into the JMS queue
                from("file:c://temp/in?noop=true").to("jms:incomingOrders");
        
                // content-based router
                from("jms:incomingOrders")
                .choice()
                    .when(header("CamelFileName").endsWith(".xml"))
                        .to("jms:xmlOrders")  
                    .when(header("CamelFileName").regex("^.*(csv|csl)$"))
                        .to("jms:csvOrders")
                    .otherwise()
                        .to("jms:badOrders").stop()
                .end()
                .to("jms:continuedProcessing");
                
                from("jms:xmlOrders").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received XML order: " 
                                + exchange.getIn().getHeader("CamelFileName"));   
                    }
                });
                from("jms:csvOrders").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received CSV order: " 
                                + exchange.getIn().getHeader("CamelFileName"));   
                    }
                });
                from("jms:badOrders").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received bad order: " 
                                + exchange.getIn().getHeader("CamelFileName"));   
                    }
                });   
                
                // test that our route is working
                from("jms:continuedProcessing").process(new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("Received continued order: " 
                                + exchange.getIn().getHeader("CamelFileName"));   
                    }
                });                        
            }
        });
		System.out.println("Content Based Route starts here..");
		context.start();
		Thread.sleep(10000);
		System.out.println("Content Based Route ends here..");
		context.stop();
	}

在上述代码中,CamelFileName 头由 FTP 消费者设置以获取文件名。为了实现 CBR 所需的条件路由,Camel 在 DSL 中引入了一些关键字。choice 方法创建一个 CBR 处理器,条件通过在 choice 之后结合 when 方法和谓词来添加。

在这个示例中,Camel 的创建者选择了 contentBasedRouter 作为方法名,以匹配 EIP,但他们仍然坚持使用 choice,因为它读起来更自然。

流程图

流程图提供了流程步骤的视觉表示。

在我们的消息示例程序中,运行时层 CamelContext 和消息层 JMS/ActiveMQ 作为第一步被实例化。然后,根据我们的用例,路由和配置被覆盖。

Camel Message Process

对于传入的文件信息,使用 JMS 消息进行基于内容的路由处理。内容根据 XML 或 CSV 文件类型进行拆分。执行相关处理器后,程序将继续执行 ContinuedProcessor。

其余文件类型被路由到 BAD 类别并停止处理。

Camel Execution Result

在 @in 文件夹中执行包含两个文件(firstmeet.txt 和 testxml2.xml)的给定示例程序后,结果如上所示。根据 CBR 逻辑,txt 文件被路由为 BAD 类别,第二个文件根据给定的文件类型被处理为 XML 类别。

 

关注点

Apache Camel 支持企业集成项目中的所有要求,例如错误处理、事务、可伸缩性和监控。它应该始终被评估为重量级 ESB 的轻量级替代方案。

这篇实践文章帮助您理解 EAI、Camel 的核心概念和实现步骤。

历史

初始版本

© . All rights reserved.