1.编写新的 Flume 配置文件,将数据备份到 HDFS 目录 /user/test/flumebackup 下,要求所有主题的数据使用同一个 Flume配置文件完成。
1. 配置概览
Flume 的主要任务是从多个来源(如日志文件)读取数据,经过处理后通过接收器(Sink)存储到目标系统(如 HDFS)。在此配置中,主要配置了以下几个部分:
- 源(Source):从文件中读取数据。
- 通道(Channel):用于缓存数据,确保数据从源到接收器的传输。
- 接收器(Sink):负责将数据写入目标系统,这里是 HDFS。
2. 源的配置(r1)
Flume 的源设置了从多个日志文件中读取数据。
TAILDIR
类型的源用于实时读取文件内容,类似于 Linux 的tail -f
命令,它会不断监听文件并读取新增内容。positionFile
指定了一个位置文件,记录了读取进度,以便 Flume 重启时可以从上次停止的地方继续读取。filegroups
列出了多个文件组,每个文件组定义了一类日志文件,这些文件按正则表达式进行匹配,并且有不同的处理方式。
3. 文件组配置
每个文件组代表一种类型的日志文件,它们的路径和处理方式如下:
- f1 文件组:包括所有路径匹配
/data_log/.*producerecord.csv
的文件。这些文件的标识符是producerecord
。 - f2 文件组:包括所有路径匹配
/data_log/.*changerecord.csv
的文件,标识符为changerecord
。 - f3 文件组:包括所有路径匹配
/data_log/.*environmentdata.csv
的文件,标识符为environmentdata
。
4. 文件头设置
配置中的 fileHeader = true
表示每个文件的第一行会作为文件的头部信息。这一行数据用于标识文件的类型或来源,Flume 会根据这些信息进行相应的处理。
5. 接收器的配置(k1)
Flume 将读取的数据写入 HDFS,并通过一系列设置控制文件的存储方式:
- 存储路径:文件会存储在 HDFS 路径
/user/test/flumebackup/%Y%m%d/%H/%{headerKey1}
下。路径使用了动态占位符,按照日期(年、月、日)、小时进行文件分隔,且每个文件夹还根据文件头(如producerecord
)进行区分。 - 文件滚动:设置了文件滚动策略,如每小时滚动一次,或者当文件大小达到一定阈值(128MB)时滚动。
- 其他参数:如
batchSize
(每次写入 HDFS 的数据量),useLocalTimeStamp
(使用本地时间戳),rollInterval
(文件滚动间隔,单位为秒),rollSize
(滚动文件的大小阈值)等,进一步控制数据写入 HDFS 的行为。
6. 通道的配置(c1)
Flume 使用一个 内存通道(memory)来缓存从源读取的数据。通道的容量限制为 10,000 个事件,确保 Flume 能够处理高吞吐量的数据流。通道负责在源和接收器之间传输数据。
7. 源、接收器和通道的连接
最后,配置文件将源 r1
、接收器 k1
和通道 c1
进行连接。源 r1
读取数据并通过通道 c1
传输给接收器 k1
,接收器最终将数据写入 HDFS。
flume脚本代码:
# ==============================
# 定义源、接收器和通道
# ==============================
a1.sources = r1
a1.sinks = k1
a1.channels = c1# ==============================
# 源配置 (r1)
# ==============================
# r1: 使用 TAILDIR 源类型,监控文件目录中的文件
a1.sources.r1.type = TAILDIR # 设置源类型为 TAILDIR
# 存储源读取进度的文件路径
a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json # 记录文件读取位置的文件
# 定义文件组 f1, f2, f3
a1.sources.r1.filegroups = f1 f2 f3# ==============================
# 文件组配置 (f1, f2, f3)
# ==============================# f1: 监控 producerecord.csv 文件
a1.sources.r1.filegroups.f1 = /data_log/.*producerecord.csv # 匹配所有 producerecord.csv 文件
# 设置 f1 文件组的头部信息,标识该组文件的类型为 producerecord
a1.sources.r1.headers.f1.headerKey1 = producerecord# f2: 监控 changerecord.csv 文件
a1.sources.r1.filegroups.f2 = /data_log/.*changerecord.csv # 匹配所有 changerecord.csv 文件
# 设置 f2 文件组的头部信息,标识该组文件的类型为 changerecord
a1.sources.r1.headers.f2.headerKey1 = changerecord# f3: 监控 environmentdata.csv 文件
a1.sources.r1.filegroups.f3 = /data_log/.*environmentdata.csv # 匹配所有 environmentdata.csv 文件
# 设置 f3 文件组的头部信息,标识该组文件的类型为 environmentdata
a1.sources.r1.headers.f3.headerKey1 = environmentdata# 设置是否包含文件头信息,用于区分文件类型
a1.sources.r1.fileHeader = true # 如果为 true,将会根据文件头部信息进行处理# ==============================
# 接收器配置 (k1)
# ==============================
# k1: 使用 HDFS 接收器将数据写入 HDFS
a1.sinks.k1.type = hdfs # 设置接收器类型为 HDFS
# HDFS 路径配置,使用动态路径并根据文件头进行分隔
a1.sinks.k1.hdfs.path = hdfs:///user/test/flumebackup/%Y%m%d/%H/%{headerKey1} # 目录结构根据日期和文件类型进行分组
# 设置文件前缀
a1.sinks.k1.hdfs.filePrefix = upload- # 设置上传文件的前缀
# 是否根据时间滚动文件夹
a1.sinks.k1.hdfs.round = true # 设置为 true 表示会按照时间滚动文件夹
# 设置滚动时间的间隔值
a1.sinks.k1.hdfs.roundValue = 1 # 每 1 个时间单位创建一个新的文件夹
# 设置滚动的时间单位
a1.sinks.k1.hdfs.roundUnit = hour # 设置滚动的单位为小时
# 是否使用本地时间戳,默认为 UTC 时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true # 设置为 true 使用本地时间戳# 每积攒多少个 Event 才会刷新到 HDFS
a1.sinks.k1.hdfs.batchSize = 100 # 设置每批次积攒 100 个 Event 才会刷新到 HDFS# 设置文件类型,可以支持压缩
a1.sinks.k1.hdfs.fileType = DataStream # 设置文件类型为 DataStream# 多少秒后生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60 # 设置文件滚动间隔为 60 秒# 设置每个文件的滚动大小,大概是 128MB
a1.sinks.k1.hdfs.rollSize = 134217700 # 设置文件滚动的大小为 128MB# 文件滚动与 Event 数量无关,设置为 0 即不基于 Event 数量滚动文件
a1.sinks.k1.hdfs.rollCount = 0 # 设置为 0,表示文件滚动不基于 Event 数量# ==============================
# 通道配置 (c1)
# ==============================
# c1: 使用内存通道缓存 Event
a1.channels.c1.type = memory # 设置通道类型为内存通道
# 设置内存通道的最大容量
a1.channels.c1.capacity = 10000 # 设置通道的最大容量为 10000 个 Event
# 设置事务容量,控制每次事务的最大 Event 数量
a1.channels.c1.transactionCapacity = 10000 # 设置每个事务的最大容量为 10000 个 Event# ==============================
# 源和接收器与通道的绑定
# ==============================
# 将源 r1 绑定到通道 c1
a1.sources.r1.channels = c1 # 源 r1 使用通道 c1
# 将接收器 k1 绑定到通道 c1
a1.sinks.k1.channel = c1 # 接收器 k1 使用通道 c1
2. 在主节点使用Flume 采集 /data_log 目录下实时日志文件中的数据,将数据存入到 Kafka 的 Topic 中(Topic 名称分别为 ChangeRecord 、 ProduceRecord 和 EnvironmentData ,分区数为 4 ), Flume 采集ChangeRecord。
1. 源 (Source)
-
r1、r2 和 r3 都使用了
TAILDIR
类型的源。这种源类型主要用于读取文件中的新增内容(类似于tail -f
的功能)。 -
每个源都配置了不同的文件路径和位置文件。位置文件(
positionFile
)记录了每个文件的读取进度,确保在重新启动时不会丢失数据。r1
:读取路径下的changerecord.csv
文件。r2
:读取路径下的producerecord.csv
文件。r3
:读取路径下的environmentdata.csv
文件。
2. 接收器 (Sink)
-
每个接收器都被配置为将数据写入 Kafka 集群,分别写入不同的 Kafka topic。
- k1:将
changerecord
数据写入 Kafka topicChangeRecord
。 - k2:将
producerecord
数据写入 Kafka topicProduceRecord
。 - k3:将
environmentdata
数据写入 Kafka topicEnvironmentData
。
- k1:将
-
每个接收器的配置包含了 Kafka 集群的地址(
kafka.bootstrap.servers
)、批量处理大小(flumeBatchSize
)和生产者的配置(如写入确认机制acks
和延迟时间linger.ms
)。
3. 通道 (Channel)
-
每个源和接收器都通过内存通道进行连接。内存通道是 Flume 的一种通道类型,适用于对性能要求较高的场景。每个通道配置了最大容量和事务容量。
- c1:用于
r1
到k1
之间的连接。 - c2:用于
r2
到k2
之间的连接。 - c3:用于
r3
到k3
之间的连接。
通道的容量设置为 10,000,表示通道最多可以缓存 10,000 个事件(数据记录)。事务容量也是 10,000,表示一次事务可以处理 10,000 个事件。
- c1:用于
4. 源和接收器的通道绑定
-
源和接收器通过通道进行连接。每个源都与一个特定的通道绑定,接收器也从该通道获取数据。
r1
通过通道c1
与k1
连接。r2
通过通道c2
与k2
连接。r3
通过通道c3
与k3
连接。
flume脚本代码:
# 定义源、接收器和通道
a1.sources = r1 r2 r3
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3# ==============================
# 源配置 (r1, r2, r3)
# ==============================# r1: 读取 change record 文件
a1.sources.r1.type = TAILDIR # 使用 TAILDIR 来源类型,监控文件变化
a1.sources.r1.positionFile = /opt/module/flume/changerecord/taildir_position.json # 存储文件读取进度的文件
a1.sources.r1.filegroups = f1 # 定义文件组
a1.sources.r1.filegroups.f1 = /data_log/.*changerecord.csv # 匹配路径下的 changerecord.csv 文件# r2: 读取 produce record 文件
a1.sources.r2.type = TAILDIR # 使用 TAILDIR 来源类型
a1.sources.r2.positionFile = /opt/module/flume/producerecord/taildir_position.json # 存储文件读取进度的文件
a1.sources.r2.filegroups = f1 # 定义文件组
a1.sources.r2.filegroups.f1 = /data_log/.*producerecord.csv # 匹配路径下的 producerecord.csv 文件# r3: 读取 environment data 文件
a1.sources.r3.type = TAILDIR # 使用 TAILDIR 来源类型
a1.sources.r3.positionFile = /opt/module/flume/environmentdata/taildir_position.json # 存储文件读取进度的文件
a1.sources.r3.filegroups = f1 # 定义文件组
a1.sources.r3.filegroups.f1 = /data_log/.*environmentdata.csv # 匹配路径下的 environmentdata.csv 文件# ==============================
# 接收器配置 (k1, k2, k3)
# ==============================# k1: 将 change record 数据写入 Kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 使用 KafkaSink 接收器
a1.sinks.k1.kafka.topic = ChangeRecord # Kafka topic 名称
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 # Kafka 集群地址
a1.sinks.k1.kafka.flumeBatchSize = 20 # 批量处理的大小
a1.sinks.k1.kafka.producer.acks = 1 # Kafka 写入确认机制,1 表示主分区副本确认
a1.sinks.k1.kafka.producer.linger.ms = 1 # 写入延迟时间,单位毫秒# k2: 将 produce record 数据写入 Kafka
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink # 使用 KafkaSink 接收器
a1.sinks.k2.kafka.topic = ProduceRecord # Kafka topic 名称
a1.sinks.k2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 # Kafka 集群地址
a1.sinks.k2.kafka.flumeBatchSize = 20 # 批量处理的大小
a1.sinks.k2.kafka.producer.acks = 1 # Kafka 写入确认机制
a1.sinks.k2.kafka.producer.linger.ms = 1 # 写入延迟时间,单位毫秒# k3: 将 environment data 数据写入 Kafka
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink # 使用 KafkaSink 接收器
a1.sinks.k3.kafka.topic = EnvironmentData # Kafka topic 名称
a1.sinks.k3.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 # Kafka 集群地址
a1.sinks.k3.kafka.flumeBatchSize = 20 # 批量处理的大小
a1.sinks.k3.kafka.producer.acks = 1 # Kafka 写入确认机制
a1.sinks.k3.kafka.producer.linger.ms = 1 # 写入延迟时间,单位毫秒# ==============================
# 通道配置 (c1, c2, c3)
# ==============================# c1: 内存通道,用于 r1 到 k1
a1.channels.c1.type = memory # 使用内存通道
a1.channels.c1.capacity = 10000 # 通道最大容量
a1.channels.c1.transactionCapacity = 10000 # 事务容量# c2: 内存通道,用于 r2 到 k2
a1.channels.c2.type = memory # 使用内存通道
a1.channels.c2.capacity = 10000 # 通道最大容量
a1.channels.c2.transactionCapacity = 10000 # 事务容量# c3: 内存通道,用于 r3 到 k3
a1.channels.c3.type = memory # 使用内存通道
a1.channels.c3.capacity = 10000 # 通道最大容量
a1.channels.c3.transactionCapacity = 10000 # 事务容量# ==============================
# 源和接收器的通道绑定
# ==============================# 将源 r1 绑定到通道 c1,接收器 k1 也使用 c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 将源 r2 绑定到通道 c2,接收器 k2 也使用 c2
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2# 将源 r3 绑定到通道 c3,接收器 k3 也使用 c3
a1.sources.r3.channels = c3
a1.sinks.k3.channel = c3