65.9K
CodeProject 正在变化。 阅读更多。
Home

在 UP Squared 主板上使用 Amazon Kinesis 和 Amazon Dynamodb

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2017年12月15日

CPOL

10分钟阅读

viewsIcon

7448

本指南演示了使用 UPSquared 和 Gigabyte 网关在 Ubuntu 上与 Amazon Kinesis 和 DynamoDB 进行通信的步骤。

引言

本指南演示了使用 UPSquared* 和 Gigabyte* 网关在 Ubuntu* 上与 Amazon Kinesis* 和 DynamoDB* 进行通信的步骤。UP Squared 推送(发送)信息,网关拉取(接收)信息以供进一步分析。

通过本文示例,您将学会使用 UP Squared 和 Gigabyte 网关 GB-BXTB-3825 平台来

  • 创建 Cognito* 参数
  • 描述 Amazon Kinesis 流
  • Put record(放置记录)、get record(获取记录)并监控 Amazon Kinesis
  • 如何从 Kinesis 流中提取数据并将其存储在 DynamoDB* 云数据库中以供后续分析

关于 UP Squared 主板

UPSquared 平台功耗低、性能高,非常适合物联网(IoT),是基于 Intel Apollo Lake 平台最快的 x86 创客主板。它包含 Intel Celeron® 处理器双核 N3350 和 Intel® Pentium® 品牌四核 N4200 处理器。

操作系统兼容性

UPSquared 平台可以运行 Ubilinux*、Ubuntu*、Windows® 10 IoT Core、Windows® 10、Yocto Project* 或 Android* Marshmallow 操作系统。有关 UPSquared 的更多信息,请访问 http://www.up-board.org/upsquared。有关 Intel® IoT Gateway GB-BXTB-3825 的详细信息,请参阅 http://b2b.gigabyte.com/Embedded-Computing/GB-BXBT-3825-rev-10#ov

 

硬件组件

此项目中使用的硬件组件如下所示

开发板

UP Squared 平台功耗低、性能高,非常适合物联网(IoT),是一款基于 Intel Apollo Lake 平台设计的快速原型开发板。它包含 Intel® Celeron® 双核 N3350 和 Intel® Pentium® 四核 N4200 处理器。

UP Squared 平台

开始之前,应在 UP Squared 平台上安装 Ubuntu* 操作系统。

1. 要确保 Ubuntu 操作系统是最新的并安装了依赖的 Ubuntu 包,请打开命令提示符(终端)并键入以下内容

 

sudo apt-get update

2. 通过在终端中键入以下命令来安装 npm(Node 版本管理器)包

sudo apt-get install npm

3. AWS* 是一个开源代码,它为许多 AWS 服务(包括 Kinesis 流、DynamoDB 等)提供 JavaScript* 库。要在 UP Squared 平台上使用 Node.js* 上的 AWS SDK for JavaScript* 开始使用 AWS,请在 Ubuntu* 终端上安装 AWS SDK JavaScript* 库和文档

npm install aws-sdk

4. Node.js* 是一个开源的、跨平台的 JavaScript* 运行时环境

sudo apt-get install node.js

Gigabyte* 网关 GB-BXTB-3825 平台

请确保 GB-BXTB-3825 网关平台上已安装 Ubuntu*
同样,请确保 Ubuntu 操作系统是最新的,安装依赖项、npm、aws-sdk 和 Node.js* 包。网关从 Kinesis 流中提取包并将其存储在 DynamoDB 表中。

创建 Amazon Kinesis* 流

Amazon Kinesis 流支持大规模实时数据流,并使数据可供消费。流中的数据是数据记录的有序序列。流中的数据记录被分成多个分片。将数据分成多个分片是一种分散负载的技术。
让我们开始登录 Amazon Web Services (AWS)* 管理控制台,打开 Amazon* Kinesis 控制台,选择一个区域,然后选择 **Go** 进入 **Stream console**。

1. 选择 **Create Kinesis** stream(创建 Kinesis 流)

2. 为您的 Kinesis 流选择一个**名称和分片数**。

3. 当您看到一条消息通知您流已创建且状态为“ACTIVE”时,表示流已准备就绪。

创建 Amazon DynamoDB* 表

Amazon DynamoDB 是一项完全托管的云服务。其灵活的数据管理使其非常适合物联网应用。使用 Amazon DynamoDB,我们可以存储从 UP Squared 平台广播的数据,并针对特定需求分析这些数据。
1. 要创建 DynamoDB 表,请打开 DynamoDB 控制台并单击 **Create table**(创建表)。下面的示例创建了一个名为“TinkSmartPegTable”的 DynamoDB 表,其中包含两个属性 **PegID** 和 **FiveBytesData**,类型为 String。

