针对 Kafka 开发者的 Azure 事件中心大规模流式传输(第 1 部分):将流数据引入 Azure
在本文中,我们创建了一个 Event Hubs 实例和一个演示应用程序,该应用程序使用 Kafka API 发送消息。
每天都会生成越来越多的数据,这需要能够大规模处理数据流的系统。Apache Kafka 一直是流处理和大规模实现发布-订阅模式的稳定系统。
Azure Event Hubs 是一项完全托管的发布-订阅服务,每秒允许数百万个事件的流式处理,而无需管理服务器和集群。Event Hubs 还提供与 Apache Kafka 兼容的终结点,从而可以使用 Event Hubs 与现有应用程序集成。
本系列将探讨开发人员如何使用 Event Hubs、Python 和 Cosmos DB 创建端到端的流式处理解决方案,而无需创建或管理基础设施。它还将连接 Power BI,根据发送到 Cosmos DB 的数据生成仪表板。本文以一个允许用户通过将详细信息发送到 Event Hub 来预订航班的 Web 应用程序为例。为了模仿此应用程序,本文使用了一个使用 Faker 库生成数据的模拟 Kafka 生产者。
在开始之前,请确保您已设置 Azure 帐户。
设置 Azure Event Hubs
在设置发送消息的生产者之前,开发人员必须设置一个 Event Hub 来接收和发送消息给使用者。
打开 Azure 门户,单击“创建资源”图标,然后创建一个新的资源组来存放所有应用程序组件。
现在我们有了一个用于存放所有组件的资源组,我们可以创建一个新的 Event Hub 命名空间,它类似于 Kafka 集群,但无需管理基础设施。
再次单击“创建资源”图标,然后找到“Event Hubs”。选择我们创建的资源组,并为命名空间指定一个名称。另外,请务必选择至少标准层,以确保 Kafka 服务已启用。完成后,单击“审阅+创建”以完成新命名空间的创建。
Event Hub 命名空间准备就绪后,打开它,然后从左侧的菜单列表中选择“Event Hubs”选项。我们将创建一个名为 airplane_bookings
的新 Event Hub,用作我们的 Kafka 主题以承载消息。如果您在此服务上运行大型生产负载,可以增加分区数和消息保留期,但在此教程中我们使用默认值。
为了从我们的 Kafka 客户端访问此 Event Hub,我们必须通过一种使用安全身份验证和安全层(SASL)的方法传递连接字符串。
为此,请打开新主题,然后向下滚动到“共享访问策略”部分。为我们的 Web 应用程序添加一个新的共享访问策略,给它命名,然后选择“发送”权限,因为我们的应用程序必须将消息发送到管道。完成后,打开新策略并复制主连接字符串,我们需要它来配置我们的数据生产者。
数据生产者
通过利用 Event Hubs 的 Kafka API,我们可以连接现有的应用程序或设备,而无需重新开发任何最终用户设备。我们配置了一个模拟数据生产者,它将发送飞机预订信息,就像生产 Web 应用程序一样。为此,我们创建了一个带有 Ubuntu 的 Linux 虚拟机(VM),并修改了示例 Python 客户端以使用 Faker Python 库。
通过在我们的资源文件夹中单击“创建资源”并选择“Ubuntu Server 20.04”选项来创建 VM。为服务器指定名称和大小,并使用 SSH 密钥配置管理员帐户。打开一个端口以便您可以远程登录,然后单击“创建”。请务必下载密钥,并在服务器构建完成后,使用 PuTTY 等终端登录到远程服务器。
现在,我们将通过运行以下命令配置一个安装了 Faker 的简单 Python Kafka 客户端
sudo pip install confluent-kafka faker faker_airtravel
这会安装 Confluent Kafka 客户端、Faker 库以及 air travel 社区附加组件。安装完这些 Python 库后,我们将创建一个名为 producer.py 的 Python 文件,使用此 GitHub 存储库中的代码。将代码粘贴到名为 producer.py 的新文件中,或使用 git clone
命令下载 GitHub 存储库。
如果我们查看代码,会发现我们传递了以下五个参数
- Event Hub 命名空间
- Event Hub 的连接字符串
- 主题名称
- 我们想要发送的消息数量
- 消息之间的最大毫秒延迟
这会将我们的脚本配置为按照要求向 Event Hub 发送随机消息。
由于我们使用的是标准的 Confluent Kafka 客户端,因此我们可以通过查看第 20 行的 conf
设置来了解如何为实际示例进行配置。在此配置中,我们设置了以下内容
- Kafka 引导服务器为
<your_namespace>.servicebus.windows.net:9093
- 协议为
SASL_SSL
- 证书位置为 Ubuntu 的默认位置(/usr/lib/ssl/certs/ca-certificates.crt)
- SASL 机制为
Plain
- 用户名设置为变量
$ConnectionString
- 密码设置为 SAS 策略的主连接字符串
- 将客户端 ID 设置为
Kafka-example-producer
建立此配置后,我们的脚本的其余部分将根据我们指定的次数进行迭代,随机等待时间在 0 到 Max wait time
之间。Faker 生成的 JSON 有效负载包含 UID、客户姓名、电话号码、城市、国家和航班详细信息。
我们现在可以通过以下命令运行脚本来生成消息:python3 producer.py. --namespace '<your_namespace>' --connection '<your_connection_string>' --topic '<your_topic>' --messages '<message_amount>' --time '<wait_time>'
。
我们可以看到已生成并发送到 Event Hub 的消息的 JSON 结构。如果我们打开 Event Hub 并查看概览屏幕,应该会在请求和消息图表中看到消息激增。
这表明我们现在正在使用标准的 Kafka 应用程序将消息推送到 Event Hubs。在我们的 Event Hub 中,目前没有任何方法可以从管道检索消息或以任何方式处理它们。一种潜在的方法是使用左侧菜单中的捕获功能,使我们能够在存储帐户上保存和查看消息。相反,我们在生产场景中创建了几个使用者应用程序来处理消息。
后续步骤
虽然此应用程序仅创建和发送人为构造的数据,但它演示了修改几乎任何使用 Kafka 协议的应用程序的简便性。通常,只需更改配置文件以使用 SASL 并指向 Event Hubs 实例即可。
目前,虽然我们的应用程序会将消息发送到管道,但我们没有消费它们。在 下一篇文章中,我们将开发几个 Python Azure Functions 来处理我们收到的有效负载,并将数据保存到 Cosmos DB 实例。
如果您想了解更多关于 Azure Cosmos DB 的信息,或分享您所知道的关于 Azure Cosmos DB 的信息,请查看 Azure Cosmos DB Conf;这是一个与 Azure Cosmos DB 社区合作举办的免费在线虚拟开发者活动。
要了解如何加速基础应用程序开发,请查看开发人员的低代码应用程序开发 7 步指南。