使用 Spring AMQP 和 RabbitMQ 创建微服务 - 简单介绍
本文将介绍如何使用 Spring AMQP 和 RabbitMQ 创建微服务。
引言
RabbitMQ 是小型应用程序之间相互通信的绝佳方式。这就是为什么它成为开发微服务的理想技术。微服务是小型应用程序,可以快速开发、轻松隔离测试,并且易于监控和维护。为了促进通信,使用 RabbitMQ 或任何其他消息代理技术是快速建立微服务生态系统的方法之一。相比其他消息代理技术,我更偏爱 RabbitMQ 有以下几点原因:
- RabbitMQ 主要支持基于队列和异步的消息交换。这允许发送方尽可能多地发送消息,而无需关心它们是否在目的地被处理。
- RabbitMQ 可以用作临时消息存储和重试机制,你可以编写微服务,使得任何未确认的消息都会被重试,直到它被处理或被强制从队列中移除。正确设置的 RabbitMQ 永远不会丢失消息(我相信其他许多替代方案也是如此,因此类似的技术也可以应用于那些消息代理)。
- 你可以将系统设计为使用 RabbitMQ 作为企业服务总线(ESB)。RabbitMQ 支持多种不同的消息路由方式,可以利用这些方式像 ESB 一样移动消息。
- 它拥有一个良好、简单易用的安全系统。
- 它的设置和管理相当容易。
人们可能会问的一个问题是,同步消息处理怎么办?我曾经认为同步处理是重要且必要的。但事实并非如此。据我所知,两个服务应用之间的任何同步操作都可以用异步方式表达,缺点可能是操作可能 थोड़ा 难以理解。这样做的好处是,两个服务应用不必同时在线才能互相通信,任何一方都可以尽快发送消息,并期望另一方成功处理。而这将会实现,因为 RabbitMQ 保证消息会被送达。这基本上是异步消息服务架构的入门介绍。
当我开始使用 RabbitMQ 时,我用的是 .NET,我不太喜欢它。我需要的是一个拥有良好文档和对 RabbitMQ 基础 API 进行了良好封装的成熟框架。在我离开那份工作之前,我开始尝试 Spring AMQP。我喜欢我所看到的。这篇教程文章是我所学知识的总结。
先决条件
为了运行这个示例项目及其相关的客户端应用程序,你必须完成以下操作:
- 安装最新更新的 JDK 1.7。
- 安装最新的 Maven 3.3.x。
- 安装 Erlang 和 RabbitMQ。你可以访问 RabbitMQ 官网查看安装指南。
- 熟悉 RabbitMQ,你需要创建一个用户、一个虚拟主机,并将该用户与虚拟主机关联。
本教程面向对 Spring 框架和 RabbitMQ 消息框架有先前经验的开发人员。如果你不熟悉这些,请跳过本教程。此外,了解一些 Maven 构建工具的知识也有助于你理解本教程。
对于这个示例项目,我为 RabbitMQ 创建了一个名为“testuser”的用户,密码是“secret”。这仅用于测试目的。请不要在生产系统中使用此凭证。我给我的“testuser”分配了管理员角色。然后我创建了一个名为“hanbo1”的虚拟主机,并给予该用户对这个虚拟主机的完全访问权限。这就是我为这个示例项目必须做的预开发设置。再次强调,如果你觉得这些简单的准备步骤难以理解,你应该在继续阅读之前先熟悉相关技术。
目标
我对这个示例项目的目标如下:
- 我想创建一个示例微服务,可以接收并处理模拟的用户请求。示例请求应以 POJO(Plain Old Java Object)的形式接收。
- 我想创建一个示例客户端应用程序,可以以 POJO 的形式发送请求,该请求将被示例微服务接收。
这两个目标应该足以展示 Spring AMQP 的能力。微服务和客户端都将请求(和响应)作为常规 POJO 处理,请求和响应的传输将以 JSON 字符串的形式进行。POJO 和 JSON 字符串之间的转换由 Spring 或其他第三方组件自动处理。
RabbitMQ 快速入门
为了能够向 RabbitMQ 消息代理发送或接收消息,你需要以下信息:
- 服务器 URI、用户名、密码和虚拟主机。这些帮助服务应用程序连接到 RabbitMQ 消息代理。
- 交换机名称、队列名称和路由键。
RabbitMQ 和基于 JMS 的消息代理的区别在于,RabbitMQ 主要基于队列,非常适合异步消息处理。JMS 消息代理同时支持同步和异步消息处理。消息流经 RabbitMQ 消息代理的方式与 JMS 消息代理不同。消息首先到达一个由用户手动或应用程序自动设置的交换机。然后,交换机存储一个或多个路由键,每个路由键会将消息路由到一个指定的队列。订阅该队列的任何消费者随后会收到消息。每个队列可以有多个监听器,但一条消息一次只能被一个消费者消费。
关于 RabbitMQ 以及可能其他一些类型的消息系统,一个很酷的特性是,如果客户端接收到的消息未被确认(acknowledge),消息会返回到队列中。在 RabbitMQ 的情况下,消息会返回到队列的顶部。这作为一个重试机制非常棒。为了利用这个优势,你需要做的是配置你的应用程序上下文,使得每当发生异常时,未处理的消息都会返回到队列。本文将演示如何(以最小的努力)实现这一点。但这样做也可能会适得其反。想象一下,当一个未处理的异常重复发生时,消息也会重复地返回到队列,应用程序会陷入一个循环。纠正这个问题最简单的方法是为所有已知的异常设置一个 catch 块,并希望没有未处理的异常会导致这类问题。如果未捕获的异常是一个担忧,那么就需要找到一种方法来捕获所有异常。但这样做,你可能会失去使用 RabbitMQ 作为重试机制的优势。
这差不多就是快速入门的全部内容了。接下来,我将解释我创建这个可用的服务和客户端程序的每一步。
步骤 1 -- 创建项目文件夹结构
本教程包含两个项目,一个是服务项目,另一个是客户端程序。两个项目应该有相似的文件结构。以下是服务应用程序的文件结构:
<project base dir> | -- src | -- main | -- java | -- org | -- hanbo | -- amqp | -- sample | -- messages | -- UserDetail.java | -- services | -- UserDetailService.java | -- Main.java | -- RabbitMessageListener.java | -- resources | -- application-context.xml | -- log4j.properties | ---- pom.xml
客户端程序的项目文件结构几乎相同。主要区别在于包目录。下载示例 zip 压缩包(在顶部)后,你应该能够查看到。
步骤 2 - 设置 Maven POM 文件
我做的第一件事是为我的项目创建一个 Maven pom.xml。在里面我定义了所有我需要的依赖项。你可能会问,一个人怎么知道要使用哪些依赖项。我是这样找出它们的:
- 这是一个基于 Spring 的应用程序,所以我将包含一些最核心的 Spring 依赖。比如 spring-core、spring-context 和 spring-beans。一旦我有了这三个核心依赖,我就可以添加 Spring AMQP 的依赖。
- 我只是在尝试 Spring AMQP,通过在线搜索我发现我唯一需要的是 spring-amqp 和 spring-rabbit。
- 之后,我需要使用 Jackson 框架来进行常规 Java 对象和 JSON 格式字符串之间的转换。所以我添加了一些 Jackson 相关的依赖项。
- 我还添加了几个与日志相关的依赖项,这样当我运行示例项目时,我可以通过查看一个格式美观的日志文件来进行故障排查(如果需要的话)。
我的 Maven POM 文件如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.hanbo.spring-amqp-test</groupId>
<artifactId>spring-amqp-testapp</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spring-amqp-testapp</name>
<url>http://maven.apache.org</url>
<properties>
<spring.version>3.2.11.RELEASE</spring.version>
<spring.amqp.version>1.3.9.RELEASE</spring.amqp.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
</dependencies>
<build>
<finalName>amqp-test</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
POM 文件的第一部分定义了所有必要的依赖项。Maven 的一个很酷的地方在于,在构建二进制可执行文件时,它不仅会下载指定的依赖项,还会下载 POM 文件中未指定的任何所需依赖项。POM 文件的第二部分定义了构建过程。我不仅选择了编译器插件,还选择了依赖插件,以便它将所有需要的 jar 包放到一个指定的位置。这使我可以轻松地将我的 jar 包以及所有依赖的 jar 包复制到我想要的任何地方。
请注意,我使用的是旧版本的 spring-amqp 依赖。当你下载新版本的依赖时,这个示例项目中的某些东西可能无法工作。如果发生这种情况,请尽力在网上查找答案。
步骤 3 - 主入口
服务项目最简单的部分是主入口类,这是源代码:
package org.hanbo.amqp.sample;
import org.springframework.context.support.GenericXmlApplicationContext;
public class Main
{
@SuppressWarnings("resource")
public static void main(String[] args)
{
GenericXmlApplicationContext context =
new GenericXmlApplicationContext("classpath:/application-context.xml");
context.registerShutdownHook();
}
}
在这个类中,只有一个方法,即整个应用程序的主入口。它只做两件事,首先从类路径加载应用程序上下文文件。然后它会注册关闭钩子。关闭钩子用于当应用程序从系统接收到关闭信号时,在退出前执行清理工作。
当应用程序启动时,它将永远运行。当用户按下 Ctrl+C 时,应用程序将捕获关闭信号并执行清理。
步骤 4 - 应用程序上下文
在我详细介绍如何创建用于定义应用程序上下文的 XML 文件之前,我只想指出,拥有应用程序上下文的 XML 定义并非必需。作为替代方案,你可以为依赖注入配置创建一个 Java 类(POJO)。我选择使用 XML 的原因是我有很多项目使用基于 XML 的应用程序上下文定义,从这些现有项目中创建一个示例项目对我来说非常容易。我也喜欢将配置与 Java 源代码分离。它使源代码更清晰。这只是个人偏好。
这方面缺少文档。我曾经在网上做了大量的研究,试图找出我需要哪些元素来创建这样一个应用程序上下文。当我找到 XSD (spring-rabbit-1.3.xsd) 时,一切都变了,创建 XML 文件对我来说毫无问题。稍后,我将解释如何利用 XSD 文件来找到你需要的元素。
让我向你展示整个应用程序上下文的定义:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<context:component-scan base-package="org.hanbo.amqp.sample" />
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="testQueue2" durable="true"/>
<rabbit:direct-exchange name="test-exchange2" durable="true">
<rabbit:bindings >
<rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
<bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>
<bean id="defaultMessageConverter"
class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>
这个应用程序上下文是服务应用程序的核心。它将所有组件联系在一起以实现一个共同的目标。让我逐一介绍这个文件的所有组件,你就会明白了。首先是这个 XML 文件的根元素:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
...
</beans>
这个根元素(名为“beans”)在其定义中包含许多命名空间,这是一种从各种 xsd 文件导入额外 xsd 元素定义的方式。其中之一是 `xmlns:rabbit`。
xsi:schemaLocation="
...
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"
这导入了一些我可以使用的 spring-amqp 特定的 xml 对象定义。我花了一段时间才弄清楚如何使用我需要的元素。如果你试图从官方 Spring AMQP 参考文档中找到每个元素的正式定义,那是在浪费时间。这是我学到的技巧,通过将这个命名空间的 URI (http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd) 复制并粘贴到浏览器导航栏,我能够获得完整的 XSD。从那里我能够找到我需要的元素。为了利用这个 XSD 文件,你需要知道 XSD 语法是如何工作的。此外,你需要能够将这些元素与 RabbitMQ 的配置联系起来。
这个文件的下一部分是显而易见的。我希望 Spring 扫描我定义的任何 Java 包以查找可注入的 bean。
<context:component-scan base-package="org.hanbo.amqp.sample" />
这样做的好处是 Spring 不仅会扫描指定的包,还会扫描其下的子包。
接下来的几个块是关于 RabbitMQ 相关的 bean。第一个叫做 `rabbit:connection-factory`。它定义了一个连接工厂,用于生产到 RabbitMQ 代理的连接。我没有在我的代码中直接使用这个 bean,而是将它传递给下一个 bean。在讨论下一个 bean 之前,让我们检查一下 `rabbit:connection-factory` 的定义:
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
这个 bean 的定义不难理解:
- 属性 id 定义了 bean 的名称。其他 bean 可以使用这个 id 来引用这个 bean。
- 属性 host 定义了服务器的位置。在这种情况下是 localhost。
- 属性 port 定义了 AMQP 代理正在监听的端口号。
- 属性 virtual-host(我在 xsd 中找到的)定义了源或目标虚拟主机。
- username 和 password 属性定义了可用于连接到 RabbitMQ 代理的用户凭证。请注意,一个用户可以被分配给一个特定的虚拟主机,而不能分配给另一个。在这种情况下,用户 "testuser" 被分配给虚拟主机 "hanbo1"。如果我指定一个不同的虚拟主机,授权将会失败。
下一个块定义了两个 bean,1) 一个叫做 `rabbit:template`,2) 另一个叫做 `rabbit:admin`。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
我不知道他们为什么选择 `rabbit:template` 这个名字。这个 bean 既是发送者也是接收者,非常像 JMS 消费者和生产者的组合,既能发送消息也能接收消息。这个 bean 有一个 id,允许它在 Java 代码中作为对象使用。它还需要一个对连接工厂的引用,这个工厂在前面已经定义了。最后,这个 bean 接受一个消息转换器的引用。这样做的原因是为了实现 POJO 和非普通 Java 对象格式(如 XML 或 JSON)之间的自动转换。有了这个转换器,服务应用程序或客户端可以处理 Java 对象而无需担心显式转换。虽然显式转换并没有什么错。你需要做的是确保你为此类转换创建一个公共方法或对象,并在需要转换的关键位置使用它。你不希望做的是在你的应用程序中到处复制相同的转换代码段。如果你这样做,并且有人告诉你需要更改转换过程,你将很难找到所有地方并进行更改。
下一个是 `rabbit:admin`。顾名思义,它用于对 RabbitMQ 代理进行管理。之所以需要它,是因为当服务/客户端应用程序首次登录 RabbitMQ 代理时,队列和路由键可能尚未注册(创建)。与其手动创建队列和路由键,你的服务应用程序和客户端可以使用这个 `rabbit:admin` 来自动创建它们。这个 bean 只接受连接工厂 bean 的 id。
接下来,我将配置交换机和队列,以及消息如何通过路由键进行路由。有几种类型的交换机:
- 直接交换机(Direct exchange)根据路由键将所有消息路由到特定的队列。我用的就是这种。
- 扇形交换机(Fanout exchange)将相同的消息路由到多个队列,它会忽略路由键。
- 主题交换机(Topic based exchange)根据路由键和一些匹配模式将消息路由到队列。基本上是通过路由键和与队列相关联的一些主题来路由消息。
- 头部交换机(Header based exchange)通过查看头部数据并将消息与特定队列匹配来路由消息。
我使用的是直接交换机,它允许我通过路由键来路由消息。其 bean 定义如下:
<rabbit:queue name="testQueue2" durable="true"/>
<rabbit:direct-exchange name="test-exchange2" durable="true">
<rabbit:bindings >
<rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
</rabbit:bindings>
</rabbit:direct-exchange>
第一行声明了队列。它有一个名字,还有一个名为 durable 的属性被设置为 "true"。在 RabbitMQ 端,这将创建一个持久化队列,即当它不存在时,应用程序会创建它;并且如果应用程序退出,它不会消失。队列 bean 后面的行是定义交换机 bean。定义非常易读,该 bean 将创建一个名为 "test-exchange2" 的交换机(如果不存在)。同样,该交换机将是一个持久化交换机。在 bean 定义内部,有绑定属性,我在这里将队列绑定到这个交换机并分配路由键。我相信可以为同一个队列分配多个路由键。但在这个例子中我没有尝试。
下一个是监听器的定义。监听器(或观察者)被放入一个监听器容器中。定义监听器容器 bean 是最耗时的工作,我不得不在网上查询如何正确配置这个 bean。监听器可以从队列中消费消息,然后将其传递给服务对象进行处理。这个过程应该尽可能透明。也就是说,监听器应该自动确认消息;如果异常阻止了消息被处理,应自动将消息返回到队列;最后,基于文本的消息应该被转换成一个 POJO,供后端服务处理。这是我的监听器容器和监听器 bean 的定义:
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
监听器容器 bean 的定义首先接受一个连接工厂对象的引用。它被设置为自动确认消息(acknowledge = "auto")。如果监听器服务对象由于未处理的异常而未能处理消息,消息应该被返回到队列(requeue-rejected = "true")。正如我之前提到的,我会向你展示如何配置你的应用程序以使用 RabbitMQ 作为临时存储和重试机制,就是这个了!
然后我将消息转换器关联为容器的默认消息转换器。这个默认消息转换器将 JSON 格式的消息转换为 POJO。这是消息转换器的定义:
<bean id="defaultMessageConverter"
class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
在监听器容器内部,可以有一个或多个监听器。每个监听器都是一个用户定义的对象,可以处理传入的消息对象(在它被转换为 POJO 之后)。监听器容器中的 listener 属性定义了对从用户定义对象类型实例化的 bean 的引用。listener 属性还指定了用户定义类中将处理消息的方法。listener 行的最后一个属性指定了从哪个队列中拉取消息。
最后,这是消息监听器 bean 的定义:
<bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>
这些是我的应用程序上下文的基本元素。在下一节中,我将快速浏览客户端程序的主入口和应用程序上下文;然后是服务应用程序的源代码。
步骤 5 -- 客户端程序的主入口和应用程序上下文
客户端程序是我自从第一份工作以来写过的最简单的程序。这是代码:
package org.hanbo.amqp.sample.client;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.GenericXmlApplicationContext;
public class ClientMain
{
public static void main(String[] args)
{
GenericXmlApplicationContext context =
new GenericXmlApplicationContext("classpath:/application-context.xml");
AmqpTemplate sender = context.getBean(AmqpTemplate.class);
for (int i = 0; i < 100; i++)
{
sendAmqpMessage(i, sender);
}
context.close();
}
private static void sendAmqpMessage(int id, AmqpTemplate sender)
{
UserDetail userDetail = new UserDetail();
userDetail.setUserActive(id % 2 == 1? false : true);
userDetail.setUserEmail(String.format("testuser%d@test.org", id));
userDetail.setUserId(123 + id);
userDetail.setUserName(String.format("testuser%d", id));
sender.convertAndSend("test-exchange2", "org.hanbo.amqp.test2", userDetail);
}
}
这是客户端程序唯一的 Java 源代码,它所做的就是获取一个消息发送器(即 `rabbit:template`),然后发送 100 条消息。消息被创建为一个 POJO,发送器会将消息对象转换为 JSON 格式并发送出去。你可以在上面代码的第二个方法中找到这一点。第一个参数是交换机的名称;第二个参数是路由键;最后一个是消息对象。代码简单且不言自明。这个 amqp 模板对象类型(即 Java 类 `RabbitTemplate`)有很多方法。我选择用来发送消息的那个是我觉得对我的设计最有用/最合适的。你也应该这样做。
我还为这个客户端程序创建了应用程序上下文。我所做的是拿上一节的应用程序上下文,去掉客户端不需要的任何东西,然后测试它。这就是客户端程序所需要的。它在这里:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<context:component-scan base-package="org.hanbo.amqp.sample" />
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
<bean id="defaultMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>
你应该能够将上面的与前一节中的进行比较,区别很明显——没有监听器容器,也没有队列和交换机。应用程序只接收。现在我们已经看到了示例客户端的源代码,让我们看看服务应用程序的源代码。
步骤 6 -- 消息对象定义
为了有客户端应用程序和服务应用程序,应该定义一个公共的消息对象类型。两个应用程序都应该包含相同的消息对象类型。这样,发送方应该能够创建消息对象,将其传递给 RabbitTemplate 类的 `convertAndSend()` 方法。消息被转换为 JSON 字符串(或者,如果你使用 XML 的序列化框架,则是 XML)并发送到接收端。
package org.hanbo.amqp.sample.messages;
public class UserDetail
{
private String userName;
private String userEmail;
private int userId;
private boolean userActive;
public String getUserName()
{
return userName;
}
public void setUserName(String userName)
{
this.userName = userName;
}
public String getUserEmail()
{
return userEmail;
}
public void setUserEmail(String userEmail)
{
this.userEmail = userEmail;
}
public int getUserId()
{
return userId;
}
public void setUserId(int userId)
{
this.userId = userId;
}
public boolean isUserActive()
{
return userActive;
}
public void setUserActive(boolean userActive)
{
this.userActive = userActive;
}
}
这个消息类型类 `UserDetail` 只是我创建的一个虚拟类,用来演示使用 JSON 在两端自动转换消息对象。更有趣的是服务应用程序的监听器类。那是这个例子的最后一部分。
步骤 7 -- 服务应用程序的监听器类
监听器类的源代码如下:
package org.hanbo.amqp.sample;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.hanbo.amqp.sample.services.UserDetailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMessageListener
{
@Autowired
private UserDetailService userDetailService;
public void handleIncoming(UserDetail userDetail)
throws Exception
{
userDetailService.processUserDetail(userDetail);
}
}
我使用 @Component 注解将该类标记为一个可以被注入到同一个 Spring 应用程序中其他类的类。在类内部,只有一个属性是由 Spring 框架注入的。我实现这个监听器类并不是作为主要的消息处理器。相反,消息被传递给了名为 `userDetailService` 的属性。
这个类只包含一个方法 `handleIncoming()`。如果你回到服务应用程序的应用程序上下文,你会看到这个方法被指定为消息处理方法。让我给你看那一段:
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
你应该能看到 `method="handleIncomong"` 这部分。这个 `handleIncoming()` 所做的就是调用 userDetailService 的 `processUserDetail()` 方法来处理消息。`UserDetailService` 的定义如下:
package org.hanbo.amqp.sample.services;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.stereotype.Service;
@Service
public class UserDetailService
{
public void processUserDetail(UserDetail userDetail)
{
System.out.println(
String.format("User Name: %s", userDetail.getUserName())
);
System.out.println(
String.format("User Email: %s", userDetail.getUserEmail())
);
System.out.println(
String.format("User ID: %d", userDetail.getUserId())
);
System.out.println(
String.format("User Active: %b", userDetail.isUserActive())
);
}
}
我实现的这个 `UserDetailService` 类只是将 UserDetail 对象输出到命令行窗口。
基本上,这里列出的所有代码段都是构成这个简单演示项目的关键元素。接下来我将向你展示如何运行该应用程序。
步骤 8 -- 运行服务和客户端
假设你已经安装了 Java 1.7、Maven 和 RabbitMQ 代理,并且根据你自己的设置正确配置了示例项目,你应该准备好测试示例项目了。有几种方法可以做到这一点:
- 使用 Maven 运行
- 使用 Java 运行
使用 Maven 运行这两个程序,首先通过以下命令启动服务程序:
mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.Main"
要停止服务程序,只需按 Ctrl + C,它就会停止。
一旦服务程序成功运行,就该尝试客户端程序了。打开一个新的命令行窗口并运行以下命令行:
mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.client.ClientMain"
客户端将向服务程序发送 100 条测试消息。你会在运行服务程序的控制台上看到 UserDetail 被打印出来。一旦 100 条消息发送完毕,客户端程序将自动关闭。
如果你想冒险尝试,可以随意使用 Java 命令来运行这两个程序。在使用 Maven 命令构建后,你可以在 target/lib 中找到所有依赖的 jar 包。将依赖的 jar 包和示例程序的 jar 包分别放到两个位置(一个用于服务程序,一个用于客户端程序)。然后运行以下命令(先运行服务,再运行客户端程序):
java -cp<all the jars including the jar of the service or client program> "class of the main entry"
祝你好运。
参考文献
历史
- 2016年5月17日 初稿。