2. 单击 **Create table**(创建表)以创建 DynamoDB 表。

3. 导航到您刚刚创建的 TinkSmartPegTable 的 **Items**(项目)选项卡,单击 **Create item**(创建项目)以创建更多项目。

4. 在 **Create item**(创建项目)表单的 more(更多)列表中选择 **Insert**(插入)

5. 从类型下拉列表中选择 **String**。

6. 输入第一个项目的名称和值,输入第二个项目的值,然后选择 **Save**(保存)。

7. TinkSmartPegTable 现在已创建,包含两个项目:PegID 和 FiveBytesData。

创建策略

IAM 策略明确列出了允许的操作以及这些操作适用的资源。
1. 在 IAM 控制台中,选择左侧列中的 Policies(策略),然后选择 **Create Policy**(创建策略)。

2. 在 **Create Policy**(创建策略)表单中,选择 **Policy Generator**(策略生成器)。

3. 填写策略权限如下

  • AWS Service: Amazon Kinesis
  • Actions: 选择 **DescribeStream**、**GetShardIterator**、**GetRecords**、**PutRecord** 和 **PutRecords** 作为允许的操作。
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 Kinesis 流名称。

要获取 Kinesis 流 ARN,请在 Kinesis 控制台上,选择流名称,然后转到左侧列中的 Stream(流)。复制 Stream ARN。

4. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN。

5. 在 **Edit Permissions**(编辑权限)表单中单击 **Add Statement**(添加语句)。

 

6. 为 Amazon DynamoDB 填写操作和 ARN。

  • AWS Service: Amazon DynamoDB
  • Actions: 选择 **All Actions**(所有操作)作为允许的操作。
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 DynamoDB 名称。

要获取 DynamoDB ARN,请转到 DynamoDB 控制台,单击左侧列中的 **Tables**(表),选择 DynamoDB 表名 **TinkSmartPegTable**,您应该能在 **Overview**(概览)选项卡中看到 DynamoDB ARN。

7. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN,然后单击 **Add Statement**(添加语句)。

8. 在 **Edit Permissions**(编辑权限)表单中单击 **Add Statement**(添加语句)。

9. 为 Amazon DynamoDB 填写操作和 ARN。

  • AWS Service: Amazon DynamoDB
  • Actions: 选择 **All Actions**(所有操作)作为允许的操作。
  • Amazon Resource Name (ARN): arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream,其中 us-east-1 是区域,111548290563 是 AWS 账户 ID,TinkKinesisStream 是 DynamoDB 名称。

要获取 DynamoDB ARN,请转到 DynamoDB 控制台,单击左侧列中的 **Tables**(表),选择 DynamoDB 表名 **TinkSmartPegTable**,您应该能在 **Overview**(概览)选项卡中看到 DynamoDB ARN。

10. 在 **Edit Permissions**(编辑权限)表单中填写 AWS Service(AWS 服务)、Action(操作)和 ARN,然后单击 **Add Statement**(添加语句)。

11. 继续进行 **Next Step**(下一步)。

12. 编辑策略名称,然后选择 **Create Policy**(创建策略)。

如果您看到“TinkKinesisDynamoDBPolicy has been created”(TinkKinesisDynamoDBPolicy 已创建)的消息,则表示新策略已成功创建。

Amazon Cognito*

Amazon Cognito 是一项 Web 服务,用于为 UP Squared 和 Gateway GB-BXTB-3825 设备指定临时安全凭证,以便访问您的 Kinesis 流和 DynamoDB。Amazon Cognito 参数用于初始化 Cognito Credentials 对象。

1. 导航到 Cognito 控制台,然后选择 **Manage Federated Identities**(管理联合身份)。

2. 输入 **identity pool name**(身份池名称),勾选 **Enable**(启用)对未经身份验证的身份的访问。

3. 选择 **Allow**(允许)。

4. 在 **Getting started with Amazon Cognito**(Amazon Cognito 入门)表单中,单击右上角的 **Edit identity pool**(编辑身份池)。

5. 单击 **Cognito Stream**(Cognito 流)以展开。

6. 为流名称选择 TinkKinesisStream,启用流状态,然后选择 **Save Changes**(保存更改)。

附加策略

1. 打开 IAM 控制台,选择 Roles(角色),然后选择 Cognito_TinkPegIdentityPoolUnauth_Role。

2. 单击 **Attach**(附加)策略。

3. 选择 TinkKinesisStreamPolicy,然后单击 Attach policy(附加策略)。

