65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Spring AMQP 和 RabbitMQ 创建微服务 - 简单介绍

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (2投票s)

2016年5月19日

MIT

20分钟阅读

viewsIcon

30234

downloadIcon

312

本文将介绍如何使用 Spring AMQP 和 RabbitMQ 创建微服务。

引言

RabbitMQ 是小型应用程序之间相互通信的绝佳方式。这就是为什么它成为开发微服务的理想技术。微服务是小型应用程序,可以快速开发、轻松隔离测试,并且易于监控和维护。为了促进通信,使用 RabbitMQ 或任何其他消息代理技术是快速建立微服务生态系统的方法之一。相比其他消息代理技术,我更偏爱 RabbitMQ 有以下几点原因:

  • RabbitMQ 主要支持基于队列和异步的消息交换。这允许发送方尽可能多地发送消息,而无需关心它们是否在目的地被处理。
  • RabbitMQ 可以用作临时消息存储和重试机制,你可以编写微服务,使得任何未确认的消息都会被重试,直到它被处理或被强制从队列中移除。正确设置的 RabbitMQ 永远不会丢失消息(我相信其他许多替代方案也是如此,因此类似的技术也可以应用于那些消息代理)。
  • 你可以将系统设计为使用 RabbitMQ 作为企业服务总线(ESB)。RabbitMQ 支持多种不同的消息路由方式,可以利用这些方式像 ESB 一样移动消息。
  • 它拥有一个良好、简单易用的安全系统。
  • 它的设置和管理相当容易。

人们可能会问的一个问题是,同步消息处理怎么办?我曾经认为同步处理是重要且必要的。但事实并非如此。据我所知,两个服务应用之间的任何同步操作都可以用异步方式表达,缺点可能是操作可能 थोड़ा 难以理解。这样做的好处是,两个服务应用不必同时在线才能互相通信,任何一方都可以尽快发送消息,并期望另一方成功处理。而这将会实现,因为 RabbitMQ 保证消息会被送达。这基本上是异步消息服务架构的入门介绍。

当我开始使用 RabbitMQ 时,我用的是 .NET,我不太喜欢它。我需要的是一个拥有良好文档和对 RabbitMQ 基础 API 进行了良好封装的成熟框架。在我离开那份工作之前,我开始尝试 Spring AMQP。我喜欢我所看到的。这篇教程文章是我所学知识的总结。

先决条件

为了运行这个示例项目及其相关的客户端应用程序,你必须完成以下操作:

  • 安装最新更新的 JDK 1.7。
  • 安装最新的 Maven 3.3.x。
  • 安装 Erlang 和 RabbitMQ。你可以访问 RabbitMQ 官网查看安装指南。
  • 熟悉 RabbitMQ,你需要创建一个用户、一个虚拟主机,并将该用户与虚拟主机关联。

本教程面向对 Spring 框架和 RabbitMQ 消息框架有先前经验的开发人员。如果你不熟悉这些,请跳过本教程。此外,了解一些 Maven 构建工具的知识也有助于你理解本教程。

对于这个示例项目,我为 RabbitMQ 创建了一个名为“testuser”的用户,密码是“secret”。这仅用于测试目的。请不要在生产系统中使用此凭证。我给我的“testuser”分配了管理员角色。然后我创建了一个名为“hanbo1”的虚拟主机,并给予该用户对这个虚拟主机的完全访问权限。这就是我为这个示例项目必须做的预开发设置。再次强调,如果你觉得这些简单的准备步骤难以理解,你应该在继续阅读之前先熟悉相关技术。

目标

我对这个示例项目的目标如下:

  • 我想创建一个示例微服务,可以接收并处理模拟的用户请求。示例请求应以 POJO(Plain Old Java Object)的形式接收。
  • 我想创建一个示例客户端应用程序,可以以 POJO 的形式发送请求,该请求将被示例微服务接收。

这两个目标应该足以展示 Spring AMQP 的能力。微服务和客户端都将请求(和响应)作为常规 POJO 处理,请求和响应的传输将以 JSON 字符串的形式进行。POJO 和 JSON 字符串之间的转换由 Spring 或其他第三方组件自动处理。

