65.9K
CodeProject 正在变化。 阅读更多。
Home

Amazon Web Services 简单队列服务 (AWS SQS) 使用 Java 2 软件开发工具包

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2020 年 4 月 6 日

CPOL

20分钟阅读

viewsIcon

5694

简介 AWS SQS 消息队列是应用程序之间交换消息的一种方式。发送者将数据对象发送到队列,接收者从队列接收对象。

引言

AWS SQS 消息队列是应用程序之间交换消息的一种方式。发送者将数据对象发送到队列,接收者从队列接收对象。Amazon 的简单队列服务 (AWS SQS) 是 AWS 提供的一项服务,通过在 Amazon 各地分布式部署提供可伸缩性和可靠性。

消息队列解耦应用程序。消息生产者只知道队列,而不知道队列的消费者。同样,消息消费者只知道队列,而不知道队列的其他消费者或生产者。此外,生产者和消费者不知道时序,并且是异步的。

有关队列和消息传递的更多信息,网上有许多资源。这里是麻省理工学院的一个很好的参考资料:阅读 22:队列和消息传递

用例

抛开不信,或者更准确地说,无论您对商业计划的合理性有何看法,都只需构建系统。来自华盛顿州的著名企业家 John Bunyan 有一个致富成名的计划,那就是最终 conclusively 证明大脚怪(或称野人,对有文化的人而言)存在,并且利用广泛的徒步旅行路径系统进行移动。

不顾会计师的建议,他清算了一半的财富,在华盛顿州的徒步旅行路径上安装了一系列隐藏摄像头,每十五分钟拍摄一张照片。由于他是个忙碌的人,他没有时间亲自分析所有照片,因此他希望图像分析软件来分析图像。如果软件识别出野人,他希望图像亲自发送到他的电子邮件帐户,以便他可以确认图像是否为野人。

现在,如果 10,000 个摄像头每 15 分钟拍摄一张照片,那么每小时就是 600,000 张图像。假设每张图像处理需要长达五分钟。希望您能看到,我们存在可伸缩性问题。

有多种方法可以解决此可伸缩性问题,但由于这是一个关于 SQS 的教程,我们使用 AWS SQS。而且,正如我在所有教程中都喜欢告诫的那样,如果“业务案例”看起来可疑,那么就抛开不信,专注于 AWS 代码。

设计

对业务案例的歉意足够了,让我们专注于应用程序的设计。下图说明了困境。

  • 每隔 n 分钟,一个 Station 会向 AWS 队列发送一个观察结果。
  • 有一个或多个 SquatchFinder 组件,其工作是从队列中获取观察结果并处理该观察结果。
  • Station 是生产者,而 SasquatchFinder 是消费者。
Station 将观察结果发送到队列,而 SasquatchFinder 从队列获取观察结果。
队列实现了异步生产者/消费者设计模式。

我们可以用一个简单的类图来形式化我们的需求。一个 Station 创建一个 Observation。一个 SasquatchFinder 处理一个 Observation

类图说明了设计。

与 AWS 的所有通信都通过其 REST API 进行。AWS SQS 也不例外。此外,SQS 队列只接受文本数据。但常见需求是队列接受二进制数据,例如图像。此外,JSON 是一种文本数据传输格式。

我们可以将 Observation 转换为 JSON 文档。图像被转换为 base64 编码,因此可以表示为文本。请注意,本教程中的 encodedImage 始终以 <snip> 截断,因为 base64 字符串相当长。

{ 
  timestamp: “1558493503”,
  latitude:”46.879967”,
  longitude:”-121.726906”,
  encodedImage:"/9j/4AA <snip> 3QEUBGX/9k="
}

Base64 编码

图像是二进制的。然而,只要正确编码和解码,所有二进制都可以用字符串表示。Base64 是一种将二进制转换为字符串的编码方案。它很有用,因为它允许将二进制数据(例如图像)嵌入到文本文件(例如网页或 JSON 文档)中。AWS SQS 队列只允许文本数据,因此如果您希望将图像存储在 AWS SQS 队列中,则必须将其转换为字符串。实现此目的最简单的方法是使用 Base64 格式在传输数据时将二进制数据编码为字符串,并在存储数据时将字符串解码为二进制数据。有关 Base64 和 DynamoDB 的示例,请参阅本网站的教程:使用 AWS DynamoDB 低级 Java API – Sprint Boot Rest 应用程序

Station – 生产者

在编写应用程序代码之前,让我们创建一个队列。您可以通过 Java 2 API SDK 创建队列;但是,这里我们手动创建队列,然后使用此队列发送和接收消息。

创建 SQS 队列

  • 导航到 SQS 控制台并选择标准队列。
  • 点击“配置队列”按钮。

  • 将队列命名为 SasquatchImageQueue

  • 接受队列属性的默认值。

  • 创建队列后,您应该会看到类似以下内容的屏幕。

  • 点击“权限”选项卡,注意我们尚未创建任何权限。在创建两个必需用户后,我们再返回“权限”选项卡。

AWS SQS 提供两种类型的队列:标准队列和先进先出 (FIFO) 队列。标准队列提供所谓的尽力而为的排序。尽管消息通常以它们收到的顺序传递,但不能保证。此外,消息也可能被处理多次。相比之下,FIFO 队列保证先进先出传递和仅处理一次。

在本教程中,我们主要使用标准队列。但是,在本教程的最后,我们将演示如何使用 FIFO 队列。

创建 SQS 队列用户

我们需要创建两个用户,一个用于与队列交互以发送消息,另一个用于接收消息。如果您以前创建过 IAM 用户,请注意我们不会将用户分配给任何组或分配任何策略。相反,我们允许队列决定其权限。当然,我们为用户分配编程访问权限并下载凭据文件。

  • 导航到 IAM 控制台并创建一个名为 SasquatchProducerUser 的新用户,该用户具有编程访问权限。
  • 将用户凭据保存在本地。

  • 创建第二个用户 SasquatchConsumerUser,该用户也具有编程访问权限。
  • 将用户凭据保存在本地。
  • 您应该已经创建了两个具有编程访问权限的用户。

