工业-实时数据采集

1.编写新的 Flume 配置文件,将数据备份到 HDFS 目录 /user/test/flumebackup 下,要求所有主题
的数据使用同一个 Flume配置文件完成。

1. 配置概览

Flume 的主要任务是从多个来源(如日志文件)读取数据,经过处理后通过接收器(Sink)存储到目标系统(如 HDFS)。在此配置中,主要配置了以下几个部分:

  • 源(Source):从文件中读取数据。
  • 通道(Channel):用于缓存数据,确保数据从源到接收器的传输。
  • 接收器(Sink):负责将数据写入目标系统,这里是 HDFS。

2. 源的配置(r1)

Flume 的源设置了从多个日志文件中读取数据。

  • TAILDIR 类型的源用于实时读取文件内容,类似于 Linux 的 tail -f 命令,它会不断监听文件并读取新增内容。
  • positionFile 指定了一个位置文件,记录了读取进度,以便 Flume 重启时可以从上次停止的地方继续读取。
  • filegroups 列出了多个文件组,每个文件组定义了一类日志文件,这些文件按正则表达式进行匹配,并且有不同的处理方式。

3. 文件组配置

每个文件组代表一种类型的日志文件,它们的路径和处理方式如下:

  • f1 文件组:包括所有路径匹配 /data_log/.*producerecord.csv 的文件。这些文件的标识符是 producerecord
  • f2 文件组:包括所有路径匹配 /data_log/.*changerecord.csv 的文件,标识符为 changerecord
  • f3 文件组:包括所有路径匹配 /data_log/.*environmentdata.csv 的文件,标识符为 environmentdata

4. 文件头设置

配置中的 fileHeader = true 表示每个文件的第一行会作为文件的头部信息。这一行数据用于标识文件的类型或来源,Flume 会根据这些信息进行相应的处理。

5. 接收器的配置(k1)

Flume 将读取的数据写入 HDFS,并通过一系列设置控制文件的存储方式:

  • 存储路径:文件会存储在 HDFS 路径 /user/test/flumebackup/%Y%m%d/%H/%{headerKey1} 下。路径使用了动态占位符,按照日期(年、月、日)、小时进行文件分隔,且每个文件夹还根据文件头(如 producerecord)进行区分。
  • 文件滚动:设置了文件滚动策略,如每小时滚动一次,或者当文件大小达到一定阈值(128MB)时滚动。
  • 其他参数:如 batchSize(每次写入 HDFS 的数据量),useLocalTimeStamp(使用本地时间戳),rollInterval(文件滚动间隔,单位为秒),rollSize(滚动文件的大小阈值)等,进一步控制数据写入 HDFS 的行为。

6. 通道的配置(c1)

Flume 使用一个 内存通道(memory)来缓存从源读取的数据。通道的容量限制为 10,000 个事件,确保 Flume 能够处理高吞吐量的数据流。通道负责在源和接收器之间传输数据。

7. 源、接收器和通道的连接

最后,配置文件将源 r1、接收器 k1 和通道 c1 进行连接。源 r1 读取数据并通过通道 c1 传输给接收器 k1,接收器最终将数据写入 HDFS。

 

 flume脚本代码:

