65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 AWS Step Function 组合混凝土诗歌

starIconstarIconstarIconstarIconstarIcon

5.00/5 (4投票s)

2024 年 2 月 28 日

CPOL

7分钟阅读

viewsIcon

3575

使用 AWS Step Function 创建分布式 map-reduce 工作流来编写自动重新生成诗歌。

概念

在观察到俄乌战争疲劳的迹象后,我决定创作一首名为“战争结束了吗?”的诗。我将其标记为“具体诗歌”,这是与具体音乐(一种由非音乐声音片段组成的音乐流派)相类比。

同样,我的诗歌由上一周关于“俄罗斯战争罪行”的搜索引擎结果组成,然后通过情感分析模型进行处理,提取最能突出俄罗斯暴行的句子。经过机器学习处理后,这些句子被组装成诗歌。每周日,诗歌都会用新的战争罪行重新生成。

战争将在搜索引擎不再返回结果、诗歌一片空白的那一天结束。

有人可能会争辩说,即使敌对行动结束后很长一段时间,搜索引擎仍然可能返回结果。这正是重点。乌克兰的许多人将不得不终生面对战争的后果。考虑战争退伍军人、受创伤的儿童以及失去亲人的家庭。

您可以通过此处访问通往诗歌的页面。

高层架构

我决定采用无服务器方案,因为它允许我按执行次数付费,而对于这个项目来说,执行次数很低。虽然我在此项目中的经验主要与 .NET 堆栈相关,但我决定使用 JavaScript,因为它在 Web 方面的能力超出了我所熟悉的任何其他语言。

高层架构图如下所示:

整个过程由 EventBridge Scheduler 启动,它会启动一系列 Lambda 函数,每个函数都有自己的职责:- 抓取搜索引擎 - 从网页中提取文章内容 - 分析文章中每个句子的情感 - 从情感最强的句子中组装诗歌,并将其放入 S3 存储桶,然后由 CloudFront 提供给客户端。

由于源代码存储在Github上,我决定通过 Github Actions 进行部署。在下面的文章中,我们将重点关注代码中的一些要点。

抓取搜索引擎

Google 爬虫服务的算法是:

  1. https://www.google.com/search?q=russian+war+crimes&tbs=qdr:w 发出请求
  2. 遍历 HTML 页面以获取网页链接。

在着手这项任务时,我原本以为我会利用 Document API 来遍历 HTML。然而,它依赖于浏览器,而 Lambda 不是浏览器。 JSDom 来拯救了我。

有了它的帮助,提取必要的值就像这样简单:

const dom = new jsdom.JSDOM(data);
const anchors = dom.window.document.querySelectorAll('a[data-ved]');

首次部署

我从一开始就决定设计项目以方便部署,因此下一步是引入 GitHub 上的持续构建。

name: Google Crawler Build

on:
  push:
    branches: [ "*" ]
  pull_request:
    branches: [ "master" ]

jobs:
  build:

    runs-on: ubuntu-latest

    strategy:
      matrix:
        node-version: [20.x]
        # See supported Node.js release schedule at https://node.org.cn/en/about/releases/

    steps:
    - uses: actions/checkout@v3
    - name: Use Node.js ${{ matrix.node-version }}
      uses: actions/setup-node@v3
      with:
        node-version: ${{ matrix.node-version }}
        cache: 'npm'
        cache-dependency-path: "./src/google-crawler/package-lock.json"
    - run: cd ./src/google-crawler && npm ci
    - run: cd ./src/google-crawler && npm test
    - run: cd ./src/google-crawler && npm run lint

我认为代码不言自明,但我们还是来看一些要点。

在这里,我们依赖于 ubuntu-latest 环境和 node-version: [20.x]

首先,我们使用 actions/checkout@v3 签出源代码。为了使 npm 正确工作,我们必须使用 cache-dependency-path: "./src/google-crawler/package-lock.json" 指定 package-lock.json 文件的路径。除了使用 npm ci 恢复包之外,我们还运行单元测试和 linter,这些是代码库必要的质量门。

