使用 Kafka 和 Azure HDInsight 进行大规模流式传输(第三部分):使用 PySpark 在 HDInsight 上分析流数据





0/5 (0投票)
在本篇文章中,我们将使用 PySpark 来流式处理和分析我们的数据。
本系列的第一篇文章介绍了如何在 HDInsight 上构建 Kafka 集群。然后,第二篇文章探讨了如何向示例应用程序添加数据。本文将介绍如何使用 Jupyter Notebook 中的 PySpark 将该数据流式传输到 Spark。此 Notebook 和其他所需文件可在此处找到。
必备组件
与前一篇文章一样,本次演示将通过安全外壳协议 (SSH) 接口获取主机信息。
在浏览器中,导航到集群中的 Jupyter 引擎。输入之前设置的管理员配置文件的登录凭据。
值得注意的是,Notebook 的输出与命令行输出不同。
相同的代码会显示两次。首先,它出现在 Jupyter Notebook kafka_streaming.ipyn 中。然后,它出现在 Python 代码文件 kafka_streaming.py 中。每个文件都执行相同的功能,使用相同的代码。
接下来,将 Python 脚本上传到主 Spark 集群节点,并使用以下命令执行命令行脚本
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 kafka_streaming.py
要访问 Jupyter Notebook,请在浏览器中启动环境。重要的是将 CLUSTERNAME
替换为 Spark 集群名称。HDInsight Spark 上的 Jupyter 设置是自动的,并且在设置时已配置好。URL 如下:
https://CLUSTERNAME.azurehdinsight.net/jupyter
除了下载 Notebook,还可以创建一个新的 PySpark Notebook。
现在,使用以下命令将 Notebook 链接到必要的 Java 软件包
%%configure -f { "Conf": {"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0" }
-f
选项会重启 Spark 会话。如果魔术命令在第一个单元格中运行,则可以省略此选项。但是,请注意,如果重新执行此命令,所有预先运行的数据都将被擦除。
在撰写本文时,上述配置工作正常。但是,软件环境变化非常快。因此,下一个单元格包含此命令
sc.version
这可以确定上述正确的 Spark 版本。
有时,可能会出现找不到库(本地和 HDInsight 上)的错误。幸运的是,通常可以通过在命令提示符中使用以下命令来解决。在 HDInsight 上,这需要通过 SSH 连接到 Spark 集群。
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
这似乎在查找和检索库方面更成功。
配置完成后,就可以执行 Notebook 的其余部分了。
首先,设置 kafkaBroker
主机和主题连接数据
kafkaBrokers = "YOUR_MAIN_KAFKA_BROKER" kafkaTopic = "SensorReadings" print("Done!")
处理此代码会重启或重新运行单元格。确保只有一个 Spark 会话在执行。
为了避免无意中启动第二个会话,请使用以下函数创建一个全局单例
def getSparkSessionInstance():
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.appName("StructuredStreaming") \
.master("local[*]") \
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
然后,可以随时调用此函数。
以下单元格在必要时重新分配 Spark 会话、上下文和流式处理上下文。为了避免所有 INFO
和 WARN
消息,将上下文设置为仅记录错误。
spark = getSparkSessionInstance()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
下一个单元格连接到 kafkaBrokers
并将流从 Kafka 读取到数据帧中。但是,直到流写入器的 start
方法触发流式处理,否则什么都不会发生。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", kafkaTopic) \
.option("startingOffsets", "earliest") \
.load()
在 Kafka 流中,键和值都是字节数组。将它们转换为字符串以供以后使用。此外,时间戳是长整数格式,稍后将转换为 Unix 时间戳格式。
目前,将 key
和 value
列转换为字符串在新数据帧中。
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
"timestamp")
假设的传感器数据以 JSON 格式存储,如下所示
{
"sensor": "mass",
"machine": "mixer",
"units": "kg",
"time": 1646861863942,
"value": 144.174
}
下一步是提取必要的数据(返回的值)以供以后使用。为此,请定义一个模式来提取每个流式记录。下面的模式定义了结构。
value_schema = StructType([
StructField("sensor",StringType(),True),
StructField("machine",StringType(),True),
StructField("units",StringType(),True),
StructField("time", LongType(), True),
StructField("value", DoubleType(), True)
])
在下一个单元格中,使用该模式将数据提取到另一个数据帧中。
这是一个方便的时候将 Kafka 式时间戳转换为 Unix 式时间戳。但是,只有在使用窗口流式处理时才需要。
首先,将时间戳转换为长整数,以秒为单位表示,然后将其转换为 Unix 时间戳。
Df2 = df1 \ .withColumn("jsonData", from_json(col("value"), value_schema)) \ .withColumn("timestamp", (col("timestamp").cast("long")/1000).cast("timestamp")) \ .select("key", "jsonData.*", "timestamp")
环境使用两种输出形式。
分组形式显示每个传感器的平均值。此选项列出了传感器和所有读数的平均值。随着更多记录的流式传输,该过程会调整平均值。在实际环境中,这不太有用。随着读数数量的增加,异常值对平均值的影响会减小。在使用平均值跟踪机器性能时,这可能是有害的。
相比之下,窗口形式更为现实。根据窗口值,这会显示一长串值。但是,对于在 Jupyter Notebook 或命令行中显示,较短的列表更可取。
outStream = streamingDataFrame. \
groupBy("key"). \
avg("value")
对于窗口模式,使用以下行并根据需要调整窗口。
outStream = streamingDataFrame. \ withWatermark("timestamp", "1 minute"). \ groupBy(window("timestamp", "10 minutes", "5 minutes"), "key"). \ groupBy("key"). \ avg('value')
然后,将结果流式传输到控制台接收器。下面的命令启动流式处理过程。完成后,该命令将等待来自 Kafka 的进一步流式数据。
query = outStream.writeStream. \
outputMode("complete"). \
option("numRows", 100). \
option("truncate", "false"). \
format("console"). \
start(). \
awaitTermination()
当生产者应用程序执行 Kafka 代理时,流式处理开始。
结果
结果有些平淡,但功能清晰明了。这是一个简单的接收器应用程序,处理或分析很少。能够一键设置两个集群并与之交互,这对资源匮乏的开发人员来说意义重大。
控制台应用程序将数据显示为每个传感器的平均值表。使用窗口代码时,输出显示时间窗口。Notebook 以列表和图形格式显示流式处理活动。
此外,执行生产者应用程序会将新批次流式传输到接收器,从而更新输出图。
结论
在 HDInsight 上设置和使用集群使原本繁琐的过程变得简单。集群同样易于预配和删除。所有虚拟机都存在于虚拟网络中。可以在没有本地管理员资源的情况下安装软件和配置服务器。
此外,PySpark Notebook 已准备好与 PySpark 内核一起使用,这已被证明非常方便。
总而言之,使用 HDInsight 进行基于云的 Kafka 开发代表了无缝且直观的下一步。
要了解有关 HDInsight 中的大规模流式处理以及 Azure HDInsight 中的 Apache Kafka 的更多信息,请查看这些Microsoft Azure 开发者社区资源。