使用 Azure Synapse Analytics 进行无缝流式分析(第 3 部分):创建数据流






4.64/5 (3投票s)
在本文中,我们将学习如何设置流式数据源来填充专用 SQL 池。
在讨论了 Azure Synapse Analytics 的功能并创建了工作区之后,我们又探索了创建专用 SQL 池。现在,我们需要用数据填充这个池。
Azure Synapse Analytics 最突出的功能之一是它对 Spark 的支持。这种支持使得使用 Python、Scala 和 SQL 等流行语言处理实时数据和数据流成为可能。
在 Azure Synapse Analytics 中,有许多方法可以摄取和处理流式数据,但在这里我们将重点关注 Spark Structured Streaming。我们将探讨它是什么,创建 Spark 池,准备数据,创建流式管道和复制数据管道,并探索我们的纽约出租车数据。
对于本教程,您应该拥有一个 Azure 帐户,一个工作区(我们在第一篇文章中创建),以及一个专用 SQL 池(我们在第二篇文章中创建)。熟悉 PySpark 会有所帮助,但我们会尽量保持简单。
什么是 Spark Structured Streaming?
Spark Structured Streaming 是一个构建在 Spark SQL 引擎之上的流处理引擎。它具有可伸缩性和容错性。它能够从各种数据源(如存储文件、Azure 事件中心等)摄取实时数据。在底层,它使用可自定义的微批处理来处理延迟低至 1 毫秒的流式数据。
使用 Spark Structured Streaming,我们可以维护日志并跟踪数据源中的任何更改。它还使我们能够使用最少的代码构建和部署完整的提取、转换和加载 (ETL) 管道。Spark Structured Streaming 的另一个值得注意的特性是,它的 DataFrame API 允许用户应用传统的 SQL 转换。
Spark Structured Streaming 可与 Azure Synapse Spark 池配合使用。在开始构建数据流之前,让我们快速熟悉一下 Spark 池。
什么是 Spark 池?
Azure Synapse Analytics 中的 Spark 支持已证明是其数据探索功能的一个绝佳补充。我们可以使用 Python、Scala、.NET、R 等语言来探索和处理 Azure Synapse Analytics 存储中的数据。
要充分利用 Spark,我们需要创建一个 Spark 池。
创建 Spark 池
我们可以通过 Azure 门户或 Azure Synapse Studio 创建 Spark 池。在这里,我们将从 Synapse Studio 内构建我们的 Spark 池。
在 Azure Synapse Studio 中,我们转到 **管理** 选项卡,然后选择 **Apache Spark 池**。目前没有可用的池,所以我们只需创建一个。该过程与我们之前创建 SQL 池的过程类似。首先,单击 **新建**。
接下来,我们为 Spark 池命名(TaxiFare)并选择基本设置。我们选择节点大小(小、中、大、特大或超特大)、启用或禁用自动缩放,以及选择节点数量。我们必须在预算和性能之间进行权衡。
接下来,我们转到 **其他设置**。
在此页面上,我们可以进一步自定义 Spark 池。我们可以启用或禁用自动暂停,并指定在没有活动作业时 Spark 应等待多长时间才能关闭群集。
我们还可以选择要使用的 Apache Spark 版本。在撰写本教程时,Apache Spark 3.0 仍处于预览状态。
我们还可以使用配置文件配置 Apache Spark,并指定任何其他其他池属性。
为简单起见,我们将对其他字段使用默认设置。在查看设置后,我们单击 **创建**。
在 Spark 池成功部署后,我们就能在 **Apache Spark 池** 下看到它。
有了 Spark 池,现在让我们开始构建我们的数据流。
准备示例数据
如前所述,Spark Structured Streaming 允许用户以流式方式从指定的存储文件读取数据。在这里,我们将使用 Microsoft 的开源纽约出租车和豪华轿车委员会 - 黄色出租车行程记录生成一些数据。我们将构建一个流式管道来加载和处理数据,然后将其写回另一个存储位置。
本文使用 PySpark,因此熟悉 PySpark 开发环境是有益的。不过,我们将保持简单易懂。
创建 PySpark Notebook 并加载数据
要开始准备我们的示例数据,我们首先创建一个 PySpark Notebook。我们转到 Azure Synapse Studio,转到 **开发** 选项卡,然后单击 **+** 和 **Notebook** 来创建一个新的 PySpark Notebook。
在我们运行任何 Notebook 单元格之前,必须将 Notebook 附加到 Spark 池。我们还必须确保选择 PySpark(Python)作为语言。
接下来,我们将 Notebook 附加到我们之前创建的 Spark 池。现在,我们已准备好继续开发。
让我们先导入所需的函数和数据集。我们将从 `NycTlcYellow` 库创建数据,并显示 DataFrame 的前十条记录
# importing dataset and functions from azureml.opendatasets import NycTlcYellow from pyspark.sql.functions import * data = NycTlcYellow() # converting the data to data frame df = data.to_spark_dataframe() # Display 10 rows display(df.limit(10))
让我们也看一下它的模式
# Display schema Df.printSchema()
这里,了解条目的总数很有帮助。我们使用此信息来决定是否可以将此数据集用于流式传输目的。
df.count()
数据有近 5300 万行,数据量足够。将其写入另一个位置需要一些时间,因此创建数据流是更好的选择。让我们玩转这些数据,看看能否对其进行一些转换。
转换数据
数据中包含许多我们不关心的不必要行。我们只想知道行程的持续时间和费用。为此,我们只需要几列:`tpepPickupDateTime`、`tpepDropoffDateTime` 和 `fareAmount`。
我们可以删除其余列,或者仅选择我们需要的列,如下所示
df = df.select(col("vendorID"),col("tpepPickupDateTime"),col("tpepDropoffDateTime"),col("fareAmount"))
如果我们想知道特定日期的行程费用是多少?由于我们已经有了上车和下车的时间和日期,我们可以利用这些信息来获取日期。让我们看看如何做到这一点。
首先,如下创建日期列
df = df.withColumn("Date", (col("tpepPickupDateTime").cast("date")))
在上面的代码行中,我们将 `tpepPickupDateTime` 列强制转换为日期数据类型。
在下一步中,我们想从该列派生新列(年和月),因为这将使我们更容易稍后进行聚合
# Derive different columns from casted date column aggdf = df.withColumn("year", year(col("date"))) .withColumn("month", month(col("date"))) .withColumn("day", dayofmonth(col("date"))) .withColumn("hour", hour(col("date")))
我们可以将此数据保存到任何底层存储
# Saving data as csv file df.repartition(1).write.option("header","true").csv("/Synapse/Streaming/csv/processsed.csv")
除了 CSV 和 Parquet 等传统文件存储格式外,Azure Synapse Analytics 还支持 Delta 格式的数据存储。我们可以选择任何这些格式来存储数据,但 Delta 提供了更多的灵活性,并增加了一层高级功能。
使用此代码将转换后的数据以 Delta 格式存储
delta_path='/Synapse/Streaming/Processed/NyTaxiFare' df.write.format('delta').save(delta_path)
构建流式管道
我们对流式传输感兴趣,然后从存储位置读取数据。当应用程序读取数据时,我们可以处理和转换数据,然后将其写回另一个存储位置。
下面的代码以流式方式读取我们刚刚存储的数据,重命名一个列,计算行程持续时间,然后将选定的列保存回指定的存储空间。
# path to store back the data delta_processed='/Synapse/Streaming/Processed/NYTaxiFare' # path to store the checkpoint to keep the logs delta_checkpoint='/Synapse/Streaming/Processed/NYTaxiFareCheckpoint' # read the previously stored data in streaming manner streaming_data=(spark.readStream.format('delta').load(delta_path) # rename the column .withColumnRenamed('fareAmount','tripCost') # Calculate the trip duration in minutes using drop off and pickup time .withColumn('TripDuration',round((col('tpepDropoffDatetime').cast('float')-col('tpepPickupDatetime').cast('float'))/60)) .selectExpr('Date','TripDuration','tripCost') .writeStream.format('delta') .option('mode','overwrite') .option('checkpointLocation',delta_checkpoint) .start(delta_processed))
尽管我们的数据包含数百万行,但查询执行速度很快,因为 Spark 会在后台异步运行流。有几种方法可以检查我们的流是否仍在运行
# isActive should return true if stream is running print(streaming_data.isActive)
# Checking the status of the data stream. print(streaming_data.status)
Azure Synapse Analytics 还允许我们在流式数据之上创建 Delta Lake 表。我们使用以下代码将数据保存到 Delta Lake 表
%%sql CREATE TABLE NewyorkTaxiFaree USING DELTA LOCATION '/Synapse/Streaming/Processed/NYTaxiFare'
在这里,我们可以充分利用 SQL 命令来查询表中的数据
%%sql select * from NewyorkTaxiFaree limit 100
%%sql select * from NewyorkTaxiFaree WHERE tripCost < 10
我们还可以将 Spark SQL 表数据加载到 DataFrame 中并显示它以确认已加载
# Load the data into SQL Pool result = spark.sql("SELECT TripDuration, tripCost FROM NewyorkTaxiFare") # display the loaded data display(result)
创建复制数据管道
我们的数据已就位,但如果我们想将数据复制到 SQL 池怎么办?Azure Synapse Analytics 已满足您的需求。它允许我们创建一个数据管道,该管道可以将数据从底层存储复制到 SQL 池。
要创建复制管道,我们首先转到 Synapse Studio。我们转到 **集成** 选项卡,然后单击 **+** ** 复制数据工具** 来创建复制数据管道。
在下一个屏幕上,我们将管道命名为“CopyDataToSQLPool”,如果需要,还可以添加任务描述。我们还可以选择一次性运行任务、计划运行任务或设置滚动窗口触发器。对于本文,我们将运行一次。
我们将数据存储在工作区的默认存储中,因此我们选择它作为数据源。
在下一个选项卡中,我们选择要加载的文件。在这里,我们可以选择 CSV、Parquet 或任何其他支持的文件存储格式。我们将上传处理后的 Delta 文件。
在下一个选项卡中,我们选择目标数据存储(NYTaxiFare)。
我们想将数据移动到专用 SQL 池,因此我们将选择它。
现在我们单击 **使用现有表**,选择我们之前创建的表,然后移至下一个选项卡。
接下来,我们从选项中选择要加载的表。
在下一个选项卡中,我们将找到表的列映射。我们需要正确映射列,因此请仔细审查映射。在运行管道之前,我们必须修复任何问题,否则我们将不得不处理后续的混乱。
现在,我们选择要应用的任何其他设置,例如容错或日志记录,然后单击 **下一步 >**。
接下来,我们查看并完成管道,然后单击 **下一步 >**。
现在我们部署管道。部署成功后,我们就准备好完成并运行管道。为此,我们单击 **完成**。
当管道成功运行时,数据将加载到相应的表中。
我们可以查询 SQL 池来验证管道是否已成功传输数据。
请注意,我们应确保拥有必要的读写权限。否则,我们很可能会遇到 FailedDbOperation 错误
Operation on target Copy_vw4 failed: ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Please make sure SQL DW has access to ADLS Gen2
此外,我们的防火墙应允许 Azure 服务和资源访问 Synapse 工作区。我们可以在 Azure 门户中编辑防火墙设置。从我们的工作区,我们单击 **显示防火墙设置**。
在那里,我们打开权限。
后续步骤
在本系列文章中,我们向您介绍了 Azure Synapse Analytics 及其一些最佳功能。我们重点介绍了使用 Azure Synapse Analytics 进行流式分析,从创建 Azure Synapse 工作区到创建流式数据管道以及设置复制管道将流式数据保存到专用 SQL 池。
我们学习了如何利用 Azure Synapse Analytics 来完成所有这些工作。但这只是一个开始。一旦我们的数据进入专用 SQL 池,我们就可以创建交互式仪表板来分析数据。我们还可以将数据与 Azure Machine Learning、Microsoft Power BI 以及许多第三方服务和集成连接起来,以帮助我们进行大数据分析。使用您的 Azure 订阅和免费积分来探索这些功能。
要了解更多信息,请立即注册以观看 Azure Synapse Analytics 实战培训系列。