部署如下所示:

name: Google Crawler Deploy

on:
  push:
    branches: [ "master" ]
jobs:
  lambda:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        node-version: [20.x]
        # See supported Node.js release schedule at https://node.org.cn/en/about/releases/

    steps:
      - uses: actions/checkout@v3
      - name: Use Node.js ${{ matrix.node-version }}
        uses: actions/setup-node@v3
        with:
          node-version: ${{ matrix.node-version }}
          cache: 'npm'
          cache-dependency-path: "./src/google-crawler/package-lock.json"
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-central-1
      - run: cd ./src/google-crawler && npm ci
      - run: cd ./src/google-crawler && zip -r lambda1.zip ./
      - run: cd ./src/google-crawler && aws lambda update-function-code --function-name=google-crawler --zip-file=fileb://lambda1.zip

这看起来与构建作业非常相似,但是,我们还压缩了代码并通过 aws lambda update-function-code 命令进行部署。

提取文章内容

为了从网页中提取文章内容,我使用了 Readability 包。以下是我如何从网页下载文章内容并将其分割成句子。

const res = await fetch(url);
const html = await res.text();
const doc = new jsdom.JSDOM(html);
const reader = new readability.Readability(doc.window.document);
const article = reader.parse();
const sentences = splitIntoSentences(article.textContent);

从一个 Lambda 调用另一个 Lambda

有很多关于如何同步调用一个 Lambda 的建议。但是,AWS 文档在这方面更为谨慎,而且是有充分理由的。

引用

虽然这种同步流程可以在服务器上的单个应用程序中正常工作,但在分布式无服务器架构中,它会带来几个可避免的问题。

成本:使用 Lambda,您需要为调用时长付费。在此示例中,当 Create invoice 函数运行时,另外两个函数也处于等待状态,在图中以红色显示。

错误处理:在嵌套调用中,错误处理会变得复杂得多。错误要么被抛给父函数在顶层函数中处理,要么函数需要自定义处理。例如,Create invoice 中的错误可能需要 Process payment 函数来撤销收费,或者它可能重试 Create invoice 过程。

紧密耦合:处理付款通常比创建发票花费的时间长。在此模型中,整个工作流程的可用性受限于最慢的函数。

扩展性:这三个函数的并发性必须相等。在繁忙的系统中,这会比其他情况使用更多的并发。

其中一个替代方案是使用 AWS Step Functions 来协调 Lambda 的执行。事实证明,我的问题是一个很好的分布式映射示例。考虑:我们提取所有必要的链接 - 我们并行映射每个链接,提取其内容并分析其情感。 - 我们将其减少到单个 S3 存储桶。

这是整个系统的定义。

{
  "Comment": "A Step Functions workflow that processes an array of strings concurrently",
  "StartAt": "Extract links from google",
  "States": {
    "Extract links from google": {
      "Type": "Task",
      "Resource": "<google crawler arn>",
      "ResultPath": "$",
      "Next": "ProcessArray",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ]
    },
    "ProcessArray": {
      "Type": "Map",
      "ItemsPath": "$",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "Extract article content",
        "States": {
          "Extract article content": {
            "Type": "Task",
            "Resource": "<article extractor arn>",
            "InputPath": "$",
            "Next": "Analyze sentiment",
            "Retry": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "IntervalSeconds": 1,
                "MaxAttempts": 2,
                "BackoffRate": 2
              }
            ],
            "Catch": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "Next": "Analyze sentiment"
              }
            ]
          },
          "Analyze sentiment": {
            "Type": "Task",
            "Resource": "<sentiment analyzer arn>",
            "InputPath": "$",
            "End": true,
            "Retry": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "IntervalSeconds": 1,
                "MaxAttempts": 2,
                "BackoffRate": 2
              }
            ]
          }
        }
      },
      "Next": "Reducer",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Reducer"
        }
      ]
    },
    "Reducer": {
      "Type": "Task",
      "Resource": "<reducer arn>",
      "InputPath": "$",
      "ResultPath": "$",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 2,
          "BackoffRate": 2
        }
      ]
    }
  }
}