RabbitMQ 快速入门

为了能够向 RabbitMQ 消息代理发送或接收消息,你需要以下信息:

  • 服务器 URI、用户名、密码和虚拟主机。这些帮助服务应用程序连接到 RabbitMQ 消息代理。
  • 交换机名称、队列名称和路由键。

RabbitMQ 和基于 JMS 的消息代理的区别在于,RabbitMQ 主要基于队列,非常适合异步消息处理。JMS 消息代理同时支持同步和异步消息处理。消息流经 RabbitMQ 消息代理的方式与 JMS 消息代理不同。消息首先到达一个由用户手动或应用程序自动设置的交换机。然后,交换机存储一个或多个路由键,每个路由键会将消息路由到一个指定的队列。订阅该队列的任何消费者随后会收到消息。每个队列可以有多个监听器,但一条消息一次只能被一个消费者消费。

关于 RabbitMQ 以及可能其他一些类型的消息系统,一个很酷的特性是,如果客户端接收到的消息未被确认(acknowledge),消息会返回到队列中。在 RabbitMQ 的情况下,消息会返回到队列的顶部。这作为一个重试机制非常棒。为了利用这个优势,你需要做的是配置你的应用程序上下文,使得每当发生异常时,未处理的消息都会返回到队列。本文将演示如何(以最小的努力)实现这一点。但这样做也可能会适得其反。想象一下,当一个未处理的异常重复发生时,消息也会重复地返回到队列,应用程序会陷入一个循环。纠正这个问题最简单的方法是为所有已知的异常设置一个 catch 块,并希望没有未处理的异常会导致这类问题。如果未捕获的异常是一个担忧,那么就需要找到一种方法来捕获所有异常。但这样做,你可能会失去使用 RabbitMQ 作为重试机制的优势。

这差不多就是快速入门的全部内容了。接下来,我将解释我创建这个可用的服务和客户端程序的每一步。

步骤 1 -- 创建项目文件夹结构

本教程包含两个项目,一个是服务项目,另一个是客户端程序。两个项目应该有相似的文件结构。以下是服务应用程序的文件结构:

<project base dir>
|
 -- src
     |
      -- main
          |
           -- java
               |
                -- org
                    |
                     -- hanbo
                         |
                          -- amqp
                              |
                               -- sample
                                   |
                                    -- messages
                                        |
                                         -- UserDetail.java
                                   |
                                    -- services
                                        |
                                         -- UserDetailService.java
                                   |
                                    -- Main.java
                                   |
                                    -- RabbitMessageListener.java
          |
           -- resources
               |
                -- application-context.xml
               |
                -- log4j.properties
|
 ---- pom.xml

客户端程序的项目文件结构几乎相同。主要区别在于包目录。下载示例 zip 压缩包(在顶部)后,你应该能够查看到。

步骤 2 - 设置 Maven POM 文件

我做的第一件事是为我的项目创建一个 Maven pom.xml。在里面我定义了所有我需要的依赖项。你可能会问,一个人怎么知道要使用哪些依赖项。我是这样找出它们的:

  • 这是一个基于 Spring 的应用程序,所以我将包含一些最核心的 Spring 依赖。比如 spring-core、spring-context 和 spring-beans。一旦我有了这三个核心依赖,我就可以添加 Spring AMQP 的依赖。
  • 我只是在尝试 Spring AMQP,通过在线搜索我发现我唯一需要的是 spring-amqp 和 spring-rabbit。
  • 之后,我需要使用 Jackson 框架来进行常规 Java 对象和 JSON 格式字符串之间的转换。所以我添加了一些 Jackson 相关的依赖项。
  • 我还添加了几个与日志相关的依赖项,这样当我运行示例项目时,我可以通过查看一个格式美观的日志文件来进行故障排查(如果需要的话)。