队列权限

最初,只有队列的创建者或所有者才能读取或写入队列。创建者必须授予权限。我们使用队列策略来完成此操作。我们使用 ASW SQS 控制台编写策略,尽管如果您愿意,也可以手动编写。

消费者权限

  • 导航到 SasquatchConsumerUser 摘要屏幕并复制 Amazon 资源名称 (ARN)。

    ARN 应类似于以下内容。

    arn:aws:iam::743327341874:user/SasquatchConsumer

    Amazon 资源编号 (ARN) 唯一标识 Amazon 资源,在本例中是 SasquatchConsumer 用户。

  • 返回 SQS 控制台,选择 SasquatchImageQueue 并点击“权限”选项卡。

  • 点击“添加权限”。
  • 在弹出的结果窗口中,将 ARN 粘贴到“主体”文本框中。
  • 勾选 DeleteMessageGetQueueUrlReceiveMessage 操作。
  • 点击“保存更改”。

  • 创建 SasquatchConsumerUser 后,导航到 SasquatchProducerUser 并复制生产者的 ARN。
    arn:aws:iam::743327341874:user/SasquatchProducerUser
  • 返回 SQS 队列并将此用户作为权限添加到队列。允许 ChangeMessageVisibilityDeleteMessageGetQueueAttributesGetQueueUrlPurgeQueueSendMessage 操作。

  • 添加两个用户的权限后,队列应如下图所示。

如果您仍然不确定如何向队列添加权限,这里有一个 Amazon 的教程:向 Amazon SQS 队列添加权限。您还可以添加服务器端加密,本教程对此进行了说明:使用服务器端加密 (SSE) 创建 Amazon SQS 队列

虽然我们不讨论策略文档,但以下内容说明了我们使用控制台设置的底层是 JSON 文档。但是,了解策略文档很重要,因为它们是 AWS 安全的核心。有关 AWS SQS 策略的更多信息,请参阅此文档:将基于身份 (IAM) 的策略用于 Amazon SQS

需要注意的是,这里我们使用 AWS SQS 而不是我们创建的消费者或生产者用户向队列分配了权限。正如前一段链接中的文档所讨论的,我们也可以轻松地使用 IAM 策略。

通过控制台发送消息

尽管可能很少有商业原因,但出于测试目的,您可以手动向队列添加消息。虽然我们不会使用该消息,但让我们探索使用 AWS SQS 控制台发送消息。

  • 参考 observations.json 文档并复制其中一个观察结果。当然,在下面的代码列表中,图像被截断了。
        {
          "stationid": 221,
          "date": "1992-03-12",
          "time": "091312",
          "image": "/9j/4AA <snip> 0Wxkf/9k="
        }
  • 选择队列,然后从队列操作中选择“发送消息”。

  • 从 observations.json 复制一条消息并将其添加到消息正文。

  • 点击“发送消息”,一分钟内,“可用消息”列应显示队列中有一条消息。

  • 从队列操作中选择“清除队列”以清除队列。

Java 项目 – 生产者

如前所述,生产者会生成消息。如果完全实现了上述设计,我们将有许多 Station 和许多。但是,为了使本教程保持简单,我们只在一个项目中限制一个 Station。

项目设置

尽管我使用 Eclipse 开发了本教程,但您可以使用自己的 IDE 甚至命令行。但是,您确实应该使用 Maven 或 Gradle。这里我们使用 Maven。假定您熟悉使用 Maven 构建 Java 项目。

POM

  • 创建一个名为 SQSTutorialProducer 的新项目。
  • 使用以下 POM 创建或覆盖 POM 文件。
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.tutorial.aws</groupId>
      <artifactId>SQSTutorialProducer</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
      <dependencyManagement>
        <dependencies>
          <dependency>
    	<groupId>software.amazon.awssdk</groupId>
    	<artifactId>bom</artifactId>
    	<version>2.5.25</version>
      	  <type>pom</type>
      	  <scope>import</scope>
    	  </dependency>
        </dependencies>
      </dependencyManagement>
      <dependencies>
        <dependency>
          <artifactId>auth</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
        <dependency>
          <artifactId>aws-core</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
        <dependency>
          <groupId>software.amazon.awssdk</groupId>
          <artifactId>auth</artifactId>
        </dependency>
        <dependency>
          <artifactId>sqs</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
           <version>1.7.5</version>
       </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-simple</artifactId>
           <version>1.6.4</version>
       </dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
    	<artifactId>maven-dependency-plugin</artifactId>
    	<executions>
      	  <execution>
    	    <id>copy-dependencies</id>
    	    <phase>prepare-package</phase>
    	    <goals>
    	      <goal>copy-dependencies</goal>
    	    </goals>
    	    <configuration>				 <outputDirectory>${project.build.directory}/lib</outputDirectory>				 
                  <overWriteReleases>false</overWriteReleases>
                  <overWriteSnapshots>false</overWriteSnapshots>						 
                  <overWriteIfNewer>true</overWriteIfNewer>
    	    </configuration>
            </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
    	  <addClasspath>true</addClasspath>
    	  <classpathPrefix>lib/</classpathPrefix>
    	  <mainClass>com.aws.tutorial.sqs.main.Station</mainClass>
    	</manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
    </build>
    </project>

在 POM 中,我们使用 AWS BOM,这样可以避免指定 AWS 库版本。我们添加了所需 AWS 库的依赖项。我们还指定 maven 将构建一个可执行的 jar,并将所需的依赖项打包在 jar 中。

请注意以下事项。

  <properties>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

如果我们不指定 Java 1.8 或更高版本,编译将失败,因为 AWS 构建器是静态接口方法,不适用于旧版 Java。尽管在您的机器上代码可能可以编译,但如果您的计算机上安装了多个 Java SDK,则可能会出现问题。通过显式设置版本、源和目标,我们可以避免任何潜在的编译问题。

Station

