Amazon Web Services 简单队列服务 (AWS SQS) 使用 Java 2 软件开发工具包





5.00/5 (1投票)
简介 AWS SQS 消息队列是应用程序之间交换消息的一种方式。发送者将数据对象发送到队列,接收者从队列接收对象。
引言
AWS SQS 消息队列是应用程序之间交换消息的一种方式。发送者将数据对象发送到队列,接收者从队列接收对象。Amazon 的简单队列服务 (AWS SQS) 是 AWS 提供的一项服务,通过在 Amazon 各地分布式部署提供可伸缩性和可靠性。
消息队列解耦应用程序。消息生产者只知道队列,而不知道队列的消费者。同样,消息消费者只知道队列,而不知道队列的其他消费者或生产者。此外,生产者和消费者不知道时序,并且是异步的。
有关队列和消息传递的更多信息,网上有许多资源。这里是麻省理工学院的一个很好的参考资料:阅读 22:队列和消息传递。
用例
抛开不信,或者更准确地说,无论您对商业计划的合理性有何看法,都只需构建系统。来自华盛顿州的著名企业家 John Bunyan 有一个致富成名的计划,那就是最终 conclusively 证明大脚怪(或称野人,对有文化的人而言)存在,并且利用广泛的徒步旅行路径系统进行移动。
不顾会计师的建议,他清算了一半的财富,在华盛顿州的徒步旅行路径上安装了一系列隐藏摄像头,每十五分钟拍摄一张照片。由于他是个忙碌的人,他没有时间亲自分析所有照片,因此他希望图像分析软件来分析图像。如果软件识别出野人,他希望图像亲自发送到他的电子邮件帐户,以便他可以确认图像是否为野人。
现在,如果 10,000 个摄像头每 15 分钟拍摄一张照片,那么每小时就是 600,000 张图像。假设每张图像处理需要长达五分钟。希望您能看到,我们存在可伸缩性问题。
有多种方法可以解决此可伸缩性问题,但由于这是一个关于 SQS 的教程,我们使用 AWS SQS。而且,正如我在所有教程中都喜欢告诫的那样,如果“业务案例”看起来可疑,那么就抛开不信,专注于 AWS 代码。
设计
对业务案例的歉意足够了,让我们专注于应用程序的设计。下图说明了困境。
- 每隔
n
分钟,一个Station
会向 AWS 队列发送一个观察结果。 - 有一个或多个
SquatchFinder
组件,其工作是从队列中获取观察结果并处理该观察结果。 Station
是生产者,而SasquatchFinder
是消费者。


我们可以用一个简单的类图来形式化我们的需求。一个 Station
创建一个 Observation
。一个 SasquatchFinder
处理一个 Observation
。

