65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Azure Event Hubs、Azure Functions 和 Cosmos DB 实现大规模流式处理 第一部分:将流式数据引入 Azure

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2022 年 3 月 23 日

CPOL

7分钟阅读

viewsIcon

5238

将数据流式传输到 Event Hub。

在本系列三篇文章中,我们将创建一个端到端的流式处理解决方案。我们将首先将数据流式传输到 Azure Event Hubs,然后使用 Java 版 Azure Functions 进行处理,并将结果管道传输到 Cosmos DB 数据库。最后,我们将 在 Power BI 中分析数据

在第一篇文章中,我们将通过 WebSocket 订阅数据流。然后,我们将使用 Java 处理该流,将数据推送到 Azure Event Hub。我们将使用一个实时的加密货币行情数据集,该数据集显示比特币相对于美元的价格变化。

完整代码可在 GitHub 上找到。

连接到 Blockchain Exchange WebSocket

要继续学习,请免费注册 Blockchain Exchange。使用 API 文档 设置您的 API 密钥并熟悉 WebSocket 端点。

将数据发送到 Azure Event Hubs 的第一步是连接到我们的源数据流。我们将创建一个简单的 Java 项目来处理数据。

在 Visual Studio Code (VSCode) 中,通过安装 Java Extension Pack,打开**命令面板** (Ctrl + Shift + P),然后选择**创建 Java 项目**,来创建一个样板 Java 应用程序。然后,按照提示操作,并选择 Maven 进行依赖管理。

设置好应用程序后,您将拥有一个带有默认类和 main 函数的 Java 项目。还有一个 pom.xml 文件用于托管 Maven 依赖项。

最初,我们只需要一个依赖项——NV WebSocket Client——即可连接到 WebSocket。因此,请务必将其添加到 pom.xml 文件中。

我们需要在 main() 函数中创建一个 WebSocket 对象

WebSocket ws = new WebSocketFactory().createSocket(URL);

然后,我们需要添加 Blockchain Exchange API 文档中提到的必要标头

// Add required header for connection
ws.addHeader("Origin","https://exchange.blockchain.com");

我们还需要在连接到 WebSocket 之前添加一个监听器。该监听器将覆盖 WebSocketAdapter 接口提供的 onTextMessage() 函数。每次 WebSocket 收到消息时,此函数都会触发,以便我们进行处理。

首先,我们将打印消息以确认其正在工作。

ws.addListener(new WebSocketAdapter() {
   @Override
   public void onTextMessage(WebSocket websocket, String message) throws Exception {
       // Received a text message so lets start by printing it out
       System.out.println(message);
   }
});

然后,我们将连接到 WebSocket 并按照 API 文档的说明发送订阅消息。此消息告诉 Blockchain Exchange 我们希望订阅 BTC-USD 行情频道。

try {
    ws.connect();
    // subscribe to the prices channel for USD-BTC price data
    ws.sendText("{\"token\": \""+authToken+"\", 
    \"action\": \"subscribe\", \"channel\": \"prices\", 
    \"symbol\": \"BTC-USD\",\"granularity\": 60}");
} catch (WebSocketException e) {
    e.printStackTrace();
} 

下面的屏幕截图显示 WebSocket 将消息输出到控制台。因此,我们的监听器正在正确接收消息。

由于数据正在从 WebSocket 流入,我们现在想将该数据传递到 Azure Event Hub。

创建 Azure Event Hub

首先,请确保您拥有 Azure 帐户或 免费创建一个。进入 Azure 控制台后,搜索“**Event Hub**”,然后在**服务**下选择**Event Hubs**。

接下来,在 Event Hubs 服务区域创建一个命名空间。该命名空间将包含我们 Event Hub 项目的所有资源,如下图所示。提供**订阅**和**资源组**。如果您还没有资源组,请单击**创建新资源组**。

填写其余详细信息,提供您的**命名空间名称**,并选择合适的**位置**和**定价层**。目前,我们将 Event Hub 设置为具有最少数量的 吞吐量单位。如果我们希望每秒发送的消息超过 1MB 或 1,000 条,以后可以对其进行扩展。

单击**审阅 + 创建**以完成命名空间。完成后,它将出现在命名空间列表中。

现在我们已经创建了命名空间,可以构建我们的 Event Hub。单击命名空间并**创建 Event Hub**。

Event Hub 需要**名称**、分区数和**消息保留**值。您之后无法更改分区数,因此在选择此值时请小心。分区越多,可能的吞吐量越高,因为消息会分布在这些分区之间。因此,您可以扩展下游的处理器数量以满足需求。您应该选择一个分区数,以反映您的应用程序在其峰值负载下的需求。

本项目需要两个分区。

消息保留值设置消息可用的时间(以天为单位)。它保证消息至少在此期间内存在。但是,这并不一定意味着消息会在过期后立即消失。