# ==============================
# 定义源、接收器和通道
# ==============================
a1.sources = r1
a1.sinks = k1
a1.channels = c1# ==============================
# 源配置 (r1)
# ==============================
# r1: 使用 TAILDIR 源类型,监控文件目录中的文件
a1.sources.r1.type = TAILDIR  # 设置源类型为 TAILDIR
# 存储源读取进度的文件路径
a1.sources.r1.positionFile = /opt/module/flume/tail_dir.json  # 记录文件读取位置的文件
# 定义文件组 f1, f2, f3
a1.sources.r1.filegroups = f1 f2 f3# ==============================
# 文件组配置 (f1, f2, f3)
# ==============================# f1: 监控 producerecord.csv 文件
a1.sources.r1.filegroups.f1 = /data_log/.*producerecord.csv  # 匹配所有 producerecord.csv 文件
# 设置 f1 文件组的头部信息,标识该组文件的类型为 producerecord
a1.sources.r1.headers.f1.headerKey1 = producerecord# f2: 监控 changerecord.csv 文件
a1.sources.r1.filegroups.f2 = /data_log/.*changerecord.csv  # 匹配所有 changerecord.csv 文件
# 设置 f2 文件组的头部信息,标识该组文件的类型为 changerecord
a1.sources.r1.headers.f2.headerKey1 = changerecord# f3: 监控 environmentdata.csv 文件
a1.sources.r1.filegroups.f3 = /data_log/.*environmentdata.csv  # 匹配所有 environmentdata.csv 文件
# 设置 f3 文件组的头部信息,标识该组文件的类型为 environmentdata
a1.sources.r1.headers.f3.headerKey1 = environmentdata# 设置是否包含文件头信息,用于区分文件类型
a1.sources.r1.fileHeader = true  # 如果为 true,将会根据文件头部信息进行处理# ==============================
# 接收器配置 (k1)
# ==============================
# k1: 使用 HDFS 接收器将数据写入 HDFS
a1.sinks.k1.type = hdfs  # 设置接收器类型为 HDFS
# HDFS 路径配置,使用动态路径并根据文件头进行分隔
a1.sinks.k1.hdfs.path = hdfs:///user/test/flumebackup/%Y%m%d/%H/%{headerKey1}  # 目录结构根据日期和文件类型进行分组
# 设置文件前缀
a1.sinks.k1.hdfs.filePrefix = upload-  # 设置上传文件的前缀
# 是否根据时间滚动文件夹
a1.sinks.k1.hdfs.round = true  # 设置为 true 表示会按照时间滚动文件夹
# 设置滚动时间的间隔值
a1.sinks.k1.hdfs.roundValue = 1  # 每 1 个时间单位创建一个新的文件夹
# 设置滚动的时间单位
a1.sinks.k1.hdfs.roundUnit = hour  # 设置滚动的单位为小时
# 是否使用本地时间戳,默认为 UTC 时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true  # 设置为 true 使用本地时间戳# 每积攒多少个 Event 才会刷新到 HDFS
a1.sinks.k1.hdfs.batchSize = 100  # 设置每批次积攒 100 个 Event 才会刷新到 HDFS# 设置文件类型,可以支持压缩
a1.sinks.k1.hdfs.fileType = DataStream  # 设置文件类型为 DataStream# 多少秒后生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60  # 设置文件滚动间隔为 60 秒# 设置每个文件的滚动大小,大概是 128MB
a1.sinks.k1.hdfs.rollSize = 134217700  # 设置文件滚动的大小为 128MB# 文件滚动与 Event 数量无关,设置为 0 即不基于 Event 数量滚动文件
a1.sinks.k1.hdfs.rollCount = 0  # 设置为 0,表示文件滚动不基于 Event 数量# ==============================
# 通道配置 (c1)
# ==============================
# c1: 使用内存通道缓存 Event
a1.channels.c1.type = memory  # 设置通道类型为内存通道
# 设置内存通道的最大容量
a1.channels.c1.capacity = 10000  # 设置通道的最大容量为 10000 个 Event
# 设置事务容量,控制每次事务的最大 Event 数量
a1.channels.c1.transactionCapacity = 10000  # 设置每个事务的最大容量为 10000 个 Event# ==============================
# 源和接收器与通道的绑定
# ==============================
# 将源 r1 绑定到通道 c1
a1.sources.r1.channels = c1  # 源 r1 使用通道 c1
# 将接收器 k1 绑定到通道 c1
a1.sinks.k1.channel = c1  # 接收器 k1 使用通道 c1

       2. 在主节点使用Flume 采集 /data_log 目录下实时日志文件中的数据,将数据存入到 Kafka Topic 中(Topic 名称分别为 ChangeRecord ProduceRecord EnvironmentData ,分区数为 4 ), Flume 采集ChangeRecord。

1. 源 (Source)

  • r1r2r3 都使用了 TAILDIR 类型的源。这种源类型主要用于读取文件中的新增内容(类似于 tail -f 的功能)。

  • 每个源都配置了不同的文件路径和位置文件。位置文件(positionFile)记录了每个文件的读取进度,确保在重新启动时不会丢失数据。

    • r1:读取路径下的 changerecord.csv 文件。
    • r2:读取路径下的 producerecord.csv 文件。
    • r3:读取路径下的 environmentdata.csv 文件。

2. 接收器 (Sink)

  • 每个接收器都被配置为将数据写入 Kafka 集群,分别写入不同的 Kafka topic

    • k1:将 changerecord 数据写入 Kafka topic ChangeRecord
    • k2:将 producerecord 数据写入 Kafka topic ProduceRecord
    • k3:将 environmentdata 数据写入 Kafka topic EnvironmentData
  • 每个接收器的配置包含了 Kafka 集群的地址(kafka.bootstrap.servers)、批量处理大小(flumeBatchSize)和生产者的配置(如写入确认机制 acks 和延迟时间 linger.ms)。