与 AWS 的所有通信都通过其 REST API 进行。AWS SQS 也不例外。此外,SQS 队列只接受文本数据。但常见需求是队列接受二进制数据,例如图像。此外,JSON 是一种文本数据传输格式。
我们可以将 Observation
转换为 JSON 文档。图像被转换为 base64 编码,因此可以表示为文本。请注意,本教程中的 encodedImage
始终以 <snip>
截断,因为 base64 字符串相当长。
{
timestamp: “1558493503”,
latitude:”46.879967”,
longitude:”-121.726906”,
encodedImage:"/9j/4AA <snip> 3QEUBGX/9k="
}
Base64 编码
图像是二进制的。然而,只要正确编码和解码,所有二进制都可以用字符串表示。Base64 是一种将二进制转换为字符串的编码方案。它很有用,因为它允许将二进制数据(例如图像)嵌入到文本文件(例如网页或 JSON 文档)中。AWS SQS 队列只允许文本数据,因此如果您希望将图像存储在 AWS SQS 队列中,则必须将其转换为字符串。实现此目的最简单的方法是使用 Base64 格式在传输数据时将二进制数据编码为字符串,并在存储数据时将字符串解码为二进制数据。有关 Base64 和 DynamoDB 的示例,请参阅本网站的教程:使用 AWS DynamoDB 低级 Java API – Sprint Boot Rest 应用程序。
Station – 生产者
在编写应用程序代码之前,让我们创建一个队列。您可以通过 Java 2 API SDK 创建队列;但是,这里我们手动创建队列,然后使用此队列发送和接收消息。
创建 SQS 队列
- 导航到 SQS 控制台并选择标准队列。
- 点击“配置队列”按钮。
- 将队列命名为
SasquatchImageQueue
。 - 接受队列属性的默认值。
- 创建队列后,您应该会看到类似以下内容的屏幕。
- 点击“权限”选项卡,注意我们尚未创建任何权限。在创建两个必需用户后,我们再返回“权限”选项卡。
AWS SQS 提供两种类型的队列:标准队列和先进先出 (FIFO) 队列。标准队列提供所谓的尽力而为的排序。尽管消息通常以它们收到的顺序传递,但不能保证。此外,消息也可能被处理多次。相比之下,FIFO 队列保证先进先出传递和仅处理一次。
在本教程中,我们主要使用标准队列。但是,在本教程的最后,我们将演示如何使用 FIFO 队列。
创建 SQS 队列用户
我们需要创建两个用户,一个用于与队列交互以发送消息,另一个用于接收消息。如果您以前创建过 IAM 用户,请注意我们不会将用户分配给任何组或分配任何策略。相反,我们允许队列决定其权限。当然,我们为用户分配编程访问权限并下载凭据文件。
- 导航到 IAM 控制台并创建一个名为
SasquatchProducerUser
的新用户,该用户具有编程访问权限。 - 将用户凭据保存在本地。
- 创建第二个用户
SasquatchConsumerUser
,该用户也具有编程访问权限。 - 将用户凭据保存在本地。
- 您应该已经创建了两个具有编程访问权限的用户。
队列权限
最初,只有队列的创建者或所有者才能读取或写入队列。创建者必须授予权限。我们使用队列策略来完成此操作。我们使用 ASW SQS 控制台编写策略,尽管如果您愿意,也可以手动编写。
消费者权限
- 导航到
SasquatchConsumerUser
摘要屏幕并复制 Amazon 资源名称 (ARN)。ARN 应类似于以下内容。
arn:aws:iam::743327341874:user/SasquatchConsumer
Amazon 资源编号 (ARN) 唯一标识 Amazon 资源,在本例中是
SasquatchConsumer
用户。 - 返回 SQS 控制台,选择
SasquatchImageQueue
并点击“权限”选项卡。 - 点击“添加权限”。
- 在弹出的结果窗口中,将 ARN 粘贴到“主体”文本框中。
- 勾选
DeleteMessage
、GetQueueUrl
和ReceiveMessage
操作。 - 点击“保存更改”。
- 创建
SasquatchConsumerUser
后,导航到SasquatchProducerUser
并复制生产者的 ARN。arn:aws:iam::743327341874:user/SasquatchProducerUser
- 返回 SQS 队列并将此用户作为权限添加到队列。允许
ChangeMessageVisibility
、DeleteMessage
、GetQueueAttributes
、GetQueueUrl
、PurgeQueue
和SendMessage
操作。 - 添加两个用户的权限后,队列应如下图所示。
如果您仍然不确定如何向队列添加权限,这里有一个 Amazon 的教程:向 Amazon SQS 队列添加权限。您还可以添加服务器端加密,本教程对此进行了说明:使用服务器端加密 (SSE) 创建 Amazon SQS 队列。
虽然我们不讨论策略文档,但以下内容说明了我们使用控制台设置的底层是 JSON 文档。但是,了解策略文档很重要,因为它们是 AWS 安全的核心。有关 AWS SQS 策略的更多信息,请参阅此文档:将基于身份 (IAM) 的策略用于 Amazon SQS。
需要注意的是,这里我们使用 AWS SQS 而不是我们创建的消费者或生产者用户向队列分配了权限。正如前一段链接中的文档所讨论的,我们也可以轻松地使用 IAM 策略。
通过控制台发送消息
尽管可能很少有商业原因,但出于测试目的,您可以手动向队列添加消息。虽然我们不会使用该消息,但让我们探索使用 AWS SQS 控制台发送消息。
- 参考 observations.json 文档并复制其中一个观察结果。当然,在下面的代码列表中,图像被截断了。
{ "stationid": 221, "date": "1992-03-12", "time": "091312", "image": "/9j/4AA <snip> 0Wxkf/9k=" }
- 选择队列,然后从队列操作中选择“发送消息”。
- 从 observations.json 复制一条消息并将其添加到消息正文。
- 点击“发送消息”,一分钟内,“可用消息”列应显示队列中有一条消息。
- 从队列操作中选择“清除队列”以清除队列。
Java 项目 – 生产者
如前所述,生产者会生成消息。如果完全实现了上述设计,我们将有许多 Station 和许多。但是,为了使本教程保持简单,我们只在一个项目中限制一个 Station。
项目设置
尽管我使用 Eclipse 开发了本教程,但您可以使用自己的 IDE 甚至命令行。但是,您确实应该使用 Maven 或 Gradle。这里我们使用 Maven。假定您熟悉使用 Maven 构建 Java 项目。
POM
- 创建一个名为
SQSTutorialProducer
的新项目。 - 使用以下 POM 创建或覆盖 POM 文件。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorial.aws</groupId> <artifactId>SQSTutorialProducer</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.5.25</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <artifactId>auth</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <artifactId>aws-core</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>auth</artifactId> </dependency> <dependency> <artifactId>sqs</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.aws.tutorial.sqs.main.Station</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
在 POM 中,我们使用 AWS BOM,这样可以避免指定 AWS 库版本。我们添加了所需 AWS 库的依赖项。我们还指定 maven 将构建一个可执行的 jar,并将所需的依赖项打包在 jar 中。
请注意以下事项。
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
如果我们不指定 Java 1.8 或更高版本,编译将失败,因为 AWS 构建器是静态接口方法,不适用于旧版 Java。尽管在您的机器上代码可能可以编译,但如果您的计算机上安装了多个 Java SDK,则可能会出现问题。通过显式设置版本、源和目标,我们可以避免任何潜在的编译问题。
Station
让我们创建一个简单的可执行 Java 类,名为 Station
。这将模拟一个真正的消息生产者。
- 首先创建一个
com.aws.tutorial.sqs.main
包。 - 在创建的包中创建一个名为
Station
的类,其中包含一个main
方法。 - 让
main
方法打印出类已执行的消息。package com.aws.tutorial.sqs.main; public class Station { public static void main(String[] args) { System.out.println("Station running...."); } }
可执行 Jar
- 编译并打包项目。如果从命令行运行,您将输入以下内容。
$ mvn clean compile package
- 构建后,从命令行执行程序。应该会出现打印输出。
$ java -jar SQSTutorialProducer-0.0.1-SNAPSHOT.jar Station running....
现在我们已经创建了消费者的基本结构,我们可以修改它来发送 SQS 消息。
发送消息
在此示例中,我们使用 SDK 将消息发送到队列。数据负载是 JSON 数据字符串。您使用硬编码数据发送到队列。显然,在实际应用程序中,数据将来自不同的来源。为了模拟从真正的生产者发送消息,在发送每条消息之间引入了延迟。
- 在修改程序之前,在
com.aws.tutorial.sqs.main
包中创建一个名为TestData
的新类。 - 从 observations.json 文件中复制三个观察结果。
- 或者,如果您不想自己转义字符串,请使用本教程 Git 项目中的
TestData.java
。注意:如果您使用 Eclipse,当您在开引号后立即粘贴字符串时,它会自动为您转义字符串。图像的 base64 代码已缩短,以便于显示。package com.aws.tutorial.sqs.main; public class TestData { public static String observationOne = " {\n" + " \"stationid\": 221,\n" + " \"date\": \"2019-03-12\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; public static String observationTwo = " {\n" + " \"stationid\": 222,\n" + " \"date\": \"2016-02-09\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; public static String observationThree = " {\n" + " \"stationid\": 223,\n" + " \"date\": \"2017-12-22\",\n" + " \"time\": \"091312\",\n" + " \"image\": \"/9j/4A <snip> \"\n" + " }"; }
- 修改
Station
,使其具有一个接受三个字符串(键、密钥和队列 URL)的构造函数。 - 创建两个成员变量,一个类型为
SqsClient
,另一个类型为String
。 - 在
Station
构造函数中初始化SqsClient
。 - 创建一个名为
sendMessage
的方法,该方法将消息发送到队列。 - 最后,修改
main
以发送TestData.java
中的所有三条消息,并在发送每条消息之间暂停。package com.aws.tutorial.sqs.main; import javax.annotation.PreDestroy; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; public class Station { SqsClient sqsClient; String queueUrl; public Station(String key, String secretKey, String queueUrl) { AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey); this.sqsClient = SqsClient.builder() .credentialsProvider(StaticCredentialsProvider .create(awsCreds)).region(Region.US_EAST_1).build(); this.queueUrl = queueUrl; } public String sendMessage(String message) { SendMessageRequest request = SendMessageRequest.builder() .queueUrl(this.queueUrl).messageBody(message) .delaySeconds(5).build(); SendMessageResponse response = this.sqsClient.sendMessage(request); return response.messageId(); } @PreDestroy public void preDestroy() { this.sqsClient.close(); } public static void main(String[] args) { System.out.println("Station running...."); Station station = new Station("AKIA22EODDUZONNX2EMP", "LUXJ5WQjW0p4bk1gC5oGBUi41rxA7oSvWWA/8SqH", "https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue"); String id = station.sendMessage(TestData.observationOne); System.out.println("sent message: " + id); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } id = station.sendMessage(TestData.observationTwo); System.out.println("sent message: " + id); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } id = station.sendMessage(TestData.observationThree); System.out.println("sent message: " + id); } }
- 编译并运行应用程序,您应该会看到以下输出。
Station running.... sent message: b861220e-a37a-424d-880c-5dd67a052967 sent message: 5185e68b-a16f-4300-8ee5-7ef5cca0eb53 sent message: 161f7444-ae7b-4890-b022-0447933054c3
- 导航到 AWS 控制台中的队列,您应该在“可用消息”列中看到三条消息。
消费者只有一个 SqsClient
实例,该实例在 Station
构造函数中初始化,并在使用 <a href="https://docs.oracle.com/javaee/7/api/javax/annotation/PreDestroy.html" rel="noreferrer noopener" target="_blank">@PreDestroy</a>
注解的方法中关闭。此注解用于标记应在类即将被垃圾回收时调用的方法。
Credentials
客户端需要凭据才能操作。这是应用程序用于向 AWS SDK 验证自身的帐户。为了简单起见,这里我们硬编码了凭据。有关 AWS Java 2 SDK 和凭据的更多信息,请参阅 SDK 文档。
SqsClient
<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/SqsClient.html" rel="noreferrer noopener" target="_blank">SqsClient</a>
是一个扩展 SdkClient
的接口,是访问 AWS SQS 服务的客户端。您可以使用 <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/SqsClientBuilder.html" rel="noreferrer noopener" target="_blank">SqsClientBuilder</a>
来构建客户端。您通过传递凭据和区域来构建客户端。
this.sqsClient = SqsClient.builder()
.credentialsProvider(StaticCredentialsProvider
.create(awsCreds)).region(Region.US_EAST_1).build()
所有对 SQS 的请求都必须通过客户端。不同类型的请求有相应的命名。例如,请求发送消息需要 SendMessageRequest
,请求删除消息需要 DeleteMessageRequest
。如果您曾使用 Java 2 SDK 提供的其他服务,例如 DynamoDb 或 S3,那么这种模式应该很熟悉。
SendMessageRequest
<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/pinpoint/model/SendMessagesRequest.html" rel="noreferrer noopener" target="_blank">SendMessageRequest</a>
封装了向客户端发送消息的请求。您使用 SendMessageRequestBuilder
构建请求。上面我们设置了队列的 URL、消息的正文以及发送消息前的延迟时间。我们从 AWS SDK 控制台获取了队列的 URL。
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(this.queueUrl).messageBody(message)
.delaySeconds(5).build();