4. 为 Cognito_TinkPegIdentityUnauth_Role 授予完全的 DynamoDB 访问权限,以便 Cognito_TinkPegIdentityUnauth_Role 能够执行 dynamodb:ListTables。再次单击 **Attach policy**(附加策略)以附加到 AmazonDynamoDBFullAccess。

5. 选择 **AmazonDynamoDBFullAccess**,然后单击 **Attach policy**(附加策略)。

6. 现在 Cognito_TinkPegIdentityPoolUnauth_Role 已附加到 TinkKinesisDynamoDBPolicy 和 AmazonDynamoDBFullAccess。

 

示例代码

为 Amazon Kinesis* 流创建 Cognito* 参数

Amazon Cognito* 是一项 Web 服务,用于为移动设备指定临时安全凭证。Amazon Cognito 参数用于初始化 Cognito Credentials 对象。

1. 按照以下步骤为 Amazon Kinesis 流创建 Cognito 参数

  • AccountId: 从右上角的 **Account Name**(账户名称)转到 **My Account**(我的账户)。
  • IdentityPoolId: 要获取用于代码 cognitoParams 的身份池 ID,请导航到 Cognito 控制台,然后单击 **Manage Federated Identities**(管理联合身份),选择 TinkPegIdentityPool,然后单击右上角的 **Edit identity pool**(编辑身份池)。**Identity pool ID**(身份池 ID)位于 **Edit identity pool**(编辑身份池)表单中。

 

2. 选择 **TinkPegIdentityPool**。

3. 单击 **Edit identity pool**(编辑身份池)。

var awsRegion = "us-east-1";
var streamName = TinkKinesisStream; 		// Kinesis stream
var tableName = " TinkSmartPegTable";

// Cognito parameters for Kinesis Stream
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};

代码示例 1:Amazon Kinesis 流的 Cognito 参数

创建凭证对象

现在,您可以使用 Amazon Cognito Identify* 服务编写一个示例应用程序来创建凭证对象。

var AWS = require('aws-sdk');
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log("Error in Cognito identiy credentials.");
        console.log(err);
    }
});

代码示例 2:创建新的凭证对象

DescribeStream

DescribeStream 返回流的当前状态、Amazon 资源名称(ARN)、分片对象数组,并指示是否有更多分片可用。下面的示例描述了 Amazon Kinesis 流以获取可用分片。

kinesis.describeStream(describeParams, function(err, data) {
  if (err) {
      console.log(err, err.stack); // an error occurred
  }
  else {
        console.log("describeStream data");
        console.log(data.StreamDescription.Shards[2].ShardId);
  }
})

代码示例 3:DescribeStream

PutRecord

下面的示例代码将 JSON* 格式的记录放入 Amazon Kinesis 流 'TinkKinesisStream'。记录包含两个字符串类型的数据:MAC 地址和五字节值。有关 PutRecord 的更多信息,请访问 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html。

function putRecordsToKinesisStream (){
    var json = {
        "PegID": "FF.FF.FF.FF.FF",
        "FiveBytesData": "0x1212121212",
    };

    var params = {
        Data: JSON.stringify(json),
        PartitionKey: partitionKey,
        StreamName: streamName
    };

    kinesis.putRecord(params, function(err, putData) {
        if (err) {
            console.log(err, err.stack);
        } else {
            try {
                sharId.value = putData.ShardId;
            } catch(err) {
                console.log("Error in Kinesis.putRecord()");
      	        console.log(err);
            }
        }
    });
};

代码示例 4:putRecord

getRecord

在调用 getRecords 之前,我们调用 getShardIterator 来指定 Kinesis 流的名称、如何读取数据记录以及在分片中的哪个位置开始顺序读取记录。有关 getShardIterator 的详细说明,请参阅 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html。

// Describe stream parameters
var describeParams = {
    StreamName: streamName
};

kinesis.describeStream(describeParams, function(err, data) {
    if (err) {
      console.log(err, err.stack); // an error occurred
    }
    else {
        var getParams = {
            ShardId: data.StreamDescription.Shards[2].ShardId,
            ShardIteratorType: "TRIM_HORIZON",   // get oldest package
            StreamName: streamName,
        };

        kinesis.getShardIterator(getParams, function(err, result) {
            if (err) {
                 console.log("Error in getShardIterator()");
                 console.log(err);
            } else {
                // Get records from the Kinesis stream
                getRecord(result.ShardIterator);
            }
        });
    }
});

代码示例 5:getShardIterator

现在我们可以从 Amazon Kinesis 流 TinkKinesisStreamGetRecords。有关 GetRecords 的更多信息,请参阅 http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html。