我的 Maven POM 文件如下所示:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.hanbo.spring-amqp-test</groupId>
   <artifactId>spring-amqp-testapp</artifactId>
   <packaging>jar</packaging>
   <version>1.0-SNAPSHOT</version>
   <name>spring-amqp-testapp</name>
   <url>http://maven.apache.org</url>

   <properties>
      <spring.version>3.2.11.RELEASE</spring.version>
      <spring.amqp.version>1.3.9.RELEASE</spring.amqp.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-beans</artifactId>
         <version>${spring.version}</version>
      </dependency>
      <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-context</artifactId>
         <version>${spring.version}</version>
      </dependency>
      <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-core</artifactId>
         <version>${spring.version}</version>
      </dependency>
      <dependency>
         <groupId>org.springframework.amqp</groupId>
         <artifactId>spring-amqp</artifactId>
	      <version>${spring.amqp.version}</version>
      </dependency>
      <dependency>
	      <groupId>org.springframework.amqp</groupId>
         <artifactId>spring-rabbit</artifactId>
	      <version>${spring.amqp.version}</version>
      </dependency>
      <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
      </dependency>
      <dependency>
         <groupId>commons-logging</groupId>
         <artifactId>commons-logging</artifactId>
         <version>1.1.3</version>
      </dependency>
      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>
      </dependency>
      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-mapper-asl</artifactId>
         <version>1.9.13</version>
      </dependency>
   </dependencies>
   <build>
      <finalName>amqp-test</finalName>
      <plugins>
         <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
              <source>1.7</source>
              <target>1.7</target>
            </configuration>
         </plugin>
         <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
              <execution>
                <phase>install</phase>
                <goals>
                  <goal>copy-dependencies</goal>
                </goals>
                <configuration>
                  <outputDirectory>${project.build.directory}/lib</outputDirectory>
                </configuration>
              </execution>
            </executions>
          </plugin>
      </plugins>
   </build>
</project>

POM 文件的第一部分定义了所有必要的依赖项。Maven 的一个很酷的地方在于,在构建二进制可执行文件时,它不仅会下载指定的依赖项,还会下载 POM 文件中未指定的任何所需依赖项。POM 文件的第二部分定义了构建过程。我不仅选择了编译器插件,还选择了依赖插件,以便它将所有需要的 jar 包放到一个指定的位置。这使我可以轻松地将我的 jar 包以及所有依赖的 jar 包复制到我想要的任何地方。

请注意,我使用的是旧版本的 spring-amqp 依赖。当你下载新版本的依赖时,这个示例项目中的某些东西可能无法工作。如果发生这种情况,请尽力在网上查找答案。

步骤 3 - 主入口

服务项目最简单的部分是主入口类,这是源代码:

package org.hanbo.amqp.sample;

import org.springframework.context.support.GenericXmlApplicationContext;

public class Main
{
   @SuppressWarnings("resource")
   public static void main(String[] args)
   {
      GenericXmlApplicationContext context =
         new GenericXmlApplicationContext("classpath:/application-context.xml");
      context.registerShutdownHook();
   }
}

在这个类中,只有一个方法,即整个应用程序的主入口。它只做两件事,首先从类路径加载应用程序上下文文件。然后它会注册关闭钩子。关闭钩子用于当应用程序从系统接收到关闭信号时,在退出前执行清理工作。

当应用程序启动时,它将永远运行。当用户按下 Ctrl+C 时,应用程序将捕获关闭信号并执行清理。

步骤 4 - 应用程序上下文

在我详细介绍如何创建用于定义应用程序上下文的 XML 文件之前,我只想指出,拥有应用程序上下文的 XML 定义并非必需。作为替代方案,你可以为依赖注入配置创建一个 Java 类(POJO)。我选择使用 XML 的原因是我有很多项目使用基于 XML 的应用程序上下文定义,从这些现有项目中创建一个示例项目对我来说非常容易。我也喜欢将配置与 Java 源代码分离。它使源代码更清晰。这只是个人偏好。

这方面缺少文档。我曾经在网上做了大量的研究,试图找出我需要哪些元素来创建这样一个应用程序上下文。当我找到 XSD (spring-rabbit-1.3.xsd) 时,一切都变了,创建 XML 文件对我来说毫无问题。稍后,我将解释如何利用 XSD 文件来找到你需要的元素。