SendMessageResponse
客户端发送请求并接收响应。<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/pinpoint/model/SendMessagesResponse.html" rel="noreferrer noopener" target="_blank">SendMessageResponse</a>
封装了响应。该方法然后返回 messageId
,main
将该值打印到控制台。
SendMessageResponse response = this.sqsClient.sendMessage(request);
return response.messageId();
现在我们已经创建了三条消息并将其发送到 SQS,我们可以编写一个消费者来消费这些消息。现在让我们创建一个名为 SQSTutorialConsumer
的 Java 项目。
Java 项目 – 消费者
消费者会消费消息。让我们为队列中的消息创建一个消费者。与生产者一样,我们通过创建一个从命令行运行的可执行类来大大简化消费者。
项目设置
让我们为消费者创建一个 Java Maven 项目。
POM
- 创建一个名为 SQSTutorialConsumer 的 Java 项目作为 Maven 项目。
- 创建一个包含以下内容的 POM 文件。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorial.aws</groupId> <artifactId>SQSTutorialConsumer</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.5.25</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <artifactId>auth</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <artifactId>aws-core</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>auth</artifactId> </dependency> <dependency> <artifactId>sqs</artifactId> <groupId>software.amazon.awssdk</groupId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.aws.tutorial.sqs.main.SasquatchFinder</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
SasquatchFinder
- 首先创建一个
com.aws.tutorial.sqs.main
包。 - 接下来,在该包中创建一个名为
SasquatchFinder
的类。 - 在该类中创建一个
main
方法,并让它打印出它已运行。package com.aws.tutorial.sqs.main; public class SasquatchFinder { public static void main(String[] args) { System.out.println("SasquatchFinder running...."); } }
- 构建项目。
$ mvn clean compile package
- 构建项目后,从命令行执行程序。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar SasquatchFinder running....
现在我们已经有了项目的基本大纲,我们可以添加代码来接收消息。
接收消息
- 与
SQSTutorialProducer
项目中的Station
一样,创建成员变量。 - 创建一个
main
方法来初始化SqsClient
。请务必使用消费者的凭据,而不是生产者的凭据。 - 创建一个名为
processMessage
的新方法,并使其使用ReceiveMessageRequest
来接收消息。 - 创建一个名为
deleteMessage
的新方法,并使其使用DeleteMessageRequest
来删除消息。 - 修改
processMessage
,使其在延迟后调用deleteMessage
。 - 修改
main
以连续循环处理消息。package com.aws.tutorial.sqs.main; import java.util.List; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; public class SasquatchFinder { private SqsClient sqsClient; private String queueUrl; public static int finderId = 1; public SasquatchFinder(String key, String secretKey, String queueUrl) { AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey); this.sqsClient = SqsClient.builder().credentialsProvider(StaticCredentialsProvider.create(awsCreds)) .region(Region.US_EAST_1).build(); this.queueUrl = queueUrl; } public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0) return; messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } } public void deleteMessage(List<Message> messages) { String receiptHandle = messages.get(0).receiptHandle(); DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl) .receiptHandle(receiptHandle).build(); this.sqsClient.deleteMessage(deleteRequest); } public static void main(String[] args) { System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " running...."); SasquatchFinder finder = new SasquatchFinder("AKIA22EODDUZAMDPWSX7", "805hbufO3Sn18eDsBDrOzCgB/eT5KVPM/AIkIpoZ", "https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue"); try { while (true) { finder.processMessage(); } } catch (Exception e) { e.printStackTrace(); } System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " stopped."); } }
- 编译并运行生产者,如果您在上一节中运行了消费者,您应该会看到以下输出。
SasquatchFinder 1 running.... { "stationid": 221, "date": "2019-03-12", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds... { "stationid": 223, "date": "2017-12-22", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds... { "stationid": 222, "date": "2016-02-09", "time": "091312", "image": "/9j/4AAQ <snip> kf/9k=" } sleeping for 10 seconds...
- 导航到 AWS 控制台中的队列,您应该看不到任何消息,因为它们在处理后已被删除。
在这个简单的消费者中,我们首先创建一个用于与队列交互的客户端。然后我们从队列中获取一条消息。程序暂停以模拟处理。然后它使用 receiptHandle
从队列中删除消息。
因为程序循环运行,所以它会处理我们创建消费者时放入队列中的所有三条消息。
ReceiveMessageRequest
ReceiveMessageRequest
封装了从 SQS 队列接收消息的请求。我们使用构建器创建请求。然后我们指定队列 URL 和要获取的最大消息数。最后,我们指定了一条消息;但是,如果需要,您可以指定多条消息。
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
.maxNumberOfMessages(1).build();
DeleteMessageRequest
处理完消息后,您应该将其从队列中删除。我们通过获取收到的消息的 receiptHandle 来完成此操作,然后使用它来删除消息。
String receiptHandle = messages.get(0).receiptHandle();
DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl)
.receiptHandle(receiptHandle).build();
程序处理队列中的所有消息。这是一个简单的消费者,但您可以有多个消费者从同一个队列中消费消息。
消息可见性
使用标准队列时,消息可能会被处理两次。当消息被消费者拾取进行处理时,它会在可配置的时间内变得不可见。当我们创建队列时,我们接受了 30 秒的可见性超时。但是,如果处理时间超过可见性超时,则消息可能会被另一个消费者处理。下图说明了这一点。

还有一个问题。当消息第二次从队列中删除时会发生什么?
- 打开 SQS 控制台并向队列发送一条消息。
- 修改
SasquatchFinder
,使其在每条消息之间暂停 40 秒。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest .builder().queueUrl(this.queueUrl).maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0){ return; } messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 40 seconds..."); Thread.sleep(40000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 构建应用程序后,打开两个命令行窗口并同时在两个不同的窗口中执行程序。
一个正在运行的实例从队列中获取消息。消息的可见性超时设置为 30 秒开始。该实例休眠 40 秒以模拟处理。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2 SasquatchFinder 2 running.... mymessage sleeping for 40 seconds...
另一个实例在队列中找不到消息,因为消息不可见。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....
然而,三十秒后,消息再次在队列中可见,并被另一个实例拾取并处理。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....
mymessage
sleeping for 40 seconds...
同时,第一个获取消息的实例完成处理并删除消息。实际上,它尝试删除消息。但是,由于另一个进程已经请求了消息并签发了新的收据句柄,因此消息并未真正删除。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2
SasquatchFinder 2 running....
mymessage
sleeping for 40 seconds...
Deleted message AQEB3/lhW8cY2cTsl2gd/GOsyPrt1J/SQn+ZR06ngf24aL5C8SqfUSPZfAl4uc2IwuZuLhZ/5BXYLWVU7AvmgSf0kb4zm0owKh01EXC4pGhvtNSsioLnk3nd4KiS5YEUO/EssCnRM1we7rXw0eLyd2LehOpPOZ49893lIJ6opy1vamQxxk6C+7iGcWbY0dMNTvrZqVaZw2JW/eZV5wI99rdUwRP16+RFj7XWsxEI5KJcExgnWY3jDRQv1mXqe5ZgWI9M7mqPH/rrx8afBdV2P53B7OK0uRm3vUGMzmW/xUgbsxsy5UB0+DZGLaccUAbegtC74LQ6BLZs64VlFxc8jAC2sp2gheLAZ849j4JkMrA8nWf+P+xKCjqdALeGrN754DcxnvhZv79R6sOGcp2lBtTOsA== by SasquatchFinder 2
由于消息仍在被第二个实例处理,第一个实例看不到该消息。第二个实例然后删除该消息。
$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....
mymessage
sleeping for 40 seconds...
Deleted message AQEBgZK7kq12asCcVVNbFQNlQmmgYTXXO8OzgoJzHpAnqdsBtMnaBxSBvjjgyVqO3nqYwuhFoxPWgXhUoUcgDzejHHSG6dM/VNG1Wdv3Q93THsJPj6BSQSH/sLjX7qvdFYT20Es0jdhN4dQTNMPyaA3sA7a2x025cUYLsegKfMlWVfCDThABbn+0evwgkn3hmzwLBvAWZEGIp0mooZvYf6WiLcblbqCnx+Gh5j5/XvmIpWuT9ux3DQSTYH+f+XdfUxclXP6exwAYyyFm7xHJnlF9LXcRcKmv2QitpQjgjK3yQBLrogU6dPf8Zp34K8iwMr1TBXEi5mZnfPSA7Cl3a4N2c+MxB+OupGIGGY6uoy2gFLSiaaunsij/weB0FFaYaE/MFhMsXdMMhNho2o/lrq6SOA== by SasquatchFinder 1
请注意,两条消息具有不同的 receiptHandle
。队列有一个内部机制,可以在消息被处理并随后删除两次时避免错误。但是,它不能阻止多次处理消息。如果操纵处理时间或可见性超时,我们可以让消息被处理更多次。
要实际删除底层消息,必须提供最新的接收句柄。因此,在上面的示例中,第一次尝试删除消息是在第二个接收句柄返回之后进行的,因此消息未被删除。但第二次尝试删除消息是最新接收句柄,因此消息被删除了。要删除消息,您必须传递最近签发的接收句柄。
您应该设计您的系统,使其不依赖于消息被处理的次数。您的系统应该是幂等的。如果您需要严格的一次性处理,那么请使用 FIFO 队列。
消息属性和死信队列
让我们探讨在使用 AWS SQS 队列时重要的两个主题:消息属性和死信队列。消息可以具有关联的元数据。但是,要接收具有关联元数据的消息,ReceiveMessageRequest
必须明确指示除了消息本身之外还要获取关联的元数据。消息可能无法成功处理。与其让消息无限期地停留在队列中失败,不如配置死信队列,以便发送失败次数达到可配置次数的消息。
死信队列
- 创建一个名为
DeadLetterQueue
的新标准队列。 SelectSasquatchImageQueue
并从“队列操作”下拉菜单中选择“配置队列”。- 修改
SasquatchImageQueue
,使其使用DeadLetterQueue
作为其死信队列。
消息属性
- 选择
SasquatchImageQueue
并发送新消息。 - 创建消息时,添加两个消息属性。
- 打开
SQSTutorialConsumer
项目并修改SasquatchFinder
中的processMessage
方法。请注意,您注释掉了删除消息的调用。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).messageAttributeNames("*").build(); List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest) .messages(); if (messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); for (Message message : messages) { System.out.println(message.messageId()); Map<String, MessageAttributeValue> attributes = message .messageAttributes(); Set<String> keys = attributes.keySet(); for (String key : keys) { System.out.println(key + ":" + attributes.get(key).stringValue()); } } try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); //this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译并运行应用程序。消息应该处理三次。
SasquatchFinder 1 running.... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds... abc e6ede972-9a6d-4c86-8c00-b16fe18977ff attribute1:abc attribute2:ded sleeping for 10 seconds...
- 返回 AWS 控制台,您应该会看到消息已放置在
DeadLetterQueue
上。
要接收消息属性,我们需要使用明确的指令构建 ReceiveMessageRequest
,通过指定 messageAttributeNames
来接收消息属性。该方法可以接受一个或多个属性名称,或者一个 * 来表示所有属性。
该消息已发送到 DeadLetterQueue
,该队列被配置为 SasquatchImageQueue
死信队列。
如果您想了解更多关于消息属性的信息,这里是 Amazon 网站上的一个教程:向 Amazon SQS 队列发送带有属性的消息。
如果您想了解更多关于死信队列的信息,这里是 Amazon 网站上的一个教程:配置 Amazon SQS 死信队列。
maxNumberOfMessages
如果队列上有多个消息可用,ReceiveMessageRequest
可以一次接收多条消息。上面我们设置了最大消息数为一。让我们探讨一下将设置更改为更多消息时会发生什么。
- 修改
SasquatchFinder
类,创建一个名为deleteMessages
的新方法。 - 让该方法迭代所有接收到的消息。
public void deleteMessages(List<Message> messages) { for(Message message:messages) { String receiptHandle = message.receiptHandle(); DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl) .receiptHandle(receiptHandle).build(); this.sqsClient.deleteMessage(deleteRequest); System.out.println("Deleted message " + receiptHandle + " by SasquatchFinder " + SasquatchFinder.finderId); } }
- 修改
processMessage
,使其调用deleteMessages
而不是deleteMessage
。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(10).messageAttributeNames("*").build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if (messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); for (Message message : messages) { System.out.println(message.messageId()); Map<String, MessageAttributeValue> attributes = message .messageAttributes(); Set<String> keys = attributes.keySet(); for (String key : keys) { System.out.println(key + ":" + attributes.get(key).stringValue()); } } try { System.out.println("sleeping for 10 seconds..."); Thread.sleep(10000); this.deleteMessages(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译应用程序。
- 编译后,导航到 AWS SQS 控制台并向队列添加五条消息,消息正文分别为 a1、a2、a3、a4 和 a5。
- 运行应用程序,您应该会看到类似以下内容的输出。
SasquatchFinder 1 running.... a4 98a42736-e4b5-4dfd-9428-3e32d2ea145d sleeping for 10 seconds... Deleted message AQEBqmAqpGs85ERM2Y8EnD4zjBPO1KxomlhJgQCPQ+JO3gjYhRcZbflS1gKJT1kas0JId7bX4X+OmFWQfC8r+gZGr02jwBcKlhvSUIv0tx13Q88EPpzMJDNbB9w9oKbgR+hc8c0nZQPPjJ2uHu7KeQfTmIdK/dt49cs/GHFRZeq3pIUWN2jJO8h0UdlpLeFKbB96WjPvakAnXDFd46meejQvBod0x18L1Y1dBt6cZc5+9AbB6eb4bJjV5dKvyDCtIUP2XFZ8iwtZF1lxntzqXxdMGYCjzaQ/oqQ5EmVJ/pFMTgWlUTks+qVFMu7a/sOCfQm7bFwE3AofXQROAK3B0crssZTbzoqQ9oJv+nj0kn596gidN+gygrISvF9vESIG1M5Ll+Lk2ADWQeO+2UA/AJax3A== by SasquatchFinder 1 a1 a5 c167bb7a-f356-4d5b-aa0f-ea90075cef50 f0d79263-05da-485e-bf6a-fa6b3f9fe92a sleeping for 10 seconds... Deleted message AQEBGwtlQPM080KnHDAOWUsZKUQ4PWfLP2g/AFn0sr9ERDOJFssjl7rNXl3mL6ryqoH9EgiPEGyGXwPm6n/FSsfbPA9OSMJYLq0Fho9qtpkcoI0mmAqRPQ/7h0J++zAmmf3bflcD9BqJS+hz4a/Di8Eo6GB0oWJUFZEFYcKWnIUGMNgnQfY3xs1DF9UuNZdsu7h3KN9hGGy3vSTuLvJJox7DDHSgY+QU3nisT5dTSfltKc9vJMQq2mPxB/f2EUmgwKQ82f10A6lPlSjVuiyNtGkKVau3BorKINz3dtG+xAHd5wWfALFExyip7zFZl6wVsnzfKox9QBaxRSrukIfx3+w5rIilq1QujPpNqLKItlxOvaXvDvxi/8lWv31S5UNlY7ooEOYSIkh1wnNwXKY7ZP4aQQ== by SasquatchFinder 1 Deleted message AQEBLIUJqmODdigrnQ88hzta9Zr+PaQnctLqmYrQT0iU5ZxvaLPy0PGNTe7eKwLHbBvc+WdDbLXK951WaPYWoY9dbMJZMyRNnjEj3doGoUkmBOm0LzTs1xDkV+QPb3fGH3s+mxh2TFhX3KFOwXrvf4uqkpx9mHdGioMWa86NSsCUUEQ3vXGUXprSdGsSqXUsoAug7v6wBU3QIPzeQm8pRLmjbZPdx+ndeV80FwnFkxDfNx/mtpAibum4ON4CxDUB66jLC7nVRe0XxXBllM2G/brS7jseqbz+Q61qbFjLNWKo96kTBIrYDjvZEmcSQdp37cYMf4rO/vsr+/XCNUtbtcD8h9Xk8Fc+atcIsuQSlrLbYMplVgN3EwogYlXJsB9GSOlVQVpO+gwOLBXonXJ6i3EAbQ== by SasquatchFinder 1 a2 a5 e65fbcc2-2c4a-42f6-8b61-ca97dad4826e b2bc665c-4c1c-42c7-b3d2-c1d5bf048ee9 sleeping for 10 seconds... Deleted message AQEB2FZyDGQEOUgLxR9wIxAiJbk++Ktec9RLon3nAZr7bPeQu2QJ8iVxRMNg92ZgvoPY5qsBndcRGEQjI5zKHQ/r62tg4+LMWwFLSDBhDF3d55w6OosgLf+K7AIBICGAeTJanTkhCzQlWYM+HCDFEve+NhPsr5+/zabaeZrkKwSBh8E2jTCmr29LmNR6ld9Bz0NSboj5gi+Gxa3dTu+xPGMLMjANVQ1Qa1BhoYEI0QP8kl9gL8aBpLhkeW1eWXgRaRtRcTAVpjxF73ZlUEFVNyYeE/Mwz9ZT2lWRftj6dv5p2PUG5Z6VtbbBw/9AXQElJUTgfHKGd4iGEjo4A3l6ff6g/NVJzm/LkGq6909txbTIk8PSp5istS4bM318W6VG2ten9jYSU7+pj8H809AHoW3VEw== by SasquatchFinder 1 Deleted message AQEBMdzd33/uz7jNQMnBJu1ne7GRh9g2xHx6X0cPWLsU0emEN0G5SGbr3nF/9QklDrrW42BX1HW6IDWxvhlI4/bOByZobYOfjmv5Cr8rDEJYnNKWxqxBZeQqjArKTy90WeEs0puUw4l6PouEZOv35daHO0h01A8Dpk/oMlVBi/OZFCIM4fetG2tUxwa7eU15WiEF4mklZqqJx2bVTbdiZqwhOucgqXlyXK3IJ5FtBFd6ACtEyX1tQmIBn6njmk/CBuX0v5+LzaxlntHy9Q+FpjuPLEyyE5wGqIk9B8Kcqv469pnaE3UJJaCK7DxgG70rF/7M1kYzaDRbRBYJB9jS3W9b8qZpj1JU4JM4euH9xBP4j59MvdwgIs4lSPvO1F3NtdCuNeOOMF15/n1WvU2U31jSeg== by SasquatchFinder 1
如示例所示,您可以指定要处理的最大消息数,但不能指定消息数量。这应该看起来合理,因为消费者在处理之前不知道队列中有多少消息。顺便说一下,请注意,消息在上面的列表中没有按照它们接收的相同顺序进行处理。
先进先出 (FIFO) 队列
让我们修改项目以使用 FIFO 队列,并同时重新运行两个消费者实例。请注意,消费者和生产者都不知道队列的类型。它们只知道队列的 URL。
- 创建一个名为
SasquatchImageQueue.fifo
的新队列,类型为 FIFO 队列。 - 点击“快速创建队列”。
- 创建一个新权限,但为了省事,我们勾选“所有人”复选框和“所有 SQS 操作”复选框。显然,在生产环境中您不会这样做。
- 修改消费者和生产者,使其都使用此队列的 URL。
https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue.fifo
- 修改生产者中的
sendMessage
方法。请注意移除了delaySeconds
并添加了messageGroupId
。public String sendMessage(String message) { SendMessageRequest request = SendMessageRequest.builder() .queueUrl(this.queueUrl).messageBody(message) .messageGroupId("mygroup").build(); SendMessageResponse response = this.sqsClient.sendMessage(request); return response.messageId(); }
- 更改队列后编译并运行生产者应用程序,三条消息将发送到队列。
- 编译并运行消费者应用程序,三条消息将按其接收顺序进行处理。
消息可见性
- 修改
SasquatchFinder
的processMessage
,使其通过休眠 40 秒来模拟处理。public void processMessage() { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl) .maxNumberOfMessages(1).build(); List<Message> messages = this.sqsClient .receiveMessage(receiveMessageRequest).messages(); if(messages == null || messages.size() == 0) { return; } messages.stream().map(s -> s.body()).forEach(System.out::println); try { System.out.println("sleeping for 40 seconds..."); Thread.sleep(40000); this.deleteMessage(messages); } catch (InterruptedException e) { e.printStackTrace(); } }
- 编译并运行应用程序。请注意,您会收到一个
SqsException
。SasquatchFinder 2 running.... messageMine sleeping for 40 seconds... software.amazon.awssdk.services.sqs.model.SqsException: Value AQEBBJL+BlwyhRLnQGxaIKDkkrEv1sU6VnHzYM51Q0UFdx2lDyWvKoI/JYcs7MktVJ1Nmyr1mCVX/cpcqS9dMqq7Ual92VLEXDS9hEYM/qg1vdEGHB60OktMzpidyWBenQQyybzXofO+pAdKOYpC/wiEw8GBPsmFDCHpVn1hxHeLSNJyw10SwNv3DTXQXk4Pe+v3yGf23bf8sDk7Rx7ApqWYi8n8z9uijZAQBdwuFpUrZslivMWCzid6AFOXI/k83+/tKnSMyT0/Mx0rng0v1k4WliSgv5YJo5HyEZTt+cOBwfA= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired. (Service: Sqs, Status Code: 400, Request ID: 845b9538-4104-5428-aa2f-c05092244385) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.handl <snip> at com.aws.tutorial.sqs.main.SasquatchFinder.main(SasquatchFinder.java:58) SasquatchFinder 2 stopped.
如果使用 FIFO 队列,在可见性超时窗口之后执行删除消息会失败。
结论
在本教程中,我们创建了一个 Amazon SQS 队列。创建队列后,我们使用 AWS Java 2 SDK 创建了一个消息生产者和一个消息消费者。然后我们探讨了几个主题,例如消息属性、死信队列和消息可见性。我们还创建了一个 FIFO 队列。
Amazon 的 SQS 队列是一个易于使用的队列,它消除了组织的基础设施管理麻烦。在本教程中,我们只探讨了 SQS 的基础知识。有关更多信息,请参阅 Java 2 SDK 开发人员指南和 SQS 开发人员指南。请记住,API 从版本 1 更改为版本 2,因此如有疑问,请假定您需要一个对象的构建器,并且在构建对象时必须配置它。但是,API 是一致的,一旦您开始使用 API,将 1.1 版代码转换为 2 版代码是直观的。
GitHub 项目
GitHub 项目 SQSTutorial 可在此处获取:这里。
文章 Amazon Web Services 简单队列服务 (AWS SQS) 使用 Java 2 软件开发工具包 最早发布于 BTS 编程博客。