function getRecord(shard_iterator) {

    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {
                var StringDecoder = require('string_decoder').StringDecoder;
                var decoder = new StringDecoder('utf8');

                // If there are packages from getRecords()
                if(result.Records.length > 0) {
                    // Loop through all the packages
                    …
                }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }

            if (result.NextShardIterator) {
                getRecord(result.NextShardIterator);
            }
        }
    });
}

代码示例 6:getRecords

Amazon DynamoDB*

编辑策略

IAM 策略允许我们指定任何支持 IAM 的服务的任何 API 操作。以下策略指定了文档中示例代码使用的 Amazon Kinesis 流和 DynamoDB 的 API 操作。编辑策略以添加或删除 Kinesis 流或 DynamoDB 上允许的操作。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1505944340000",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:111548290563:stream/TinkKinesisStream"
            ]
        },
        {
            "Sid": "Stmt1505944397000",
            "Effect": "Allow",
            "Action": [
                "dynamodb:*"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111548290563:table/TinkSmartPegTable"
            ]
        }
    ]
}

图 7:Kinesis 流和 DynamoDB 的策略

调用 listTable

listTable 返回先前创建的 Amazon DynamoDB 表名的数组。我们调用 listTable 来获取可用的 Amazon DynamoDB 表名。

var db = new AWS.DynamoDB();
var dbParams = {};

// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
    if (err) {
        console.log("Error in dynamoDb.listTables(): ");
        console.log(err, err.stack); // an error occurred
        return;
    }
    else {
        tableStr.value = data.TableNames;
        console.log(tableStr.value.length);
    }
});

代码示例 7:listTables

调用 putItem

下面的函数接收来自 Amazon Kinesis 流的包并将其存储在 Amazon DynamoDB 表中。

putItem = function(tableName, pegId, fiveBytes) {
    // The item has two attributes: PegID type string and FiveBytesData type String
    var item = {
        'PegID': { 'S': pegId },
        'FiveBytesData': { 'S': fiveBytes }
    };

    var putItemParams = {
        TableName: tableName,
        Item: item
    };

    // Store the package in the DynamoDB
    db.putItem(putItemParams, function(err, data) {
    if (err) {
        console.log("Got error: ");
        console.log(err, err.stack); // an error occurred
    } else {
        console.log(JSON.stringify(data, null, 2));
    }
  });
};

代码示例 8:putItem

示例草图

以下示例草图展示了如何重复读取 Kinesis 流,从 Amazon Kinesis 流中获取记录并将其存储在 Amazon DynamoDB 中。下面的两个示例都是在 UP Squared 和 Gigabyte Gateway GB-BXTB-3825 平台上开发和执行的。执行前请确保两个平台都已连接到网络。

Kinesis 流 putRecord 示例

var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};

var AWS = require('aws-sdk'); //package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log(err);
    }
});

var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});

function sharIdObj() {
    this.value = "";
}
var sharId = new sharIdObj();

function gateWayPackageObj() {
    this.value = "";
}
var package = new gateWayPackageObj();
var describeParams = {
        StreamName: streamName
    };

kinesis.describeStream(describeParams, function(err, data) {
  if (err) {
      console.log(err, err.stack); // an error occurred
  }
  else {
        console.log("describeStream data");
        console.log(data);           // successful response
  }
})

setInterval(putRecordsToKinesisStream, 5000);

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putRecordsToKinesisStream()
// This function puts the package into the Kinesis stream. The Kinesis stream is pre-created.
// Parameters: NA
////////////////////////////////////////////////////////////////////////////////////////////////

function putRecordsToKinesisStream (){

    var json = {
        "PegID": "FF.FF.FF.FF.FF",
        "FiveBytesData": "0x1212121212",
    };

    var params = {
        Data: JSON.stringify(json),
        PartitionKey: partitionKey,
        StreamName: streamName
    };

    kinesis.putRecord(params, function(err, putData) {
        if (err) {
            console.log(err, err.stack);
        } else {
            try {
                sharId.value = putData.ShardId;
            } catch(err) {
                console.log("Error in Kinesis.putRecord()");
      	        console.log(err);
            }
        }
    });
};

代码示例 9:TinkGatewayToCloud.js 示例

连续读取 Kinesis 流示例

var awsRegion = "us-east-1";
var streamName = 'TinkKinesisStream'; // Kinesis stream
var tableName = "TinkSmartPegTable";

