Flume实战--Flume中的选择器、自动容灾(故障转移)、负载均衡的详解与操作

        本文详细介绍了Apache Flume的关键特性,包括选择器、拦截器、故障转移和负载均衡。选择器负责将数据分发到多个Channel,拦截器用于修改或丢弃Event。故障转移机制能够在Sink故障时自动切换,而负载均衡则在多个Sink间分配负载。文章还提供了自定义拦截器的示例,展示了Flume在复杂数据处理中的灵活性和稳定性。

选择器

        当一个Source连接到多个Channel时,选择器决定了数据如何分发到这些Channel。Flume提供了复制选择器和多路复用选择器,允许我们根据需要选择数据分发策略。

一个Source对应多个channel的情况下,多个Channel中的数据是否相同,取决于我们使用了什么选择器,默认是复制选择器。也可以手动的使用多路选择器。

复制选择器

        复制选择器会将数据复制到所有配置的Channel,适用于需要在多个地方处理相同数据的场景。

 配置示例

编写flume脚本,需要一个source,两个channel,以及两个sink

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2
#  avro http  syslogtcp
# avro  avro-client
# http  curl
# syslogtcp  nc 
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 7777#执行选择器类型为复制选择器
a1.sources.r1.selector.type=replicatinga1.channels.c1.type=memory
a1.channels.c2.type=memorya1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

启动这个flume脚本:

flume-ng agent -c ./ -f syslogtcp-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

向bigdata01 中的 7777 端口发送消息:

echo "hello world" | nc bigdata01 7777如果nc命令无法识别,需要安装一下   yum install -y nc

 查看里面的数据发现都一样,说明使用的是复制选择器。

多路复用选择器

        多路复用选择器可以根据数据内容选择性地将数据发送到特定的Channel,适用于根据数据特性进行分流处理的场景。

就是每次发送消息的时候,可以指定发送消息走哪条channel,只有这条channel对应的sink才有数据,其他sink没数据。

 配置示例

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state        #以每个Event的header中的state这个属性的值作为选择channel的依据
a1.sources.r1.selector.mapping.CZ = c1       #如果state=CZ,则选择c1这个channel
a1.sources.r1.selector.mapping.US = c2 c3    #如果state=US,则选择c2 和 c3 这两个channel
a1.sources.r1.selector.default = c4          #默认使用c4这个channel

说明一个小区别:

avro
syslogtcp
http

可以指定一个hostname和端口号

不同的source,我们使用的发送数据的方式是不一样的:
avro-client
nc
curl

curl  是可以模拟发送get 或者 post 请求的。
比如: curl www.baidu.com

 编写脚本:mul.conf

a1.sources = r1  
a1.channels = c1 c2
a1.sinks = s1 s2a1.sources.r1.type= http
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 8888a1.sources.r1.selector.type=multiplexing
# header 跟  mapping 结合在一起,用于发送消息时,指定发送的方向
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
# 发送的消息找不到具体的channel,就走默认的c1
a1.sources.r1.selector.default = c1a1.channels.c1.type=memory
a1.channels.c2.type=memorya1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.useLocalTimeStamp=truea1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=/flume/%Y-%m-%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.useLocalTimeStamp=truea1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2

 启动该脚本:

flume-ng agent -c ./ -f mul.conf -n a1 -Dflume.root.logger=INFO,console

 模拟http请求:

curl -X POST -d '[{"headers":{"state":"USER"},"body":"this my multiplex to c1"}]' http://bigdata01:8888
curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://bigdata01:8888

效果就是,当我发送一条指令的时候,走state=USER的路径,只生成一个文件,走另一条路才会生成另一个不同的文件。

 

自动容灾(故障转移)

        Flume的故障转移机制允许在一个Sink组中的多个Sink之间自动切换,确保数据的持续处理,即使某个Sink出现故障。

多个sink组成一个组,这个组内的sink只有一台工作,假如这一台坏了,另一台自动的工作。

为了演示这个效果,我使用了三个Agent.模型如下:

在bigdata02和bigdata03上安装flume