让我们创建一个简单的可执行 Java 类,名为 Station。这将模拟一个真正的消息生产者。

  • 首先创建一个 com.aws.tutorial.sqs.main 包。
  • 在创建的包中创建一个名为 Station 的类,其中包含一个 main 方法。
  • main 方法打印出类已执行的消息。
    package com.aws.tutorial.sqs.main;
    
    public class Station {
      public static void main(String[] args) {
        System.out.println("Station running....");
      }
    }

可执行 Jar

  • 编译并打包项目。如果从命令行运行,您将输入以下内容。
    $ mvn clean compile package
  • 构建后,从命令行执行程序。应该会出现打印输出。
    $ java -jar SQSTutorialProducer-0.0.1-SNAPSHOT.jar 
    Station running....

现在我们已经创建了消费者的基本结构,我们可以修改它来发送 SQS 消息。

发送消息

在此示例中,我们使用 SDK 将消息发送到队列。数据负载是 JSON 数据字符串。您使用硬编码数据发送到队列。显然,在实际应用程序中,数据将来自不同的来源。为了模拟从真正的生产者发送消息,在发送每条消息之间引入了延迟。

  • 在修改程序之前,在 com.aws.tutorial.sqs.main 包中创建一个名为 TestData 的新类。
  • 从 observations.json 文件中复制三个观察结果。
  • 或者,如果您不想自己转义字符串,请使用本教程 Git 项目中的 TestData.java。注意:如果您使用 Eclipse,当您在开引号后立即粘贴字符串时,它会自动为您转义字符串。图像的 base64 代码已缩短,以便于显示。
    package com.aws.tutorial.sqs.main;
    
    public class TestData {
    
    	public static String observationOne = "    {\n" + 
    			"      \"stationid\": 221,\n" + 
    			"      \"date\": \"2019-03-12\",\n" + 
    			"      \"time\": \"091312\",\n" + 
       		        "      \"image\": \"/9j/4A <snip> \"\n" + 
    			"    }";
    	
    	public static String observationTwo = "    {\n" + 
    			"      \"stationid\": 222,\n" + 
    			"      \"date\": \"2016-02-09\",\n" + 
    			"      \"time\": \"091312\",\n" + 
    			"      \"image\": \"/9j/4A <snip> \"\n" +  
    			"    }";
    	
    	public static String observationThree = "    {\n" + 
    			"      \"stationid\": 223,\n" + 
    			"      \"date\": \"2017-12-22\",\n" + 
    			"      \"time\": \"091312\",\n" + 
    			"      \"image\": \"/9j/4A <snip> \"\n" + 
    			"    }";
    }
  • 修改 Station,使其具有一个接受三个字符串(键、密钥和队列 URL)的构造函数。
  • 创建两个成员变量,一个类型为 SqsClient,另一个类型为 String
  • Station 构造函数中初始化 SqsClient
  • 创建一个名为 sendMessage 的方法,该方法将消息发送到队列。
  • 最后,修改 main 以发送 TestData.java 中的所有三条消息,并在发送每条消息之间暂停。
    package com.aws.tutorial.sqs.main;
    
    import javax.annotation.PreDestroy;
    import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
    import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.sqs.SqsClient;
    import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
    import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
    
    public class Station {
    
      SqsClient sqsClient;
      String queueUrl;
    	
      public Station(String key, String secretKey, String queueUrl) {
        AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey);
    
        this.sqsClient = SqsClient.builder()
          .credentialsProvider(StaticCredentialsProvider
          .create(awsCreds)).region(Region.US_EAST_1).build();
    
        this.queueUrl = queueUrl;
      }
    	
      public String sendMessage(String message) {
        SendMessageRequest request = SendMessageRequest.builder()
          .queueUrl(this.queueUrl).messageBody(message)
          .delaySeconds(5).build();		
    
        SendMessageResponse response = this.sqsClient.sendMessage(request);
        return response.messageId();
      }
    
      @PreDestroy
      public void preDestroy() {
        this.sqsClient.close();
      }
    
      public static void main(String[] args) {
        System.out.println("Station running....");
        Station station = new Station("AKIA22EODDUZONNX2EMP",
          "LUXJ5WQjW0p4bk1gC5oGBUi41rxA7oSvWWA/8SqH",
          "https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue");
    
        String id = station.sendMessage(TestData.observationOne);
        System.out.println("sent message: " + id);
        try {
          Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
          }
        id = station.sendMessage(TestData.observationTwo);
        System.out.println("sent message: " + id);
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        id = station.sendMessage(TestData.observationThree);
        System.out.println("sent message: " + id);
        }
    }
  • 编译并运行应用程序,您应该会看到以下输出。
    Station running....
    sent message: b861220e-a37a-424d-880c-5dd67a052967
    sent message: 5185e68b-a16f-4300-8ee5-7ef5cca0eb53
    sent message: 161f7444-ae7b-4890-b022-0447933054c3
  • 导航到 AWS 控制台中的队列,您应该在“可用消息”列中看到三条消息。

消费者只有一个 SqsClient 实例,该实例在 Station 构造函数中初始化,并在使用 <a href="https://docs.oracle.com/javaee/7/api/javax/annotation/PreDestroy.html" rel="noreferrer noopener" target="_blank">@PreDestroy</a> 注解的方法中关闭。此注解用于标记应在类即将被垃圾回收时调用的方法。

Credentials

客户端需要凭据才能操作。这是应用程序用于向 AWS SDK 验证自身的帐户。为了简单起见,这里我们硬编码了凭据。有关 AWS Java 2 SDK 和凭据的更多信息,请参阅 SDK 文档

SqsClient

<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/SqsClient.html" rel="noreferrer noopener" target="_blank">SqsClient</a> 是一个扩展 SdkClient 的接口,是访问 AWS SQS 服务的客户端。您可以使用 <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/SqsClientBuilder.html" rel="noreferrer noopener" target="_blank">SqsClientBuilder</a> 来构建客户端。您通过传递凭据和区域来构建客户端。

 this.sqsClient = SqsClient.builder()
      .credentialsProvider(StaticCredentialsProvider
      .create(awsCreds)).region(Region.US_EAST_1).build()

