65.9K
CodeProject 正在变化。 阅读更多。
Home

Apache Flume: 带有事件合并的条件多路复用流

starIconstarIconstarIconstarIconemptyStarIcon

4.00/5 (1投票)

2019 年 1 月 21 日

CPOL

4分钟阅读

viewsIcon

3911

如何使用拦截器和通道选择器在 Flume 中设置多路复用流, 以及与 HDFS sink 合并事件。

本博客详细介绍了如何在 Flume 中使用 Interceptor 和 Channel Selector 设置多路复用流。此外,还解释了使用 HDFS sink 进行事件整合。代码(conf 文件)可在 github 上获取。

先决条件

  • 此代码已在我的 Cloudera 设置版本 5.15 上进行了测试。 详情请参阅此 博客
  • 它使用 Cloudera 提供的 retail_db 中的 orders 表的数据。 您可以从我的 github 仓库获取此数据 - Datasets

场景

在本练习中,我们读取 orders 数据,该数据包含 4 个字段,行的最后一个字段是 status 列。 本练习的目的是读取数据并将状态为“COMPLETE”的所有订单发送到一个 HDFS 文件夹,而所有其他订单发送到另一个文件夹。

输入数据如下所示,它分布在多个文件中,并且有大约 68000 条记录。

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT

代码解释

初始化

  • 我们创建一个源来读取输入数据
  • 两个通道 - 一个用于流式传输“COMPLETE”订单,另一个用于所有其他订单
  • 两个 Sink - 用于分别存储来自两个通道的数据
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT

创建 Interceptor

顾名思义,拦截器会拦截从源读取的每个输入事件并对其进行修改。 我们使用 regex 拦截器来解析每一行,读取该行的状态值,然后创建一个 header 字段“status = {value}”,其中 value 是订单的实际状态。 例如,如果状态为 COMPLETE,则注入的 header 将为“status = COMPLETE”。

###
# Configure the spooldir source and create interceptor to insert header named "status"
###
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/skamalj/learn-flume2/orders_csv/
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
# Capture status , i.e last field in the  comma separated input line
a1.sources.r1.interceptors.i1.regex = ^.*,([^,]+)$
#Captured data value is stored in header as "status=>s1". 
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.name = status
  • 在第 4-5 行中,我们设置源以读取输出目录的输入数据
  • 在第 7 行中,我们将拦截器设置为 regex 类型,然后在第 8 行中,我们告诉它提取逗号后的最后一个字段($ 依赖于行尾)。
  • 在第 11-12 行中,我们为提取的值设置名称,即 status
    • 提示捕获多个值:如果您有一个 regex 来提取两个字段,例如 (.*),(.*),那么您需要将 serializer 设置为两个字段“s1 s2”,然后还要为每个字段设置名称。

设置 Channel Selector

查看下面的图片以了解为什么我们需要 selector

源读取我们的 csv 输入文件并将每一行作为事件传递给拦截器。 拦截器读取该行,并根据最后一列的值,创建一个名为“status”的 header,其中包含订单状态值。 由于我们有两个通道和 sink,因此我们需要 selector 来决定哪个事件应该进入哪个通道。 因此,selector 是我们的“事件路由器”。

###
# Bind the source to the channels. Based on value fo "status" 
# which is either 1 or 2 channel is selected. Any other value goes to channel 1
###
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = status
###
#If the status field value is COMPLETE then send the input to c1 else to c2.
###
a1.sources.r1.selector.mapping.COMPLETE = c1
a1.sources.r1.selector.default = c2
  • 在上面的第 6 行中,我们告诉 selector 查看 header 字段中的“status”值。
  • 在第 10 行中,我们说如果 status header 的值为“COMPLETE”,则将该事件发送到“c1”。
  • 第 11 行说,对于其他所有内容,将事件发送到“c2”。

提示。 您可以修改练习以拥有 3 个通道,其中 c1 接收“COMPLETE”订单,c2 接收“PENDING”,c3 接收所有其他订单,或者您可以进行更简单的修改,将状态为 COMPLETEPENDING 的事件发送到 c1,其余事件发送到 c2。(在第 10 行下方添加行 a1.sources.r1.selector.mapping.PENDING = c1)。

HDFS Sink 和事件整合

在这里,我们为每个通道设置 hdfs sink,下面仅显示一个 sink 的代码。 每个 sink 每个文件整合 10000 个事件。

###
# Describe the sinks to receive the feed from channels
###
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/skamalj/learn-flume/orders/complete
a1.sinks.k1.hdfs.rollCount = 10000
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
  • 在这里,如果必须整合事件,则第 6-8 行非常重要。 整合可以基于事件数量 (rollCount)、创建新文件之前要等待的时间间隔 (rollInterval) 或文件大小 (rollSize) 进行。
  • 所有 3 个选项一起使用,具体来说,将其他值设置为“0”以禁用其他条件。 因此,在我们的示例中,当我们有 10000 个事件时,我们将滚动到文件。 同时,我们禁用了基于 IntervalSize 的滚动。

其余代码只是设置另一个 sink,将通道链接到 sink 并配置通道。

执行

从放置 example.conf 的目录运行以下代码,并在下面的屏幕截图中查看输出。

请确保您的输入路径已根据您的系统进行配置,并且 hdfs 路径也在 conf 文件中进行了修改。

flume-ng agent --name a1 --conf . --conf-file example.conf

因此,在这里,您可以看到它创建了两个输出目录,一个用于“COMPLETE”订单,另一个用于剩余订单。 每个文件包含 10000 条记录,但最后一个文件显然包含最后剩下的任何数字。

全部完成,希望对您有所帮助。

历史

  • 2019 年 1 月 21 日:初始版本
© . All rights reserved.