在集群中可以使用脚本
xsync.sh /opt/installs/flume/
xsync.sh /etc/profile
xcall.sh source /etc/profile也可以使用长拷贝命令,例如:
scp -r /opt/installs/flume1.9.0/ root@hadoop11:/opt/installs/
# 因为 /etc/hosts 文件中没有配置映射,所以使用ip代替了
scp -r /opt/installs/flume1.9.0/ root@192.168.52.12:/opt/installs/scp -r /etc/profile root@hadoop11:/etc
scp -r /etc/profile root@192.168.52.12:/etc两个虚拟机需要刷新配置文件
source /etc/profile

在bigdata01上,编写flume脚本:

failover.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
#  此处是设置的权重,权重越多,就工作
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000

启动该脚本:

flume-ng agent -c ../conf -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata02上,编写 failover2.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata02
a1.sources.r1.port = 10087# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover2.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata03上,编写 failover3.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1# source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigdata03
a1.sources.r1.port = 10088# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = logger

启动flume脚本:

flume-ng agent  -f ./failover3.conf -n a1 -Dflume.root.logger=INFO,console

bigdata02和03启动无异常,再启动01上的脚本:

flume-ng agent -f ./failover.conf -n a1 -Dflume.root.logger=INFO,console

在bigdata01上,发送消息:

echo "wei,wei,wei" | nc hadoop10 10086

发现bigdata02有反应,出现了消息,而bigdata03无反应。因为bigdata02权重大,需要工作

测试故障转移,将bigdata02停掉,再在bigdata01上发消息,就发现bigdata03收到消息了,故障转移了。

负载均衡

        负载均衡机制允许Flume在多个Sink之间平衡数据负载,提高数据处理的效率和可靠性。

演示一下:

bigdata01中创建balance.conf

#list names
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1# source
a1.sources.r1.type = syslogtcp
a1.sources.r1.host = bigdata01
a1.sources.r1.port = 10086# channel
a1.channels.c1.type = memory# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 10087a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata03
a1.sinks.k2.port = 10088#设置sink组
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random

bigdata02,bigdata03 不变,服务必须是启动的。

使用我们的nc命令发送请求,发现bigdata02和bigdata03随机的处理请求的数据。

  flume轮训是每隔一段时间轮训,而不是每秒轮训一次。所以可能多条在同一时间间隔的events都被一个输出到一个sink端。

练习

需求

编写一个Flume配置,实现以下需求:

 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。

数据