所有对 SQS 的请求都必须通过客户端。不同类型的请求有相应的命名。例如,请求发送消息需要 SendMessageRequest,请求删除消息需要 DeleteMessageRequest。如果您曾使用 Java 2 SDK 提供的其他服务,例如 DynamoDb 或 S3,那么这种模式应该很熟悉。

SendMessageRequest

<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/pinpoint/model/SendMessagesRequest.html" rel="noreferrer noopener" target="_blank">SendMessageRequest</a> 封装了向客户端发送消息的请求。您使用 SendMessageRequestBuilder 构建请求。上面我们设置了队列的 URL、消息的正文以及发送消息前的延迟时间。我们从 AWS SDK 控制台获取了队列的 URL。

SendMessageRequest request = SendMessageRequest.builder()
  .queueUrl(this.queueUrl).messageBody(message)
  .delaySeconds(5).build();
该 URL 位于 AWS 控制台中队列的“详细信息”选项卡中。

SendMessageResponse

客户端发送请求并接收响应。<a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/pinpoint/model/SendMessagesResponse.html" rel="noreferrer noopener" target="_blank">SendMessageResponse</a> 封装了响应。该方法然后返回 messageIdmain 将该值打印到控制台。

SendMessageResponse response = this.sqsClient.sendMessage(request);
return response.messageId();

现在我们已经创建了三条消息并将其发送到 SQS,我们可以编写一个消费者来消费这些消息。现在让我们创建一个名为 SQSTutorialConsumer 的 Java 项目。

Java 项目 – 消费者

消费者会消费消息。让我们为队列中的消息创建一个消费者。与生产者一样,我们通过创建一个从命令行运行的可执行类来大大简化消费者。

项目设置

让我们为消费者创建一个 Java Maven 项目。

POM

  • 创建一个名为 SQSTutorialConsumer 的 Java 项目作为 Maven 项目。
  • 创建一个包含以下内容的 POM 文件。
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.tutorial.aws</groupId>
      <artifactId>SQSTutorialConsumer</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
      </properties>
      <dependencyManagement>
        <dependencies>
          <dependency>
    	<groupId>software.amazon.awssdk</groupId>
    	<artifactId>bom</artifactId>
    	<version>2.5.25</version>
      	  <type>pom</type>
      	  <scope>import</scope>
    	  </dependency>
        </dependencies>
      </dependencyManagement>
      <dependencies>
        <dependency>
          <artifactId>auth</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
        <dependency>
          <artifactId>aws-core</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
        <dependency>
          <groupId>software.amazon.awssdk</groupId>
          <artifactId>auth</artifactId>
        </dependency>
        <dependency>
          <artifactId>sqs</artifactId>
          <groupId>software.amazon.awssdk</groupId>
        </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-api</artifactId>
           <version>1.7.5</version>
       </dependency>
       <dependency>
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-simple</artifactId>
           <version>1.6.4</version>
       </dependency>
      </dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
    	<artifactId>maven-dependency-plugin</artifactId>
    	<executions>
      	  <execution>
    	    <id>copy-dependencies</id>
    	    <phase>prepare-package</phase>
    	    <goals>
    	      <goal>copy-dependencies</goal>
    	    </goals>
    	    <configuration>				 <outputDirectory>${project.build.directory}/lib</outputDirectory>				 
                  <overWriteReleases>false</overWriteReleases>
                  <overWriteSnapshots>false</overWriteSnapshots>						 
                  <overWriteIfNewer>true</overWriteIfNewer>
    	    </configuration>
            </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
    	  <addClasspath>true</addClasspath>
    	  <classpathPrefix>lib/</classpathPrefix>
    	  <mainClass>com.aws.tutorial.sqs.main.SasquatchFinder</mainClass>
    	</manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
    </build>
    </project>

SasquatchFinder

  • 首先创建一个 com.aws.tutorial.sqs.main 包。
  • 接下来,在该包中创建一个名为 SasquatchFinder 的类。
  • 在该类中创建一个 main 方法,并让它打印出它已运行。
    package com.aws.tutorial.sqs.main;
    
    public class SasquatchFinder {
      public static void main(String[] args) {
        System.out.println("SasquatchFinder running....");
      }
    }
  • 构建项目。
    $ mvn clean compile package
  • 构建项目后,从命令行执行程序。
    $ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 
    SasquatchFinder running....

现在我们已经有了项目的基本大纲,我们可以添加代码来接收消息。

