Apache Flume: 使用 Morphline 和 Kite 将 CSV 转换为 AVRO 和 Parquet





4.00/5 (1投票)
如何使用 Morphline 拦截器和 Kite sink 将 CSV Flume 事件转换为 Avro 和 Parquet。
这篇博客解释了如何使用 Morphline 拦截器和 Kite sink 将 csv Flume 事件转换为 Avro 和 Parquet。
代码可在 github 上找到 这里。请查看这两个文件
- orders.avsc,它描述了输入事件的 avro 模式,以及
- part-m-00000,其中包含我们的 csv 数据。
请记住 avro 模式文件中给定的字段名称 - “orders.avsc”。
先决条件
- 此代码已在我的 Cloudera 设置版本 5.15 上测试过。解释见此 博客。
步骤 1 - 使用 Avro 和 Parquet 格式创建 Kite 数据集
kite-dataset create orders_parquet --schema orders.avsc --format parquet
kite-dataset create orders_avro --schema orders.avsc --format avro
步骤 2 - 创建 Morphline 拦截器配置文件 (morphline.conf)
morphlines : [{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{ readCSV {
charset : UTF-8
columns : [order_id,order_date,order_customer_id,order_status]
}
}
{ toAvro {
schemaFile : /home/skamalj/learn-flume2/orders.avsc
}
}
{ writeAvroToByteArray {
format : containerlessBinary
}
}
]
}
]
- 在 Morphline 中,命令通过管道传递给彼此,事件从一个命令传递到另一个命令。列表中的第一个命令是
readCSV
- 在这里,我们读取行并为每个字段分配名称,该名称与我们在 avro 模式文件 - orders.avsc 中定义的名称完全相同。 - 下一个命令是使用模式文件将事件转换为 avro。在这里,列标题名称与 avro 模式 1 对 1 映射。
- 接下来,我们需要无容器事件,Kite 将为我们创建容器(默认使用 snappy 压缩)。
- 请确保根据您的设置更改突出显示的路径。这是本地目录路径,而不是 hdfs。
步骤 3 - 创建 Flume 配置文件
仅对代码的一部分进行解释
a1.sources.k1.interceptors = schemaheader morphlineinterceptor
a1.sources.k1.interceptors.schemaheader.type = static
a1.sources.k1.interceptors.schemaheader.key = flume.avro.schema.url
a1.sources.k1.interceptors.schemaheader.value =
hdfs://cloudera-master:8020/user/skamalj/avro-schemas/orders.avsc
a1.sources.k1.interceptors.morphlineinterceptor.type =
org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.k1.interceptors.morphlineinterceptor.morphlineFile =
/home/skamalj/learn-flume2/morphline.conf
a1.sources.k1.interceptors.morphlineinterceptor.morphlineId = morphline1
a1.sinks.s1.type = org.apache.flume.sink.kite.DatasetSink
a1.sinks.s1.channel = c1
a1.sinks.s1.kite.dataset.uri = dataset:hive://cloudera-master:9083/default/orders_parquet
- 我们需要两个拦截器:
- 一个用于注入“
flume.avro.schema.url
”header 值(第 3-5 行)。Kite sink 使用它来解析 avro 事件。要使用它,请将 orders.avsc 文件加载到 hdfs 并提供 url 作为值。 - 第二个是我们在步骤 2 中创建的 morphline 拦截器(第 7-9 行)。它将事件转换为 avro。
- 一个用于注入“
- 第 11-13 行配置 Kite sink,这是用于 parquet 的。
- 如果您查看 github 中的完整代码,您会看到我们有两个通道和两个 sink - 一组用于 parquet,另一组用于 Avro。
- 请确保根据您的设置更改突出显示的路径。(Morphline 路径是本地目录,而不是 hdfs。)
步骤 4 - 执行
要执行,您需要设置 hive 和 hcat home 变量。此外,还要确保增加 JVM 内存 (-X...),因为默认内存设置不足以与 Morphline 和 Kite 配合使用。
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HCAT_HOME=/opt/cloudera/parcels/CDH/lib/hive-hcatalog
flume-ng agent --name a1 --conf . --conf-file flume_morph_kite.conf -Xms4096m -Xmx8192m
检查截图所示的结果。
完成了!
历史
- 2019 年 1 月 21 日:初始版本