Flume入门到实践--Flume的安装与基础概念与安装实战

        在当今大数据时代,有效管理和处理海量日志数据对于企业获取洞察和保持运营效率至关重要。为此目的设计的强有力工具之一是 Apache Flume。本文将带您探索Flume,了解其功能、安装方法以及一些实际用例,以展示其在处理日志数据方面的有效性。

Flume简介

        Apache Flume 是一个分布式、可靠且可用的服务,旨在高效地收集、聚合和移动来自各种来源的大量日志数据到集中式数据存储。它在处理管道中扮演着关键角色,充当数据流的导管。

        Flume 常被形象地比喻为“水管”,一端连接着数据源(水源),另一端连接着数据存储(水桶)。它特别适合于实时日志数据的收集与传输。

Flume的核心组件

  1. Agent:Flume的基本工作单元,每个Flume配置文件可以包含一个或多个Agent。
  2. Source:数据的来源,可以是日志文件、网络端口等。
  3. Channel:数据的通道,用于暂存从Source到Sink的数据。
  4. Sink:数据的目的地,可以是HDFS、HBase或其他存储系统。
  5. Event:Flume中数据的基本单位。

Flume的关键特性

  1. 可靠的数据处理: Flume确保在传输过程中不会丢失数据,提供端到端的可靠性。
  2. 持久性: 它使用通道支持持久化存储,确保在故障情况下数据不会丢失。
  3. 灵活性: Flume可以通过各种源、通道和接收器进行定制,以适应特定的数据流需求。
  4. 可扩展性: 它旨在随着数据量的增加而扩展,适合大规模数据处理。

安装和配置

通过网盘分享的文件

apache-flume-1.9.0-bin.tar.gz

安装Flume

Flume的安装相对简单,以Flume 1.9.0版本为例

下载并解压

把安装包拉到/opt/modules目录下

进入/opt/modules目录下

tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/

进入/opt/installs目录下

mv apache-flume-1.9.0-bin/ flume

配置环境变量

进入/etc/profile配置环境变量

export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin

使用命令配置环境变量

echo 'export FLUME_HOME=/opt/installs/flume' >> /etc/profile
echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile

刷新环境变量

source /etc/profile

修改配置文件

在/opt/installs/flume/conf路径下复制并修改flume-env.sh文件。

cp flume-env.sh.template flume-env.sh

 修改 JAVA_HOME 的路径为自己的 jdk 路径。

export JAVA_HOME=/opt/installs/jdk

Flume的数据模型

Flume支持单一数据流和多数据流模型,可以根据实际需求灵活配置。

单一数据流模型

单一数据流模型包含一个Agent,适用于简单的日志收集场景。

多数据流模型

多数据流模型可以包含多个Agent,Agent之间可以进行数据的流转和处理。

Flume的使用

        Flume的使用主要依赖于配置文件,通过定义Source、Channel和Sink的组件及其关系来实现数据的流动。

编写 conf文件

flume 的使用是编写 conf文件的,运行的时候指定该文件

# 定义组件的名字
<Agent>.sources = <Source>
a1.sources=s1
<Agent>.channels = <Channel1> <Channel2>
a1.channels=c1
<Agent>.sinks = <Sink>
a1.sinks=sink1# 设置source 和 channel 之间的关系
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
a1.sources.s1.channels=c1# 设置sink 和 channel 之间的关系
<Agent>.sinks.<Sink>.channel = <Channel1>
a1.sinks.sink1.channel=c1先定义agent的名字,再定义agent中三大组件的名字
接着定义各个组件之间的关联关系
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

常见的组件

 参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了

常见的Source 

常见的channel

常见的sink

总结:Kafka 可以充当flume中的各个组件。三个组件可以两两组合在一起,所以使用场景非常的多。

案例展示

1.)Avro+Memory+Logger(Avro + 内存 + 日志)

此设置适用于演示目的。它监听指定端口,通过内存收集数据,并将其记录到控制台。

先找source 中的avro看需要设置什么参数

#编写s1的类型是什么
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.32.128
a1.sources.s1.port = 4141
a1.sources.s1.channels = c1

找到channel中的memory类型,再设置一下

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#source 或者 sink 每个事务中存取 Event 的操作数量
a1.channels.c1.transactionCapacity = 10000

接着查找sink,sink的类型是logger