3. 通道 (Channel)

  • 每个源和接收器都通过内存通道进行连接。内存通道是 Flume 的一种通道类型,适用于对性能要求较高的场景。每个通道配置了最大容量和事务容量。

    • c1:用于 r1 到 k1 之间的连接。
    • c2:用于 r2 到 k2 之间的连接。
    • c3:用于 r3 到 k3 之间的连接。

    通道的容量设置为 10,000,表示通道最多可以缓存 10,000 个事件(数据记录)。事务容量也是 10,000,表示一次事务可以处理 10,000 个事件。

4. 源和接收器的通道绑定

  • 源和接收器通过通道进行连接。每个源都与一个特定的通道绑定,接收器也从该通道获取数据。

    • r1 通过通道 c1 与 k1 连接。
    • r2 通过通道 c2 与 k2 连接。
    • r3 通过通道 c3 与 k3 连接。

flume脚本代码:

# 定义源、接收器和通道
a1.sources = r1 r2 r3
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3# ==============================
# 源配置 (r1, r2, r3)
# ==============================# r1: 读取 change record 文件
a1.sources.r1.type = TAILDIR  # 使用 TAILDIR 来源类型,监控文件变化
a1.sources.r1.positionFile = /opt/module/flume/changerecord/taildir_position.json  # 存储文件读取进度的文件
a1.sources.r1.filegroups = f1  # 定义文件组
a1.sources.r1.filegroups.f1 = /data_log/.*changerecord.csv  # 匹配路径下的 changerecord.csv 文件# r2: 读取 produce record 文件
a1.sources.r2.type = TAILDIR  # 使用 TAILDIR 来源类型
a1.sources.r2.positionFile = /opt/module/flume/producerecord/taildir_position.json  # 存储文件读取进度的文件
a1.sources.r2.filegroups = f1  # 定义文件组
a1.sources.r2.filegroups.f1 = /data_log/.*producerecord.csv  # 匹配路径下的 producerecord.csv 文件# r3: 读取 environment data 文件
a1.sources.r3.type = TAILDIR  # 使用 TAILDIR 来源类型
a1.sources.r3.positionFile = /opt/module/flume/environmentdata/taildir_position.json  # 存储文件读取进度的文件
a1.sources.r3.filegroups = f1  # 定义文件组
a1.sources.r3.filegroups.f1 = /data_log/.*environmentdata.csv  # 匹配路径下的 environmentdata.csv 文件# ==============================
# 接收器配置 (k1, k2, k3)
# ==============================# k1: 将 change record 数据写入 Kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  # 使用 KafkaSink 接收器
a1.sinks.k1.kafka.topic = ChangeRecord  # Kafka topic 名称
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092  # Kafka 集群地址
a1.sinks.k1.kafka.flumeBatchSize = 20  # 批量处理的大小
a1.sinks.k1.kafka.producer.acks = 1  # Kafka 写入确认机制,1 表示主分区副本确认
a1.sinks.k1.kafka.producer.linger.ms = 1  # 写入延迟时间,单位毫秒# k2: 将 produce record 数据写入 Kafka
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink  # 使用 KafkaSink 接收器
a1.sinks.k2.kafka.topic = ProduceRecord  # Kafka topic 名称
a1.sinks.k2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092  # Kafka 集群地址
a1.sinks.k2.kafka.flumeBatchSize = 20  # 批量处理的大小
a1.sinks.k2.kafka.producer.acks = 1  # Kafka 写入确认机制
a1.sinks.k2.kafka.producer.linger.ms = 1  # 写入延迟时间,单位毫秒# k3: 将 environment data 数据写入 Kafka
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink  # 使用 KafkaSink 接收器
a1.sinks.k3.kafka.topic = EnvironmentData  # Kafka topic 名称
a1.sinks.k3.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092  # Kafka 集群地址
a1.sinks.k3.kafka.flumeBatchSize = 20  # 批量处理的大小
a1.sinks.k3.kafka.producer.acks = 1  # Kafka 写入确认机制
a1.sinks.k3.kafka.producer.linger.ms = 1  # 写入延迟时间,单位毫秒# ==============================
# 通道配置 (c1, c2, c3)
# ==============================# c1: 内存通道,用于 r1 到 k1
a1.channels.c1.type = memory  # 使用内存通道
a1.channels.c1.capacity = 10000  # 通道最大容量
a1.channels.c1.transactionCapacity = 10000  # 事务容量# c2: 内存通道,用于 r2 到 k2
a1.channels.c2.type = memory  # 使用内存通道
a1.channels.c2.capacity = 10000  # 通道最大容量
a1.channels.c2.transactionCapacity = 10000  # 事务容量# c3: 内存通道,用于 r3 到 k3
a1.channels.c3.type = memory  # 使用内存通道
a1.channels.c3.capacity = 10000  # 通道最大容量
a1.channels.c3.transactionCapacity = 10000  # 事务容量# ==============================
# 源和接收器的通道绑定
# ==============================# 将源 r1 绑定到通道 c1,接收器 k1 也使用 c1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# 将源 r2 绑定到通道 c2,接收器 k2 也使用 c2
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2# 将源 r3 绑定到通道 c3,接收器 k3 也使用 c3
a1.sources.r3.channels = c3
a1.sinks.k3.channel = c3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/33765.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

