65.9K
CodeProject 正在变化。 阅读更多。
Home

Azure Synapse Analytics 入门(第二部分):数据准备与管理

2021 年 6 月 30 日

CPOL

7分钟阅读

viewsIcon

6803

在本文中,我们将探讨 Azure Synapse Analytics 如何帮助进行数据准备和管理,从而无需自定义提取、转换和加载 (ETL) 代码。

数据科学家和商业智能 (BI) 团队的工作涉及理解来自业务流程的数据。然而,要将这些非结构化数据的海洋转化为可供使用的有价值的见解,我们需要一个数据仓库平台和一组编程工具。

构建数据仓库的传统方法是将集成过程连接到多个源。它采用三步法将数据加载到数据仓库中,称为提取、转换和加载 (ETL)。在 ETL 中,我们在加载数据到系统之前,在暂存区域转换数据。

数据科学家和 BI 团队通常在本地使用 ETL,处理关系型和结构化数据。ETL 不支持数据湖操作。将数据加载到 Azure Synapse Analytics 中时,我们应该使用不同的方法。

在本系列的第一篇文章中,我们探讨了 Azure Synapse Analytics 及其一些功能。本系列的第二篇文章探讨了如何在 Azure Synapse Analytics 中提取、转换和加载数据。它还强调了其灵活的语言支持如何帮助开发人员为 BI 团队和数据科学家提供强大的数据解决方案。

ETL 与 ELT

通过提取、加载和转换 (ELT),来自数据源的原始数据直接加载到目标系统,然后才进行转换。ELT 适用于云中的结构化和非结构化数据源。因此,它比 ETL 更适合处理海量数据集并为商业智能和数据分析做准备。

使用 ELT,我们首先将源数据提取为落地 Azure 的文件格式。通常,这种格式是带分隔符的文本文件 (CSV)。

我们可以使用多种加载方法之一,包括批量复制程序 (BCP) 和 SqlBulkCopy API,它们在 SQL Server 开发人员中很受欢迎。然而,在 Azure Synapse Analytics 的大数据和云环境中,PolyBase 外部表和 COPY 语句使我们能够更快地加载数据并实现更高的可伸缩性。

Polybase 有效地允许多个节点并行导入数据。在 Azure Data Factory (ADF) 中,我们可以使用 PolyBase 定义一个管道,将数据从 Azure Synapse Analytics 表传输到 Databricks 数据帧,或从 Databricks 数据帧传输回 Azure Synapse Analytics 表。

我们可以在 Azure Data Factory 中使用 COPY 语句将数据加载到 Azure Synapse Analytics 中。

通过 PolyBase 和 COPY 语句,我们可以通过 T-SQL 语言访问存储在 Azure Blob 存储或 Azure Data Lake Store 中的外部数据。为了在加载时获得最大的灵活性,请使用 COPY 语句。

专用 SQL 池

专用 SQL 池(以前称为 SQL Data Warehouse (DW))是指 Azure Synapse Analytics 中的企业数据仓库功能。开发人员可以利用 T-SQL 的强大功能来构建数据定义语言 (DDL) 和数据建模语言 (DML) 语句,以在其数据仓库中针对专用 SQL 池执行关系数据库操作。

与 SQL Server 一样,CREATE TABLE AS SELECT (CTAS) 语法有助于从 SELECT 语句的结果快速在 Azure Synapse Analytics 的专用 SQL 池中创建新表。CTAS 作为并行操作运行,您可以指定数据分布和表结构类型。

这是一个简单的示例,说明如何使用 CTAS 和循环选项从现有表创建新表,循环选项可以快速将行随机分布到所有分布中

CREATE TABLE [dbo].[FactInvoices_new] 
WITH
( 
DISTRIBUTION = ROUND_ROBIN 
,CLUSTERED COLUMNSTORE INDEX 
) 
AS 
SELECT  * 
FROM    [dbo].[FactInvoices];

之前的 FactInvoices 事实表包含最初在事务系统中创建的数据,然后加载到专用 SQL 池中进行分析。

以下示例使用 CTAS 根据两个表为每个产品创建包含总销售额和折扣的表

CREATE TABLE [dbo].[ShipmentsPerCustomer]  
WITH  
(  
    DISTRIBUTION = ROUND_ROBIN,  
    CLUSTERED COLUMNSTORE INDEX  
)
AS  
SELECT  
c.CustomerID, 
c.Name AS CustomerName,   
SUM(s.Value) as TotalValue, 
SUM(s.Weight) as TotalWeight 
FROM [dbo].[Customer] AS c 
INNER JOIN [dbo].[Shipment] AS s 
ON c.CustomerID = s.CustomerID 
GROUP BY c.CustomerID;

Azure Blob 存储

组织必须处理存储在各种文件中的数据。尽管存储和管理文档、图像、视频、日志或数据库文件具有挑战性,但 Azure Blob 存储可以提供帮助。

Azure Blob 存储中的 Blob 代表二进制大型对象,可以是任何类型的内容。Azure Blob 存储帮助组织以低成本在云中管理多种类型的非结构化文本或二进制文件。它使用 Azure Active Directory (AD) 的用户访问控制来保护这些文件。

有多种方法可以将内容上传到 Azure Blob 存储,包括 AzCopy 和 Azure CLI。