{"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}
{"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}
{"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-02 12:02:00", "level": "WARN", "message": "Warn occurred!"}
{"timestamp": "2023-04-03 12:02:00", "level": "WARN", "message": "Warn occurred!"}

代码

package com.bigdata;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;public class ETLInterceptor implements Interceptor {/*** {"timestamp": "2023-03-01 12:00:00", "level": "INFO", "message": "This is an info message."}* {"timestamp": "2023-03-31 12:01:00", "level": "ERROR", "message": "Error occurred!"}* {"timestamp": "2023-04-01 12:02:00", "level": "WARN", "message": "Warn occurred!"}** 抽取data.json文件中的数据,只保留Warn以及Error级别的日期,并且timestamp只保留4月1日以后的数据。*/@Overridepublic void initialize() {}// 这个方法是核心方法,可以处理一条,就可以循环处理多条@Overridepublic Event intercept(Event event) {// byte[] --> StringString json = new String(event.getBody());/*JSONObject jsonObject = JSON.parseObject(json);String timestamp = jsonObject.getString("timestamp");String level = jsonObject.getString("level");String message = jsonObject.getString("message");*/Log log = JSON.parseObject(json, Log.class);System.out.println(log);// 日期比较  after  before   --> Calendar 、Date、DateLocal// String --> DateSimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");try {Date date = dateFormat.parse(log.getTimestamp());Date date2 = dateFormat.parse("2023-03-31");if((log.getLevel().equals("WARN") || log.getLevel().equals("ERROR"))  &&  date.after(date2)){System.out.println("符合条件的数据......");return event;}} catch (ParseException e) {throw new RuntimeException(e);}return null;}@Overridepublic List<Event> intercept(List<Event> list) {ArrayList<Event> events = new ArrayList<>();// 任何一个list ,都不能边循环,边添加或者删除,否则报错!!!// 如何过滤掉不符合条件的数据,先将其置为null,然后在循环中,将null的数据不添加到集合中,就过滤掉了for (Event oldEvent : list) {Event newEvent = intercept(oldEvent);if(newEvent != null){events.add(newEvent);}}return events;}@Overridepublic void close() {}public static class EventBuilder implements Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}

 视频链接

15-flume中的选择器_哔哩哔哩_bilibili

16-flume的故障转移_哔哩哔哩_bilibili

17-flume中的负载均衡_哔哩哔哩_bilibili

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1550949.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

CANoe_DBC能够打开但是无法使用“BusType”

解决DBC文件在CAPL中调用问题&#xff1a;从CANdb到CAPL的顺畅过渡 在汽车电子和嵌入式系统开发中&#xff0c;DBC&#xff08;Database CAN&#xff09;文件作为描述CAN&#xff08;Controller Area Network&#xff09;通信协议的重要工具&#xff0c;广泛应用于网络设计、测…

工作日志:ruoyi-vue-plus echarts根据窗口大小变化

1、echarts根据窗口大小变化。 onMounted(() > {// 折线图type EChartsOption echarts.EChartsOption;var chartDom document.getElementById(chartDom)!;var myChart echarts.init(chartDom);var option: EChartsOption;option {grid: {left: 35,top: 10,bottom: 30,r…

jenkins部署Maven和NodeJS项目

在 Java 项目开发中&#xff0c;项目的编译、测试、打包等是比较繁琐的&#xff0c;属于重复劳动的工作&#xff0c;浪费人力和时间成本。以往开发项目时&#xff0c;程序员往往需要花较多的精力在引用 jar 包搭建项目环境上&#xff0c;跨部门甚至跨人员之间的项目结构都有可能…

1.8 软件业务测试

欢迎大家订阅【软件测试】 专栏&#xff0c;开启你的软件测试学习之旅&#xff01; 文章目录 前言1 概述2 方法3 测试策略4 案例分析 前言 在软件开发生命周期中&#xff0c;业务测试扮演着至关重要的角色。本文详细讲解了业务测试的定义、目的、方法以及测试策略。 本篇文章参…

C++队列、双向队列

前言 C算法与数据结构 打开打包代码的方法兼述单元测试 队列 队列&#xff08;Queue&#xff09;是一种基本的线性数据结构&#xff0c;它遵循先进先出&#xff08;First In First Out, FIFO&#xff09;的原则。这意味着最先被添加到队列中的元素将会是最先被移除的。和生活…

命令回显echo

命令回显 通常&#xff0c;make在执行命令行之前会把要执行的命令行进行输出。我们称之为“回显”&#xff0c;就好像我们输入命令执行一样。 如果要执行的命令行以字符“”开始&#xff0c;则make在执行时这个命令就不会被回显。典型的用法是我们在使用“echo”命令输出一些信…

Github 2024-09-29 php开源项目日报 Top10

根据Github Trendings的统计,今日(2024-09-29统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目10Blade项目1Java项目1ASP项目1Coolify: 开源自助云平台 创建周期:1112 天开发语言:PHP, Blade协议类型:Apache License 2.0Star数量…

Java多线程几个哈希表的区别

HashMap 首先HashMap肯定是不行的,并没有加解锁操作,一旦多线程同时写的话,直接就会发生覆盖之类的操作 排除HashMap先,主要对比HashTable和ConcurrentHashMap HashTable vs ConcurrentHashMap 1. 加锁粒度不同 HashTable HashTable是对整个哈希表进行加锁操作,任何增删改查操…

数据结构串的kmp相关(求next和nextval)

傻瓜版&#xff0c;用来演示手算过程&#xff0c;个人理解用的&#xff0c;仅供参考。

CICD Jenkins实现Pipline

一、安装 1、由于 Jenkins 是基于 Java 的&#xff0c;首先需要确保你的系统中安装了 Java。推荐使用 OpenJDK 11。可以通过以下命令安装&#xff1a; apt update apt install openjdk-11-jdk2、在安装 Jenkins 之前&#xff0c;你需要将其仓库添加到你的系统中。首先&#x…

DotNetty ChannelRead接收数据为null

问题&#xff1a;C#使用Dotnetty和Java netty服务器通讯&#xff0c;结果能正确发送数据到服务器&#xff0c;却始终接收不到服务器返回的数据。 解决&#xff1a;一定一定要注意服务器和客户端使用的编码一定要完全一样才行 我先前在客户端添加了StringDecoder,服务器却没有…

【Spring Boot 入门一】构建你的第一个Spring Boot应用

一、引言 在当今的软件开发领域&#xff0c;Java一直占据着重要的地位。而Spring Boot作为Spring框架的延伸&#xff0c;为Java开发者提供了一种更加便捷、高效的开发方式。它简化了Spring应用的搭建和配置过程&#xff0c;让开发者能够专注于业务逻辑的实现。无论是构建小型的…

8.12 矢量图层面要素单一符号使用八(随机标记填充)

8.12 矢量图层面要素单一符号使用八(随机标记填充)_qgis随机填充-CSDN博客 目录 前言 随机标记填充&#xff08;Random Marker Fill&#xff09; QGis设置面符号为随机标记填充&#xff08;Random Marker Fill&#xff09; 二次开发代码实现随机标记填充&#xff08;Rando…

《低空经济:文旅行业的新引擎 》

《低空经济&#xff1a;文旅行业的新引擎 》 一、低空经济与文旅行业的融合态势 低空经济作为新兴经济形态&#xff0c;正与文旅行业深度融合&#xff0c;为文旅发展带来新机遇。 近年来&#xff0c;随着科技的不断进步和人们对旅游体验的不断追求&#xff0c;低空经济与文旅…

Java面试常见问题总结

Java基础 Java 中的⼏种基本数据类型是什么&#xff1f;对应的包装类型是什么&#xff1f;各⾃占⽤多少字节呢&#xff1f; Java 中有 8 种基本数据类型&#xff0c;分别为&#xff1a; 6 种数字类型&#xff1a; 4 种整数型&#xff1a;byte、short、int、long2 种浮点型&a…

elasticsearch基础知识、go如何操作elasticsearch

【单元目标】 什么是elasticsearch&#xff1f; elasticsearch Analysis(分词器)概念及使用 go实现elasticsearch 搜索封装 【教学内容】 1. 什么是elasticsearch&#xff1f; Elasticsearch 是一个实时的分布式存储、搜索、分析的引擎。 Elasticsearch is a real-time, …

在Windows上使用谷歌浏览器的安全支付功能

在使用谷歌浏览器进行在线支付时&#xff0c;确保您的交易安全至关重要。本文将为您提供详细的步骤&#xff0c;帮助您在Windows系统上启用和使用谷歌浏览器的安全支付功能。 &#xff08;本文由https://www.chromexz.com.cn/站点的作者进行编写&#xff0c;转载时请进行标注。…

Unity 代码裁剪(Strip Engine Code)

文章目录 0.IL2CPP 打包运行闪退问题1.什么是代码裁剪2.为什么要使用代码裁剪3.代码裁剪设置与级别4.强制保留代码4.1 使用[Preserve]标签4.2 使用Link.xml文件 5.Strip中遇到的问题及解决方法6.注意事项 0.IL2CPP 打包运行闪退问题 Google Play要求从2019年8月1日起apk必须支…

《后端程序猿 · Spring事务失效场景》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

【2025】springboot基于微信小程序记账本的设计与实现(源码+文档+调试+答疑)

文章目录 前言一、主要技术&#xff1f;二、项目内容1.整体介绍&#xff08;示范&#xff09;2.运行截图3.系统测试 总结更多项目 前言 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;记账本小…