65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Azure 事件中心、Azure Functions 和 Cosmos DB 进行大规模流式处理 - 第二部分:使用 Java 中的 Azure Functions 处理流式数据

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2022 年 3 月 24 日

CPOL

8分钟阅读

viewsIcon

5075

在 Cosmos DB 中消耗、处理和存储数据

在本系列的第一篇文章中,我们成功订阅了实时比特币兑美元汇率的 WebSocket,然后将消息转发到了 Azure 事件中心。在本文中,我们将使用 Azure Functions 从事件中心消耗这些消息,然后将信息写入 Cosmos DB。

Functions 为 Azure 提供无服务器计算。我们编写代码并将其发布为 Azure Function,然后 Azure 会在后台处理基础结构和扩展。Azure Functions 仅提供满足我们应用程序工作负载所需的资源。这种方法提供了按需计算,可以轻松扩展并节省低需求期间的成本。

Cosmos DB 是一个完全托管的 NoSQL 数据库。这个完全托管的解决方案使我们能够专注于构建应用程序而不是数据库管理,因为它会自动处理更新和补丁。无服务器和自动扩展选项也有助于我们的数据库根据需求进行扩展。

Cosmos DB 支持许多流行的 NoSQL API,包括 MongoDB、Cassandra 和 Gremlin。因此,我们仍然可以使用我们喜欢的 API 来处理数据库,而 Cosmos DB 则负责存储。

让我们开始设置 Cosmos DB 和我们的 Azure Function。完整代码可在 GitHub 上找到。

创建 Cosmos DB 实例

首先,在 Azure 的 Cosmos DB 控制台中创建一个数据库。为此,请在 Azure 中搜索“Cosmos DB”,然后在“服务”下找到它。进入 Cosmos DB 控制台后,单击“创建”。

在本演示中,我们希望创建一个 Core (SQL) 实例。但是,如下图所示,还有其他 API 选项。

填写表单,为您的实例命名,并选择一个合适的位置。我们应该重用我们在第一篇文章中创建的资源组。由于所有这些资源将协同工作,因此将它们放在同一个资源组中有意义。

我们应用免费套餐折扣并限制总账户吞吐量以供演示。当您测试应用程序时,这种方法很有益。它确保您在 Cosmos DB 自动扩展时不会遇到意外费用,例如,如果您不小心将大量数据持续发送到数据库。

下一步是选择如何分发您的实例。托管服务可以极大地改善应用程序的架构和故障转移能力。这些选项在区域和可用性区域之间提供冗余,因此如果某个特定位置的服务出现问题,您的服务就可以从另一个地方照常运行。

对于本演示,我们保持这些设置禁用。但是,如果您认为成本值得避免中断和停机,则应在生产应用程序中启用其中一些选项。

我们将保留其余选项为默认值。但在生产系统中,请注意“网络”和“备份策略”选项卡。这些设置可以锁定您的实例以实现非公共访问,并确保其备份周期符合您的需求。

现在数据库已启动并运行,我们将探索如何使用 Azure Functions 将消息从事件中心传递到数据库。

使用 Azure Functions 处理事件中心消息

在本节中,我们将创建一个 Azure Function。进入事件中心的任何新消息都将触发此 Function。在解析一些属性以使其对 Power BI 等下游应用程序更有用之后,我们将简单地读取信息并将其写入我们的数据库。

转到 Azure 控制台并搜索“Function App”来创建函数。Function App 将封装我们的函数。单击“创建”并填写表单,确保使用与之前相同的资源组。

创建完成后,单击 Function App。在左侧菜单中,单击“Functions”选项。

一条消息将告诉您 Azure 门户不支持编辑 Java Function Apps。仍单击“创建”以获取有关设置本地开发环境以开始构建和测试函数的说明。我们使用 Visual Studio Code (VS Code)。

与 VS Code 集成使本地开发更加直接。您将获得所有必要的文件和配置,以自动将函数链接到第一篇文章的事件中心。

在此之前,我们需要创建一个存储帐户。转到 Azure 控制台中的“存储帐户”部分,然后按“创建”。同样,请确保使用相同的资源组并选择合适的冗余选项。对于本教程,我们选择本地冗余存储。

完成后,返回 Function App。在“Functions”子菜单中单击“创建”,然后按照说明安装 Azure 依赖项。首先,打开 VS Code 并安装 Azure Functions 扩展。在 VS Code 中,单击左侧的“Azure”菜单按钮,然后单击“创建新项目”。

按照说明提供文件夹,选择 Java 作为语言,并提供组 ID 和工件 ID(这些 ID 可以是任何值)。

项目准备好后,同样在 VS Code 的“Azure”菜单中,单击“Functions”下的“创建函数”按钮。

VS Code 会要求您选择一个模板。我们单击“更改模板筛选器”并将其更改为“all”。然后我们选择“EventHubTrigger”。

在接下来的步骤中,选择您在第一篇文章中创建的事件中心和策略以及您刚创建的存储帐户。

完成后,您将拥有一个 Java 项目,可以本地开发 Azure Function。它包含样板代码来运行函数并将任何消息输出到事件中心。

打开并运行第一篇文章的事件中心项目来测试您的应用程序。此操作将一次将 100 条消息的批次发送到我们的事件中心。在运行此程序时,请返回到函数项目并运行它。如果一切连接正确,您应该开始在控制台中看到发送到事件中心的消息,如下图所示。