让我向你展示整个应用程序上下文的定义:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
   
   xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context-3.0.xsd
         http://www.springframework.org/schema/tx 
         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
         http://www.springframework.org/schema/rabbit 
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
         
   <context:component-scan base-package="org.hanbo.amqp.sample" />

   <rabbit:connection-factory id="connectionFactory"
      host="localhost"
      port="5672"
      virtual-host="hanbo1"
      username="testuser" password="secret"/>

   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                       message-converter="defaultMessageConverter" />
   <rabbit:admin connection-factory="connectionFactory"/>
   
   <rabbit:queue name="testQueue2" durable="true"/>
   <rabbit:direct-exchange name="test-exchange2" durable="true">
      <rabbit:bindings >
         <rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
      </rabbit:bindings>
   </rabbit:direct-exchange>
   
   <rabbit:listener-container
      connection-factory="connectionFactory"
      acknowledge="auto"
      requeue-rejected="true"
      message-converter="defaultMessageConverter">
      <rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
   </rabbit:listener-container>
   
   <bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>
   <bean id="defaultMessageConverter"
            class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>

这个应用程序上下文是服务应用程序的核心。它将所有组件联系在一起以实现一个共同的目标。让我逐一介绍这个文件的所有组件,你就会明白了。首先是这个 XML 文件的根元素:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
   
   xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context-3.0.xsd
         http://www.springframework.org/schema/tx 
         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
         http://www.springframework.org/schema/rabbit 
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
...
</beans>

这个根元素(名为“beans”)在其定义中包含许多命名空间,这是一种从各种 xsd 文件导入额外 xsd 元素定义的方式。其中之一是 `xmlns:rabbit`。

   xsi:schemaLocation="
         ...
         http://www.springframework.org/schema/rabbit 
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"

这导入了一些我可以使用的 spring-amqp 特定的 xml 对象定义。我花了一段时间才弄清楚如何使用我需要的元素。如果你试图从官方 Spring AMQP 参考文档中找到每个元素的正式定义,那是在浪费时间。这是我学到的技巧,通过将这个命名空间的 URI (http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd) 复制并粘贴到浏览器导航栏,我能够获得完整的 XSD。从那里我能够找到我需要的元素。为了利用这个 XSD 文件,你需要知道 XSD 语法是如何工作的。此外,你需要能够将这些元素与 RabbitMQ 的配置联系起来。

这个文件的下一部分是显而易见的。我希望 Spring 扫描我定义的任何 Java 包以查找可注入的 bean。

   <context:component-scan base-package="org.hanbo.amqp.sample" />

这样做的好处是 Spring 不仅会扫描指定的包,还会扫描其下的子包。

接下来的几个块是关于 RabbitMQ 相关的 bean。第一个叫做 `rabbit:connection-factory`。它定义了一个连接工厂,用于生产到 RabbitMQ 代理的连接。我没有在我的代码中直接使用这个 bean,而是将它传递给下一个 bean。在讨论下一个 bean 之前,让我们检查一下 `rabbit:connection-factory` 的定义:

   <rabbit:connection-factory id="connectionFactory"
      host="localhost"
      port="5672"
      virtual-host="hanbo1"
      username="testuser" password="secret"/>

这个 bean 的定义不难理解:

  • 属性 id 定义了 bean 的名称。其他 bean 可以使用这个 id 来引用这个 bean。
  • 属性 host 定义了服务器的位置。在这种情况下是 localhost。
  • 属性 port 定义了 AMQP 代理正在监听的端口号。
  • 属性 virtual-host(我在 xsd 中找到的)定义了源或目标虚拟主机。
  • username 和 password 属性定义了可用于连接到 RabbitMQ 代理的用户凭证。请注意,一个用户可以被分配给一个特定的虚拟主机,而不能分配给另一个。在这种情况下,用户 "testuser" 被分配给虚拟主机 "hanbo1"。如果我指定一个不同的虚拟主机,授权将会失败。

下一个块定义了两个 bean,1) 一个叫做 `rabbit:template`,2) 另一个叫做 `rabbit:admin`。

   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                       message-converter="defaultMessageConverter" />
   <rabbit:admin connection-factory="connectionFactory"/>

