在 UP Squared 主板上使用 Amazon Kinesis 和 Amazon Dynamodb
本指南演示了使用 UPSquared 和 Gigabyte 网关在 Ubuntu 上与 Amazon Kinesis 和 DynamoDB 进行通信的步骤。
引言
本指南演示了使用 UPSquared* 和 Gigabyte* 网关在 Ubuntu* 上与 Amazon Kinesis* 和 DynamoDB* 进行通信的步骤。UP Squared 推送(发送)信息,网关拉取(接收)信息以供进一步分析。
通过本文示例,您将学会使用 UP Squared 和 Gigabyte 网关 GB-BXTB-3825 平台来
- 创建 Cognito* 参数
- 描述 Amazon Kinesis 流
- Put record(放置记录)、get record(获取记录)并监控 Amazon Kinesis
- 如何从 Kinesis 流中提取数据并将其存储在 DynamoDB* 云数据库中以供后续分析
关于 UP Squared 主板
UPSquared 平台功耗低、性能高,非常适合物联网(IoT),是基于 Intel Apollo Lake 平台最快的 x86 创客主板。它包含 Intel Celeron® 处理器双核 N3350 和 Intel® Pentium® 品牌四核 N4200 处理器。
操作系统兼容性
UPSquared 平台可以运行 Ubilinux*、Ubuntu*、Windows® 10 IoT Core、Windows® 10、Yocto Project* 或 Android* Marshmallow 操作系统。有关 UPSquared 的更多信息,请访问 http://www.up-board.org/upsquared。有关 Intel® IoT Gateway GB-BXTB-3825 的详细信息,请参阅 http://b2b.gigabyte.com/Embedded-Computing/GB-BXBT-3825-rev-10#ov。
硬件组件
此项目中使用的硬件组件如下所示
- UPSquared
- UPSquared 电源 5V@6A
- Intel® IoT 网关 GB-BXTB-3825
- 带 HDMI* 接口的显示器
- HDMI线
- 通过 Internet 连接或 UP* WiFi 套件进行网络连接
- USB 键盘和鼠标
开发板
UP Squared 平台功耗低、性能高,非常适合物联网(IoT),是一款基于 Intel Apollo Lake 平台设计的快速原型开发板。它包含 Intel® Celeron® 双核 N3350 和 Intel® Pentium® 四核 N4200 处理器。
UP Squared 平台
开始之前,应在 UP Squared 平台上安装 Ubuntu* 操作系统。
1. 要确保 Ubuntu 操作系统是最新的并安装了依赖的 Ubuntu 包,请打开命令提示符(终端)并键入以下内容
sudo apt-get update
2. 通过在终端中键入以下命令来安装 npm(Node 版本管理器)包
sudo apt-get install npm
3. AWS* 是一个开源代码,它为许多 AWS 服务(包括 Kinesis 流、DynamoDB 等)提供 JavaScript* 库。要在 UP Squared 平台上使用 Node.js* 上的 AWS SDK for JavaScript* 开始使用 AWS,请在 Ubuntu* 终端上安装 AWS SDK JavaScript* 库和文档
npm install aws-sdk
4. Node.js* 是一个开源的、跨平台的 JavaScript* 运行时环境
sudo apt-get install node.js
Gigabyte* 网关 GB-BXTB-3825 平台
请确保 GB-BXTB-3825 网关平台上已安装 Ubuntu*。
同样,请确保 Ubuntu 操作系统是最新的,安装依赖项、npm、aws-sdk 和 Node.js* 包。网关从 Kinesis 流中提取包并将其存储在 DynamoDB 表中。
创建 Amazon Kinesis* 流
Amazon Kinesis 流支持大规模实时数据流,并使数据可供消费。流中的数据是数据记录的有序序列。流中的数据记录被分成多个分片。将数据分成多个分片是一种分散负载的技术。
让我们开始登录 Amazon Web Services (AWS)* 管理控制台,打开 Amazon* Kinesis 控制台,选择一个区域,然后选择 **Go** 进入 **Stream console**。
1. 选择 **Create Kinesis** stream(创建 Kinesis 流)
2. 为您的 Kinesis 流选择一个**名称和分片数**。
3. 当您看到一条消息通知您流已创建且状态为“ACTIVE”时,表示流已准备就绪。
创建 Amazon DynamoDB* 表
Amazon DynamoDB 是一项完全托管的云服务。其灵活的数据管理使其非常适合物联网应用。使用 Amazon DynamoDB,我们可以存储从 UP Squared 平台广播的数据,并针对特定需求分析这些数据。
1. 要创建 DynamoDB 表,请打开 DynamoDB 控制台并单击 **Create table**(创建表)。下面的示例创建了一个名为“TinkSmartPegTable”的 DynamoDB 表,其中包含两个属性 **PegID** 和 **FiveBytesData**,类型为 String。
2. 单击 **Create table**(创建表)以创建 DynamoDB 表。
3. 导航到您刚刚创建的 TinkSmartPegTable 的 **Items**(项目)选项卡,单击 **Create item**(创建项目)以创建更多项目。
4. 在 **Create item**(创建项目)表单的 more(更多)列表中选择 **Insert**(插入)
5. 从类型下拉列表中选择 **String**。
6. 输入第一个项目的名称和值,输入第二个项目的值,然后选择 **Save**(保存)。
7. TinkSmartPegTable 现在已创建,包含两个项目:PegID 和 FiveBytesData。
创建策略
IAM 策略明确列出了允许的操作以及这些操作适用的资源。
1. 在 IAM 控制台中,选择左侧列中的 Policies(策略),然后选择 **Create Policy**(创建策略)。
2. 在 **Create Policy**(创建策略)表单中,选择 **Policy Generator**(策略生成器)。
3. 填写策略权限如下
- AWS Service: Amazon Kinesis
- Actions: 选择 **DescribeStream**、**GetShardIterator**、**GetRecords**、**PutRecord** 和 **PutRecords** 作为允许的操作。
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 Kinesis 流名称。
要获取 Kinesis 流 ARN,请在 Kinesis 控制台上,选择流名称,然后转到左侧列中的 Stream(流)。复制 Stream ARN。
4. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN。
5. 在 **Edit Permissions**(编辑权限)表单中单击 **Add Statement**(添加语句)。
6. 为 Amazon DynamoDB 填写操作和 ARN。
- AWS Service: Amazon DynamoDB
- Actions: 选择 **All Actions**(所有操作)作为允许的操作。
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 DynamoDB 名称。
要获取 DynamoDB ARN,请转到 DynamoDB 控制台,单击左侧列中的 **Tables**(表),选择 DynamoDB 表名 **TinkSmartPegTable**,您应该能在 **Overview**(概览)选项卡中看到 DynamoDB ARN。
7. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN,然后单击 **Add Statement**(添加语句)。
8. 在 **Edit Permissions**(编辑权限)表单中单击 **Add Statement**(添加语句)。
9. 为 Amazon DynamoDB 填写操作和 ARN。
- AWS Service: Amazon DynamoDB
- Actions: 选择 **All Actions**(所有操作)作为允许的操作。
- Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 DynamoDB 名称。
要获取 DynamoDB ARN,请转到 DynamoDB 控制台,单击左侧列中的 **Tables**(表),选择 DynamoDB 表名 **TinkSmartPegTable**,您应该能在 **Overview**(概览)选项卡中看到 DynamoDB ARN。
10. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN,然后单击 **Add Statement**(添加语句)。
11. 继续进行 **Next Step**(下一步)。
12. 编辑策略名称,然后选择 **Create Policy**(创建策略)。
如果您看到“TinkKinesisDynamoDBPolicy has been created”(TinkKinesisDynamoDBPolicy 已创建)的消息,则表示新策略已成功创建。
Amazon Cognito*
Amazon Cognito 是一项 Web 服务,用于为 UP Squared 和 Gateway GB-BXTB-3825 设备指定临时安全凭证,以便访问您的 Kinesis 流和 DynamoDB。Amazon Cognito 参数用于初始化 Cognito Credentials 对象。
1. 导航到 Cognito 控制台,然后选择 **Manage Federated Identities**(管理联合身份)。
2. 输入 **identity pool name**(身份池名称),勾选 **Enable**(启用)对未经身份验证的身份的访问。
3. 选择 **Allow**(允许)。
4. 在 **Getting started with Amazon Cognito**(Amazon Cognito 入门)表单中,单击右上角的 **Edit identity pool**(编辑身份池)。
5. 单击 **Cognito Stream**(Cognito 流)以展开。
6. 为流名称选择 TinkKinesisStream,启用流状态,然后选择 **Save Changes**(保存更改)。
附加策略
1. 打开 IAM 控制台,选择 Roles(角色),然后选择 Cognito_TinkPegIdentityPoolUnauth_Role。
2. 单击 **Attach**(附加)策略。
3. 选择 TinkKinesisStreamPolicy,然后单击 Attach policy(附加策略)。
4. 为 Cognito_TinkPegIdentityUnauth_Role 授予完全的 DynamoDB 访问权限,以便 Cognito_TinkPegIdentityUnauth_Role 能够执行 dynamodb:ListTables。再次单击 **Attach policy**(附加策略)以附加到 AmazonDynamoDBFullAccess。
5. 选择 **AmazonDynamoDBFullAccess**,然后单击 **Attach policy**(附加策略)。
6. 现在 Cognito_TinkPegIdentityPoolUnauth_Role 已附加到 TinkKinesisDynamoDBPolicy 和 AmazonDynamoDBFullAccess。
示例代码
为 Amazon Kinesis* 流创建 Cognito* 参数
Amazon Cognito* 是一项 Web 服务,用于为移动设备指定临时安全凭证。Amazon Cognito 参数用于初始化 Cognito Credentials 对象。
1. 按照以下步骤为 Amazon Kinesis 流创建 Cognito 参数
- AccountId: 从右上角的 **Account Name**(账户名称)转到 **My Account**(我的账户)。
- IdentityPoolId: 要获取用于代码 cognitoParams 的身份池 ID,请导航到 Cognito 控制台,然后单击 **Manage Federated Identities**(管理联合身份),选择 TinkPegIdentityPool,然后单击右上角的 **Edit identity pool**(编辑身份池)。**Identity pool ID**(身份池 ID)位于 **Edit identity pool**(编辑身份池)表单中。
2. 选择 **TinkPegIdentityPool**。
3. 单击 **Edit identity pool**(编辑身份池)。
var awsRegion = "us-east-1";
var streamName = TinkKinesisStream; // Kinesis stream
var tableName = " TinkSmartPegTable";
// Cognito parameters for Kinesis Stream
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
代码示例 1:Amazon Kinesis 流的 Cognito 参数
创建凭证对象
现在,您可以使用 Amazon Cognito Identify* 服务编写一个示例应用程序来创建凭证对象。
var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log("Error in Cognito identiy credentials.");
console.log(err);
}
});
代码示例 2:创建新的凭证对象
DescribeStream
DescribeStream
返回流的当前状态、Amazon 资源名称(ARN)、分片对象数组,并指示是否有更多分片可用。下面的示例描述了 Amazon Kinesis 流以获取可用分片。
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
console.log("describeStream data");
console.log(data.StreamDescription.Shards[2].ShardId);
}
})
代码示例 3:DescribeStream
PutRecord
下面的示例代码将 JSON* 格式的记录放入 Amazon Kinesis 流 'TinkKinesisStream'
。记录包含两个字符串类型的数据:MAC 地址和五字节值。有关 PutRecord 的更多信息,请访问 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html。
function putRecordsToKinesisStream (){
var json = {
"PegID": "FF.FF.FF.FF.FF",
"FiveBytesData": "0x1212121212",
};
var params = {
Data: JSON.stringify(json),
PartitionKey: partitionKey,
StreamName: streamName
};
kinesis.putRecord(params, function(err, putData) {
if (err) {
console.log(err, err.stack);
} else {
try {
sharId.value = putData.ShardId;
} catch(err) {
console.log("Error in Kinesis.putRecord()");
console.log(err);
}
}
});
};
代码示例 4:putRecord
getRecord
在调用 getRecords
之前,我们调用 getShardIterator
来指定 Kinesis 流的名称、如何读取数据记录以及在分片中的哪个位置开始顺序读取记录。有关 getShardIterator
的详细说明,请参阅 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html。
// Describe stream parameters
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[2].ShardId,
ShardIteratorType: "TRIM_HORIZON", // get oldest package
StreamName: streamName,
};
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
// Get records from the Kinesis stream
getRecord(result.ShardIterator);
}
});
}
});
代码示例 5:getShardIterator
现在我们可以从 Amazon Kinesis 流 TinkKinesisStream
中 GetRecords
。有关 GetRecords
的更多信息,请参阅 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html。
function getRecord(shard_iterator) {
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
var StringDecoder = require('string_decoder').StringDecoder;
var decoder = new StringDecoder('utf8');
// If there are packages from getRecords()
if(result.Records.length > 0) {
// Loop through all the packages
…
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
if (result.NextShardIterator) {
getRecord(result.NextShardIterator);
}
}
});
}
代码示例 6:getRecords
Amazon DynamoDB*
编辑策略
IAM 策略允许我们指定任何支持 IAM 的服务的任何 API 操作。以下策略指定了文档中示例代码使用的 Amazon Kinesis 流和 DynamoDB 的 API 操作。编辑策略以添加或删除 Kinesis 流或 DynamoDB 上允许的操作。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1505944340000",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream"
]
},
{
"Sid": "Stmt1505944397000",
"Effect": "Allow",
"Action": [
"dynamodb:*"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:111548290563:table/TinkSmartPegTable"
]
}
]
}
图 7:Kinesis 流和 DynamoDB 的策略
调用 listTable
listTable 返回先前创建的 Amazon DynamoDB 表名的数组。我们调用 listTable 来获取可用的 Amazon DynamoDB 表名。
var db = new AWS.DynamoDB();
var dbParams = {};
// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
if (err) {
console.log("Error in dynamoDb.listTables(): ");
console.log(err, err.stack); // an error occurred
return;
}
else {
tableStr.value = data.TableNames;
console.log(tableStr.value.length);
}
});
代码示例 7:listTables
调用 putItem
下面的函数接收来自 Amazon Kinesis 流的包并将其存储在 Amazon DynamoDB 表中。
putItem = function(tableName, pegId, fiveBytes) {
// The item has two attributes: PegID type string and FiveBytesData type String
var item = {
'PegID': { 'S': pegId },
'FiveBytesData': { 'S': fiveBytes }
};
var putItemParams = {
TableName: tableName,
Item: item
};
// Store the package in the DynamoDB
db.putItem(putItemParams, function(err, data) {
if (err) {
console.log("Got error: ");
console.log(err, err.stack); // an error occurred
} else {
console.log(JSON.stringify(data, null, 2));
}
});
};
代码示例 8:putItem
示例草图
以下示例草图展示了如何重复读取 Kinesis 流,从 Amazon Kinesis 流中获取记录并将其存储在 Amazon DynamoDB 中。下面的两个示例都是在 UP Squared 和 Gigabyte Gateway GB-BXTB-3825 平台上开发和执行的。执行前请确保两个平台都已连接到网络。
Kinesis 流 putRecord 示例
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk'); //package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log(err);
}
});
var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
function sharIdObj() {
this.value = "";
}
var sharId = new sharIdObj();
function gateWayPackageObj() {
this.value = "";
}
var package = new gateWayPackageObj();
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
console.log("describeStream data");
console.log(data); // successful response
}
})
setInterval(putRecordsToKinesisStream, 5000);
////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putRecordsToKinesisStream()
// This function puts the package into the Kinesis stream. The Kinesis stream is pre-created.
// Parameters: NA
////////////////////////////////////////////////////////////////////////////////////////////////
function putRecordsToKinesisStream (){
var json = {
"PegID": "FF.FF.FF.FF.FF",
"FiveBytesData": "0x1212121212",
};
var params = {
Data: JSON.stringify(json),
PartitionKey: partitionKey,
StreamName: streamName
};
kinesis.putRecord(params, function(err, putData) {
if (err) {
console.log(err, err.stack);
} else {
try {
sharId.value = putData.ShardId;
} catch(err) {
console.log("Error in Kinesis.putRecord()");
console.log(err);
}
}
});
};
代码示例 9:TinkGatewayToCloud.js 示例
连续读取 Kinesis 流示例
var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";
// Cognito parameters for Kinesis Stream and DynamoDB
var cognitoParams = {
AccountId: "111548290563",
IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk'); //package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
if (!err) {
console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
}
else {
console.log("Error in Cognito identiy credentials.");
console.log(err);
}
});
var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
var db = new AWS.DynamoDB();
var dbParams = {};
// Object for the DynamoDB table name
function tableObj() {
this.value = "";
}
var tableStr = new tableObj();
// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
if (err) {
console.log("Error in dynamoDb.listTables(): ");
console.log(err, err.stack); // an error occurred
return;
}
else {
tableStr.value = data.TableNames;
console.log(tableStr.value.length);
}
});
// Describe stream parameters
var describeParams = {
StreamName: streamName
};
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[4].ShardId,
ShardIteratorType: "TRIM_HORIZON", // get oldest package
StreamName: streamName,
};
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
// Get records from the Kinesis stream
getRecord(result.ShardIterator);
}
});
}
});
////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: getRecord
// This function gets records from the kinesis stream and then puts the item into the
// DynamoDB.
//
// Parameters:
// - shard_iterator: The shard position from which to start reading data records
// sequentiallly.
////////////////////////////////////////////////////////////////////////////////////////////////
function getRecord(shard_iterator) {
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
var StringDecoder = require('string_decoder').StringDecoder;
var decoder = new StringDecoder('utf8');
// If there are packages from getRecords()
if(result.Records.length > 0) {
// Loop through all the packages
for(var i = 0; i < result.Records.length; i++) {
if(result.Records[i] != undefined) {
var getData = JSON.parse(decoder.write(result.Records[i].Data));
console.log("PegId =");
console.log(getData.PegID);
// Put the package into the DynamoDB table
putItem(tableStr.value[0], getData.PegID, getData.FiveBytesData);
}
}
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
if (result.NextShardIterator) {
getRecord(result.NextShardIterator);
}
}
});
}
////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putItem
// This function
// - Creates an item using the pass in parameters: pegId and fiveBytes
// - Calls dynamoDb.putItem() to put the item into the tableName
// Parameters:
// - tableName: The name of the DynamoDB
// - pegId: UUID of the peg
// - fiveBytes: The stock information of the peg
////////////////////////////////////////////////////////////////////////////////////////////////
putItem = function(tableName, pegId, fiveBytes) {
// The item has two attributes: PegID type string and FiveBytesData type String
var item = {
'PegID': { 'S': pegId },
'FiveBytesData': { 'S': fiveBytes }
};
var putItemParams = {
TableName: tableName,
Item: item
};
// Store the package in the DynamoDB
db.putItem(putItemParams, function(err, data) {
if (err) {
console.log("Got error: ");
console.log(err, err.stack); // an error occurred
} else {
console.log(JSON.stringify(data, null, 2));
}
});
};
代码示例 10:TinkGetPackageFromCloud.js 示例
UP Squared 将持续将包推送到 Kinesis 流。
网关将持续从 Kinesis 流中拉取包,并将其存储在 DynamoDB 表中。
摘要
我们已经介绍了如何在 UP Squared 和 Gigabyte 网关 GB-BXTB-3825 平台上使用 Ubuntu* 操作系统上的 JavaScript* 来使用 Amazon Kinesis 流和 DynamoDB*。UP Squared 会持续将包推送到 Kinesis 流,Gateway GB-BXTB-3825 会持续从 Kinesis 流中拉取包,然后将其存储在云中以供进一步分析。我们建议您尝试在这些平台上使用不同的 Amazon Web Services。