65.9K
CodeProject 正在变化。 阅读更多。
Home

Apache Flume: 使用 Morphline 和 Kite 将 CSV 转换为 AVRO 和 Parquet

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (1投票)

2019 年 1 月 21 日

CPOL

2分钟阅读

viewsIcon

4429

如何使用 Morphline 拦截器和 Kite sink 将 CSV Flume 事件转换为 Avro 和 Parquet。

这篇博客解释了如何使用 Morphline 拦截器和 Kite sink 将 csv Flume 事件转换为 Avro 和 Parquet。

代码可在 github 上找到 这里。请查看这两个文件

  1. orders.avsc,它描述了输入事件的 avro 模式,以及
  2. part-m-00000,其中包含我们的 csv 数据。

请记住 avro 模式文件中给定的字段名称 - “orders.avsc”。

先决条件

  • 此代码已在我的 Cloudera 设置版本 5.15 上测试过。解释见此 博客

步骤 1 - 使用 Avro 和 Parquet 格式创建 Kite 数据集

kite-dataset create orders_parquet --schema orders.avsc --format parquet
kite-dataset create orders_avro --schema orders.avsc --format avro

步骤 2 - 创建 Morphline 拦截器配置文件 (morphline.conf)

morphlines : [{
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      { readCSV {
          charset : UTF-8
          columns : [order_id,order_date,order_customer_id,order_status]
        }
      }
      { toAvro {
           schemaFile : /home/skamalj/learn-flume2/orders.avsc
        }
      }
      { writeAvroToByteArray {
          format : containerlessBinary
        }
      }
   ]
  }
]
  • 在 Morphline 中,命令通过管道传递给彼此,事件从一个命令传递到另一个命令。列表中的第一个命令是 readCSV - 在这里,我们读取行并为每个字段分配名称,该名称与我们在 avro 模式文件 - orders.avsc 中定义的名称完全相同。
  • 下一个命令是使用模式文件将事件转换为 avro。在这里,列标题名称与 avro 模式 1 对 1 映射。
  • 接下来,我们需要无容器事件,Kite 将为我们创建容器(默认使用 snappy 压缩)。
  • 请确保根据您的设置更改突出显示的路径。这是本地目录路径,而不是 hdfs。

步骤 3 - 创建 Flume 配置文件

仅对代码的一部分进行解释

a1.sources.k1.interceptors = schemaheader morphlineinterceptor

a1.sources.k1.interceptors.schemaheader.type = static
a1.sources.k1.interceptors.schemaheader.key = flume.avro.schema.url
a1.sources.k1.interceptors.schemaheader.value = 
    hdfs://cloudera-master:8020/user/skamalj/avro-schemas/orders.avsc

a1.sources.k1.interceptors.morphlineinterceptor.type = 
    org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.k1.interceptors.morphlineinterceptor.morphlineFile = 
    /home/skamalj/learn-flume2/morphline.conf
a1.sources.k1.interceptors.morphlineinterceptor.morphlineId = morphline1

a1.sinks.s1.type = org.apache.flume.sink.kite.DatasetSink
a1.sinks.s1.channel = c1
a1.sinks.s1.kite.dataset.uri = dataset:hive://cloudera-master:9083/default/orders_parquet
  • 我们需要两个拦截器:
    • 一个用于注入“flume.avro.schema.url”header 值(第 3-5 行)。Kite sink 使用它来解析 avro 事件。要使用它,请将 orders.avsc 文件加载到 hdfs 并提供 url 作为值。
    • 第二个是我们在步骤 2 中创建的 morphline 拦截器(第 7-9 行)。它将事件转换为 avro。
  • 第 11-13 行配置 Kite sink,这是用于 parquet 的。
  • 如果您查看 github 中的完整代码,您会看到我们有两个通道和两个 sink - 一组用于 parquet,另一组用于 Avro。
  • 请确保根据您的设置更改突出显示的路径。(Morphline 路径是本地目录,而不是 hdfs。)

步骤 4 - 执行

要执行,您需要设置 hive 和 hcat home 变量。此外,还要确保增加 JVM 内存 (-X...),因为默认内存设置不足以与 Morphline 和 Kite 配合使用。

export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HCAT_HOME=/opt/cloudera/parcels/CDH/lib/hive-hcatalog
flume-ng agent --name a1 --conf . --conf-file flume_morph_kite.conf -Xms4096m -Xmx8192m

检查截图所示的结果。

完成了!

历史

  • 2019 年 1 月 21 日:初始版本
© . All rights reserved.