使用 AWS Step Function 组合混凝土诗歌





5.00/5 (4投票s)
使用 AWS Step Function 创建分布式 map-reduce 工作流来编写自动重新生成诗歌。
概念
在观察到俄乌战争疲劳的迹象后,我决定创作一首名为“战争结束了吗?”的诗。我将其标记为“具体诗歌”,这是与具体音乐(一种由非音乐声音片段组成的音乐流派)相类比。
同样,我的诗歌由上一周关于“俄罗斯战争罪行”的搜索引擎结果组成,然后通过情感分析模型进行处理,提取最能突出俄罗斯暴行的句子。经过机器学习处理后,这些句子被组装成诗歌。每周日,诗歌都会用新的战争罪行重新生成。
战争将在搜索引擎不再返回结果、诗歌一片空白的那一天结束。
有人可能会争辩说,即使敌对行动结束后很长一段时间,搜索引擎仍然可能返回结果。这正是重点。乌克兰的许多人将不得不终生面对战争的后果。考虑战争退伍军人、受创伤的儿童以及失去亲人的家庭。
您可以通过此处访问通往诗歌的页面。
高层架构
我决定采用无服务器方案,因为它允许我按执行次数付费,而对于这个项目来说,执行次数很低。虽然我在此项目中的经验主要与 .NET 堆栈相关,但我决定使用 JavaScript,因为它在 Web 方面的能力超出了我所熟悉的任何其他语言。
高层架构图如下所示:
整个过程由 EventBridge Scheduler 启动,它会启动一系列 Lambda 函数,每个函数都有自己的职责:- 抓取搜索引擎 - 从网页中提取文章内容 - 分析文章中每个句子的情感 - 从情感最强的句子中组装诗歌,并将其放入 S3 存储桶,然后由 CloudFront 提供给客户端。
由于源代码存储在Github上,我决定通过 Github Actions 进行部署。在下面的文章中,我们将重点关注代码中的一些要点。
抓取搜索引擎
Google 爬虫服务的算法是:
- 向 https://www.google.com/search?q=russian+war+crimes&tbs=qdr:w 发出请求
- 遍历 HTML 页面以获取网页链接。
在着手这项任务时,我原本以为我会利用 Document API 来遍历 HTML。然而,它依赖于浏览器,而 Lambda 不是浏览器。 JSDom 来拯救了我。
有了它的帮助,提取必要的值就像这样简单:
const dom = new jsdom.JSDOM(data);
const anchors = dom.window.document.querySelectorAll('a[data-ved]');
首次部署
我从一开始就决定设计项目以方便部署,因此下一步是引入 GitHub 上的持续构建。
name: Google Crawler Build
on:
push:
branches: [ "*" ]
pull_request:
branches: [ "master" ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [20.x]
# See supported Node.js release schedule at https://node.org.cn/en/about/releases/
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
cache-dependency-path: "./src/google-crawler/package-lock.json"
- run: cd ./src/google-crawler && npm ci
- run: cd ./src/google-crawler && npm test
- run: cd ./src/google-crawler && npm run lint
我认为代码不言自明,但我们还是来看一些要点。
在这里,我们依赖于 ubuntu-latest
环境和 node-version: [20.x]
。
首先,我们使用 actions/checkout@v3
签出源代码。为了使 npm 正确工作,我们必须使用 cache-dependency-path: "./src/google-crawler/package-lock.json"
指定 package-lock.json 文件的路径。除了使用 npm ci
恢复包之外,我们还运行单元测试和 linter,这些是代码库必要的质量门。
部署如下所示:
name: Google Crawler Deploy
on:
push:
branches: [ "master" ]
jobs:
lambda:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [20.x]
# See supported Node.js release schedule at https://node.org.cn/en/about/releases/
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
cache-dependency-path: "./src/google-crawler/package-lock.json"
- uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
- run: cd ./src/google-crawler && npm ci
- run: cd ./src/google-crawler && zip -r lambda1.zip ./
- run: cd ./src/google-crawler && aws lambda update-function-code --function-name=google-crawler --zip-file=fileb://lambda1.zip
这看起来与构建作业非常相似,但是,我们还压缩了代码并通过 aws lambda update-function-code
命令进行部署。
提取文章内容
为了从网页中提取文章内容,我使用了 Readability 包。以下是我如何从网页下载文章内容并将其分割成句子。
const res = await fetch(url);
const html = await res.text();
const doc = new jsdom.JSDOM(html);
const reader = new readability.Readability(doc.window.document);
const article = reader.parse();
const sentences = splitIntoSentences(article.textContent);
从一个 Lambda 调用另一个 Lambda
有很多关于如何同步调用一个 Lambda 的建议。但是,AWS 文档在这方面更为谨慎,而且是有充分理由的。
引用虽然这种同步流程可以在服务器上的单个应用程序中正常工作,但在分布式无服务器架构中,它会带来几个可避免的问题。
成本:使用 Lambda,您需要为调用时长付费。在此示例中,当 Create invoice 函数运行时,另外两个函数也处于等待状态,在图中以红色显示。
错误处理:在嵌套调用中,错误处理会变得复杂得多。错误要么被抛给父函数在顶层函数中处理,要么函数需要自定义处理。例如,Create invoice 中的错误可能需要 Process payment 函数来撤销收费,或者它可能重试 Create invoice 过程。
紧密耦合:处理付款通常比创建发票花费的时间长。在此模型中,整个工作流程的可用性受限于最慢的函数。
扩展性:这三个函数的并发性必须相等。在繁忙的系统中,这会比其他情况使用更多的并发。
其中一个替代方案是使用 AWS Step Functions 来协调 Lambda 的执行。事实证明,我的问题是一个很好的分布式映射示例。考虑:我们提取所有必要的链接 - 我们并行映射每个链接,提取其内容并分析其情感。 - 我们将其减少到单个 S3 存储桶。
这是整个系统的定义。
{
"Comment": "A Step Functions workflow that processes an array of strings concurrently",
"StartAt": "Extract links from google",
"States": {
"Extract links from google": {
"Type": "Task",
"Resource": "<google crawler arn>",
"ResultPath": "$",
"Next": "ProcessArray",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
]
},
"ProcessArray": {
"Type": "Map",
"ItemsPath": "$",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "Extract article content",
"States": {
"Extract article content": {
"Type": "Task",
"Resource": "<article extractor arn>",
"InputPath": "$",
"Next": "Analyze sentiment",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Analyze sentiment"
}
]
},
"Analyze sentiment": {
"Type": "Task",
"Resource": "<sentiment analyzer arn>",
"InputPath": "$",
"End": true,
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
]
}
}
},
"Next": "Reducer",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Reducer"
}
]
},
"Reducer": {
"Type": "Task",
"Resource": "<reducer arn>",
"InputPath": "$",
"ResultPath": "$",
"End": true,
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
]
}
}
}
在这里,我们通过 "Type": "Map",
进入映射阶段,其中 Article extractor
和 Sentiment analyzer
都作为 Iterator
。一旦映射阶段完成,我们就通过 "Next": "Reducer"
进入归约阶段。
另一件值得一提的事情是通过添加错误处理来提高系统的可靠性。最直接的方法是添加重试,通过
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"IntervalSeconds": 1,
"MaxAttempts": 2,
"BackoffRate": 2
}
]
我们还使用了另一种策略——让单个映射实例失败,而不是让整个映射阶段失败。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Analyze sentiment"
}
]
使用 Layers 优化 Monorepo 结构
此时,我们存储库的结构看起来不理想,每个函数都有 package.json 和单独的构建步骤。更重要的是:单独的 package.json 意味着单独的 node_modules 文件夹,这会导致大量磁盘空间浪费,因为许多模块是重复的。
当添加更多函数时,这不会奏效。但是,有一种方法可以一次性构建和打包所有依赖项,即使用 Lambda Layers。这种方法允许我们将所有依赖项打包到一个单独的层中,并将其视为我们函数的通用运行时。
我们将重新组织我们的存储库,使其看起来像这样:
让我们来看一个单独的部署层的 Action:
name: Deploy Modules Layer
on:
workflow_call:
secrets:
AWS_ACCESS_KEY_ID:
required: true
AWS_SECRET_ACCESS_KEY:
required: true
jobs:
layer:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [20.x]
# See supported Node.js release schedule at https://node.org.cn/en/about/releases/
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
cache-dependency-path: "./src/package-lock.json"
- uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
- run: cd ./src && npm ci
- run: cd ./src && zip -r layer.zip node_modules
- run: cd ./src && aws lambda publish-layer-version --layer-name poeme-concrete-modules --zip-file fileb://layer.zip
这里没有特别的事情发生,除了我们现在正在使用 aws lambda publish-layer-version
。现在让我们在部署函数时继续使用已部署的层。
name: Article Extractor Deploy
on:
push:
branches: [ "master" ]
jobs:
layer:
uses: ./.github/workflows/modules-layer-deploy.yml
secrets: inherit
lambda:
runs-on: ubuntu-latest
needs: layer
strategy:
matrix:
node-version: [20.x]
# See supported Node.js release schedule at https://node.org.cn/en/about/releases/
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: 'npm'
cache-dependency-path: "./src/package-lock.json"
- uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
- run: cd ./src && npm ci
- run: cd ./src/article-extractor && zip -r lambda1.zip ./
- run: cd ./src/article-extractor && aws lambda update-function-code --function-name=article-extractor --zip-file=fileb://lambda1.zip
- run: echo "layer-arn=$(aws lambda list-layer-versions --layer-name poeme-concrete-modules --region eu-central-1 --query 'LayerVersions[0].LayerVersionArn')" >> $GITHUB_ENV
- run: aws lambda update-function-configuration --function-name=article-extractor --layers="${{ env.layer-arn }}"
这里有几点值得注意。
首先,是我们如何依赖于 deploy layers 作业。
jobs:
layer:
uses: ./.github/workflows/modules-layer-deploy.yml
secrets: inherit
对于新手来说,可能不明显的是我们如何使用 secrets: inherit
将机密信息传递给部署层的 Action。人们可能会自然地认为它会从 GitHub 存储中推断机密信息,但这并非如此,子 Action 会从父工作流推断机密信息。
另一件重要的事情是强制新部署的函数使用已发布的层的最新版本。我们分两步实现这一点:
- 查询最新的层版本并将其存储在环境变量中
echo "layer-arn=$(aws lambda list-layer-versions --layer-name poeme-concrete-modules --region eu-central-1 --query 'LayerVersions[0].LayerVersionArn')" >> $GITHUB_ENV
- 使用存储的值配置更新函数配置
aws lambda update-function-configuration --function-name=article-extractor --layers="${{ env.layer-arn }}"
访问机密
在选择情感分析引擎时,自然的选择是 Amazon Comprehend。我为什么没有坚持使用它?我不喜欢它的结果。
相反,我选择了 text2data 服务。最终,它就像通过 HTTP 调用任何其他第三方服务一样,因此在本节中,我将简要介绍检索调用此 API 所需的机密信息。
import { SecretsManagerClient, GetSecretValueCommand } from
"@aws-sdk/client-secrets-manager";
async function getSentimentAnalysisApiKey() {
const secret_name = "SENTIMENT_ANALYSIS_API_KEY";
const client = new SecretsManagerClient({
region: "eu-central-1",
});
let response;
try {
response = await client.send(
new GetSecretValueCommand({
SecretId: secret_name,
VersionStage: "AWSCURRENT"
})
);
} catch (error) {
console.log(error);
throw error;
}
return response.SecretString;
}
将结果写入 S3
CloudFront 从 S3 存储桶提供 HTML 内容。因此,为了发布诗歌,我们需要生成 HTML 并将其存储在存储桶中。
为了生成 HTML,我们将句子插入 mustache 模板中。
const formatted =
poem
.map(p => `<p>${p}</p>`)
.join("\n");
const html = renderTemplate(formatted);
const renderTemplate = (poem) => {
const template = fs.readFileSync('./template.html', 'utf8');
return Mustache.render(template, {
poem: poem
});
}
模板中需要注意的是,我们必须使用三个花括号,以便插入的 HTML 不会被转义。
<html>
//omitted for brevity
<body>
<article>
{{{poem}}}
</article>
</body>
</html>
现在我们可以使用以下代码将 HTML 存储在 S3 中:
const putParams = {
Bucket: 'poeme-concrete',
Key: 'index.html',
Body: html,
ContentType: 'text/html',
};
await s3.putObject(putParams).promise();
结论
通常,这时我会总结一下文章中涉及的技术。但是,这次我鼓励您阅读这首诗,并偶尔重温它。您可能会认为它太令人不安而置之不理,但对乌克兰的许多人来说,这是严峻的现实。我也不例外,因为我花了两年时间来治疗我现年 4 岁的儿子患有的 PTSD 和言语障碍。尽管如此,我的处境已经算幸运,因为我至少有地方住,并且生活在一个相对和平的地区。
如果过去 30 年的经历教会了我们任何东西,那就是您永远无法通过不惩罚侵略者来阻止他。俄罗斯在德涅斯特河沿岸、伊切里亚、阿布哈兹、克里米亚以及顿涅茨克和卢甘斯克地区的部分地区后并没有停下来,如果它认为可以免受惩罚地发动这场战争,现在也不会停下来。
历史
- 2024 年 2 月 28 日 - 初始版本