65.9K
CodeProject 正在变化。 阅读更多。
Home

集成 MongoDB 和 Amazon Kinesis,实现智能持久流

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2018年11月13日

CPOL
viewsIcon

9663

您可以将您的在线运营工作负载构建在 MongoDB 之上,并通过使用 MongoDB Stitch 触发器启动 Amazon Kinesis 流处理操作来实时响应事件

您可以将您的在线运营工作负载构建在 MongoDB 之上,并通过使用 MongoDB Stitch 触发器启动 Amazon Kinesis 流处理操作来实时响应事件。

让我们看一个示例场景,其中数据流是用户在网站上执行操作而生成的。我们将持久存储数据,并同时向 Kinesis 进程馈送数据,以对诸如购物车放弃、产品推荐甚至信用卡欺诈检测等进行流式分析。

我们将通过设置 Stitch 触发器来实现这一点。当 MongoDB 中进行相关数据更新时,触发器将使用 Stitch 函数调用 AWS Kinesis,如您在此架构图中所示

图 1. 架构图

您需要以下内容才能跟着操作

  1. 一个 Atlas 实例
    如果您还没有在 Atlas 上运行的应用程序,您可以在此处按照我们的 Atlas 入门指南进行操作。在此示例中,我们将使用一个名为 streamdata 的数据库,其中包含一个名为 clickdata 的集合,我们将把来自基于网络的电子商务应用程序的数据写入其中。
  2. 一个 AWS 账户和一个 Kinesis 流
    在此示例中,我们将使用 Kinesis 流将数据向下游发送到其他应用程序,例如 Kinesis Analytics。这是我们希望将更新馈送到其中的流。
  3. 一个 Stitch 应用程序
    如果您还没有 Stitch 应用程序,请登录 Atlas,然后单击左侧导航栏中的 Stitch Apps,然后单击 Create New Application

创建集合

第一步是从 Stitch 应用程序控制台创建数据库和集合。单击左侧导航菜单中的 Rules,然后单击 Add Collection 按钮。为数据库键入 streamdata,为集合名称键入 clickdata。选择标有“用户只能读取和写入自己的数据”的模板,并提供一个字段名称,我们将在此处指定用户 ID。

图 2. 创建集合

配置 Stitch 与 AWS 通信

Stitch 允许您配置服务以与外部服务(如 AWS Kinesis)交互。从左侧导航栏中选择 Services,然后单击 Add a Service 按钮,选择 AWS 服务并设置 AWS 访问密钥 ID 和秘密访问密钥

图 3. Stitch 中的服务配置

服务使用规则来指定 Stitch 可以使用服务的哪些方面以及如何使用。通过单击标有 NEW RULE 的按钮,添加一个规则,该规则将使该服务能够与 Kinesis 通信。将规则命名为“kinesis”,因为我们将使用此特定规则来启用与 AWS Kinesis 的通信。在标有 Action 的部分中,选择标有 Kinesis 的 API 并选择 All Actions。

图 4. 添加规则以启用与 Kinesis 的集成

编写一个使用服务将文档流式传输到 Kinesis 的函数

现在我们有了一个可用的 AWS 服务,我们可以用它将记录放入 Kinesis 流中。我们在 Stitch 中通过函数来实现这一点。让我们设置一个 putKinesisRecord 函数。

从左侧菜单中选择“函数”,然后单击“创建新函数”。为函数提供一个名称,并将以下内容粘贴到函数体中。

exports = function(event){
 const awsService = context.services.get('aws');
try{
   awsService.kinesis().PutRecord({
     Data: JSON.stringify(event.fullDocument),
     StreamName: "stitchStream",
     PartitionKey: "1"
      }).then(function(response) {
        return response;
      });
}
catch(error){
  console.log(JSON.parse(error));
}
};
图 5. 示例函数 - putKinesisRecord

测试函数

让我们通过手动调用该函数来确保一切正常。从 Function Editor 中,单击 Console 以查看 Stitch 的交互式 javascript 控制台。

从触发器调用的函数需要一个事件。为了测试我们函数的执行,我们需要向函数传递一个虚拟事件。在 Stitch 中从控制台创建变量很简单。只需将变量的值设置为 JSON 文档即可。对于我们的简单示例,请使用以下内容

event = {
   "operationType": "replace",
   "fullDocument": {
       "color": "black",
       "inventory": {
           "$numberInt": "1"
       },
       "overview": "test document",
       "price": {
           "$numberDecimal": "123"
       },
       "type": "backpack"
   },
   "ns": {
       "db": "streamdata",
       "coll": "clickdata"
   }
}
exports(event);

将上述内容粘贴到控制台中,然后单击标有 Run Function As 的按钮。选择一个用户,函数将执行。

嗒哒!

使用 Stitch 触发器整合起来

我们已经将 MongoDB 集合部署在 Atlas 中,并接收来自我们 Web 应用程序的事件。我们的 Kinesis 流已准备好接收数据。我们有一个 Stitch 函数可以将数据放入 Kinesis 流中。

配置 Stitch 触发器非常简单,几乎没有高潮。从左侧导航栏中单击 Triggers,命名您的触发器,提供数据库和集合上下文,并选择 Stitch 将通过执行函数来响应的数据库事件。

对于数据库和集合,使用第一步中的名称。现在我们将设置我们希望触发器监视的操作。(有些触发器可能关心所有操作——插入、更新、删除和替换——而另一些触发器可能更高效,因为它们逻辑上只对其中一些操作有意义。)在我们的例子中,我们将监视插入、更新和替换操作。

现在我们将 putKinesisRecord 函数指定为链接函数,我们完成了。

图 6. Stitch 中的触发器配置

作为触发器执行的一部分,Stitch 将转发与触发器事件相关的详细信息,包括事件中涉及的完整文档(即集合中新插入、更新或删除的文档)。我们可以在此处评估传入文档的某些条件或属性,并决定是否将记录放入流中。

测试触发器!

Amazon 提供了一个仪表板,您可以在其中查看与传入流数据相关的详细信息。

图 7. Kinesis 流监控

当您从 Stitch 内部执行该函数时,您将开始看到数据进入 Kinesis 流。

构建更多功能

到目前为止,我们的触发器非常基本——它监视一个集合,当发生任何更新或插入时,它将整个文档馈送到我们的 Kinesis 流中。从这里我们可以构建一些更智能的功能。为了结束这篇文章,让我们看看数据一旦持久存储在 MongoDB 中并放入流中后,我们可以用它做些什么。

一旦记录进入 Kinesis 流,您可以配置下游的其他服务来处理数据。一个常见的用例是结合 Amazon Kinesis Data Analytics 对流数据执行分析。Amazon Kinesis Data Analytics 提供预配置的模板来完成异常检测、简单警报、聚合等任务。

例如,我们的数据流将包含来自购买的订单。这些订单可能来自销售点系统,也可能来自我们基于网络的电子商务应用程序。Kinesis Analytics 可用于创建处理传入数据流的应用程序。例如,我们可以构建一个机器学习算法来检测数据中的异常,或者从我们流中的滑动翻转数据窗口创建产品性能排行榜。

图 8. Amazon Data Analytics - 异常检测示例

总结

现在您可以将 MongoDB 连接到 Kinesis。从这里,您可以利用 Amazon Web Services 提供的众多服务中的任何一个来构建您的应用程序。在本系列的下一篇文章中,我们将重点讨论如何将数据从 Kinesis 重新导入 MongoDB。在此期间,请告诉我们您正在使用 Atlas、Stitch 和 Kinesis 构建什么!

资源

MongoDB Atlas

MongoDB Stitch

Amazon Kinesis

© . All rights reserved.