本项目将消息保留值设置为 1。

连接到 Event Hub

Event Hub 准备好后,我们需要对其进行配置,以便我们的应用程序可以连接。因此,我们需要创建一个连接。

首先,单击**连接**按钮或在 Event Hub 的**设置**菜单中选择**共享访问策略**。通过命名策略并勾选**管理**、**发送**和**侦听**框来创建策略。现在从列表中选择此策略时,您的凭据将出现,如下图所示。这些凭据允许我们的 Java 应用程序连接到 Event Hub 并发送消息。

现在我们的 Event Hub 已准备好接收消息。接下来,我们将修改我们的 Java 应用程序监听器,通过 HTTPS 将消息从 Blockchain Exchange WebSocket 传递到 Event Hub。您可以使用 Event Hubs 库来完成此操作,但我们将使用 Event Hub REST API

我们只需要修改 onTextMessage 函数,该函数以前将我们的 Blockchain Exchange 消息打印到屏幕。

首先,我们需要认证令牌。Microsoft 提供 一个辅助函数来完成此操作。我们只需提供资源 URI、密钥名称(策略名称)以及策略提供的主要密钥。

String auth = GetSASToken("msblog.servicebus.windows.net", "java-app", eventHubKey);

然后,我们创建一个变量来保存我们将要发布消息的 REST API 端点的 URL。我们还创建一个 JSON 数组对象,用于在发送消息之前对其进行批处理。我们可以单独发送它们,但批处理为我们提供了以后在需要时增加吞吐量的条件。

String URL = 
https://msblog.servicebus.windows.net/blockchain-usd-btc-price/messages?timeout=60&api-version=2014-01;
/ create an array to hold the batch
JSONArray array = new JSONArray();

批处理消息 API 端点需要一个如下所示的 JSON 对象数组

[{"Body":"Message1"},{"Body":"Message2"},{"Body":"Message3"}]

每个 JSON 对象必须有一个 Body 键,后跟消息。我们将为此类接收到的每条消息创建一个此格式的 JSON 对象,并将其添加到数组中。

// Received a text message so let's convert it to a json object and store in an array
JSONObject obj = new JSONObject();
obj.put("Body", message);
array.add(obj);

我们将继续执行此操作,直到我们的 JSON 数组包含 100 条消息。然后,我们将创建一个带有必需标头的 HTTP POST 请求,包括之前的授权密钥。我们的请求体将是一个 JSON 字符串,表示我们的 JSON 数组,它符合 API 的必需格式。

发送消息后,我们将关闭连接并重置 JSON 数组,以便我们可以构建下一批 100 条消息

if (array.size() > 100) {
   // if the batch is over 100 messages, send it and then create a new batch
   CloseableHttpClient httpClient = HttpClientBuilder.create().build();
   try {
       HttpPost request = new HttpPost(URL);
       request.setHeader("Content-Type", "application/vnd.microsoft.servicebus.json");
       request.setHeader("Authorization", auth);
       request.setEntity(new StringEntity(array.toJSONString()));
       CloseableHttpResponse response = httpClient.execute(request);
       response.close();
   }
   finally {
       httpClient.close();
   }
   // reset array
   array = new JSONArray();
}

在 Azure Event Hub 控制台中,我们的 Event Hub 概览页面上的图表显示了请求数以及接收或发送的消息数。

上面的屏幕截图显示了对我们 Event Hub 的请求。它还显示了 101 条收到的消息,这是我们第一批消息的大小。因此,我们的应用程序按预期工作。

后续步骤

本文探讨了如何使用 Azure Event Hubs 接收消息以供下游处理。我们成功连接到 WebSocket 以获取消息,然后使用 HTTPS 将它们转发到 Event Hub。

我们使用了来自 Blockchain Exchange 的示例 WebSocket 来构建概念验证。这些技术在现实世界中可能非常有益,尤其是在您拥有实时数据源时,例如来自电子商务系统的大量交易或社交网络上的交互。

将消息放入 Event Hub 可以使我们的应用程序不必在消息激增时满足需求。相反,消息直接进入 Event Hub。我们可以水平扩展下游处理器以满足需求。

使用此方法可确保我们处理所有消息。如果我们想提高速度,只需添加更多处理器即可。因此,在创建 Event Hub 时仔细选择分区数至关重要。此设置最终决定了您可以扩展的程度以及您可以处理的消息吞吐量。

在本系列的下一篇文章中,我们将 将我们的 Event Hub 连接到 Azure Functions。Azure Functions 将处理我们的消息并将其存储在 Cosmos DB 中。稍后,我们将在 Power BI 中分析和可视化数据。

要了解如何使用 Azure Cosmos DB 更改源来了解用户模式、执行实时数据分析和可视化,请参阅 使用 Azure Cosmos DB 更改源可视化实时数据分析

© . All rights reserved.