a1.sinks.s2.channel = c1
a1.sinks.s2.type = logger

最终合并起来的文件就是:

配置

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  avro-memory-log.conf

将配置文件的内容拷贝进去

  启动命令

flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console

 -c  后面跟上 配置文件的路径
-f  跟上自己编写的conf文件
-n  agent的名字
-Dflume.root.logger=INFO,console   INFO 日志输出级别  Debug,INFO,warn,error 等

接着向端口中发送数据:

flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt

给avro发消息,使用avro-client

flume是没有运行结束时间的,它一直监听某个Ip的端口,有消息就处理,没消息,就等着,反正不可能运行结束。

如果想停止,可以使用ctrl + c 终止flume。

2)Exec + Memory + HDFS(执行命令 + 内存 + HDFS)

此配置用于持续监控日志文件,并将它们存储在HDFS中。

配置

以下版本演示的是没有时间语义的案例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hivedata/arr1.txt
a1.sources.r1.channels = c1a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/event/

接着我们演示hdfs文件中含有时间转义字符怎么办?

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txta1.sources.r1.channels = c1a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间,否则会报时间戳是null的异常
a1.sinks.k1.hdfs.useLocalTimeStamp=true

在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  exec-memory-hdfs.conf

将配置文件的内容拷贝进去

 启动命令

flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

假如hdfs中有时间转义字符,必须给定时间,给定时间有两种方式,要么通过header 传递一个时间,要么使用本地时间。

假如hdfs中使用了时间转义字符,配置文件中必须二选一:
1)useLocalTimeStamp=true
2)使用时间戳拦截器

时间需要转义,没有时间无法翻译为具体的值  %d 就无法翻译为 日期

实现不断的向a.txt中存入数据的效果

echo "Hello World" >> a.txt

3)Spool +File + HDFS(Spooldir + 文件 + HDFS)

此设置非常适合处理包含多个不断更新的日志文件的目录。

配置

a1.channels = ch-1
a1.sources = src-1a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /home/scripts/
a1.sources.src-1.fileHeader = truea1.channels.ch-1.type = filea1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = ch-1
a1.sinks.k1.hdfs.path = /flume/

在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  Spool_File_hdfs.conf

将配置文件的内容拷贝进去

 启动命令

flume-ng agent -c ./ -f Spool_File_hdfs.conf -n a1 -Dflume.root.logger=INFO,console

数据采集的意思:不管是一个文件还是一个文件夹,数据都是不断产生的,特别是生产环境下更是如此。

以上的采集只能采集到文件夹中是否有新的文件产生,不能采集变化的文件。

抽取一个文件夹中的所有文件,子文件夹中的文件是不抽取的,抽取过的文件,数据发生了变化,也不会再抽取一次。

假如你的channel是 file类型,必定会有临时文件产生,产生的文件在

总结:

channel 一般常用的只要两个,一个是memory,一个是file ,最高效的是memory ,也是最常用的。

 

4)TailDir+Memory+HDFS(TailDir+日志+HDFS)

这个案例演示了如何监控文件内容的变化,将变化的内容实时上传至HDFS。

配置

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs

在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件

进入后创建  taildir_memory_hdfs.conf

将配置文件的内容拷贝进去

启动命令

flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console

 数据不断的发生变化,每发生一次变化,就传递一次,源源不断的抽取出来。

刚才为什么抽取了那么多个文件还没有抽取完?
由于我们刚才的一些数据非常的大,根据如下参数可以疯狂创建文件:
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize  1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)

再次抽取,发现不抽了,原因是有一个文件记录了以前抽取过的内容,将其删除:
rm -rf /root/.flume/taildir_position.json

修改1.txt 里面的内容,flume会再次抽取数据
echo "hello" >> 1.txt

视频链接

01-flume的介绍_哔哩哔哩_bilibili

02-flume的安装_哔哩哔哩_bilibili

03-flume的数据模型_哔哩哔哩_bilibili

04-flume中的conf的语法_哔哩哔哩_bilibili

05-avro+mem+log的演示_哔哩哔哩_bilibili

06-vi和touch创建文件后有何不同_哔哩哔哩_bilibili

07-exec+mem+hdfs的演示_哔哩哔哩_bilibili

08-spool+file+hdfs_哔哩哔哩_bilibili

09-taildir+mem+hdfs_哔哩哔哩_bilibili