Azure Blob 存储与其他 Azure 平台产品集成良好。例如,我们可以将 Azure Blob 存储与 Azure 内容分发网络 (CDN) 结合使用,将内容从最近的数据中心定向到用户。这可以减少加载时间并提高响应能力。此外,为了提高应用程序性能和增强用户体验,我们可以配置 Azure 搜索来索引 Azure Blob 存储中的多种文件类型。

我们可以通过 Azure 存储 REST API 从任何可以发出 HTTP 请求的设备访问 Azure Blob 存储中的数据。我们还可以通过 Azure 门户以及使用 Azure PowerShell 和 Azure CLI 从命令行访问 Azure Blob 存储。

对于 .NET、Java、Python 和 Node.js 等平台,我们可以使用 Azure 存储客户端库,它封装了 REST API 调用,可以从我们的应用程序访问 Azure Blob 存储。

以下 Java 代码示例创建了一个存储容器,并将本地文件上传到此存储容器。由于 Azure Blob 存储针对存储大量非结构化数据进行了优化,我们可以使用此代码来移动包含图像、视频、音频、日志和其他类型的非结构化数据的文件

String sasToken = "<your-sas-token>"; 
/* Create a new BlobServiceClient with a SAS Token */ 
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder() 
    .endpoint("https://your-storage-account-url.storage.windows.net") 
    .sasToken(sasToken) 
    .buildClient(); 
  
/* Create a new container client */ 
try { 
    containerClient = blobServiceClient.createBlobContainer("sample-container"); 
} catch (BlobStorageException ex) { 
    // The container may already exist, so don't throw an error 
    if (!ex.getErrorCode().equals(BlobErrorCode.CONTAINER_ALREADY_EXISTS)) { 
        throw ex; 
    } 
} 
  
/* Upload the file to the container */ 
BlobClient blobClient = containerClient.getBlobClient("sample-blob"); 
blobClient.uploadFromFile("sample-file");

Azure Data Lake Store Gen2

Azure Data Lake Storage Gen2 是一个高效的数据湖,专门用于在 Azure 上运行大数据分析,同时管理海量数据。Azure Data Lake Storage Gen2 构建在 Azure Blob 存储之上,融合了其所有优点,包括文件系统语义、文件级安全性和可扩展性。此外,它还具有低成本、高可用性和弹性功能。这些附加功能进一步降低了在 Azure 上运行大数据分析的总拥有成本。

在 Azure Synapse Analytics 和 Azure Databricks 之间读取、转换和移动数据

为了弥合数据湖和数据仓库之间的差距,Azure 有自己的 Databricks 实现,称为 Azure Databricks。这个云工具使 Azure Synapse Analytics 能够探索、准备、训练和转换数据。

为了演示概念、获取见解并验证我们的想法,让我们使用 Azure Synapse Studio 笔记本进行数据实验。笔记本是一种可读的、人性化的文档,我们可以通过自由添加文本块和代码片段来创建它。Synapse 笔记本支持四种 Apache Spark 语言:pySpark (Python)、Spark (Scala)、SparkSQL 和用于 Apache Spark 的 .NET (C#)。

让我们探索使用 Scala 语言访问 Azure Blob 存储,读取表,执行 SQL 查询,并将数据写回另一个表。

首先,我们在笔记本会话配置中设置 Azure Blob 存储帐户访问密钥

spark.conf.set( 
  "fs.azure.account.key.<storage-account-name>.blob.core.windows.net", 
  "<storage-account-access-key>")

接下来,我们设置 Java 数据库连接 (JDBC) 连接字符串和 BLOB 连接字符串,从 Azure Synapse 查询中获取一些数据,并使用 spark.read 函数将结果放入 Spark DataFrame

val df: DataFrame = spark.read 
  .format("com.databricks.spark.sqldw") 
  .option("url", "jdbc:sqlserver://<connection-string>") 
  .option("tempDir", "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>") 
  .option("forwardSparkAzureStorageCredentials", "true") 
  .option("query", "select OrderId, count(*) as itemCount from FactOrderDetails group by OrderId") 
  .load()

最后,我们使用 spark.write 函数将转换后的数据写回 Azure Synapse 表

df.write
  .format("com.databricks.spark.sqldw") 
  .option("url", "jdbc:sqlserver://<connection-string>") 
  .option("forwardSparkAzureStorageCredentials", "true") 
  .option("dbTable", "<table-name>") 
  .option("tempDir", "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>") 
  .save()

结论

我们已经探讨了 Azure Synapse Analytics 如何使用 ELT 作为 ETL 的替代方案来提取、加载和转换数据。它提前执行加载步骤,并允许数据转换在数据仓库内部进行。

我们可以在 Azure Synapse Analytics 中使用无代码或支持的语言(包括 T-SQL、Java、Scala、Python 和 C#)来摄取、准备和管理数据。Azure Synapse Analytics 灵活的语言支持使开发人员能够利用其编程技能为 BI 和数据科学团队提供数据解决方案,以回答业务问题。

要了解更多信息,请继续阅读本系列的第三篇也是最后一篇文章,探讨数据科学和商业智能团队如何使用 Azure Synapse Analytics 数据来深入了解业务流程。

此外,您还可以通过参加 Microsoft 的Azure Synapse Analytics 动手培训系列,了解更多关于如何使用 Azure Synapse Analytics 推动商业智能和机器学习的信息。由 Azure Synapse 工程团队成员主讲,每集都提供 Azure Synapse 动手培训。

© . All rights reserved.