在这里,我们通过 "Type": "Map", 进入映射阶段,其中 Article extractorSentiment analyzer 都作为 Iterator。一旦映射阶段完成,我们就通过 "Next": "Reducer" 进入归约阶段。

另一件值得一提的事情是通过添加错误处理来提高系统的可靠性。最直接的方法是添加重试,通过

"Retry": [
  {
    "ErrorEquals": [
      "States.ALL"
    ],
    "IntervalSeconds": 1,
    "MaxAttempts": 2,
    "BackoffRate": 2
  }
]

我们还使用了另一种策略——让单个映射实例失败,而不是让整个映射阶段失败。

"Catch": [
  {
    "ErrorEquals": [
      "States.ALL"
    ],
    "Next": "Analyze sentiment"
  }
]

使用 Layers 优化 Monorepo 结构

此时,我们存储库的结构看起来不理想,每个函数都有 package.json 和单独的构建步骤。更重要的是:单独的 package.json 意味着单独的 node_modules 文件夹,这会导致大量磁盘空间浪费,因为许多模块是重复的。

当添加更多函数时,这不会奏效。但是,有一种方法可以一次性构建和打包所有依赖项,即使用 Lambda Layers。这种方法允许我们将所有依赖项打包到一个单独的层中,并将其视为我们函数的通用运行时。

我们将重新组织我们的存储库,使其看起来像这样:

让我们来看一个单独的部署层的 Action:

name: Deploy Modules Layer

on:
  workflow_call:
    secrets:
      AWS_ACCESS_KEY_ID:
        required: true
      AWS_SECRET_ACCESS_KEY:
        required: true

jobs:
  layer:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        node-version: [20.x]
        # See supported Node.js release schedule at https://node.org.cn/en/about/releases/

    steps:
      - uses: actions/checkout@v3
      - name: Use Node.js ${{ matrix.node-version }}
        uses: actions/setup-node@v3
        with:
          node-version: ${{ matrix.node-version }}
          cache: 'npm'
          cache-dependency-path: "./src/package-lock.json"
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-central-1
      - run: cd ./src && npm ci
      - run: cd ./src && zip -r layer.zip node_modules
      - run: cd ./src && aws lambda publish-layer-version --layer-name poeme-concrete-modules --zip-file fileb://layer.zip

这里没有特别的事情发生,除了我们现在正在使用 aws lambda publish-layer-version。现在让我们在部署函数时继续使用已部署的层。

name: Article Extractor Deploy

on:
  push:
    branches: [ "master" ]
jobs:
  layer:
    uses: ./.github/workflows/modules-layer-deploy.yml
    secrets: inherit

  lambda:
    runs-on: ubuntu-latest
    needs: layer
    strategy:
      matrix:
        node-version: [20.x]
        # See supported Node.js release schedule at https://node.org.cn/en/about/releases/

    steps:
      - uses: actions/checkout@v3
      - name: Use Node.js ${{ matrix.node-version }}
        uses: actions/setup-node@v3
        with:
          node-version: ${{ matrix.node-version }}
          cache: 'npm'
          cache-dependency-path: "./src/package-lock.json"
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-central-1
      - run: cd ./src && npm ci
      - run: cd ./src/article-extractor && zip -r lambda1.zip ./
      - run: cd ./src/article-extractor && aws lambda update-function-code --function-name=article-extractor --zip-file=fileb://lambda1.zip
      - run: echo "layer-arn=$(aws lambda list-layer-versions --layer-name poeme-concrete-modules --region eu-central-1 --query 'LayerVersions[0].LayerVersionArn')" >> $GITHUB_ENV
      - run: aws lambda update-function-configuration --function-name=article-extractor --layers="${{ env.layer-arn }}"

