65.9K
CodeProject 正在变化。 阅读更多。
Home

制作一个简单的数据流水线 - 第 1 部分:ETL 模式

starIconstarIconstarIconstarIconstarIcon

5.00/5 (1投票)

2022年2月4日

CPOL

7分钟阅读

viewsIcon

7120

安排 Python 和 SQL 脚本,以使您的数据集在 Postgres 数据库中保持干净和最新。

想自己试试吗?首先,注册bit.io以立即获得免费的Postgres数据库访问权限。然后克隆GitHub仓库并亲自尝试!

问题所在

公开和私有数据源比比皆是,但同时也存在问题

  1. 源数据可能频繁更新,但在使用前需要大量准备。
  2. 可能存在已准备好的次级数据源,但它们可能过时且缺乏数据溯源
  3. 具有异构来源和格式的多个数据源可能需要为特定应用程序进行集成

幸运的是,有一种通用的计算模式可以缓解这些问题,并将数据放在正确的位置和格式以供使用:**“提取、转换、加载”(ETL)**。

ETL的实现复杂性和健壮性各不相同,从在单台机器上调度简单的Python和Postgres脚本,到工业级的Kubernetes集群、Apache Airflow和Spark的组合。

在这里,我们将介绍一个简单的Python和Postgres实现,您可以快速上手。我们将一起 walkthrough 关键代码片段,完整的实现和文档可在此仓库中找到。

提取、转换、加载

根据维基百科

**提取、转换、加载(ETL)**是将数据从一个或多个源复制到目标系统的通用过程,该目标系统表示数据的不同于源的表示形式,或在与源不同的上下文中表示数据。

换句话说——我们使用ETL从一个或多个源**提取**数据,以便我们可以将其**转换**成另一种表示形式,然后**加载**到单独的目标。

我们不提供假设性的例子,而是直接通过一个实际问题进行演示——在Postgres数据库中保持美国县级COVID病例、死亡和疫苗接种的干净数据集。您可以在我们的公共Postgres数据库中看到最终产品。

计划好的ETL流程有助于我们将准备好的数据加载到通用数据库中,以便我们可以进一步连接、转换和访问数据以用于分析和机器学习等特定用例。

提取

第一个ETL步骤是将数据从一个或多个源**提取**到我们可以在转换步骤中使用的格式。我们将使用pandas进行转换步骤,因此我们的具体目标是将源数据提取到pandas DataFrame中。

我们将处理三个数据源

  1. 《纽约时报》对每日县级COVID病例和死亡人数的汇编(每天更新多次)
  2. CDC对每县疫苗接种统计(每日更新)
  3. 美国人口普查局五年美国社区调查估算的县级人口(每年更新)

前两个来源可通过直接CSV文件下载URL访问。人口普查数据可通过Web应用程序手动访问,也可通过API以编程方式访问。人口普查数据每年只更新一次,因此我们通过Web应用程序将CSV文件手动下载到本地目录(该文件在仓库中提供)。

我们使用下面的代码将这两种类型的CSV文件源提取到pandas DataFrame中。`csv_from_get_request` 使用Python的`requests`包处理URL下载,`csv_from_local` 处理本地CSV文件。

   1   """Provides extraction functions.
   2   Currently only supports GET from URL or local file.
   3   """
   4  
   5   import io
   6   
   7   import pandas as pd
   8   import requests
   9   
  10   
  11   def csv_from_get_request(url):
  12       """Extracts a data text string accessible with a GET request.
  13       Parameters
  14       ----------
  15       url : str
  16           URL for the extraction endpoint, including any query string
  17       Returns
  18       ----------
  19       DataFrame
  20       """
  21       r = requests.get(url, timeout=5)
  22       data = r.content.decode('utf-8')
  23       df =  pd.read_csv(io.StringIO(data), low_memory=False)
  24       return df
  25   
  26   
  27   def csv_from_local(path):
  28      """Extracts a csv from local filesystem.
  29       Parameters
  30       ----------
  31       path : str
  32       Returns
  33       ----------
  34       DataFrame
  35       """
  36       return pd.read_csv(path, low_memory=False)

将数据**提取**到DataFrame后,我们就可以进行数据转换了。

变换

在第二步中,我们将从提取步骤中的pandas DataFrame**转换**为新的DataFrame以供加载步骤使用。

数据转换是一个广泛的过程,可以包括处理缺失值、强制执行类型、过滤到相关子集、重塑表、计算派生变量等等。

与提取和加载步骤相比,由于每个数据源的特殊性,我们不太可能为整个转换步骤重用代码。但是,我们当然可以(也应该)在可能的情况下对常见转换操作进行模块化和重用。

在这个简单的实现中,我们为每个数据源定义了一个转换函数。每个函数都包含一个简短的pandas脚本。下面我们展示了NYT县级COVID病例和死亡数据的转换函数。其他数据源的处理方式类似

"""Provides optional transform functions for different data sources."""

import pandas as pd


def nyt_cases_counties(df):
    """Transforms NYT county-level COVID data"""
    # Cast date as datetime
    df['date'] = pd.to_datetime(df['date'])
    # Store FIPS codes as standard 5 digit strings
    df['fips'] = df['fips'].astype(str).str.extract('(.*)\.', expand=False).str.zfill(5)
    # Drop Puerto Rico due to missing deaths data, cast deaths to int
    df = df.loc[df['state'] != 'Puerto Rico'].copy()
    df['deaths'] = df['deaths'].astype(int)
    return df

# Script truncated for Medium

我们在第9行和第14行强制执行数据类型,提取标准化的FIPS代码(第11行),以支持按县与其他数据源连接,并通过删除波多黎各(第13行)来处理缺失值。