mmdet 加载预训练模型多卡训练过程中,存在显卡占用显存不均匀

1. 问题描述 基于mmdet https://github.com/open-mmlab/mmdetection代码仓库,修改了自己的检测代码,加载了预训练模型,进行分布式训练。 在训练过程中,出现了显卡的占用显存不均匀的问题。 如图所示,可以看到显卡2 占…

使用ALB将HTTP访问重定向至HTTPS

HTTPS是加密数据传输协议,安全性高。当企业进行HTTPS安全改造后,为了方便用户访问,可以使用ALB在用户无感知的情况下将HTTP访问重定向至HTTPS。 前提条件 您已创建ALB实例,并为该实例添加了HTTP监听和监听端口为443的HTTPS监听。…

力扣92.反转链表Ⅱ

题目描述 题目链接92. 反转链表 II 给你单链表的头指针 head 和两个整数 left 和 right &#xff0c;其中 left < right 。请你反转从位置 left 到位置 right 的链表节点&#xff0c;返回 反转后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], left …

Java版-速通ETL工具中简单的DAG执行实现

DAG作用 在ETL工具中&#xff0c;一般使用DAG图来进行任务的配置&#xff0c;将任务配置在有向无环图中&#xff0c;执行时候从首层节点&#xff0c;依次往下&#xff0c;下层节点的执行依赖于父节点是否执行完毕的状态&#xff0c;当最后一层的节点执行完成之后&#xff0c;整…

Web安全深度剖析

1.Web安全简介 ​ 攻击者想要对计算机进行渗透&#xff0c;有一个条件是必须的&#xff0c;就是攻击者的计算机与服务器必须能够正常通信&#xff0c;服务器与客户端进行通信依靠的就是端口。 ​ 如今的web应该称之为web应用程序&#xff0c;功能强大&#xff0c;离不开四个要…

策略模式的理解和实践

在软件开发中&#xff0c;我们经常遇到需要在不同算法之间进行选择的情况。这些算法可能实现相同的功能&#xff0c;但使用不同的方法或逻辑。为了增强代码的可维护性和可扩展性&#xff0c;我们可以使用设计模式来优化这些算法的实现和管理。策略模式&#xff08;Strategy Pat…

在 Linux 环境下搭建 OpenLab Web 网站并实现 HTTPS 和访问控制

实验要求 综合练习&#xff1a;请给openlab搭建web网站 ​ 网站需求&#xff1a; ​ 1.基于域名[www.openlab.com](http://www.openlab.com)可以访问网站内容为 welcome to openlab!!! ​ 2.给该公司创建三个子界面分别显示学生信息&#xff0c;教学资料和缴费网站&#xff0c…

Java开发利器:IDEA的安装与使用(下)

文章目录 8. 快捷键的使用8.1 常用快捷键8.2 查看快捷键8.3 自定义快捷键8.4 使用其它平台快捷键 9. IDEA断点调试(Debug)9.1 为什么需要Debug9.2 Debug的步骤9.3 多种Debug情况介绍9.3.1 行断点9.3.2 方法断点9.3.3 字段断点9.3.4 条件断点9.3.5 异常断点9.3.6 线程调试9.3.7 …

非对称任意进制转换器(安卓)

除了正常进制转换&#xff0c;还可以输入、输出使用不同的数字符号&#xff0c;达成对数值进行加密的效果 点我下载APK安装包 使用unity开发。新建一个c#代码文件&#xff0c;把代码覆盖进去&#xff0c;再把代码文件添加给main camera即可。 using System.Collections; usin…

神经网络入门实战:(十四)pytorch 官网内置的 CIFAR10 数据集,及其网络模型

