65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Kafka 和 Azure HDInsight 进行大规模流式传输(第二部分):将流数据传输到 HDInsight 上的 Kafka

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2022 年 4 月 28 日

CPOL

4分钟阅读

viewsIcon

3325

在本文中,我们将探讨如何将数据流式传输到 Kafka。

本系列的第一篇文章探讨了在 Azure HDInsight 上设置 Kafka 群集的简单过程。

本文探讨了创建和部署 Kafka 生产者(其将在 Python 生产者脚本中模拟来自工厂传感器的数据流)的过程。模拟流还可以根据需要进行调整和重新提交。

生产者

创建应用程序的第一步是定义机器和传感器信息。之后,将配置 Kafka 代理列表和主题名称。可以使用安全外壳协议 (SSH) 连接到群集来访问 Azure 上的代理名称。有关此群集的 SSH 凭据,请参阅上一篇文章。

或者,可以使用 PowerShell 通过 SSH 连接创建主题。虽然此演示仅概述相关步骤,但 Microsoft 提供了有关使用 PowerShell 进行 Azure 操作的完整教程

要使用此方法,首先需要安装 Az PowerShell 模块,该模块已包含在门户中可访问的云 shell 中。如果从本地计算机使用 SSH,请确保已安装PowerShellAz 模块

首先,连接到群集的第一个主节点,并将 CLUSTERNAME 替换为已创建的 Kafka 群集名称。此外,如果默认名称在设置过程中被更改,请替换 sshuser

ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net

如果尚未安装,请在主节点上安装 JSON 处理器 (jq)。这可能需要安装应用程序所需的任何 Python 库。

sudo apt -y install jq

接下来,设置 Kafka cluster name 环境变量。

read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME

然后,为主机设置 ZOOKEEPER 环境变量。

export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`

现在,输入在群集部署过程中创建的管理密码。

接下来,为主机设置 KAFKA_BROKER 环境变量。

export KAFKA BROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`

然后,获取代理列表。

echo '$KAFKABROKERS='$KAFKABROKERS

这应该会返回类似以下内容的结果:

<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

接下来,将这些值粘贴到 producer.py 文件中需要的位置。

这篇快速入门文章详细介绍了如何使用上面提供的信息在我们的 Kafka 群集中创建主题。使用以下命令执行此操作:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic SensorReadings --zookeeper $KAFKAZKHOSTS

上传应用程序

虽然有多种上传 Python 文件的方法,但通常最明智的做法是遵循阻力最小的路径。如果尚未下载,请从GitHub下载必要的文件。

在 shell 中,使用 nano 或其他首选编辑器创建文件。然后,复制并粘贴 producer.py 的内容。接下来,将 kafkaBrokers 值更改为上面返回的值。保存文件。

该应用程序使用 kafka-python 库。通过以下方式安装:

pip3 install kafka-python

测试生产者

运行快速测试以确认一切正常,方法是启动 Kafka 命令行使用者脚本。

<PATH>/kafka-console-consumer.sh --topic SensorReadings --from-beginning --bootstrap-server <paste your broker list here>

在撰写本文时,该路径是 /usr/hdp/2.6.5.3033-1/kafka/bin/

然后,使用者等待 Kafka 发送数据。

现在,打开第二个 SSH 控制台并执行生产者:

python3 producer.py

如果一切顺利,生产者会将一小部分数据发送到 Kafka。这些数据在使用者控制台中可见。

{"sensor": "mass", "machine": "mixer", "units": "kg", "time": 1646861863942, "value": 144.174}
{"sensor": "rotation", "machine": "mixer", "units": "rpm", "time": 1646861863942, "value": 65.009}
{"sensor": "fillpressure", "machine": "mixer", "units": "kPa", "time": 1646861863942, "value": 575.347}
{"sensor": "bowltemp", "machine": "mixer", "units": "DegC", "time": 1646861863942, "value": 25.452}
{"sensor": "ovenspeed", "machine": "oven", "units": "m/s", "time": 1646861863942, "value": 0.203}

有五个命令行选项用于以所需形式生成数据:

  • -l--loop_count 后面跟一个整数,指定要生成的周期数。生成的记录总数取决于其他选定的选项。
  • -s--sensor 后面跟一个传感器名称,指定要模拟的传感器。如果使用此选项,则无需使用机器名称选项,因为传感器是唯一的。机器名称纯粹是信息性的。
  • -r--rate 后面跟一个小数,指定每秒要发送的读数数量。这仅仅是每个周期发送数据之前的延迟。它不是非常精确,但足以满足其目的。
  • -m--machine 后面跟一个机器名称,指定要使用的机器。如果未使用 --sensor 选项,则该机器的所有传感器将同时触发。
  • -t--time 后面跟一个整数,指定运行秒数。使用时,该过程将忽略 loop_count 选项。

这些选项可以组合使用 run-scenario.sh shell 脚本作为起点来运行模拟。此脚本在后台模式下多次执行应用程序,模拟连续的数据交付,就像在工作环境中一样。

在脚本中,设置命令行选项以根据场景要求提供输入。

如下执行脚本会产生符合要求的 25 秒数据爆发:

./run-scenario.sh 25

它通过执行以下命令来实现这一点:

python3 producer.py -t 25 -r 0.2 -s mass &
python3 producer.py -t 25 -s rotation &
python3 producer.py -t 25 -r 10 -s FillPressure &
python3 producer.py -t 25 -r 10 -s BowlTemp &
python3 producer.py -t 25 -s OvenSpeed &
python3 producer.py -t 25 -r 1 -s ProveTemp &
python3 producer.py -t 25 -r 5 -s OvenTemp1 &
python3 producer.py -t 25 -r 5 -s OvenTemp2 &
python3 producer.py -t 25 -r 10 -s OvenTemp3 &
python3 producer.py -t 25 -r 2 -s CoolTemp1 &
python3 producer.py -t 25 -r 1 -s CoolTemp2 &
python3 producer.py -t 25 -s PackSpeed &
python3 producer.py -t 25 -r 10 -s PackCounter &

在没有命令行参数的情况下运行脚本会产生 15 秒的数据爆发。任何正整数都可以。

结论

此时,应该很清楚,在 HDInsight 上使用 Kafka 与在本地使用 Kafka 基本相同。无论是在旧的 Ubuntu PC 服务器上作为独立服务器运行,在我们的服务器机房中运行多服务器群集,还是在 Azure HDInsight 上运行,方法都不会改变。

此外,使用此生产者或从数据库流式传输数据之间没有区别。对于本地和 Azure Kafka 实现,过程是相同的。

现在,剩下要做的就是流式传输和分析数据。要探索此过程,请继续阅读本系列的最后一篇文章,其中将使用 PySpark 来完成此任务。

要了解有关 HDInsight 上大规模流式处理以及 Azure HDInsight 上的 Apache Kafka 的更多信息,请查看这些Microsoft Azure 开发人员社区资源

© . All rights reserved.