制作一个简单的数据流水线 - 第 1 部分:ETL 模式





5.00/5 (1投票)
安排 Python 和 SQL 脚本,以使您的数据集在 Postgres 数据库中保持干净和最新。
想自己试试吗?首先,注册bit.io以立即获得免费的Postgres数据库访问权限。然后克隆GitHub仓库并亲自尝试!
问题所在
公开和私有数据源比比皆是,但同时也存在问题
幸运的是,有一种通用的计算模式可以缓解这些问题,并将数据放在正确的位置和格式以供使用:**“提取、转换、加载”(ETL)**。
ETL的实现复杂性和健壮性各不相同,从在单台机器上调度简单的Python和Postgres脚本,到工业级的Kubernetes集群、Apache Airflow和Spark的组合。
在这里,我们将介绍一个简单的Python和Postgres实现,您可以快速上手。我们将一起 walkthrough 关键代码片段,完整的实现和文档可在此仓库中找到。
提取、转换、加载
根据维基百科
**提取、转换、加载(ETL)**是将数据从一个或多个源复制到目标系统的通用过程,该目标系统表示数据的不同于源的表示形式,或在与源不同的上下文中表示数据。
换句话说——我们使用ETL从一个或多个源**提取**数据,以便我们可以将其**转换**成另一种表示形式,然后**加载**到单独的目标。
我们不提供假设性的例子,而是直接通过一个实际问题进行演示——在Postgres数据库中保持美国县级COVID病例、死亡和疫苗接种的干净数据集。您可以在我们的公共Postgres数据库中看到最终产品。
提取
第一个ETL步骤是将数据从一个或多个源**提取**到我们可以在转换步骤中使用的格式。我们将使用pandas进行转换步骤,因此我们的具体目标是将源数据提取到pandas DataFrame中。
我们将处理三个数据源
前两个来源可通过直接CSV文件下载URL访问。人口普查数据可通过Web应用程序手动访问,也可通过API以编程方式访问。人口普查数据每年只更新一次,因此我们通过Web应用程序将CSV文件手动下载到本地目录(该文件在仓库中提供)。
我们使用下面的代码将这两种类型的CSV文件源提取到pandas DataFrame中。`csv_from_get_request` 使用Python的`requests`包处理URL下载,`csv_from_local` 处理本地CSV文件。
1 """Provides extraction functions.
2 Currently only supports GET from URL or local file.
3 """
4
5 import io
6
7 import pandas as pd
8 import requests
9
10
11 def csv_from_get_request(url):
12 """Extracts a data text string accessible with a GET request.
13 Parameters
14 ----------
15 url : str
16 URL for the extraction endpoint, including any query string
17 Returns
18 ----------
19 DataFrame
20 """
21 r = requests.get(url, timeout=5)
22 data = r.content.decode('utf-8')
23 df = pd.read_csv(io.StringIO(data), low_memory=False)
24 return df
25
26
27 def csv_from_local(path):
28 """Extracts a csv from local filesystem.
29 Parameters
30 ----------
31 path : str
32 Returns
33 ----------
34 DataFrame
35 """
36 return pd.read_csv(path, low_memory=False)
将数据**提取**到DataFrame后,我们就可以进行数据转换了。
变换
在第二步中,我们将从提取步骤中的pandas DataFrame**转换**为新的DataFrame以供加载步骤使用。
数据转换是一个广泛的过程,可以包括处理缺失值、强制执行类型、过滤到相关子集、重塑表、计算派生变量等等。
与提取和加载步骤相比,由于每个数据源的特殊性,我们不太可能为整个转换步骤重用代码。但是,我们当然可以(也应该)在可能的情况下对常见转换操作进行模块化和重用。
在这个简单的实现中,我们为每个数据源定义了一个转换函数。每个函数都包含一个简短的pandas脚本。下面我们展示了NYT县级COVID病例和死亡数据的转换函数。其他数据源的处理方式类似。
"""Provides optional transform functions for different data sources."""
import pandas as pd
def nyt_cases_counties(df):
"""Transforms NYT county-level COVID data"""
# Cast date as datetime
df['date'] = pd.to_datetime(df['date'])
# Store FIPS codes as standard 5 digit strings
df['fips'] = df['fips'].astype(str).str.extract('(.*)\.', expand=False).str.zfill(5)
# Drop Puerto Rico due to missing deaths data, cast deaths to int
df = df.loc[df['state'] != 'Puerto Rico'].copy()
df['deaths'] = df['deaths'].astype(int)
return df
# Script truncated for Medium
我们在第9行和第14行强制执行数据类型,提取标准化的FIPS代码(第11行),以支持按县与其他数据源连接,并通过删除波多黎各(第13行)来处理缺失值。
将数据**转换**为新的DataFrame后,我们就可以加载到数据库了。
Load (加载)
在最后一个ETL步骤中,我们将转换后的DataFrame**加载**到一个通用目标,以便进行分析、机器学习和其他用例。在这个简单的实现中,我们将使用bit.io上的PostgreSQL数据库。
bit.io 是即时创建符合标准的Postgres数据库并将数据加载到一个地方的最简单方法(并且对于大多数业余规模用例都是免费的)。您只需注册(无需信用卡),按照提示“创建仓库”(您的私人数据库),然后按照“连接bit.io”文档获取新数据库的Postgres连接字符串。
注意:您可以将以下代码与任何Postgres数据库一起使用,但数据库设置和连接将由您自己负责。
在建立了目标数据库之后,我们就可以开始 walkthrough 加载步骤的代码了。此步骤需要比其他步骤更多的样板代码来处理与数据库的交互。然而,与转换步骤不同的是,这段代码通常可以重用于每个pandas到Postgres的ETL过程。
加载步骤中的主要函数是`to_table`。此函数接收来自转换步骤的DataFrame(`df`),一个完全限定的目标表名(下一节中的示例),以及一个Postgres连接字符串`pg_conn_string`。
第18-12行验证连接字符串,解析完全限定表名中的模式(bit.io“仓库”)和表,并创建一个SQLAlchemy引擎。该引擎是一个对象,用于管理Postgres数据库的连接,支持自定义SQL和pandas SQL API。
第24-28行检查表是否已存在(截断的辅助函数`_table_exists`)。如果表已存在,我们将使用SQLAlchemy执行`_truncate table`(另一个截断的辅助函数),该函数会清除表中的所有现有数据,为新的加载做准备。
最后,在第30-39行,我们打开另一个SQLAlchemy连接,并使用pandas API通过快速的自定义插入方法`_psql_insert_copy`将DataFrame加载到Postgres。
1 """Load pandas DataFrames to PostgreSQL on bit.io"""
2
3 from sqlalchemy import create_engine
4
5 def to_table(df, destination, pg_conn_string):
6 """
7 Loads a pandas DataFrame to a bit.io database.
8 Parameters
9 ----------
10 df : pandas.DataFrame
11 destination : str
12 Fully qualified bit.io PostgreSQL table name.
13 pg_conn_string : str
14 A bit.io PostgreSQL connection string including credentials.
15 """
16 # Validation and setup
17 if pg_conn_string is None:
18 raise ValueError("You must specify a PG connection string.")
19 schema, table = destination.split(".")
20 engine = create_engine(pg_conn_string)
21
22 # Check if table exists and set load type accordingly
23 if _table_exists(engine, schema, table):
24 _truncate_table(engine, schema, table)
25 if_exists = 'append'
26 else:
27 if_exists = 'fail'
28
29 with engine.connect() as conn:
30 # 10 minute upload limit
31 conn.execute("SET statement_timeout = 600000;")
32 df.to_sql(
33 table,
34 conn,
35 schema,
36 if_exists=if_exists,
37 index=False,
38 method=_psql_insert_copy)
39
40 # The following helper methods are truncated here for brevity,
41 # but are available on github.com/bitdotioinc/simple-pipeline
42 # _table_exists - returns boolean indicating whether a table already exists
43 # _truncate_table - deletes all data from existing table to prepare for fresh load
44 # _psql_insert_copy - implements a fast pandas -> PostgreSQL insert using COPY FROM CSV command
注意:为了简单起见,我们在这里覆盖了整个表,而不是使用增量加载,并且因为其中一些历史数据集会被更新和追加。实现增量加载将更有效,但会稍微复杂一些。
整合各个部分
就是这样!我们完成了所有三个ETL步骤。现在是时候将它们组合成一个计划好的进程了。
我们将在制作简单数据管道 第二部分:自动化ETL中介绍接下来的步骤。
如果您想立即尝试,这个简单方法的完整实现,包括调度,可在此仓库中找到。
对未来的Inner Join出版物和相关的bit.io数据内容感兴趣?请考虑订阅我们的每周通讯。
附录
系列概述
本文是关于制作一个简单而有效的ETL管道的四部分系列文章的第一篇。我们尽量减少ETL工具和框架的使用,以保持实现简单并关注基本概念。每一部分都引入了一个新的概念,以构建位于此仓库中的完整管道。
其他考虑因素
本系列旨在通过一个简单易用的实现来说明ETL模式。为了保持这一重点,一些细节被留在了本附录中。
- 最佳实践——本系列文章忽略了一些关于构建健壮生产管道的重要实践:暂存表、增量加载、容器化/依赖管理、事件消息/警报、错误处理、并行处理、配置文件、数据建模等等。有很好的资源可以学习如何将这些最佳实践添加到您的管道中。
- ETL vs. ELT vs. ETLT——ETL模式可能带有为每个最终用例加载一个定制ETL进程的含义。在现代数据环境中,大量的转换工作发生在数据仓库加载之后。这就产生了“ELT”这个词,或者令人费解的“ETLT”。简单来说,您可能希望尽量减少加载前的转换(如果有的话),以便在数据仓库内部进行转换迭代。
继续阅读
我们写了一个关于ETL管道的完整系列!在这里查看它们
核心概念和关键技能
- 制作一个简单的数据流水线 - 第 1 部分:ETL 模式
- 制作简单数据管道 第二部分:自动化ETL
- 制作简单数据管道 第三部分:测试ETL
- 制作简单数据管道 第四部分:CI/CD与GitHub Actions