我不知道他们为什么选择 `rabbit:template` 这个名字。这个 bean 既是发送者也是接收者,非常像 JMS 消费者和生产者的组合,既能发送消息也能接收消息。这个 bean 有一个 id,允许它在 Java 代码中作为对象使用。它还需要一个对连接工厂的引用,这个工厂在前面已经定义了。最后,这个 bean 接受一个消息转换器的引用。这样做的原因是为了实现 POJO 和非普通 Java 对象格式(如 XML 或 JSON)之间的自动转换。有了这个转换器,服务应用程序或客户端可以处理 Java 对象而无需担心显式转换。虽然显式转换并没有什么错。你需要做的是确保你为此类转换创建一个公共方法或对象,并在需要转换的关键位置使用它。你不希望做的是在你的应用程序中到处复制相同的转换代码段。如果你这样做,并且有人告诉你需要更改转换过程,你将很难找到所有地方并进行更改。

下一个是 `rabbit:admin`。顾名思义,它用于对 RabbitMQ 代理进行管理。之所以需要它,是因为当服务/客户端应用程序首次登录 RabbitMQ 代理时,队列和路由键可能尚未注册(创建)。与其手动创建队列和路由键,你的服务应用程序和客户端可以使用这个 `rabbit:admin` 来自动创建它们。这个 bean 只接受连接工厂 bean 的 id。

接下来,我将配置交换机和队列,以及消息如何通过路由键进行路由。有几种类型的交换机:

  • 直接交换机(Direct exchange)根据路由键将所有消息路由到特定的队列。我用的就是这种。
  • 扇形交换机(Fanout exchange)将相同的消息路由到多个队列,它会忽略路由键。
  • 主题交换机(Topic based exchange)根据路由键和一些匹配模式将消息路由到队列。基本上是通过路由键和与队列相关联的一些主题来路由消息。
  • 头部交换机(Header based exchange)通过查看头部数据并将消息与特定队列匹配来路由消息。

我使用的是直接交换机,它允许我通过路由键来路由消息。其 bean 定义如下:

   <rabbit:queue name="testQueue2" durable="true"/>
   <rabbit:direct-exchange name="test-exchange2" durable="true">
      <rabbit:bindings >
         <rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
      </rabbit:bindings>
   </rabbit:direct-exchange>

第一行声明了队列。它有一个名字,还有一个名为 durable 的属性被设置为 "true"。在 RabbitMQ 端,这将创建一个持久化队列,即当它不存在时,应用程序会创建它;并且如果应用程序退出,它不会消失。队列 bean 后面的行是定义交换机 bean。定义非常易读,该 bean 将创建一个名为 "test-exchange2" 的交换机(如果不存在)。同样,该交换机将是一个持久化交换机。在 bean 定义内部,有绑定属性,我在这里将队列绑定到这个交换机并分配路由键。我相信可以为同一个队列分配多个路由键。但在这个例子中我没有尝试。

下一个是监听器的定义。监听器(或观察者)被放入一个监听器容器中。定义监听器容器 bean 是最耗时的工作,我不得不在网上查询如何正确配置这个 bean。监听器可以从队列中消费消息,然后将其传递给服务对象进行处理。这个过程应该尽可能透明。也就是说,监听器应该自动确认消息;如果异常阻止了消息被处理,应自动将消息返回到队列;最后,基于文本的消息应该被转换成一个 POJO,供后端服务处理。这是我的监听器容器和监听器 bean 的定义:

   <rabbit:listener-container
      connection-factory="connectionFactory"
      acknowledge="auto"
      requeue-rejected="true"
      message-converter="defaultMessageConverter">
      <rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
   </rabbit:listener-container>

监听器容器 bean 的定义首先接受一个连接工厂对象的引用。它被设置为自动确认消息(acknowledge = "auto")。如果监听器服务对象由于未处理的异常而未能处理消息,消息应该被返回到队列(requeue-rejected = "true")。正如我之前提到的,我会向你展示如何配置你的应用程序以使用 RabbitMQ 作为临时存储和重试机制,就是这个了!

然后我将消息转换器关联为容器的默认消息转换器。这个默认消息转换器将 JSON 格式的消息转换为 POJO。这是消息转换器的定义:

   <bean id="defaultMessageConverter"
            class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