接收消息

  • SQSTutorialProducer 项目中的 Station 一样,创建成员变量。
  • 创建一个 main 方法来初始化 SqsClient。请务必使用消费者的凭据,而不是生产者的凭据。
  • 创建一个名为 processMessage 的新方法,并使其使用 ReceiveMessageRequest 来接收消息。
  • 创建一个名为 deleteMessage 的新方法,并使其使用 DeleteMessageRequest 来删除消息。
  • 修改 processMessage,使其在延迟后调用 deleteMessage
  • 修改 main 以连续循环处理消息。
    package com.aws.tutorial.sqs.main;
    
    import java.util.List;
    import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
    import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.sqs.SqsClient;
    import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
    import software.amazon.awssdk.services.sqs.model.Message;
    import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
    
    public class SasquatchFinder {
    
    	private SqsClient sqsClient;
    	private String queueUrl;
    	public static int finderId = 1;
    
    	public SasquatchFinder(String key, String secretKey, String queueUrl) {
    		AwsBasicCredentials awsCreds = AwsBasicCredentials.create(key, secretKey);
    		this.sqsClient = SqsClient.builder().credentialsProvider(StaticCredentialsProvider.create(awsCreds))
    				.region(Region.US_EAST_1).build();
    		this.queueUrl = queueUrl;
    	}
    
    	public void processMessage() {
    
    		ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
    				.maxNumberOfMessages(1).build();
    		List<Message> messages = this.sqsClient.receiveMessage(receiveMessageRequest).messages();
    		if(messages == null || messages.size() == 0) return;
    		messages.stream().map(s -> s.body()).forEach(System.out::println);
    		try {
    			System.out.println("sleeping for 10 seconds...");
    			Thread.sleep(10000);
    		  this.deleteMessage(messages);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public void deleteMessage(List<Message> messages) {
    		String receiptHandle = messages.get(0).receiptHandle();
    		DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl)
    				.receiptHandle(receiptHandle).build();
    		this.sqsClient.deleteMessage(deleteRequest);
    	}
    
    	public static void main(String[] args) {
    		System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " running....");
    		SasquatchFinder finder = new SasquatchFinder("AKIA22EODDUZAMDPWSX7", "805hbufO3Sn18eDsBDrOzCgB/eT5KVPM/AIkIpoZ",
    				"https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue");
    		try {
    			while (true) {
    				finder.processMessage();
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		System.out.println("SasquatchFinder " + SasquatchFinder.finderId + " stopped.");
    	}
    }
  • 编译并运行生产者,如果您在上一节中运行了消费者,您应该会看到以下输出。
    SasquatchFinder 1 running....
        {
          "stationid": 221,
          "date": "2019-03-12",
          "time": "091312",
          "image": "/9j/4AAQ <snip> kf/9k="
        }
    sleeping for 10 seconds...
        {
          "stationid": 223,
          "date": "2017-12-22",
          "time": "091312",
          "image": "/9j/4AAQ <snip> kf/9k="
        }
    sleeping for 10 seconds...
        {
          "stationid": 222,
          "date": "2016-02-09",
          "time": "091312",
          "image": "/9j/4AAQ <snip> kf/9k="
        }
    sleeping for 10 seconds...
  • 导航到 AWS 控制台中的队列,您应该看不到任何消息,因为它们在处理后已被删除。

在这个简单的消费者中,我们首先创建一个用于与队列交互的客户端。然后我们从队列中获取一条消息。程序暂停以模拟处理。然后它使用 receiptHandle 从队列中删除消息。

因为程序循环运行,所以它会处理我们创建消费者时放入队列中的所有三条消息。

ReceiveMessageRequest

ReceiveMessageRequest 封装了从 SQS 队列接收消息的请求。我们使用构建器创建请求。然后我们指定队列 URL 和要获取的最大消息数。最后,我们指定了一条消息;但是,如果需要,您可以指定多条消息。

ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
				.maxNumberOfMessages(1).build();

DeleteMessageRequest

处理完消息后,您应该将其从队列中删除。我们通过获取收到的消息的 receiptHandle 来完成此操作,然后使用它来删除消息。

String receiptHandle = messages.get(0).receiptHandle();
		DeleteMessageRequest deleteRequest = DeleteMessageRequest.builder().queueUrl(this.queueUrl)
				.receiptHandle(receiptHandle).build();

程序处理队列中的所有消息。这是一个简单的消费者,但您可以有多个消费者从同一个队列中消费消息。

消息可见性

使用标准队列时,消息可能会被处理两次。当消息被消费者拾取进行处理时,它会在可配置的时间内变得不可见。当我们创建队列时,我们接受了 30 秒的可见性超时。但是,如果处理时间超过可见性超时,则消息可能会被另一个消费者处理。下图说明了这一点。

还有一个问题。当消息第二次从队列中删除时会发生什么?

  • 打开 SQS 控制台并向队列发送一条消息。

  • 修改 SasquatchFinder,使其在每条消息之间暂停 40 秒。
    public void processMessage() {
      ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest
        .builder().queueUrl(this.queueUrl).maxNumberOfMessages(1).build();
    
      List<Message> messages = this.sqsClient
        .receiveMessage(receiveMessageRequest).messages();
    
      if(messages == null || messages.size() == 0){
       return;
     }
      messages.stream().map(s -> s.body()).forEach(System.out::println);
      try {
        System.out.println("sleeping for 40 seconds...");
        Thread.sleep(40000);
        this.deleteMessage(messages);
      } catch (InterruptedException e) {
          e.printStackTrace();
        }
    }
  • 构建应用程序后,打开两个命令行窗口并同时在两个不同的窗口中执行程序。

    一个正在运行的实例从队列中获取消息。消息的可见性超时设置为 30 秒开始。该实例休眠 40 秒以模拟处理。

    $ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2
    SasquatchFinder 2 running....
    mymessage
    sleeping for 40 seconds...

另一个实例在队列中找不到消息,因为消息不可见。

$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....

然而,三十秒后,消息再次在队列中可见,并被另一个实例拾取并处理。

$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....
mymessage
sleeping for 40 seconds...

同时,第一个获取消息的实例完成处理并删除消息。实际上,它尝试删除消息。但是,由于另一个进程已经请求了消息并签发了新的收据句柄,因此消息并未真正删除。

$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 2
SasquatchFinder 2 running....
mymessage
sleeping for 40 seconds...
Deleted message AQEB3/lhW8cY2cTsl2gd/GOsyPrt1J/SQn+ZR06ngf24aL5C8SqfUSPZfAl4uc2IwuZuLhZ/5BXYLWVU7AvmgSf0kb4zm0owKh01EXC4pGhvtNSsioLnk3nd4KiS5YEUO/EssCnRM1we7rXw0eLyd2LehOpPOZ49893lIJ6opy1vamQxxk6C+7iGcWbY0dMNTvrZqVaZw2JW/eZV5wI99rdUwRP16+RFj7XWsxEI5KJcExgnWY3jDRQv1mXqe5ZgWI9M7mqPH/rrx8afBdV2P53B7OK0uRm3vUGMzmW/xUgbsxsy5UB0+DZGLaccUAbegtC74LQ6BLZs64VlFxc8jAC2sp2gheLAZ849j4JkMrA8nWf+P+xKCjqdALeGrN754DcxnvhZv79R6sOGcp2lBtTOsA== by SasquatchFinder 2

由于消息仍在被第二个实例处理,第一个实例看不到该消息。第二个实例然后删除该消息。

$ java -jar SQSTutorialConsumer-0.0.1-SNAPSHOT.jar 1
SasquatchFinder 1 running....
mymessage
sleeping for 40 seconds...
Deleted message AQEBgZK7kq12asCcVVNbFQNlQmmgYTXXO8OzgoJzHpAnqdsBtMnaBxSBvjjgyVqO3nqYwuhFoxPWgXhUoUcgDzejHHSG6dM/VNG1Wdv3Q93THsJPj6BSQSH/sLjX7qvdFYT20Es0jdhN4dQTNMPyaA3sA7a2x025cUYLsegKfMlWVfCDThABbn+0evwgkn3hmzwLBvAWZEGIp0mooZvYf6WiLcblbqCnx+Gh5j5/XvmIpWuT9ux3DQSTYH+f+XdfUxclXP6exwAYyyFm7xHJnlF9LXcRcKmv2QitpQjgjK3yQBLrogU6dPf8Zp34K8iwMr1TBXEi5mZnfPSA7Cl3a4N2c+MxB+OupGIGGY6uoy2gFLSiaaunsij/weB0FFaYaE/MFhMsXdMMhNho2o/lrq6SOA== by SasquatchFinder 1

请注意,两条消息具有不同的 receiptHandle。队列有一个内部机制,可以在消息被处理并随后删除两次时避免错误。但是,它不能阻止多次处理消息。如果操纵处理时间或可见性超时,我们可以让消息被处理更多次。

要实际删除底层消息,必须提供最新的接收句柄。因此,在上面的示例中,第一次尝试删除消息是在第二个接收句柄返回之后进行的,因此消息未被删除。但第二次尝试删除消息是最新接收句柄,因此消息被删除了。要删除消息,您必须传递最近签发的接收句柄。

您应该设计您的系统,使其不依赖于消息被处理的次数。您的系统应该是幂等的。如果您需要严格的一次性处理,那么请使用 FIFO 队列。

消息属性和死信队列

让我们探讨在使用 AWS SQS 队列时重要的两个主题:消息属性和死信队列。消息可以具有关联的元数据。但是,要接收具有关联元数据的消息,ReceiveMessageRequest 必须明确指示除了消息本身之外还要获取关联的元数据。消息可能无法成功处理。与其让消息无限期地停留在队列中失败,不如配置死信队列,以便发送失败次数达到可配置次数的消息。

死信队列

  • 创建一个名为 DeadLetterQueue 的新标准队列。
  • SelectSasquatchImageQueue 并从“队列操作”下拉菜单中选择“配置队列”。

  • 修改 SasquatchImageQueue,使其使用 DeadLetterQueue 作为其死信队列。

消息属性

  • 选择 SasquatchImageQueue 并发送新消息。
  • 创建消息时,添加两个消息属性。

  • 打开 SQSTutorialConsumer 项目并修改 SasquatchFinder 中的 processMessage 方法。请注意,您注释掉了删除消息的调用。
    public void processMessage() {
      ReceiveMessageRequest receiveMessageRequest = 
        ReceiveMessageRequest.builder().queueUrl(this.queueUrl)	 		 	 
         .maxNumberOfMessages(1).messageAttributeNames("*").build();		
    
    List<Message> messages = 
      this.sqsClient.receiveMessage(receiveMessageRequest)
        .messages();
    
      if (messages == null || messages.size() == 0) {			
        return;		
      }
    
      messages.stream().map(s -> s.body()).forEach(System.out::println);
    
      for (Message message : messages) {
        System.out.println(message.messageId());
        Map<String, MessageAttributeValue> attributes = message
          .messageAttributes();
    
        Set<String> keys = attributes.keySet();
        
        for (String key : keys) {
          System.out.println(key + ":" + attributes.get(key).stringValue());
        }
      }
      try {
        System.out.println("sleeping for 10 seconds...");
        Thread.sleep(10000);
        //this.deleteMessage(messages);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  • 编译并运行应用程序。消息应该处理三次。
    SasquatchFinder 1 running....
    abc
    e6ede972-9a6d-4c86-8c00-b16fe18977ff
    attribute1:abc
    attribute2:ded
    sleeping for 10 seconds...
    abc
    e6ede972-9a6d-4c86-8c00-b16fe18977ff
    attribute1:abc
    attribute2:ded
    sleeping for 10 seconds...
    abc
    e6ede972-9a6d-4c86-8c00-b16fe18977ff
    attribute1:abc
    attribute2:ded
    sleeping for 10 seconds...
  • 返回 AWS 控制台,您应该会看到消息已放置在 DeadLetterQueue 上。

要接收消息属性,我们需要使用明确的指令构建 ReceiveMessageRequest,通过指定 messageAttributeNames 来接收消息属性。该方法可以接受一个或多个属性名称,或者一个 * 来表示所有属性。

该消息已发送到 DeadLetterQueue,该队列被配置为 SasquatchImageQueue 死信队列。

如果您想了解更多关于消息属性的信息,这里是 Amazon 网站上的一个教程:向 Amazon SQS 队列发送带有属性的消息

如果您想了解更多关于死信队列的信息,这里是 Amazon 网站上的一个教程:配置 Amazon SQS 死信队列

maxNumberOfMessages

如果队列上有多个消息可用,ReceiveMessageRequest 可以一次接收多条消息。上面我们设置了最大消息数为一。让我们探讨一下将设置更改为更多消息时会发生什么。

  • 修改 SasquatchFinder 类,创建一个名为 deleteMessages 的新方法。
  • 让该方法迭代所有接收到的消息。
    public void deleteMessages(List<Message> messages) {
      for(Message message:messages) {
        String receiptHandle = message.receiptHandle();
        DeleteMessageRequest deleteRequest = 
          DeleteMessageRequest.builder().queueUrl(this.queueUrl)
          .receiptHandle(receiptHandle).build();
    
        this.sqsClient.deleteMessage(deleteRequest);
        System.out.println("Deleted message " + receiptHandle 
        + " by SasquatchFinder " + SasquatchFinder.finderId);
      }
    }
  • 修改 processMessage,使其调用 deleteMessages 而不是 deleteMessage
    public void processMessage() {
      ReceiveMessageRequest receiveMessageRequest = 
        ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
          .maxNumberOfMessages(10).messageAttributeNames("*").build();
    
      List<Message> messages = this.sqsClient
        .receiveMessage(receiveMessageRequest).messages();
      if (messages == null || messages.size() == 0) {
        return;
      }
      messages.stream().map(s -> s.body()).forEach(System.out::println);
      for (Message message : messages) {
        System.out.println(message.messageId());
        Map<String, MessageAttributeValue> attributes = message
          .messageAttributes();
    
        Set<String> keys = attributes.keySet();
        for (String key : keys) {
          System.out.println(key + ":" + attributes.get(key).stringValue());
        }
      }
      try {
        System.out.println("sleeping for 10 seconds...");
        Thread.sleep(10000);
        this.deleteMessages(messages);
      } catch (InterruptedException e) {
          e.printStackTrace();
        }
    }
  • 编译应用程序。
  • 编译后,导航到 AWS SQS 控制台并向队列添加五条消息,消息正文分别为 a1、a2、a3、a4 和 a5。

  • 运行应用程序,您应该会看到类似以下内容的输出。
    SasquatchFinder 1 running....
    a4
    98a42736-e4b5-4dfd-9428-3e32d2ea145d
    sleeping for 10 seconds...
    Deleted message AQEBqmAqpGs85ERM2Y8EnD4zjBPO1KxomlhJgQCPQ+JO3gjYhRcZbflS1gKJT1kas0JId7bX4X+OmFWQfC8r+gZGr02jwBcKlhvSUIv0tx13Q88EPpzMJDNbB9w9oKbgR+hc8c0nZQPPjJ2uHu7KeQfTmIdK/dt49cs/GHFRZeq3pIUWN2jJO8h0UdlpLeFKbB96WjPvakAnXDFd46meejQvBod0x18L1Y1dBt6cZc5+9AbB6eb4bJjV5dKvyDCtIUP2XFZ8iwtZF1lxntzqXxdMGYCjzaQ/oqQ5EmVJ/pFMTgWlUTks+qVFMu7a/sOCfQm7bFwE3AofXQROAK3B0crssZTbzoqQ9oJv+nj0kn596gidN+gygrISvF9vESIG1M5Ll+Lk2ADWQeO+2UA/AJax3A== by SasquatchFinder 1
    a1
    a5
    c167bb7a-f356-4d5b-aa0f-ea90075cef50
    f0d79263-05da-485e-bf6a-fa6b3f9fe92a
    sleeping for 10 seconds...
    Deleted message AQEBGwtlQPM080KnHDAOWUsZKUQ4PWfLP2g/AFn0sr9ERDOJFssjl7rNXl3mL6ryqoH9EgiPEGyGXwPm6n/FSsfbPA9OSMJYLq0Fho9qtpkcoI0mmAqRPQ/7h0J++zAmmf3bflcD9BqJS+hz4a/Di8Eo6GB0oWJUFZEFYcKWnIUGMNgnQfY3xs1DF9UuNZdsu7h3KN9hGGy3vSTuLvJJox7DDHSgY+QU3nisT5dTSfltKc9vJMQq2mPxB/f2EUmgwKQ82f10A6lPlSjVuiyNtGkKVau3BorKINz3dtG+xAHd5wWfALFExyip7zFZl6wVsnzfKox9QBaxRSrukIfx3+w5rIilq1QujPpNqLKItlxOvaXvDvxi/8lWv31S5UNlY7ooEOYSIkh1wnNwXKY7ZP4aQQ== by SasquatchFinder 1
    Deleted message AQEBLIUJqmODdigrnQ88hzta9Zr+PaQnctLqmYrQT0iU5ZxvaLPy0PGNTe7eKwLHbBvc+WdDbLXK951WaPYWoY9dbMJZMyRNnjEj3doGoUkmBOm0LzTs1xDkV+QPb3fGH3s+mxh2TFhX3KFOwXrvf4uqkpx9mHdGioMWa86NSsCUUEQ3vXGUXprSdGsSqXUsoAug7v6wBU3QIPzeQm8pRLmjbZPdx+ndeV80FwnFkxDfNx/mtpAibum4ON4CxDUB66jLC7nVRe0XxXBllM2G/brS7jseqbz+Q61qbFjLNWKo96kTBIrYDjvZEmcSQdp37cYMf4rO/vsr+/XCNUtbtcD8h9Xk8Fc+atcIsuQSlrLbYMplVgN3EwogYlXJsB9GSOlVQVpO+gwOLBXonXJ6i3EAbQ== by SasquatchFinder 1
    a2
    a5
    e65fbcc2-2c4a-42f6-8b61-ca97dad4826e
    b2bc665c-4c1c-42c7-b3d2-c1d5bf048ee9
    sleeping for 10 seconds...
    Deleted message AQEB2FZyDGQEOUgLxR9wIxAiJbk++Ktec9RLon3nAZr7bPeQu2QJ8iVxRMNg92ZgvoPY5qsBndcRGEQjI5zKHQ/r62tg4+LMWwFLSDBhDF3d55w6OosgLf+K7AIBICGAeTJanTkhCzQlWYM+HCDFEve+NhPsr5+/zabaeZrkKwSBh8E2jTCmr29LmNR6ld9Bz0NSboj5gi+Gxa3dTu+xPGMLMjANVQ1Qa1BhoYEI0QP8kl9gL8aBpLhkeW1eWXgRaRtRcTAVpjxF73ZlUEFVNyYeE/Mwz9ZT2lWRftj6dv5p2PUG5Z6VtbbBw/9AXQElJUTgfHKGd4iGEjo4A3l6ff6g/NVJzm/LkGq6909txbTIk8PSp5istS4bM318W6VG2ten9jYSU7+pj8H809AHoW3VEw== by SasquatchFinder 1
    Deleted message AQEBMdzd33/uz7jNQMnBJu1ne7GRh9g2xHx6X0cPWLsU0emEN0G5SGbr3nF/9QklDrrW42BX1HW6IDWxvhlI4/bOByZobYOfjmv5Cr8rDEJYnNKWxqxBZeQqjArKTy90WeEs0puUw4l6PouEZOv35daHO0h01A8Dpk/oMlVBi/OZFCIM4fetG2tUxwa7eU15WiEF4mklZqqJx2bVTbdiZqwhOucgqXlyXK3IJ5FtBFd6ACtEyX1tQmIBn6njmk/CBuX0v5+LzaxlntHy9Q+FpjuPLEyyE5wGqIk9B8Kcqv469pnaE3UJJaCK7DxgG70rF/7M1kYzaDRbRBYJB9jS3W9b8qZpj1JU4JM4euH9xBP4j59MvdwgIs4lSPvO1F3NtdCuNeOOMF15/n1WvU2U31jSeg== by SasquatchFinder 1
    

如示例所示,您可以指定要处理的最大消息数,但不能指定消息数量。这应该看起来合理,因为消费者在处理之前不知道队列中有多少消息。顺便说一下,请注意,消息在上面的列表中没有按照它们接收的相同顺序进行处理。

先进先出 (FIFO) 队列

让我们修改项目以使用 FIFO 队列,并同时重新运行两个消费者实例。请注意,消费者和生产者都不知道队列的类型。它们只知道队列的 URL。

  • 创建一个名为 SasquatchImageQueue.fifo 的新队列,类型为 FIFO 队列。

  • 点击“快速创建队列”。
  • 创建一个新权限,但为了省事,我们勾选“所有人”复选框和“所有 SQS 操作”复选框。显然,在生产环境中您不会这样做。

  • 修改消费者和生产者,使其都使用此队列的 URL。
    https://sqs.us-east-1.amazonaws.com/743327341874/SasquatchImageQueue.fifo
  • 修改生产者中的 sendMessage 方法。请注意移除了 delaySeconds 并添加了 messageGroupId
    public String sendMessage(String message) {	
      SendMessageRequest request = SendMessageRequest.builder()
          .queueUrl(this.queueUrl).messageBody(message)
          .messageGroupId("mygroup").build();
    
      SendMessageResponse response = this.sqsClient.sendMessage(request);
      return response.messageId();
    }
  • 更改队列后编译并运行生产者应用程序,三条消息将发送到队列。
  • 编译并运行消费者应用程序,三条消息将按其接收顺序进行处理。

消息可见性

  • 修改 SasquatchFinderprocessMessage,使其通过休眠 40 秒来模拟处理。
    public void processMessage() {
      ReceiveMessageRequest receiveMessageRequest = 
        ReceiveMessageRequest.builder().queueUrl(this.queueUrl)
        .maxNumberOfMessages(1).build();
    
      List<Message> messages = this.sqsClient
        .receiveMessage(receiveMessageRequest).messages();
    
      if(messages == null || messages.size() == 0) {
        return;
      }
    
      messages.stream().map(s -> s.body()).forEach(System.out::println);
      try {
    	System.out.println("sleeping for 40 seconds...");
    	Thread.sleep(40000);
            this.deleteMessage(messages);
      } catch (InterruptedException e) {
            e.printStackTrace();
      }
    }
  • 编译并运行应用程序。请注意,您会收到一个 SqsException
    SasquatchFinder 2 running....
    messageMine
    sleeping for 40 seconds...
    software.amazon.awssdk.services.sqs.model.SqsException: Value AQEBBJL+BlwyhRLnQGxaIKDkkrEv1sU6VnHzYM51Q0UFdx2lDyWvKoI/JYcs7MktVJ1Nmyr1mCVX/cpcqS9dMqq7Ual92VLEXDS9hEYM/qg1vdEGHB60OktMzpidyWBenQQyybzXofO+pAdKOYpC/wiEw8GBPsmFDCHpVn1hxHeLSNJyw10SwNv3DTXQXk4Pe+v3yGf23bf8sDk7Rx7ApqWYi8n8z9uijZAQBdwuFpUrZslivMWCzid6AFOXI/k83+/tKnSMyT0/Mx0rng0v1k4WliSgv5YJo5HyEZTt+cOBwfA= for parameter ReceiptHandle is invalid. Reason: The receipt handle has expired. (Service: Sqs, Status Code: 400, Request ID: 845b9538-4104-5428-aa2f-c05092244385)
    	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.handl <snip> at com.aws.tutorial.sqs.main.SasquatchFinder.main(SasquatchFinder.java:58)
    SasquatchFinder 2 stopped.

如果使用 FIFO 队列,在可见性超时窗口之后执行删除消息会失败。

结论

在本教程中,我们创建了一个 Amazon SQS 队列。创建队列后,我们使用 AWS Java 2 SDK 创建了一个消息生产者和一个消息消费者。然后我们探讨了几个主题,例如消息属性、死信队列和消息可见性。我们还创建了一个 FIFO 队列。

Amazon 的 SQS 队列是一个易于使用的队列,它消除了组织的基础设施管理麻烦。在本教程中,我们只探讨了 SQS 的基础知识。有关更多信息,请参阅 Java 2 SDK 开发人员指南和 SQS 开发人员指南。请记住,API 从版本 1 更改为版本 2,因此如有疑问,请假定您需要一个对象的构建器,并且在构建对象时必须配置它。但是,API 是一致的,一旦您开始使用 API,将 1.1 版代码转换为 2 版代码是直观的。

GitHub 项目

GitHub 项目 SQSTutorial 可在此处获取:这里

文章 Amazon Web Services 简单队列服务 (AWS SQS) 使用 Java 2 软件开发工具包 最早发布于 BTS 编程博客

© . All rights reserved.