// Cognito parameters for Kinesis Stream and DynamoDB
var cognitoParams = {
    AccountId: "111548290563",
    IdentityPoolId: "us-east-1:ab55e2b3-36b6-4e88-aa9a-38f6d437d89e"
};
var AWS = require('aws-sdk');       			//package for AWS
AWS.config.region = awsRegion;
AWS.config.credentials = new AWS.CognitoIdentityCredentials(cognitoParams);
AWS.config.credentials.get(function(err) {
    if (!err) {
        console.log("Cognito Identity Id: " + AWS.config.credentials.identityId);
    }
    else {
        console.log("Error in Cognito identiy credentials.");
        console.log(err);
    }
});

var kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
var db = new AWS.DynamoDB();
var dbParams = {};

// Object for the DynamoDB table name
function tableObj() {
    this.value = "";
}
var tableStr = new tableObj();

// If the DynamoDB table exists, get the DynamoDB table name.
db.listTables(dbParams , function(err, data) {
    if (err) {
        console.log("Error in dynamoDb.listTables(): ");
        console.log(err, err.stack);			 // an error occurred
        return;
    }
    else {
        tableStr.value = data.TableNames;
        console.log(tableStr.value.length);
    }
});

// Describe stream parameters
var describeParams = {
    StreamName: streamName
};

kinesis.describeStream(describeParams, function(err, data) {
    if (err) {
      console.log(err, err.stack);			 // an error occurred
    }
    else {
        var getParams = {
            ShardId: data.StreamDescription.Shards[4].ShardId,
            ShardIteratorType: "TRIM_HORIZON",   		// get oldest package
            StreamName: streamName,
        };

        kinesis.getShardIterator(getParams, function(err, result) {
            if (err) {
                 console.log("Error in getShardIterator()");
                 console.log(err);
            } else {
                // Get records from the Kinesis stream
                getRecord(result.ShardIterator);
            }
        });
    }
});

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: getRecord
// This function gets records from the kinesis stream and then puts the item into the
// DynamoDB.
//
// Parameters:
//     - shard_iterator: The shard position from which to start reading data records
//                                    sequentiallly.
////////////////////////////////////////////////////////////////////////////////////////////////
function getRecord(shard_iterator) {
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {
                var StringDecoder = require('string_decoder').StringDecoder;
                var decoder = new StringDecoder('utf8');

                // If there are packages from getRecords()
                if(result.Records.length > 0) {
                    // Loop through all the packages
                    for(var i = 0; i < result.Records.length; i++) {
                        if(result.Records[i] != undefined) {
                            var getData = JSON.parse(decoder.write(result.Records[i].Data));

                            console.log("PegId =");
                            console.log(getData.PegID);

                            // Put the package into the DynamoDB table
                            putItem(tableStr.value[0], getData.PegID, getData.FiveBytesData);
                        }
                    }
                }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }

            if (result.NextShardIterator) {
                getRecord(result.NextShardIterator);
            }
        }
    });
}

////////////////////////////////////////////////////////////////////////////////////////////////
// Function Name: putItem
// This function
//     - Creates an item using the pass in parameters: pegId and fiveBytes
//     - Calls dynamoDb.putItem() to put the item into the tableName
// Parameters:
//     - tableName: The name of the DynamoDB
//     - pegId:     UUID of the peg
//     - fiveBytes: The stock information of the peg
////////////////////////////////////////////////////////////////////////////////////////////////
putItem = function(tableName, pegId, fiveBytes) {
    // The item has two attributes: PegID type string and FiveBytesData type String
    var item = {
        'PegID': { 'S': pegId },
        'FiveBytesData': { 'S': fiveBytes }
    };

    var putItemParams = {
        TableName: tableName,
        Item: item
    };

    // Store the package in the DynamoDB
    db.putItem(putItemParams, function(err, data) {
    if (err) {
        console.log("Got error: ");
        console.log(err, err.stack); // an error occurred
    } else {
        console.log(JSON.stringify(data, null, 2));
    }
  });
};

代码示例 10:TinkGetPackageFromCloud.js 示例

UP Squared 将持续将包推送到 Kinesis 流。

网关将持续从 Kinesis 流中拉取包,并将其存储在 DynamoDB 表中。

摘要

我们已经介绍了如何在 UP Squared 和 Gigabyte 网关 GB-BXTB-3825 平台上使用 Ubuntu* 操作系统上的 JavaScript* 来使用 Amazon Kinesis 流和 DynamoDB*。UP Squared 会持续将包推送到 Kinesis 流,Gateway GB-BXTB-3825 会持续从 Kinesis 流中拉取包,然后将其存储在云中以供进一步分析。我们建议您尝试在这些平台上使用不同的 Amazon Web Services。

参考文献

© . All rights reserved.