在监听器容器内部,可以有一个或多个监听器。每个监听器都是一个用户定义的对象,可以处理传入的消息对象(在它被转换为 POJO 之后)。监听器容器中的 listener 属性定义了对从用户定义对象类型实例化的 bean 的引用。listener 属性还指定了用户定义类中将处理消息的方法。listener 行的最后一个属性指定了从哪个队列中拉取消息。

最后,这是消息监听器 bean 的定义:

   <bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>

这些是我的应用程序上下文的基本元素。在下一节中,我将快速浏览客户端程序的主入口和应用程序上下文;然后是服务应用程序的源代码。

步骤 5 -- 客户端程序的主入口和应用程序上下文

客户端程序是我自从第一份工作以来写过的最简单的程序。这是代码:

package org.hanbo.amqp.sample.client;

import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.GenericXmlApplicationContext;

public class ClientMain
{
   public static void main(String[] args)
   {
      GenericXmlApplicationContext context =
         new GenericXmlApplicationContext("classpath:/application-context.xml");
      
      AmqpTemplate sender = context.getBean(AmqpTemplate.class);
      for (int i = 0; i < 100; i++)
      {
         sendAmqpMessage(i, sender);
      }
      
      context.close();
   }
   
   private static void sendAmqpMessage(int id, AmqpTemplate sender)
   {
      UserDetail userDetail = new UserDetail();
      userDetail.setUserActive(id % 2 == 1? false : true);
      userDetail.setUserEmail(String.format("testuser%d@test.org", id));
      userDetail.setUserId(123 + id);
      userDetail.setUserName(String.format("testuser%d", id));

      sender.convertAndSend("test-exchange2", "org.hanbo.amqp.test2", userDetail);
   }
}

这是客户端程序唯一的 Java 源代码,它所做的就是获取一个消息发送器(即 `rabbit:template`),然后发送 100 条消息。消息被创建为一个 POJO,发送器会将消息对象转换为 JSON 格式并发送出去。你可以在上面代码的第二个方法中找到这一点。第一个参数是交换机的名称;第二个参数是路由键;最后一个是消息对象。代码简单且不言自明。这个 amqp 模板对象类型(即 Java 类 `RabbitTemplate`)有很多方法。我选择用来发送消息的那个是我觉得对我的设计最有用/最合适的。你也应该这样做。

我还为这个客户端程序创建了应用程序上下文。我所做的是拿上一节的应用程序上下文,去掉客户端不需要的任何东西,然后测试它。这就是客户端程序所需要的。它在这里:

<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:rabbit="http://www.springframework.org/schema/rabbit"
   
   xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context-3.0.xsd
         http://www.springframework.org/schema/tx 
         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
         http://www.springframework.org/schema/rabbit 
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
         
   <context:component-scan base-package="org.hanbo.amqp.sample" />

   <rabbit:connection-factory id="connectionFactory"
      host="localhost"
      port="5672"
      virtual-host="hanbo1"
      username="testuser" password="secret"/>

   <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"  message-converter="defaultMessageConverter" />
   <rabbit:admin connection-factory="connectionFactory"/>
   
   <bean id="defaultMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>

你应该能够将上面的与前一节中的进行比较,区别很明显——没有监听器容器,也没有队列和交换机。应用程序只接收。现在我们已经看到了示例客户端的源代码,让我们看看服务应用程序的源代码。

步骤 6 -- 消息对象定义

为了有客户端应用程序和服务应用程序,应该定义一个公共的消息对象类型。两个应用程序都应该包含相同的消息对象类型。这样,发送方应该能够创建消息对象,将其传递给 RabbitTemplate 类的 `convertAndSend()` 方法。消息被转换为 JSON 字符串(或者,如果你使用 XML 的序列化框架,则是 XML)并发送到接收端。

package org.hanbo.amqp.sample.messages;

public class UserDetail
{
   private String userName;
   private String userEmail;
   private int userId;
   private boolean userActive;
   
   public String getUserName()
   {
      return userName;
   }
   
   public void setUserName(String userName)
   {
      this.userName = userName;
   }
   
   public String getUserEmail()
   {
      return userEmail;
   }
   
   public void setUserEmail(String userEmail)
   {
      this.userEmail = userEmail;
   }
   