现在,当新数据到达事件中心时,我们将触发一个函数。我们可以在将数据存储到 Cosmos DB 之前对其进行处理。该函数会将 JSON 解析为 Java 对象,并将价格数组值拆分为单独的属性。

// convert the message to a JSON Object
JSONObject json = new JSONObject(singleMessage);
// Parse out the price array
JSONArray priceArr = json.getJSONArray("price");
// put each item in the array into its own property
json.put("timestamp",priceArr.get(0));
json.put("openPrice",priceArr.get(1));
json.put("highPrice",priceArr.get(2));
json.put("lowPrice",priceArr.get(3));
json.put("closePrice",priceArr.get(4));
json.put("volume",priceArr.get(5));
// remove the price array
json.remove("price");

首先,我们将消息解析为 JSON 对象, 以便更容易操作。然后,我们可以访问对象内的价格数组。我们将把六个值中的每个值作为单独的属性放入 JSON 对象中。最后,我们将从对象中删除价格数组。

这种方法提供了一个干净的 JSON 对象,其中包含每个字段以及从数组中提取的字段。因此,在 Cosmos DB 中查询此数据更加容易。

接下来,我们将此 JSON 对象转换回 Java 对象,然后再将其与 Cosmos DB 集成。首先,我们创建一个 Java 类来表示我们的对象。它还有助于 Cosmos DB 理解我们属性的数据类型。

public class BlockchainPrice {
   private int sequenceNumber;
   private String event;
   private String channel;
   private String symbol;
   private String timestamp;
   private double openPrice;
   private double highPrice;
   private double lowPrice;
   private double closePrice;
   private double volume;
}

然后,我们使用 Gson 自动将 JSON 字符串转换为 Java 对象。只要我们为 JSON 对象属性命名与 Java 类相同,这种方法就可以工作。

// convert the json object into a Java Object
Gson g = new Gson();
BlockchainPrice price = g.fromJson(json.toString(), BlockchainPrice.class);

接下来,我们将修改默认函数,该函数处理我们的事件中心消息,以与我们的 Cosmos DB 实例交互并存储数据。

将数据存储到 Cosmos DB

我们需要创建一个容器来托管我们的数据库并为存储数据创建数据库。首先,进入 Azure 控制台并单击已创建的 Cosmos DB 实例。然后,单击“创建容器”。

选择“创建新数据库”并为其提供 ID。然后,选择合适的扩展和吞吐量设置。在本演示中,我们将它们保持较低。

接下来,为容器提供一个 ID,并选择一个 分区键 来存储数据。数据库应该存储此数据,最好是在每个记录上。

现在数据库已准备好,我们将获取其连接字符串并将其引入我们的 Function App。在左侧的“密钥”菜单中找到“主连接字符串”。我们需要将连接字符串存储在 Function App 项目的 *local.settings.json* 文件中,该文件在我们使用 VS Code 创建项目时自动创建。

属性名应为 CosmosDBConnectionString,值为我们刚刚从 Azure 门户复制的主连接字符串。

我们必须修改我们的运行函数以提供 Cosmos DB 输出注释和一个用于存储数据的文档。

EventHubTrigger 注释之后添加 CosmosDBOutput 注释,并为其提供一些参数。connectionStringSetting 参数应使用我们添加到本地设置文件中的属性。

@FunctionName("EventHubTriggerProcessMessage")
public void run(
    @EventHubTrigger(name = "message", eventHubName = "blockchain-usd-btc-price", 
    connection = "msblog_javaapp_EVENTHUB", consumerGroup = "$Default", 
    cardinality = Cardinality.MANY) List<String> message,
    @CosmosDBOutput(
        name = "msblogcosmos",
        databaseName = "blockchain-price",
        collectionName = "blockchainPrices",
        connectionStringSetting = "CosmosDBConnectionString")
        OutputBinding<BlockchainPrice> document,
    final ExecutionContext context
)

完成此操作后,我们只需在函数的末尾添加一行代码即可将我们的 Java 对象存储到 Cosmos DB 中。

Gson g = new Gson();
BlockchainPrice price = g.fromJson(json.toString(), BlockchainPrice.class);
//store the price object in CosmosDB
document.setValue(price);

现在我们可以运行我们的事件中心应用程序发送一些消息,然后确认我们可以在 Cosmos DB 中看到数据。

从 Cosmos DB 控制台,选择实例。然后,在左侧菜单中选择“数据资源管理器”。您应该会看到您的容器,如果展开它,您也会看到您的数据库。

选择项目将自动运行默认的 SQL 查询。

您应该会在屏幕上看到返回的数据,如下图所示。选择一个项目会提供更多详细信息。正如屏幕截图所示,我们的文档已正确存储。它已正确地将数组中的项目解析并存储在单独的属性中。

后续步骤

在本文中,我们成功创建了一个 Cosmos DB 实例。我们还使用 Visual Studio Code 在本地开发了一个 Azure Function。该函数处理来自事件中心的数据并将其存储在 Cosmos DB 中。

在我们三部分系列文章的最后一篇文章中,我们将探索 使用 Power BI 进行临时分析和创建强大的仪表板。

要了解如何使用 Azure Cosmos DB 更改源来了解用户模式、执行实时数据分析和可视化,请参阅 使用 Azure Cosmos DB 更改源可视化实时数据分析

© . All rights reserved.