结论

        Apache Flume是一个多功能工具,它简化了收集、聚合和移动大量日志数据的过程。其在配置上的灵活性和数据处理的稳健性使其成为处理大数据的理想选择。无论您是在实时监控日志文件还是在多个源中聚合数据,Flume都提供了一个可扩展且可靠的解决方案。本文介绍了Flume的基础概念、安装配置以及几个典型的使用案例,希望能够帮助读者更好地理解和使用Flume。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1548846.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

逻辑回归(中):数学公式学习笔记 LaTeX 版

背景 闲来无事翻了一下之前买的一个机器学习课程及之前记录的网络笔记&#xff0c;发现遇到公式都是截图&#xff0c;甚至是在纸上用笔推导的。重新整理一遍之前逻辑回归函数的学习笔记&#xff0c;主要是为了玩一下 LaTex 语法&#xff0c;写公式挺有意思的。 整理之前三篇笔…

机器学习-SVM

线性感知机分类 支持向量机 线性感知机&#xff08;Perceptron&#xff09; 感知机是线性二值分类器。 注意&#xff1a;什么是线性&#xff1f;线性分割面就是&#xff0c;就是在分割面中&#xff0c;任意两个的连线也在分割面中&#xff0c;这个分割面&#xff0c;就是线…

debian linux 只安装mysql client

查询系统版本 执行cat /etc/os-release 可以看到是debian11 rootservice-headquarters-hg-self-data-report-844ccf78b-6ls7t:/mysql#cat /etc/os-release PRETTY_NAME"Debian GNU/Linux 11 (bullseye)" NAME"Debian GNU/Linux" VERSION_ID"11&quo…

DOM元素导出图片与PDF:多种方案对比与实现

背景 在日常前端开发中&#xff0c;经常会有把页面的 DOM 元素作为 PNG 或者 PDF 下载到本地的需求。例如海报功能&#xff0c;简历导出功能等等。在我们自家的产品「代码小抄」中&#xff0c;就使用了 html2canvas 来实现代码片段导出为图片&#xff1a; 是不是还行&#xff…

【STM32】SPI回顾

一、定义 SPI是Motorola首先提出的全双工四线同步串行外围接口&#xff0c;采用主从模式&#xff08;Master-Slave&#xff09;架构。 二、单机与多机通信 4线SPI器件有四个信号&#xff1a;时钟(SPI CLK, SCLK)、主机输出从机输入(MOSI)、主机输入从机输出(MISO)、片选(CS/N…

简单理解C++在C的基础上的改变

1.C语言的一些不足 我们首先看下面用C语言实现栈 #include<stdio.h> #include<assert.h> #include<stdlib.h> typedef int StackDateType; typedef struct Stack {StackDateType* _ps;size_t _size;size_t _capacity; }Stack; void StackInit(Stack* ps) {…

Qt_网络编程

目录 1、Qt的UDP Socket 1.1 用Udp实现服务器 1.2 用Udp实现客户端 2、Qt的TCP Socket 2.1 用Tcp实现服务器 2.2 用Tcp实现客户端 3、Qt的HTTP 3.1使用Qt的HTTP 结语 前言&#xff1a; 网络协议是每个平台都必须遵守的&#xff0c;只是不同的平台所提供的网络API不…

工业缺陷检测——Windows 10本地部署AnomalyGPT工业缺陷检测大模型

0. 引言 在缺陷检测中&#xff0c;由于真实世界样本中的缺陷数据极为稀少&#xff0c;有时在几千甚至几万个样品中才会出现一个缺陷数据。因此&#xff0c;以往的模型只需在正常样本上进行训练&#xff0c;学习正常样品的数据分布。在测试时&#xff0c;需要手动指定阈值来区分…

实现语音合成的三种方法:HTML5 Web Speech 、speak-tts、百度语音合成

1. 使用HTML5 Web Speech API 1.1 使用方法 window.speechSynthesis 是HTML5 Web Speech API的一部分&#xff0c;是浏览器原生提供的文本转语音功能。它允许开发者在网页上通过JavaScript调用&#xff0c;将文本转换为语音进行播放。 https://developer.mozilla.org/zh-CN/d…

渗透测试--文件上传常用绕过方式