   public int getUserId()
   {
      return userId;
   }
   
   public void setUserId(int userId)
   {
      this.userId = userId;
   }
   
   public boolean isUserActive()
   {
      return userActive;
   }
   
   public void setUserActive(boolean userActive)
   {
      this.userActive = userActive;
   }
}

这个消息类型类 `UserDetail` 只是我创建的一个虚拟类,用来演示使用 JSON 在两端自动转换消息对象。更有趣的是服务应用程序的监听器类。那是这个例子的最后一部分。

步骤 7 -- 服务应用程序的监听器类

监听器类的源代码如下:

package org.hanbo.amqp.sample;

import org.hanbo.amqp.sample.messages.UserDetail;
import org.hanbo.amqp.sample.services.UserDetailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMessageListener
{
   @Autowired
   private UserDetailService userDetailService;
   
   public void handleIncoming(UserDetail userDetail)
      throws Exception
   {
      userDetailService.processUserDetail(userDetail);
   }
}

我使用 @Component 注解将该类标记为一个可以被注入到同一个 Spring 应用程序中其他类的类。在类内部,只有一个属性是由 Spring 框架注入的。我实现这个监听器类并不是作为主要的消息处理器。相反,消息被传递给了名为 `userDetailService` 的属性。

这个类只包含一个方法 `handleIncoming()`。如果你回到服务应用程序的应用程序上下文,你会看到这个方法被指定为消息处理方法。让我给你看那一段:

   <rabbit:listener-container
      connection-factory="connectionFactory"
      acknowledge="auto"
      requeue-rejected="true"
      message-converter="defaultMessageConverter">
      <rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
   </rabbit:listener-container>

你应该能看到 `method="handleIncomong"` 这部分。这个 `handleIncoming()` 所做的就是调用 userDetailService 的 `processUserDetail()` 方法来处理消息。`UserDetailService` 的定义如下:

package org.hanbo.amqp.sample.services;

import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.stereotype.Service;

@Service
public class UserDetailService
{
   public void processUserDetail(UserDetail userDetail)
   {
      System.out.println(
         String.format("User Name: %s", userDetail.getUserName())
      );
      System.out.println(
         String.format("User Email: %s", userDetail.getUserEmail())
      );
      System.out.println(
         String.format("User ID: %d", userDetail.getUserId())
      );
      System.out.println(
         String.format("User Active: %b", userDetail.isUserActive())
      );
   }
}

我实现的这个 `UserDetailService` 类只是将 UserDetail 对象输出到命令行窗口。

基本上,这里列出的所有代码段都是构成这个简单演示项目的关键元素。接下来我将向你展示如何运行该应用程序。

步骤 8 -- 运行服务和客户端

假设你已经安装了 Java 1.7、Maven 和 RabbitMQ 代理,并且根据你自己的设置正确配置了示例项目,你应该准备好测试示例项目了。有几种方法可以做到这一点:

  • 使用 Maven 运行
  • 使用 Java 运行

使用 Maven 运行这两个程序,首先通过以下命令启动服务程序:

mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.Main"

要停止服务程序,只需按 Ctrl + C,它就会停止。

一旦服务程序成功运行,就该尝试客户端程序了。打开一个新的命令行窗口并运行以下命令行:

mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.client.ClientMain"

客户端将向服务程序发送 100 条测试消息。你会在运行服务程序的控制台上看到 UserDetail 被打印出来。一旦 100 条消息发送完毕,客户端程序将自动关闭。

如果你想冒险尝试,可以随意使用 Java 命令来运行这两个程序。在使用 Maven 命令构建后,你可以在 target/lib 中找到所有依赖的 jar 包。将依赖的 jar 包和示例程序的 jar 包分别放到两个位置(一个用于服务程序,一个用于客户端程序)。然后运行以下命令(先运行服务,再运行客户端程序):

java -cp<all the jars including the jar of the service or client program> "class of the main entry"

祝你好运。

参考文献

历史

  • 2016年5月17日 初稿。
使用 Spring AMQP 和 RabbitMQ 创建微服务 - 一个简单的介绍 - CodeProject - 代码之家
© . All rights reserved.