这里有几点值得注意。

首先,是我们如何依赖于 deploy layers 作业。

jobs:
  layer:
    uses: ./.github/workflows/modules-layer-deploy.yml
    secrets: inherit

对于新手来说,可能不明显的是我们如何使用 secrets: inherit 将机密信息传递给部署层的 Action。人们可能会自然地认为它会从 GitHub 存储中推断机密信息,但这并非如此,子 Action 会从父工作流推断机密信息。

另一件重要的事情是强制新部署的函数使用已发布的层的最新版本。我们分两步实现这一点:

  1. 查询最新的层版本并将其存储在环境变量中
    echo "layer-arn=$(aws lambda list-layer-versions --layer-name poeme-concrete-modules --region eu-central-1 --query 'LayerVersions[0].LayerVersionArn')" >> $GITHUB_ENV
  2. 使用存储的值配置更新函数配置
    aws lambda update-function-configuration --function-name=article-extractor --layers="${{ env.layer-arn }}"

访问机密

在选择情感分析引擎时,自然的选择是 Amazon Comprehend。我为什么没有坚持使用它?我不喜欢它的结果。

相反,我选择了 text2data 服务。最终,它就像通过 HTTP 调用任何其他第三方服务一样,因此在本节中,我将简要介绍检索调用此 API 所需的机密信息。

import { SecretsManagerClient, GetSecretValueCommand } from 
         "@aws-sdk/client-secrets-manager";

async function getSentimentAnalysisApiKey() {
    const secret_name = "SENTIMENT_ANALYSIS_API_KEY";

    const client = new SecretsManagerClient({
        region: "eu-central-1",
    });

    let response;

    try {
        response = await client.send(
            new GetSecretValueCommand({
                SecretId: secret_name,
                VersionStage: "AWSCURRENT"
            })
        );
    } catch (error) {
        console.log(error);
        throw error;
    }

    return response.SecretString;
}

将结果写入 S3

CloudFront 从 S3 存储桶提供 HTML 内容。因此,为了发布诗歌,我们需要生成 HTML 并将其存储在存储桶中。

为了生成 HTML,我们将句子插入 mustache 模板中。

const formatted =
        poem
          .map(p => `<p>${p}</p>`)
          .join("\n");
const html = renderTemplate(formatted);

const renderTemplate = (poem) => {
  const template = fs.readFileSync('./template.html', 'utf8');

  return Mustache.render(template, {
    poem: poem
  });
}

模板中需要注意的是,我们必须使用三个花括号,以便插入的 HTML 不会被转义。

<html>
    //omitted for brevity
    <body>
        <article>
{{{poem}}}
        </article>
    </body>
</html>

现在我们可以使用以下代码将 HTML 存储在 S3 中:

const putParams = {
  Bucket: 'poeme-concrete',
  Key: 'index.html',
  Body: html,
  ContentType: 'text/html',
};

await s3.putObject(putParams).promise();

结论

通常,这时我会总结一下文章中涉及的技术。但是,这次我鼓励您阅读这首诗,并偶尔重温它。您可能会认为它太令人不安而置之不理,但对乌克兰的许多人来说,这是严峻的现实。我也不例外,因为我花了两年时间来治疗我现年 4 岁的儿子患有的 PTSD 和言语障碍。尽管如此,我的处境已经算幸运,因为我至少有地方住,并且生活在一个相对和平的地区。

如果过去 30 年的经历教会了我们任何东西,那就是您永远无法通过不惩罚侵略者来阻止他。俄罗斯在德涅斯特河沿岸、伊切里亚、阿布哈兹、克里米亚以及顿涅茨克和卢甘斯克地区的部分地区后并没有停下来,如果它认为可以免受惩罚地发动这场战争,现在也不会停下来。

历史

  • 2024 年 2 月 28 日 - 初始版本
© . All rights reserved.