(一) pytorch 官网内置的网络模型 图像处理&#xff1a; Models and pre-trained weights — Torchvision 0.20 documentation (二) CIFAR10数据集的分类网络模型&#xff08;仅前向传播&#xff09;&#xff1a; 下方的网络模型图片有误&#xff0c;已做修改&#xff0c;具…

linux 系列服务器 高并发下ulimit优化文档

系统输入 ulimit -a 结果如下 解除 Linux 系统的最大进程数 要解除或提高 Linux 系统的最大进程数&#xff0c;可以修改 ulimit 设置和 /etc/security/limits.conf 文件中的限制。 临时修改 ulimit 设置 可以使用 ulimit 命令来查看和修改当前会话的最大进程数&#xff1a; 查…

Elasticsearch数据迁移(快照)

1. 数据条件 一台原始es服务器&#xff08;192.168.xx.xx&#xff09;&#xff0c;数据迁移后的目标服务器&#xff08;10.2.xx.xx&#xff09;。 2台服务器所处环境&#xff1a; centos7操作系统&#xff0c; elasticsearch-7.3.0。 2. 为原始es服务器数据创建快照 修改elas…

基于 SpringBoot 构建校园失物招领智能平台:优化校园失物处理流程

4系统设计 4.1系统概要设计 本文通过B/S结构(Browser/Server,浏览器/服务器结构)开发的该校园失物招领系统&#xff0c;B/S结构的优点很多&#xff0c;例如&#xff1a;开发容易、强的共享性、便于维护等&#xff0c;只要有网络&#xff0c;用户可以随时随地进行使用。 系统工作…

图解SSL/TLS 建立加密通道的过程

众所周知&#xff0c;HTTPS 是 HTTP 安全版&#xff0c;HTTP 的数据以明文形式传输&#xff0c;而 HTTPS 使用 SSL/TLS 协议对数据进行加密&#xff0c;确保数据在传输过程中的安全。 那么&#xff0c;HTTPS 是如何做到数据加密的呢&#xff1f;这就需要了解 SSL/TLS 协议了。 …

HTTP协议图--HTTP 工作过程

HTTP请求响应模型 HTTP通信机制是在一次完整的 HTTP 通信过程中&#xff0c;客户端与服务器之间将完成下列7个步骤&#xff1a; 建立 TCP 连接 在HTTP工作开始之前&#xff0c;客户端首先要通过网络与服务器建立连接&#xff0c;该连接是通过 TCP 来完成的&#xff0c;该协议…

BurpSuite工具-Proxy代理用法(抓包、改包、放包)

一、Burp Suite 项目管理 二、Proxy&#xff08;代理抓包模块&#xff09; 1. 简要说明 1.1. Intercept&#xff08;拦截&#xff09; 1.2. HTTP History&#xff08;HTTP 历史&#xff09; 1.3. WebSockets History&#xff08;WebSocket 历史&#xff09; 1.4. Options…

前端测试框架 jasmine 的使用

最近的项目在使用AngulaJs,对JS代码的测试问题就摆在了面前。通过对比我们选择了 Karma jasmine ,使用 Jasmine做单元测试 &#xff0c;Karma 自动化完成&#xff0c;当然了如果使用 Karma jasmine 前提是必须安装 Nodejs。 安装好 Nodejs &#xff0c;使用 npm 安装好必要…

Blender均匀放缩模型

解决办法&#xff1a; 首先选中模型&#xff0c;按下“s”键&#xff0c;如下图所示&#xff0c;此时模型根据鼠标的移动放缩 或者在按下“s”后输入数值&#xff0c;再按回车键Enter&#xff0c;模型会根据你该数值进行均匀放缩 指定放大2倍结果——

TCP/IP 协议图--计算机网络体系结构分层

计算机网络体系结构分层 计算机网络体系结构分层 不难看出&#xff0c;TCP/IP 与 OSI 在分层模块上稍有区别。OSI 参考模型注重“通信协议必要的功能是什么”&#xff0c;而 TCP/IP 则更强调“在计算机上实现协议应该开发哪种程序”

hive 行转列

行转列的常规做法是&#xff0c;group bysum(if())【或count(if())】 建表: CREATE TABLE table2 (year INT,month INT,amount DOUBLE );INSERT INTO table2 (year, month, amount) VALUES(1991, 2, 1.2),(1991, 3, 1.3),(1991, 4, 1.4),(1992, 1, 2.1),(1992, 2, 2.2),(1992…