将数据**转换**为新的DataFrame后,我们就可以加载到数据库了。

Load (加载)

在最后一个ETL步骤中,我们将转换后的DataFrame**加载**到一个通用目标,以便进行分析、机器学习和其他用例。在这个简单的实现中,我们将使用bit.io上的PostgreSQL数据库。

bit.io 是即时创建符合标准的Postgres数据库并将数据加载到一个地方的最简单方法(并且对于大多数业余规模用例都是免费的)。您只需注册(无需信用卡),按照提示“创建仓库”(您的私人数据库),然后按照“连接bit.io”文档获取新数据库的Postgres连接字符串。

注册后,您可以在几秒钟内创建一个私有的PostgreSQL数据库,并获取SQLAlchemy的连接字符串。

注意:您可以将以下代码与任何Postgres数据库一起使用,但数据库设置和连接将由您自己负责。

在建立了目标数据库之后,我们就可以开始 walkthrough 加载步骤的代码了。此步骤需要比其他步骤更多的样板代码来处理与数据库的交互。然而,与转换步骤不同的是,这段代码通常可以重用于每个pandas到Postgres的ETL过程。

加载步骤中的主要函数是`to_table`。此函数接收来自转换步骤的DataFrame(`df`),一个完全限定的目标表名(下一节中的示例),以及一个Postgres连接字符串`pg_conn_string`。

第18-12行验证连接字符串,解析完全限定表名中的模式(bit.io“仓库”)和表,并创建一个SQLAlchemy引擎。该引擎是一个对象,用于管理Postgres数据库的连接,支持自定义SQL和pandas SQL API。

第24-28行检查表是否已存在(截断的辅助函数`_table_exists`)。如果表已存在,我们将使用SQLAlchemy执行`_truncate table`(另一个截断的辅助函数),该函数会清除表中的所有现有数据,为新的加载做准备。

最后,在第30-39行,我们打开另一个SQLAlchemy连接,并使用pandas API通过快速的自定义插入方法`_psql_insert_copy`将DataFrame加载到Postgres。

   1   """Load pandas DataFrames to PostgreSQL on bit.io"""
   2   
   3   from sqlalchemy import create_engine
   4   
   5   def to_table(df, destination, pg_conn_string):
   6       """
   7       Loads a pandas DataFrame to a bit.io database.
   8       Parameters
   9       ----------
  10       df : pandas.DataFrame
  11       destination : str
  12           Fully qualified bit.io PostgreSQL table name.
  13       pg_conn_string : str
  14           A bit.io PostgreSQL connection string including credentials.
  15       """
  16       # Validation and setup
  17       if pg_conn_string is None:
  18           raise ValueError("You must specify a PG connection string.")
  19       schema, table = destination.split(".")
  20       engine = create_engine(pg_conn_string)
  21   
  22       # Check if table exists and set load type accordingly
  23       if _table_exists(engine, schema, table):
  24           _truncate_table(engine, schema, table)
  25           if_exists = 'append'
  26       else:
  27           if_exists = 'fail'
  28   
  29       with engine.connect() as conn:
  30           # 10 minute upload limit
  31           conn.execute("SET statement_timeout = 600000;")
  32           df.to_sql(
  33               table,
  34               conn,
  35               schema,
  36               if_exists=if_exists,
  37               index=False,
  38               method=_psql_insert_copy)
  39           
  40   # The following helper methods are truncated here for brevity,
  41   # but are available on github.com/bitdotioinc/simple-pipeline
  42       # _table_exists - returns boolean indicating whether a table already exists
  43       # _truncate_table - deletes all data from existing table to prepare for fresh load
  44       # _psql_insert_copy - implements a fast pandas -> PostgreSQL insert using COPY FROM CSV command

注意:为了简单起见,我们在这里覆盖了整个表,而不是使用增量加载,并且因为其中一些历史数据集会被更新和追加。实现增量加载将更有效,但会稍微复杂一些。

整合各个部分

就是这样!我们完成了所有三个ETL步骤。现在是时候将它们组合成一个计划好的进程了。

我们将在制作简单数据管道 第二部分:自动化ETL中介绍接下来的步骤。

如果您想立即尝试,这个简单方法的完整实现,包括调度,可在此仓库中找到。

对未来的Inner Join出版物和相关的bit.io数据内容感兴趣?请考虑订阅我们的每周通讯

附录

系列概述

本文是关于制作一个简单而有效的ETL管道的四部分系列文章的第一篇。我们尽量减少ETL工具和框架的使用,以保持实现简单并关注基本概念。每一部分都引入了一个新的概念,以构建位于此仓库中的完整管道。

  1. 第一部分:ETL模式
  2. 第二部分:自动化ETL
  3. 第三部分:测试ETL
  4. 第四部分:CI/CD与GitHub Actions

其他考虑因素

本系列旨在通过一个简单易用的实现来说明ETL模式。为了保持这一重点,一些细节被留在了本附录中。

  • 最佳实践——本系列文章忽略了一些关于构建健壮生产管道的重要实践:暂存表、增量加载、容器化/依赖管理、事件消息/警报、错误处理、并行处理、配置文件、数据建模等等。有很好的资源可以学习如何将这些最佳实践添加到您的管道中。
  • ETL vs. ELT vs. ETLT——ETL模式可能带有为每个最终用例加载一个定制ETL进程的含义。在现代数据环境中,大量的转换工作发生在数据仓库加载之后。这就产生了“ELT”这个词,或者令人费解的“ETLT”。简单来说,您可能希望尽量减少加载前的转换(如果有的话),以便在数据仓库内部进行转换迭代。

继续阅读

我们写了一个关于ETL管道的完整系列!在这里查看它们

核心概念和关键技能

关注自动化

ETL实战

© . All rights reserved.