文件上传常用绕过方式 1.前端代码&#xff0c;限制只允许上传图片。修改png为php即可绕过前端校验。 2.后端校验Content-Type 校验文件格式 前端修改&#xff0c;抓取上传数据包&#xff0c;并且修改 Content-Type 3.服务端检测&#xff08;目录路径检测&#xff09; 对目…

LMDeploy 量化部署实践

任务 使用结合W4A16量化与kv cache量化的internlm2_5-1_8b-chat模型封装本地API并与大模型进行一次对话 复现过程 按照教材安装环境。https://github.com/InternLM/Tutorial/blob/camp3/docs/L2/LMDeploy/readme.md 使用LMDeploy部署原版的1.8b大模型&#xff0c;占用显存2…

Centos怎么执行脚本

方法一&#xff1a;切换到shell脚本所在的目录&#xff08;此时&#xff0c;称为工作目录&#xff09;执行shell脚本 cd /data/shell ./hello.sh 方法二&#xff1a;以绝对路径的方式去执行bash shell脚本 /data/shell/hello.sh 方法三&#xff1a;直接使用bash 或sh 来执行…

Kubernetes深入详解(一)

目录 第一部分 K8s概念和架构 1、k8s概述和特性 2、K8s架构组件 3、k8s核心概念 第二部分 从零搭建k8s集群 1、搭建k8s环境平台规划 2、服务器硬件配置要求 3、搭建k8s集群部署方式 (1) 基于客户端工具kubeadm 1、安装Docker 2、添加阿里云YUM软件源 3、安 装kubea…

代码随想录Day 58|拓扑排序、dijkstra算法精讲,题目:软件构建、参加科学大会

提示&#xff1a;DDU&#xff0c;供自己复习使用。欢迎大家前来讨论~ 文章目录 图论part08**拓扑排序精讲**题目&#xff1a;117. 软件构建拓扑排序的背景解题思路&#xff1a;模拟过程 **dijkstra&#xff08;朴素版&#xff09;精讲**题目&#xff1a;47. 参加科学大会解题思…

腾讯特效 SDK

腾讯云视立方腾讯特效 SDK&#xff08;Tencent Effect&#xff09;是音视频终端 SDK &#xff08;腾讯云视立方&#xff09;的子产品 SDK 之一&#xff0c;提供美颜特效功能。基于优图精准的 AI 能力和天天 P 图丰富的实时特效处理&#xff0c;为各类视频处理场景提供丰富的产品…

SpringCloud-Netflix第一代微服务快速入门

1.springCloud常用组件 Netflix Eureka 当我们的微服务过多的时候&#xff0c;管理服务的通信地址是一个非常麻烦的事情&#xff0c;Eureka就是用来管理微服务的通信地址清单的&#xff0c;有了Eureka之后我们通过服务的名字就能实现服务的调用。 Netflix Ribbon\Feign : 客…

卫星导航定位原理学习(三)

GNSS信号体制及其性能分析 GNSS信号体制直接影响卫星导航系统的性能&#xff0c;是卫星导航系统设计的重要内容。卫星导航信号体制主要包括信号频率、信号结构、导航电文3部分。其中信号结构又包括调制波形、频率带宽、扩频码码长、码速率、码结构、信号功率等内容。导航电文设…

8086介绍

内部结构 执行部件EU&#xff08;Execution Unit&#xff09; 包含运算器、通用寄存器组、EU控制单元。 只负责控制&#xff0c;不和外部总线打交道 总线接口部件BIU&#xff08;Bus Interface Unit&#xff09; 包含指令队列缓冲器、16位指令指针寄存器IP、16位段寄存器&am…

【L波段差分干涉SAR卫星(陆地探测一号01组)】

L波段差分干涉SAR卫星&#xff08;陆地探测一号01组&#xff09; L波段差分干涉SAR卫星&#xff08;陆地探测一号01组&#xff09;是我国自主研发的重要卫星系统&#xff0c;以下是对该卫星的详细介绍&#xff1a; 一、基本信息 卫星组成&#xff1a;陆地探测一号01组由A星…

全网最适合入门的面向对象编程教程:53 Python字符串与序列化-字符串与字符编码

全网最适合入门的面向对象编程教程&#xff1a;53 Python 字符串与序列化-字符串与字符编码 摘要&#xff1a; 在 Python 中&#xff0c;字符串是文本的表示&#xff0c;默认使用 Unicode 编码&#xff0c;这允许你处理各种字符集&#xff0c;字符编码是将字符转换为字节的规则…