使用 Java 2 软件开发工具包的 Amazon Web Services Simple Queue Service





0/5 (0投票)
使用 Java 2 软件开发工具包的 Amazon Web Services Simple Queue Service
引言
消息队列是应用程序之间交换消息的一种方式。发送者将数据对象发送到队列,接收者从队列接收对象。Amazon 的 Simple Queue Service (SQS) 是 AWS 提供的一项服务,通过在 Amazon 各地分布式部署,提供可扩展性和可靠性。
消息队列将应用程序解耦。消息生产者只知道队列,而不知道队列的消费者。同样,消息消费者只知道队列,而不知道队列的其他消费者或生产者。此外,生产者和消费者不知道时序,它们是异步的。
关于队列和消息传递的更多信息,网上有许多资源。这里有一个来自麻省理工学院的良好参考资料:阅读 22:队列和消息传递。
用例
暂且放下怀疑,或者更准确地说,无论你对商业计划的合理性有何看法,都只是构建系统。著名的华盛顿州企业家 John Bunyan 有一个致富成名的计划,那就是最终 conclusively 证明大脚怪(或对有文化的人来说是 Sasquatch)确实存在,并且利用广泛的徒步旅行路径系统移动。
不顾会计师的建议,他清算了一半财产,在华盛顿州的徒步旅行路径沿线安装了一系列隐藏摄像头,每十五分钟拍摄一张照片。由于他很忙,没有时间亲自分析所有照片,因此他希望图像分析软件来分析图像。如果软件识别出大脚怪,他希望图像亲自发送到他的电子邮件账户,以便他可以确认图像是否为大脚怪。
现在,如果 10,000 个摄像头每 15 分钟拍摄一张照片,那么每小时就有 600,000 张图像。假设每张图像处理时间长达五分钟。希望您能看到,我们存在可扩展性问题。
有多种方法可以解决此可扩展性问题,但由于本教程是关于 SQS 的,我们使用 AWS SQS。而且,正如我在所有教程中都喜欢告诫的那样,如果“业务案例”看起来可疑,那么就暂时放下怀疑,专注于 AWS 代码。
设计
对业务案例的抱歉已经足够了,让我们专注于应用程序的设计。下图说明了困境。
- 每
n
分钟,一个Station
向 AWS 队列发送一个观测值。 - 有 1 个或多个
SquatchFinder
组件,其工作是从队列中获取观测值并处理观测值。 Station
是生产者,而SasquatchFinder
是消费者。
Station
将观测值发送到队列,SasquatchFinder
从队列获取观测值。我们可以用一个简单的类图来形式化我们的需求。一个 Station
创建一个 Observation
。一个 SasquatchFinder
处理一个 Observation
。
所有与外部进程与 AWS 的通信都通过其 REST API。SQS 也不例外。此外,SQS 队列只接受文本数据。但一个常见的需求是队列接受二进制数据,例如图像。此外,JSON 是一种文本数据传输格式。
我们可以将 Observation
转换为 JSON 文档。图像被转换为 base64
编码,以便可以表示为文本。请注意,本教程中的 encodedImage
总是用 <snip>
截断,因为 base64 字符串
相当长。
{
timestamp: “1558493503”,
latitude:”46.879967”,
longitude:”-121.726906”,
encodedImage:"/9j/4AA <snip> 3QEUBGX/9k="
}
Base64 编码
图像是二进制的。然而,所有二进制数据都可以通过正确编码和解码的 String
表示。Base64
是一种将二进制数据转换为 string
的编码方案。它很有用,因为它允许将二进制数据(例如图像)嵌入到文本文件(例如网页或 JSON 文档)中。AWS 队列只允许文本数据,因此如果您希望将图像存储在 SQS 队列中,您必须将其转换为 string
。最简单的实现方式是使用 Base64
格式在传输数据时将二进制数据编码为 string
,并在存储数据时将 string
解码为二进制数据。有关 Base64
和 DynamoDB
的示例,请参阅本网站的教程:使用 AWS DynamoDB 低级 Java API – Spring Boot Rest 应用程序。
站点 – 生产者
在编写应用程序代码之前,让我们先创建一个队列。您可以通过 Java 2 API SDK 创建队列;但是,这里我们手动创建队列,然后使用此队列发送和接收消息。
创建 SQS 队列
- 导航到 SQS 控制台并选择标准队列。
- 单击“配置队列”按钮。
- 将队列命名为
SasquatchImageQueue
。 - 接受
Queue
属性的默认值。 - 创建队列后,您应该会看到类似以下屏幕
- 单击“权限”选项卡,并注意我们尚未创建权限。我们将在创建两个必要的用户后返回“权限”选项卡。
AWS 提供两种类型的队列:标准队列和先进先出 (FIFO) 队列。标准队列提供所谓的尽力而为的排序。尽管消息通常按接收顺序传递,但不能保证。此外,消息也可能被处理多次。相比之下,FIFO 队列保证先进先出交付和只处理一次。
在本教程中,我们主要使用标准队列。但是,在本教程的末尾,我们将演示如何使用 FIFO 队列。
创建 SQS 队列用户
我们需要创建两个用户,一个用于与队列交互发送消息,另一个用于接收消息。如果您以前创建过 IAM 用户,请注意我们不将用户分配给任何组或分配任何策略。相反,我们允许队列决定其权限。当然,我们为用户分配编程访问权限并下载凭证文件。
- 导航到 IAM 控制台,创建一个名为
SasquatchProducerUser
的新用户,该用户具有编程访问权限。 - 将用户凭证本地保存。
- 创建第二个用户,名为
SasquatchConsumerUser
,该用户也具有编程访问权限。 - 将用户凭证本地保存。
- 您应该已创建两个具有编程访问权限的用户。
队列权限
最初,只有队列的创建者或所有者才能读写队列。创建者必须授予权限。我们使用队列策略来完成此操作。我们使用 ASW SQS 控制台编写策略,尽管如果您愿意,也可以手动编写。
消费者权限
- 导航到
SasquatchConsumerUser
摘要屏幕并复制 Amazon 资源名称 (ARN)。ARN 应该类似于以下内容
arn:aws:iam::743327341874:user/SasquatchConsumer
Amazon 资源编号 (ARN) 唯一标识 Amazon 资源,在此例中为
SasquatchConsumer
用户。 - 返回 SQS 控制台,选择
SasquatchImageQueue
,然后单击权限选项卡。 - 点击添加权限。
- 在弹出的窗口中,将 ARN 粘贴到 Principal 文本框中。
- 选中
DeleteMessage
、GetQueueUrl
和ReceiveMessage
操作。 - 单击保存更改。
- 创建
SasquatchConsumerUser
后,导航到SasquatchProducerUser
并复制生产者的 ARN。arn:aws:iam::743327341874:user/SasquatchProducerUser
- 导航回 SQS 队列并将此用户作为权限添加到队列中。允许
ChangeMessageVisibility
、DeleteMessage
、GetQueueAttributes
、GetQueueUrl
、PurgeQueue
和SendMessage
操作。 - 添加两个用户的权限后,队列应如下图所示。
如果您仍然不确定如何向队列添加权限,这里有 Amazon 的一个教程:向 Amazon SQS 队列添加权限。您还可以添加服务器端加密,本教程将对此进行说明:创建带有服务器端加密 (SSE) 的 Amazon SQS 队列。
尽管我们不讨论策略文档,但以下示例说明了 JSON 文档是我们使用控制台设置的基础。然而,理解策略文档很重要,因为它们是 AWS 安全的核心。有关 SQS 策略的更多信息,请参阅此文档:将基于身份 (IAM) 的策略用于 Amazon SQS。
需要注意的一点是,这里我们使用 SQS 而不是我们创建的消费者或生产者用户来为队列分配权限。正如上一段链接中的文档所讨论的,我们也可以轻松地使用 IAM 策略。
通过控制台发送消息
尽管出于业务原因可能很少需要这样做,但出于测试目的,您可以手动向队列添加消息。尽管我们不会使用该消息,但让我们探索如何使用 SQS 控制台发送消息。
- 请参阅
observations.json
文档并复制其中一个观测值。当然,在下面的代码清单中,图像被截断了。{ "stationid": 221, "date": "1992-03-12", "time": "091312", "image": "/9j/4AA <snip> 0Wxkf/9k=" }
- 选择队列,然后从“队列操作”中选择发送消息。
- 从 _observations.json_ 中复制一条消息,并将其添加到消息正文中。
- 点击发送消息,一分钟内,可用消息列应显示队列中有一条消息。
- 从队列操作中选择清除队列以清除队列。
Java 项目 – 生产者
如前所述,生产者会生成消息。如果完全实现了上述设计,我们将有许多站点。然而,为了保持教程的简单性,我们只在一个项目中限制一个站点。
项目设置
尽管我使用 Eclipse 开发了本教程,但您可以使用自己的 IDE 甚至命令行。但是,您确实应该使用 Maven 或 Gradle。这里我们使用 Maven。假设您熟悉使用 Maven 构建 Java 项目。
POM
- 创建一个名为
SQSTutorialProducer
的新项目。 - 使用以下 POM 创建或覆盖 POM 文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorial.aws</groupId> <artifactId>SQSTutorialProducer</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.5.25</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <artifactId>auth</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <artifactId>aws-core</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>auth</artifactId> </dependency> <dependency> <artifactId>sqs</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.aws.tutorial.sqs.main.Station</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
在 POM 中,我们使用 AWS BOM,这样可以避免指定 AWS 库版本。我们为所需的 AWS 库添加依赖项。我们还指定 maven 将构建一个可执行的 jar,其中包含所需的依赖项。
请注意以下事项
<properties> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
如果我们不指定 Java 1.8 或更高版本,编译将失败,因为 AWS 构建器是静态接口方法,不适用于旧版 Java。尽管在您的机器上,代码可能能够编译,但如果您的计算机上安装了多个 Java SDK,您可能会遇到问题。通过明确设置版本、源和目标,我们可以避免编译中可能出现的任何问题。
Station
让我们创建一个名为 Station
的简单可执行 Java 类。这将模拟一个真正的消息生产者。
package com.aws.tutorial.sqs.main;
public class Station {
public static void main(String[] args) {
System.out.println("Station running....");
}
}
可执行 Jar
现在我们已经创建了消费者的基本结构,我们可以修改它来发送 SQS 消息。
发送消息
在此示例中,我们使用 SDK 向队列发送消息。数据负载是 JSON 数据的 string
。您使用硬编码数据发送到队列。显然,在实际应用程序中,数据将来自不同的源。为了模拟从真正的生产者发送消息,在发送每条消息之间引入了延迟。
客户端只需要一个 SqsClient
实例,该实例在 Station
构造函数中初始化,并在使用 @PreDestroy 注解标记的方法中关闭。此注解用于标记一个在类即将被垃圾回收时应调用的方法。
Credentials
客户端需要凭证才能运行。这是应用程序用于向 AWS SDK 验证自身的用户帐户。在这里,我们为了简单起见硬编码凭证。有关 AWS Java 2 SDK 和凭证的更多信息,请参阅 SDK 文档。
SqsClient
SqsClient 是一个扩展 SdkClient
的接口,是访问 AWS SQS 服务的客户端。您可以使用 SqsClientBuilder 来构建客户端。您通过传递凭证和区域来构建客户端。
this.sqsClient = SqsClient.builder()
.credentialsProvider(StaticCredentialsProvider
.create(awsCreds)).region(Region.US_EAST_1).build()
所有对 SQS 的请求都必须通过客户端。不同类型的请求有相应的名称。例如,请求发送消息需要 SendMessageRequest
,请求删除消息需要 DeleteMessageRequest
。如果您曾使用过 Java 2 SDK 提供的其他服务,例如 DynamoDb 或 S3,那么这种模式应该很熟悉。
SendMessageRequest
SendMessageRequest 封装了向客户端发送消息的请求。您使用 SendMessageRequestBuilder
构建请求。上面,我们设置了队列的 URL、消息的正文以及发送消息前的延迟时间。我们从 AWS SDK 控制台获取了队列的 URL。
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(this.queueUrl).messageBody(message)
.delaySeconds(5).build();
SendMessageResponse
客户端发送请求并接收响应。SendMessageResponse 封装了响应。然后该方法返回 messageId
,main
将值打印到控制台。
SendMessageResponse response = this.sqsClient.sendMessage(request);
return response.messageId();
现在我们已经创建了三条消息并将其发送到 SQS,我们可以编写一个消费者来消费这些消息。现在让我们创建一个名为 SQSTutorialConsumer
的 Java 项目。
Java 项目 – 消费者
消费者,就是消费消息。让我们为队列中的消息创建一个消费者。与生产者一样,我们通过创建一个从命令行运行的可执行类来大大简化消费者。
项目设置
让我们为 Consumer
创建一个 Java Maven 项目。
POM
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tutorial.aws</groupId>
<artifactId>SQSTutorialConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.5.25</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<artifactId>auth</artifactId>
<groupId>software.amazon.awssdk</groupId>
</dependency>
<dependency>
<artifactId>aws-core</artifactId>
<groupId>software.amazon.awssdk</groupId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
<dependency>
<artifactId>sqs</artifactId>
<groupId>software.amazon.awssdk</groupId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.aws.tutorial.sqs.main.SasquatchFinder</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
SasquatchFinder
接收消息
ReceiveMessageRequest
ReceiveMessageRequest
封装了从 SQS 队列接收消息的请求。我们使用构建器创建请求。我们指定 queueURL
和要获取的最大消息数。我们指定了一条消息;但是,如果需要,您可以指定多条消息。
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
.maxNumberOfMessages(1).build();
DeleteMessageRequest
处理完消息后,您应该将其从队列中删除。我们通过获取已接收消息的 receiptHandle
来完成此操作,然后使用它来删除消息。
String receiptHandle = messages.get(0).receiptHandle();
DeleteMessageRequest deleteRequest =
DeleteMessageRequest.builder().queueUrl(this.queueUrl)
.receiptHandle(receiptHandle).build();
该程序处理队列中的所有消息。这是一个简单的消费者,但您可以有多个消费者从同一个队列中消费消息。
消息可见性
使用标准队列时,消息可能会被处理两次。当消费者获取消息进行处理时,消息会在可配置的时间内变得不可见。当我们创建队列时,我们接受了 30 秒的可见性超时。但是,如果处理时间超过可见性超时,消息可能会被另一个消费者处理。下图说明了这一点
还有一个小问题。当消息第二次从队列中删除时会发生什么?
请注意,两条消息具有不同的 receiptHandle
。队列有一个内部机制,可以在消息被处理并随后删除两次时避免错误。但是,它不能防止消息被多次处理。如果我们操纵处理时间或可见性超时,我们可以让消息被处理更多次。
要实际删除底层消息,必须提供最新的收据句柄。因此,在我们的上述示例中,第一次尝试删除消息是在返回第二个收据句柄之后进行的,因此消息未被删除。但第二次尝试删除消息是最新收据句柄,因此消息被删除了。要删除消息,您必须传递最新颁发的收据句柄。
您应该设计您的系统,使其不依赖于消息被处理的次数。您的系统应该具有幂等性。如果您需要严格的一次且仅一次处理,请使用 FIFO 队列。
消息属性和死信队列
让我们探讨在使用 SQS 队列时两个重要主题:消息属性和死信队列。消息可以有相关的元数据。然而,要接收带有相关元数据的消息,ReceiveMessageRequest
必须明确指示除了消息本身之外,还要获取相关的元数据。消息可能无法成功处理。与其让消息无限期地在队列中失败,不如配置一个死信队列,以发送失败可配置次数的消息。
死信队列
消息属性
为了接收消息属性,我们必须通过指定 messageAttributeNames
来构建 ReceiveMessageRequest
,并明确指示接收消息属性。该方法可以接受一个或多个属性名称,或一个 * 来表示所有属性。
消息被发送到 DeadLetterQueue
,该队列被配置为 SasquatchImageQueue
的死信队列。
如果您想了解有关消息属性的更多信息,这里是 Amazon 网站上的一个教程:向 Amazon SQS 队列发送带有属性的消息。
如果您想了解更多关于死信队列的信息,这里是 Amazon 网站上的一个教程:配置 Amazon SQS 死信队列。
maxNumberOfMessages
如果队列中有更多消息可用,ReceiveMessageRequest
可以一次接收多条消息。上面,我们将最大消息数设置为一条。让我们探讨一下当我们更改设置为更多消息时会发生什么。
- 创建
com.aws.tutorial.sqs.main
包。 - 在创建的包中创建一个名为
Station
的类,其中包含main
方法。 - 让
main
方法打印出类已执行的消息。 - 编译并打包项目。如果从命令行运行,您将输入以下内容
$ mvn clean compile package
- 构建后,从命令行执行程序。应该会出现打印输出。
$ java -jar SQSTutorialProducer-0.0.1-SNAPSHOT.jar Station running....
- 在修改程序之前,在
com.aws.tutorial.sqs.main
包中创建一个名为TestData
的新类。 - 从 _observations.json_ 文件中复制三个观测值。
- 或者,如果您不想自己转义字符串,请使用本教程 Git 项目中的 _TestData.java_。**注意**:如果您使用 Eclipse,当您在开引号后立即粘贴字符串时,它会自动为您转义字符串。图像的 Base64 代码被缩短,以便于显示。
package com.aws.tutorial.sqs.main; public class TestData { public static String observationOne = " {\n" + " \"stationid\": 221,\n" + " \"date\": \"2019-03-12\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; public static String observationTwo = " {\n" + " \"stationid\": 222,\n" + " \"date\": \"2016-02-09\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; public static String observationThree = " {\n" + " \"stationid\": 223,\n" + " \"date\": \"2017-12-22\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; }
- 修改
Station
,使其包含一个接受三个string
参数的构造函数:key、secret key 和队列的 URL。 - 创建两个成员变量,一个类型为
SqsClient
,另一个类型为String
。 - 在
Station
构造函数中,初始化SqsClient
。 - 创建一个名为
sendMessage
的方法,该方法将消息发送到队列。 - 最后,修改
main
以发送 _TestData.java_ 中的所有三条消息,并在发送每条消息之间暂停。package com.aws.tutorial.sqs.main; import javax.annotation.PreDestroy; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; public class Station { SqsClient sqsClient; String queueUrl; public Station(String key, String secretKey, String queueUrl) { AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey); this.sqsClient = SqsClient.builder() .credentialsProvider(StaticCredentialsProvider .create(awsCreds)).region(Region.US_EAST_1).build(); this.queueUrl = queueUrl; } public String sendMessage(String message) { SendMessageRequest request = SendMessageRequest.builder() .queueUrl(this.queueUrl).messageBody(message) .delaySeconds(5).build(); SendMessageResponse response = this.sqsClient.sendMessage(request); return response.messageId(); } @PreDestroy public void preDestroy() { this.sqsClient.close(); } public static void main(String[] args) { System.out.println("Station running...."); Station station = new Station("AKIA22EODDUZONNX2EMP", "LUXJ5WQjW0p4bk1gC5oGBUi41rxA7oSvWWA/8SqH", "https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue"); String id = station.sendMessage(TestData.observationOne); System.out.println("sent message: " + id); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } id = station.sendMessage(TestData.observationTwo); System.out.println("sent message: " + id); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } id = station.sendMessage(TestData.observationThree); System.out.println("sent message: " + id); } }
- 编译并运行应用程序,您应该会看到以下输出
Station running.... sent message: b861220e-a37a-424d-880c-5dd67a052967 sent message: 5185e68b-a16f-4300-8ee5-7ef5cca0eb53 sent message: 161f7444-ae7b-4890-b022-0447933054c3
- 导航到 AWS 控制台中的队列,您应该会在可用消息列中看到三条消息。
- 创建一个名为
SQSTutorialConsumer
的 Java 项目,作为一个 Maven 项目。 - 创建一个包含以下内容的 POM 文件。
- 创建
com.aws.tutorial.sqs.main
包。 - 在包中创建一个名为
SasquatchFinder
的类。 - 在类中创建一个
main
方法,并让它打印出它已运行。package com.aws.tutorial.sqs.main; public class SasquatchFinder { public static void main(String[] args) { System.out.println("SasquatchFinder running...."); } }
- 构建项目。
$ mvn clean compile package
- 构建项目后,从命令行执行程序。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar SasquatchFinder running....
现在我们已经有了项目的基本框架,我们可以添加代码来接收消息。
- 与
SQSTutorialProducer
项目中的 _Station_ 一样,创建成员变量。 - 创建一个
main
方法,该方法初始化SqsClient
。请务必使用消费者的凭证,而不是生产者的凭证。 - 创建一个名为
processMessage
的新方法,并使其使用ReceiveMessageRequest
接收消息。 - 创建一个名为
deleteMessage
的新方法,并使其使用DeleteMessageRequest
删除消息。 - 修改
processMessage
,使其在延迟后调用deleteMessage
。 - 修改
main
以连续循环处理消息。package com.aws.tutorial.sqs.main; import java.util.List; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; public class SasquatchFinder { private SqsClient sqsClient; private String queueUrl; public static int finderId = 1; public SasquatchFinder(String key, String secretKey, String queueUrl) { AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey); this.sqsClient = SqsClient.builder().credentialsProvider (StaticCredentialsProvider.create(awsCreds)) .region(Region.US_EAST_1).build(); this.queueUrl = queueUrl; } public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0) return; messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } } public void deleteMessage(List<Message> messages) { String receiptHandle = messages.get(0).receiptHandle(); DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl) .receiptHandle(receiptHandle).build(); this.sqsClient.deleteMessage(deleteRequest); } public static void main(String[] args) { System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " running...."); SasquatchFinder finder = new SasquatchFinder ("AKIA22EODDUZAMDPWSX7", "805hbufO3Sn18eDsBDrOzCgB/eT5KVPM/AIkIpoZ", "https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue"); try { while (true) { finder.processMessage(); } } catch (Exception e) { e.printStackTrace(); } System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " stopped."); } }
- 编译并运行生产者,如果您在上一节中运行了消费者,您应该会看到以下输出
SasquatchFinder 1 running.... { "stationid": 221, "date": "2019-03-12", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds... { "stationid": 223, "date": "2017-12-22", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds... { "stationid": 222, "date": "2016-02-09", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds...
- 导航到 AWS 控制台中的队列,您应该会看到没有消息,因为它们在处理后已被删除。
在这个简单的消费者中,我们首先创建一个客户端来与队列交互。然后我们从队列中获取一条消息。程序暂停以模拟处理。然后它通过使用
receiptHandle
从队列中删除消息。由于程序循环运行,它处理了我们在创建消费者时放入队列中的所有三条消息。
- 打开 SQS 控制台并向队列发送一条消息。
- 修改
SasquatchFinder
,使其在每条消息之间暂停 40 秒。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder().queueUrl(this.queueUrl).maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0){ return; } messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 40 seconds..."); Thread.sleep(40000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 构建应用程序后,打开两个命令行窗口,同时在两个不同的窗口中执行程序。
一个运行实例从队列中获取消息。消息的可见性超时设置为 30 秒开始。实例休眠 40 秒以模拟处理。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2 SasquatchFinder 2 running.... mymessage sleeping for 40 seconds...
另一个实例在队列上找不到消息,因为消息不可见。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1 SasquatchFinder 1 running....
然而,三十秒后,消息在队列上再次可见,并被另一个实例获取并处理。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1 SasquatchFinder 1 running.... mymessage sleeping for 40 seconds...
与此同时,首先获取消息的实例完成了处理并删除了消息。实际上,它尝试删除消息。但是,由于另一个进程已经请求了消息并颁发了新的收据句柄,因此消息并未真正删除。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2 SasquatchFinder 2 running.... mymessage sleeping for 40 seconds... Deleted message AQEB3/lhW8cY2cTsl2gd/GOsyPrt1J/ SQn+ZR06ngf24aL5C8SqfUSPZfAl4uc2IwuZuLhZ/5BXYLWVU7AvmgSf0kb4zm0owKh01 EXC4pGhvtNSsioLnk3nd4KiS5YEUO/EssCnRM1we7rXw0eLyd2LehOpPOZ49893lIJ6opy 1vamQxxk6C+7iGcWbY0dMNTvrZqVaZw2JW/eZV5wI99rdUwRP16+RFj7XWsxEI5KJcExgn WY3jDRQv1mXqe5ZgWI9M7mqPH/rrx8afBdV2P53B7OK0uRm3vUGMzmW/xUgbsxsy5UB0+ DZGLaccUAbegtC74LQ6BLZs64VlFxc8jAC2sp2gheLAZ849j4JkMrA8nWf+P+xKCjqdAL eGrN754DcxnvhZv79R6sOGcp2lBtTOsA== by SasquatchFinder 2
由于消息仍由第二个实例处理,第一个实例看不到消息。然后第二个实例删除消息。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1 SasquatchFinder 1 running.... mymessage sleeping for 40 seconds... Deleted message AQEBgZK7kq12asCcVVNbFQNlQmmgYTXXO8OzgoJzHpAnqds BtMnaBxSBvjjgyVqO3nqYwuhFoxPWgXhUoUcgDzejHHSG6dM/VNG1Wdv3Q93THs JPj6BSQSH/sLjX7qvdFYT20Es0jdhN4dQTNMPyaA3sA7a2x025cUYLsegKfMlWV fCDThABbn+0evwgkn3hmzwLBvAWZEGIp0mooZvYf6WiLcblbqCnx+Gh5j5/XvmI pWuT9ux3DQSTYH+f+XdfUxclXP6exwAYyyFm7xHJnlF9LXcRcKmv2QitpQjgjK3 yQBLrogU6dPf8Zp34K8iwMr1TBXEi5mZnfPSA7Cl3a4N2c+MxB+OupGIGGY6uoy 2gFLSiaaunsij/weB0FFaYaE/MFhMsXdMMhNho2o/lrq6SOA== by SasquatchFinder 1
- 创建一个名为
DeadLetterQueue
的新标准队列。 选择 SasquatchImageQueue
并从队列操作下拉菜单中选择配置队列。- 修改
SasquatchImageQueue
,使其死信队列使用DeadLetterQueue
。 - 选择
SasquatchImageQueue
并发送新消息。 - 创建消息时,添加两个消息属性
- 打开
SQSTutorialConsumer
项目并修改SasquatchFinder
中的processMessage
方法。请注意,您注释掉了删除消息的调用。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).messageAttributeNames("*").build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest) .messages(); if (messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); for (Message message : messages) { System.out.println(message.messageId()); Map<String, MessageAttributeValue> attributes = message .messageAttributes(); Set<String> keys = attributes.keySet(); for (String key : keys) { System.out.println(key + ":" + attributes.get(key).stringValue()); } } try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); //this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译并运行应用程序。消息应该处理三次。
SasquatchFinder 1 running.... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds...
- 返回 AWS 控制台,您应该会看到消息已放置在
DeadLetterQueue
上。 - 修改
SasquatchFinder
类,创建一个名为deleteMessages
的新方法。 - 让该方法迭代所有收到的消息
public void deleteMessages(List<Message> messages) { for(Message message:messages) { String receiptHandle = message.receiptHandle(); DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl) .receiptHandle(receiptHandle).build(); this.sqsClient.deleteMessage(deleteRequest); System.out.println("Deleted message " + receiptHandle + " by SasquatchFinder " + SasquatchFinder.finderId); } }
- 修改
processMessage
,使其调用deleteMessages
而不是deleteMessage
。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(10).messageAttributeNames("*").build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if (messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); for (Message message : messages) { System.out.println(message.messageId()); Map<String, MessageAttributeValue> attributes = message .messageAttributes(); Set<String> keys = attributes.keySet(); for (String key : keys) { System.out.println(key + ":" + attributes.get(key).stringValue()); } } try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); this.deleteMessages(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译应用程序。
- 编译后,导航到 AWS SQS 控制台,向队列中添加五条消息,消息正文分别为
a1
、a2
、a3
、a4
和a5
。 - 运行应用程序,您应该会看到类似以下的输出。
SasquatchFinder 1 running.... a4 98a42736-e4b5-4dfd-9428-3e32d2ea145d sleeping for 10 seconds... Deleted message AQEBqmAqpGs85ERM2Y8EnD4zjBPO1KxomlhJgQCPQ+ JO3gjYhRcZbflS1gKJT1kas0JId7bX4X+ OmFWQfC8r+gZGr02jwBcKlhvSUIv0tx13Q88EPpzMJDNbB9w9oKbgR+ hc8c0nZQPPjJ2uHu7KeQfTmIdK/dt49cs/ GHFRZeq3pIUWN2jJO8h0UdlpLeFKbB96WjPvakAnXDFd46meejQvBod0x 18L1Y1dBt6cZc5+9AbB6eb4bJjV5dKvyDCt IUP2XFZ8iwtZF1lxntzqXxdMGYCjzaQ/oqQ5EmVJ/pFMTgWlUTks+ qVFMu7a/sOCfQm7bFwE3AofXQROAK3B0crssZT bzoqQ9oJv+nj0kn596gidN+gygrISvF9vESIG1M5Ll+Lk2ADWQeO+2UA/AJax3A== by SasquatchFinder 1 a1 a5 c167bb7a-f356-4d5b-aa0f-ea90075cef50 f0d79263-05da-485e-bf6a-fa6b3f9fe92a sleeping for 10 seconds... Deleted message AQEBGwtlQPM080KnHDAOWUsZKUQ4PWfLP2g/ AFn0sr9ERDOJFssjl7rNXl3mL6ryqoH9EgiPEGyGXwPm6n/ FSsfbPA9OSMJYLq0Fho9qtpkcoI0mmAqRPQ/7h0J++zAmmf3bflcD 9BqJS+hz4a/Di8Eo6GB0oWJUFZEFYcKWnIUGMNgnQfY3xs 1DF9UuNZdsu7h3KN9hGGy3vSTuLvJJox7DDHSgY+QU3nisT5dTSfl tKc9vJMQq2mPxB/f2EUmgwKQ82f10A6lPlSjVuiyNtGkKV au3BorKINz3dtG+xAHd5wWfALFExyip7zFZl6wVsnzfKox9QBaxRS rukIfx3+w5rIilq1QujPpNqLKItlxOvaXvDvxi/ 8lWv31S5UNlY7ooEOYSIkh1wnNwXKY7ZP4aQQ== by SasquatchFinder 1 Deleted message AQEBLIUJqmODdigrnQ88hzta9Zr+PaQnctLqmYrQT 0iU5ZxvaLPy0PGNTe7eKwLHbBvc+WdDbLXK951WaPYWoY9dbMJZMyRN njEj3doGoUkmBOm0LzTs1xDkV+QPb3fGH3s+mxh2TFhX3KFOwXrvf4uqk px9mHdGioMWa86NSsCUUEQ3vXGUXprSdGsSqXUsoAug7 v6wBU3QIPzeQm8pRLmjbZPdx+ndeV80FwnFkxDfNx/mtpAibum4ON4Cx DUB66jLC7nVRe0XxXBllM2G/brS7jseqbz+Q61qbFjLNW Ko96kTBIrYDjvZEmcSQdp37cYMf4rO/vsr+/XCNUtbtcD8h9Xk8Fc+ atcIsuQSlrLbYMplVgN3EwogYlXJsB9GSOlVQVpO+ gwOLBXonXJ6i3EAbQ== by SasquatchFinder 1 a2 a5 e65fbcc2-2c4a-42f6-8b61-ca97dad4826e b2bc665c-4c1c-42c7-b3d2-c1d5bf048ee9 sleeping for 10 seconds... Deleted message AQEB2FZyDGQEOUgLxR9wIxAiJbk++Ktec9RL on3nAZr7bPeQu2QJ8iVxRMNg92ZgvoPY5qsBndcRGEQjI5zKHQ/ r62tg4+LMWwFLSDBhDF3d55w6OosgLf+K7AIBICGAeTJanTkhCzQ lWYM+HCDFEve+NhPsr5+/zabaeZrkKwSBh8E2jTCmr29LmNR6ld 9Bz0NSboj5gi+Gxa3dTu+xPGMLMjANVQ1Qa1BhoYEI0QP8kl9gL8 aBpLhkeW1eWXgRaRtRcTAVpjxF73ZlUEFVNyYeE/Mwz9ZT2lWRf tj6dv5p2PUG5Z6VtbbBw/9AXQElJUTgfHKGd4iGEjo4A3l6ff6g/ NVJzm/LkGq6909txbTIk8PSp5istS4bM318W6VG2ten9jYSU7+ pj8H809AHoW3VEw== by SasquatchFinder 1 Deleted message AQEBMdzd33/uz7jNQMnBJu1ne7GRh9g2xHx6 X0cPWLsU0emEN0G5SGbr3nF/9QklDrrW42BX1HW6IDWxvhlI4/ bOByZobYOfjmv5Cr8 rDEJYnNKWxqxBZeQqjArKTy90WeEs0puUw4l6PouEZOv35daHO0h 01A8Dpk/oMlVBi/OZFCIM4fetG2tUxwa7eU15WiEF4mklZqqJx2b VTbdiZqwhOucgqXlyXK3IJ5FtBFd6ACtEyX1tQmIBn6njmk/CBuX 0v5+LzaxlntHy9Q+FpjuPLEyyE5wGqIk9B8Kcqv469pnaE3UJJaCK7 DxgG70rF/7M1kYzaDRbRBYJB9jS3W9b8qZpj1JU4JM4euH9xBP4j 59MvdwgIs4lSPvO1F3NtdCuNeOOMF15/ n1WvU2U31jSeg== by SasquatchFinder 1
正如示例所示,您可以指定要处理的最大消息数,但不能指定消息数。这应该看起来很合理,因为消费者在处理之前不知道队列中有多少消息。顺便说一句,请注意,在上面的列表中,消息的处理顺序与接收顺序不同。
先进先出 (FIFO) 队列
让我们修改项目以使用 FIFO 队列,并同时重新运行两个消费者实例。请注意,消费者和生产者都不知道队列的类型。它们只知道其 URL。
消息可见性
如果使用 FIFO 队列,在可见性超时窗口之后执行删除消息的操作会失败。
结论
在本教程中,我们创建了一个 Amazon SQS 队列。我们使用 AWS Java 2 SDK 创建了一个消息生产者和一个消息消费者。我们探讨了消息属性、死信队列和消息可见性等几个主题。我们还创建了一个 FIFO 队列。
Amazon 的 SQS 队列是一个易于使用的队列,它消除了组织管理基础设施的麻烦。在本教程中,我们只研究了 SQS 的基础知识。有关更多信息,请参阅 Java 2 SDK 开发人员指南和 SQS 开发人员指南。请记住,API 从版本 1 到版本 2 发生了变化,因此如有疑问,请假定您需要一个对象的构建器,并且在构建对象时必须对其进行配置。但是,API 是一致的,一旦您开始使用 API,将 1.1 代码转换为 2 就很直观了。
GitHub 项目
GitHub 项目 SQSTutorial
可在此处获取。
- 创建一个名为
SasquatchImageQueue.fifo
的新队列,类型为 FIFO 队列。 - 单击“快速创建队列”。
- 创建一个新权限,但为了省事,我们勾选“所有人”复选框和“所有 SQS 操作”复选框。在生产环境中,您显然不会这样做。
- 修改消费者和生产者以使用此队列的 URL。
https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue.fifo
- 修改生产者中的
sendMessage
方法。请注意,已删除delaySeconds
并添加了messageGroupId
。public String sendMessage(String message) { SendMessageRequest request = SendMessageRequest.builder() .queueUrl(this.queueUrl).messageBody(message) .messageGroupId("mygroup").build(); SendMessageResponse response = this.sqsClient.sendMessage(request); return response.messageId(); }
- 更改队列后编译并运行生产者应用程序,三条消息将发送到队列。
- 编译并运行消费者应用程序,三条消息将按接收顺序处理。
- 修改
SasquatchFinder processMessage
以通过睡眠 40 秒来模拟处理。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 40 seconds..."); Thread.sleep(40000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译并运行应用程序。请注意,您会收到一个
SqsException
。SasquatchFinder 2 running.... messageMine sleeping for 40 seconds... software.amazon.awssdk.services.sqs.model.SqsException: Value AQEBBJL+BlwyhRLnQGxaIKDkkrEv1sU6VnHzYM51Q0UFdx2lDyWvKoI/JYcs7MktVJ1Nmyr1mCVX/ cpcqS9dMqq7Ual92VLEXDS9hEYM/qg1vdEGHB60OktMzpidyWBenQQyybzXofO+ pAdKOYpC/wiEw8GBPsmFDCHpVn1hxHeLSNJyw10SwNv3DTXQXk4Pe+v3yGf23bf8sDk7 Rx7ApqWYi8n8z9uijZAQBdwuFpUrZslivMWCzid6AFOXI/k83+/tKnSMyT0/Mx0rng0v1k4W liSgv5YJo5HyEZTt+cOBwfA= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired. (Service: Sqs, Status Code: 400, Request ID: 845b9538-4104-5428-aa2f-c05092244385) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.handl <snip> at com.aws.tutorial.sqs.main.SasquatchFinder.main(SasquatchFinder.java:58) SasquatchFinder 2 stopped.
- Amazon Simple Queue Service:开发人员指南
- Amazon Simple Queue Service:API 参考
- 适用于 Java 2.0 的 AWS 开发工具包开发人员指南
- 适用于 Java 2.0 的 AWS 开发工具包 API 参考