65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Kafka 和 Azure HDInsight 进行大规模流式传输(第三部分):使用 PySpark 在 HDInsight 上分析流数据

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2022 年 4 月 29 日

CPOL

5分钟阅读

viewsIcon

4202

在本篇文章中,我们将使用 PySpark 来流式处理和分析我们的数据。

本系列的第一篇文章介绍了如何在 HDInsight 上构建 Kafka 集群。然后,第二篇文章探讨了如何向示例应用程序添加数据。本文将介绍如何使用 Jupyter Notebook 中的 PySpark 将该数据流式传输到 Spark。此 Notebook 和其他所需文件可在此处找到。

必备组件

与前一篇文章一样,本次演示将通过安全外壳协议 (SSH) 接口获取主机信息。

在浏览器中,导航到集群中的 Jupyter 引擎。输入之前设置的管理员配置文件的登录凭据。

值得注意的是,Notebook 的输出与命令行输出不同。

相同的代码会显示两次。首先,它出现在 Jupyter Notebook kafka_streaming.ipyn 中。然后,它出现在 Python 代码文件 kafka_streaming.py 中。每个文件都执行相同的功能,使用相同的代码。

接下来,将 Python 脚本上传到主 Spark 集群节点,并使用以下命令执行命令行脚本

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 kafka_streaming.py

要访问 Jupyter Notebook,请在浏览器中启动环境。重要的是将 CLUSTERNAME 替换为 Spark 集群名称。HDInsight Spark 上的 Jupyter 设置是自动的,并且在设置时已配置好。URL 如下:

https://CLUSTERNAME.azurehdinsight.net/jupyter

除了下载 Notebook,还可以创建一个新的 PySpark Notebook。

现在,使用以下命令将 Notebook 链接到必要的 Java 软件包

%%configure -f
{
    "Conf":	{"spark.jars.packages":
               "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"
}

-f 选项会重启 Spark 会话。如果魔术命令在第一个单元格中运行,则可以省略此选项。但是,请注意,如果重新执行此命令,所有预先运行的数据都将被擦除。

在撰写本文时,上述配置工作正常。但是,软件环境变化非常快。因此,下一个单元格包含此命令

sc.version

这可以确定上述正确的 Spark 版本。

有时,可能会出现找不到库(本地和 HDInsight 上)的错误。幸运的是,通常可以通过在命令提示符中使用以下命令来解决。在 HDInsight 上,这需要通过 SSH 连接到 Spark 集群。

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

这似乎在查找和检索库方面更成功。

配置完成后,就可以执行 Notebook 的其余部分了。

首先,设置 kafkaBroker 主机和主题连接数据

kafkaBrokers = "YOUR_MAIN_KAFKA_BROKER"
kafkaTopic = "SensorReadings"
print("Done!")

处理此代码会重启或重新运行单元格。确保只有一个 Spark 会话在执行。

为了避免无意中启动第二个会话,请使用以下函数创建一个全局单例

def getSparkSessionInstance():
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .appName("StructuredStreaming") \
            .master("local[*]") \
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

然后,可以随时调用此函数。

以下单元格在必要时重新分配 Spark 会话、上下文和流式处理上下文。为了避免所有 INFOWARN 消息,将上下文设置为仅记录错误。

spark = getSparkSessionInstance()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)

下一个单元格连接到 kafkaBrokers 并将流从 Kafka 读取到数据帧中。但是,直到流写入器的 start 方法触发流式处理,否则什么都不会发生。

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafkaBrokers) \
    .option("subscribe", kafkaTopic) \
    .option("startingOffsets", "earliest") \
    .load()

在 Kafka 流中,键和值都是字节数组。将它们转换为字符串以供以后使用。此外,时间戳是长整数格式,稍后将转换为 Unix 时间戳格式。

目前,将 keyvalue 列转换为字符串在新数据帧中。

df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
                           "timestamp")

假设的传感器数据以 JSON 格式存储,如下所示

{
"sensor": "mass",
"machine": "mixer",
"units": "kg",
"time": 1646861863942,
"value": 144.174
}

下一步是提取必要的数据(返回的值)以供以后使用。为此,请定义一个模式来提取每个流式记录。下面的模式定义了结构。

value_schema = StructType([
        StructField("sensor",StringType(),True),
        StructField("machine",StringType(),True),
        StructField("units",StringType(),True),
        StructField("time", LongType(), True),
        StructField("value", DoubleType(), True)
])

在下一个单元格中,使用该模式将数据提取到另一个数据帧中。

这是一个方便的时候将 Kafka 式时间戳转换为 Unix 式时间戳。但是,只有在使用窗口流式处理时才需要。

首先,将时间戳转换为长整数,以秒为单位表示,然后将其转换为 Unix 时间戳。

Df2 = df1 \
  .withColumn("jsonData", from_json(col("value"), value_schema)) \
  .withColumn("timestamp", (col("timestamp").cast("long")/1000).cast("timestamp")) \
 .select("key", "jsonData.*", "timestamp")

环境使用两种输出形式。

分组形式显示每个传感器的平均值。此选项列出了传感器和所有读数的平均值。随着更多记录的流式传输,该过程会调整平均值。在实际环境中,这不太有用。随着读数数量的增加,异常值对平均值的影响会减小。在使用平均值跟踪机器性能时,这可能是有害的。

相比之下,窗口形式更为现实。根据窗口值,这会显示一长串值。但是,对于在 Jupyter Notebook 或命令行中显示,较短的列表更可取。

outStream = streamingDataFrame. \
    groupBy("key"). \
    avg("value")

对于窗口模式,使用以下行并根据需要调整窗口。

outStream = streamingDataFrame. \
    withWatermark("timestamp", "1 minute"). \
    groupBy(window("timestamp", "10 minutes", "5 minutes"), "key"). \
    groupBy("key"). \
    avg('value')

然后,将结果流式传输到控制台接收器。下面的命令启动流式处理过程。完成后,该命令将等待来自 Kafka 的进一步流式数据。

query = outStream.writeStream. \
    outputMode("complete"). \
    option("numRows", 100). \
    option("truncate", "false"). \
    format("console"). \
    start(). \
    awaitTermination()

当生产者应用程序执行 Kafka 代理时,流式处理开始。

结果

结果有些平淡,但功能清晰明了。这是一个简单的接收器应用程序,处理或分析很少。能够一键设置两个集群并与之交互,这对资源匮乏的开发人员来说意义重大。

控制台应用程序将数据显示为每个传感器的平均值表。使用窗口代码时,输出显示时间窗口。Notebook 以列表和图形格式显示流式处理活动。

此外,执行生产者应用程序会将新批次流式传输到接收器,从而更新输出图。

结论

在 HDInsight 上设置和使用集群使原本繁琐的过程变得简单。集群同样易于预配和删除。所有虚拟机都存在于虚拟网络中。可以在没有本地管理员资源的情况下安装软件和配置服务器。

此外,PySpark Notebook 已准备好与 PySpark 内核一起使用,这已被证明非常方便。

总而言之,使用 HDInsight 进行基于云的 Kafka 开发代表了无缝且直观的下一步。

要了解有关 HDInsight 中的大规模流式处理以及 Azure HDInsight 中的 Apache Kafka 的更多信息,请查看这些Microsoft Azure 开发者社区资源

© . All rights reserved.