Flume1.9.0自定义拦截器

需求

1、在linux日志文件/data/log/moreInfoRes.log中一直会产生如下JSON数据:

{"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"}
{"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"}
{"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}

 2、需要根据数据中的type字段分目录存储,,并且还要对type字段的值进行一定的处理,最终处理之后的数据需要存储到HDFS上的/moreInfoRes目录中。例如:

  • ​type:video_info 类型的数据需要存储到 /moreInfoRes/videoInfo 目录里面。

  • ​type:user_info 类型的数据需要存储到 /moreInfoRes/userInfo 目录里面。

  • ​type:gift_record 类型的数据需要存储到 /moreInfoRes/giftRecord 目录里面。

3、这边拦截器用 Search and Replace Interceptor + Regex Extractor Interceptor 可以实现,但是这边使用前者的话效率有点低,故采用 自定义Interceptor + Regex Extractor Interceptor 实现。

实现

鉴于此,可以使用 Exec Source + Custom Interceptor + Regex Extractor Interceptor + File Channel + HDFS Sink 来实现。官方文档如下:

Exec Source:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceCustom Interceptor:
可参考其他 Interceptor 的实现Regex Extractor Interceptor:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#regex-extractor-interceptorFile Channel:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelHDFS Sink:
https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#hdfs-sink

创建工程

引入依赖

主要是 flume-ng-core 和 jackson 依赖,其他可不引入。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flume-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.4</version></dependency>
<!--        <dependency>-->
<!--            <groupId>com.alibaba</groupId>-->
<!--            <artifactId>fastjson</artifactId>-->
<!--            <version>2.0.25</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>cn.hutool</groupId>-->
<!--            <artifactId>hutool-core</artifactId>-->
<!--            <version>5.8.27</version>-->
<!--        </dependency>--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.10</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies></project>

编写 Custom Interceptor 

package com.example.flumedemo.interceptor;import com.example.flumedemo.constant.OptType;
import com.example.flumedemo.util.JsonUtil;
import com.example.flumedemo.util.NamingCaseUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.SearchAndReplaceInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Map;/*** 自定义拦截器,* 将字段的值按照指定操作转换,得到的结果替换原来的值。** @author liaorj* @date 2024/11/13*/
public class MyInterceptor implements Interceptor {private static final Logger logger = LoggerFactory.getLogger(SearchAndReplaceInterceptor.class);/*** json中需要处理的字段*/private final String jsonField;/*** 需要对字段的值进行什么操作*/private final String optType;private final Charset charset;public MyInterceptor(String jsonField, String optType, Charset charset) {this.jsonField = jsonField;this.optType = optType;this.charset = charset;}@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {try {logger.info("----event={}", JsonUtil.toJson(event));if (null == event || ArrayUtils.isEmpty(event.getBody())) {logger.info("----event or body is null");return event;}//将body转为map对象Map<String, Object> jsonObject;jsonObject = JsonUtil.toBean(event.getBody(), new TypeReference<Map<String, Object>>() {});logger.info("----jsonObject={}", jsonObject);//获取jsonField的值logger.info("----jsonField={}", this.jsonField);Object value = jsonObject.get(this.jsonField);logger.info("----jsonFieldValue={}", value);if (jsonObject.containsKey(this.jsonField)) {logger.info("----containsKey");} else {logger.info("----not containsKey");}logger.info("----jsonObject.keySet={}", jsonObject.keySet());if (jsonObject.keySet().contains(this.jsonField)) {logger.info("----keySet containsKey");} else {logger.info("----keySet not containsKey");}//如果含有下划线if (null != value) {String newValue = null;logger.info("-----opt={},code={}", this.optType, OptType.toCamelCase.getCode());logger.info("----opt equals={}", OptType.toCamelCase.getCode().equals(this.optType));if (OptType.toCamelCase.getCode().equals(this.optType)) {//将下划线字符串转为驼峰newValue = NamingCaseUtil.toCamelCase(value.toString());logger.info("----newValue={}", newValue);} else if (OptType.toKebabCase.getCode().equals(this.optType)) {//hutool和fastjson的类本地跑可以,上环境却用不了,执行到相关类就没有日志了,可能包冲突了,暂不用。
//                    newValue = NamingCase.toKebabCase(value.toString());} else if (OptType.toPascalCase.getCode().equals(this.optType)) {
//                    newValue = NamingCase.toPascalCase(value.toString());} else if (OptType.toUnderlineCase.getCode().equals(this.optType)) {newValue = NamingCaseUtil.toUnderlineCase2(value.toString());} else {newValue = value.toString();}//替换原来的值logger.info("----newValue2={}", newValue);jsonObject.put(this.jsonField, newValue);logger.info("----jsonObject2={}", jsonObject);event.setBody(JsonUtil.toJson(jsonObject).getBytes(charset));}} catch (Exception e) {throw new RuntimeException(e);}return event;}@Overridepublic List<Event> intercept(List<Event> events) {Iterator var2 = events.iterator();while (var2.hasNext()) {Event event = (Event) var2.next();this.intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {private String jsonField;private String optType;private Charset charset;public Builder() {this.charset = Charsets.UTF_8;}@Overridepublic void configure(Context context) {String jsonObjField = context.getString("jsonField");Preconditions.checkArgument(!StringUtils.isEmpty(jsonObjField), "Must supply a valid jsonField (may not be empty)");this.jsonField = jsonObjField;String optType = context.getString("optType");Preconditions.checkArgument(!StringUtils.isEmpty(optType), "Must supply a valid opt (may not be empty)");this.optType = optType;if (context.containsKey("charset")) {// May throw IllegalArgumentException for unsupported charsets.charset = Charset.forName(context.getString("charset"));}}@Overridepublic Interceptor build() {Preconditions.checkNotNull(this.jsonField, "jsonField required");Preconditions.checkNotNull(this.optType, "opt required");return new MyInterceptor(this.jsonField, this.optType, this.charset);}}/*public static void main(String[] args) {String str = "{\"send_id\":\"834688818270961664\",\"good_id\":\"223\",\"video_id\":\"14943443045138661356\",\"gold\":\"10\",\"timestamp\":1494344574,\"type\":\"gift_record\"}";MyInterceptor myInterceptor = new MyInterceptor("type", "toCamelCase", Charsets.UTF_8);Event event = new SimpleEvent();event.setBody(str.getBytes(StandardCharsets.UTF_8));Event result = myInterceptor.intercept(event);System.out.println(JsonUtil.toJson(JsonUtil.toBean(result.getBody(), new TypeReference<Map<String, Object>>() {})));}*/
}
package com.example.flumedemo.constant;/*** @author liaorj* @date 2024/11/13*/
public enum OptType {//将下划线方式命名的字符串转换为驼峰式。toCamelCase("toCamelCase"),//将驼峰式命名的字符串转换为短横连接方式。toKebabCase("toKebabCase"),//将下划线方式命名的字符串转换为帕斯卡式。toPascalCase("toPascalCase"),//将驼峰式命名的字符串转换为下划线方式toUnderlineCase("toUnderlineCase");private String code;OptType(String code) {this.code = code;}public String getCode() {return code;}
}
package com.example.flumedemo.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;import java.io.IOException;/*** @author liaorj* @date 2024/10/24*/
public class JsonUtil {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();public static byte[] toBytes(Object object) {try {return OBJECT_MAPPER.writeValueAsBytes(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static String toJson(Object object) {try {return OBJECT_MAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}/*** 排除字段,敏感字段或太长的字段不显示:身份证、手机号、邮箱、密码等* 参考:https://www.baeldung-cn.com/jackson-ignore-properties-on-serialization** @param object* @param excludeProperties* @return*/public static String toJson(Object object, String[] excludeProperties) {try {SimpleBeanPropertyFilter theFilter = SimpleBeanPropertyFilter.serializeAllExcept(excludeProperties);FilterProvider filterProvider = new SimpleFilterProvider().addFilter("myFilter", theFilter);OBJECT_MAPPER.setFilterProvider(filterProvider);return OBJECT_MAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static String toPrettyJson(Object object) {try {return OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(byte[] json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (IOException e) {throw new RuntimeException(e);}}public static <T> T toBean(byte[] json, TypeReference<T> typeReference) {try {return OBJECT_MAPPER.readValue(json, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}
}
package com.example.flumedemo.util;import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** 字符串转化工具类* @author liaorj* @date 2024/11/14*/
public class NamingCaseUtil {private static Pattern linePattern = Pattern.compile("_(\\w)");/*** 下划线转驼峰*/public static String toCamelCase(String str) {str = str.toLowerCase();Matcher matcher = linePattern.matcher(str);StringBuffer sb = new StringBuffer();while (matcher.find()) {matcher.appendReplacement(sb, matcher.group(1).toUpperCase());}matcher.appendTail(sb);return sb.toString();}/*** 驼峰转下划线(简单写法,效率低于{@link #toUnderlineCase2(String)})*/public static String toUnderlineCase(String str) {return str.replaceAll("[A-Z]", "_$0").toLowerCase();}private static Pattern humpPattern = Pattern.compile("[A-Z]");/*** 驼峰转下划线,效率比上面高*/public static String toUnderlineCase2(String str) {Matcher matcher = humpPattern.matcher(str);StringBuffer sb = new StringBuffer();while (matcher.find()) {matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());}matcher.appendTail(sb);return sb.toString();}
}

打包

打包前可以使用MyInterceptor类的main函数测试下。
mvn clean
mvn package
打包好后,需要把当前jar包上传到linux上的flume目录下的lib目录中。

配置文件

然后在flume目录下的conf目录下创建配置文件:file-to-hdfs-customInterceptor.conf,内容如下,注意自定义拦截器所在包和HDFS主机ip要修改成自己的。

# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/moreInfoRes.log# Describe/configure the source interceptors
a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = com.example.flumedemo.interceptor.MyInterceptor$Builder
a1.sources.r1.interceptors.i1.jsonField = type
a1.sources.r1.interceptors.i1.optType = toCamelCasea1.sources.r1.interceptors.i2.type = regex_extractor
a1.sources.r1.interceptors.i2.regex = "type":"(\\w+)"
a1.sources.r1.interceptors.i2.serializers = s1
a1.sources.r1.interceptors.i2.serializers.s1.name = logType# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.163.130:9000/moreInfoRes/%{logType}
a1.sinks.k1.hdfs.filePrefix = data
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/moreInfoRes/checkpointDir
a1.channels.c1.dataDirs = /data/moreInfoRes/dataDirs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

切换到flume目录,执行:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs-customInterceptor.conf -Dflume.root.logger=INFO,console

测试结果

执行 hdfs dfs -ls -R / 命令查看 HDFS上 的 /moreInfoRes 目录文件信息,可以看到处理成功了:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/16710.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

RadSystems 自定义页面全攻略:个性化任务管理系统的实战设计

系列文章目录 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;一&#xff09;&#x1f6aa; 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;二&#xff09;&#x1f6aa; 探索RadSystems&#xff1a;低代码开发的新选择&#xff08;三&#xff09;&…

【开发基础】语义化版本控制

语义化版本控制 基础三级结构主版本号次版本号修正版本号 思维导图在node包管理中的特殊规则 参考文件 基础 语义化版本控制是一套通用的包/库的版本管理规范。在各类语言的包管理中都有用到&#xff0c;一般以x.x.x的形式出现在包的命名中。 三级结构 在语义化版本控制中&a…

IDC 报告:百度智能云 VectorDB 优势数量 TOP 1

近日&#xff0c;IDC 发布了《RAG 与向量数据库市场前景预测》报告&#xff0c;深入剖析了检索增强生成&#xff08;RAG&#xff09;技术和向量数据库市场的发展趋势。报告不仅绘制了 RAG 技术的发展蓝图&#xff0c;还评估了市场上的主要厂商。在这一评估中&#xff0c;百度智…

操作系统——同步

笔记内容及图片整理自XJTUSE “操作系统” 课程ppt&#xff0c;仅供学习交流使用&#xff0c;谢谢。 背景 解决有界缓冲区问题的共享内存方法在并发变量上存在竞争条件&#xff0c;即多个并发进程访问和操作同一个共享数据&#xff0c;从而其执行结果与特定访问次序有关。这种…

IAR调试时输出文本数据到电脑(未使用串口)

说明 因为板子没引出串口引脚&#xff0c;没法接USB转TTL&#xff0c;又想要长时间运行程序并保存某些特定数据&#xff0c;所以找到了这个办法。为了写数据到本机&#xff0c;所以板子必须运行在IAR的debug模式下。 参考&#xff1a;IAR环境下变量导出方法&#xff1a;https…

基于Java Springboot美食食谱推荐系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA 数据库&…

Java集合(Collection+Map)

Java集合&#xff08;CollectionMap&#xff09; 为什么要使用集合&#xff1f;泛型 <>集合框架单列集合CollectionCollection遍历方式List&#xff1a;有序、可重复、有索引ArrayListLinkedListVector&#xff08;已经淘汰&#xff0c;不会再用&#xff09; Set&#xf…

vue 项目使用 nginx 部署

前言 记录下使用element-admin-template 改造项目踩过的坑及打包部署过程 一、根据权限增加动态路由不生效 原因是Sidebar中路由取的 this.$router.options.routes,需要在计算路由 permission.js 增加如下代码 // generate accessible routes map based on roles const acce…

vue3 中直接使用 JSX ( lang=“tsx“ 的用法)

1. 安装依赖 npm i vitejs/plugin-vue-jsx2. 添加配置 vite.config.ts 中 import vueJsx from vitejs/plugin-vue-jsxplugins 中添加 vueJsx()3. 页面使用 <!-- 注意 lang 的值为 tsx --> <script setup lang"tsx"> const isDark ref(false)// 此处…

uniapp如何i18n国际化

1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖&#xff0c;如果没有可以自行使用如下命令下载&#xff1a; npm install vue-i18n --save 2、创建相关文件 en文件下&#xff1a; zh文件下&#xff1a; index文件下&#xff1a; 3、在main.js中注册&#xff1a…

【视觉SLAM】4b-特征点法估计相机运动之PnP 3D-2D

文章目录 1 问题引入2 求解P3P 1 问题引入 透视n点&#xff08;Perspective-n-Point&#xff0c;PnP&#xff09;问题是计算机视觉领域的经典问题&#xff0c;用于求解3D-2D的点运动。换句话说&#xff0c;当知道n个3D空间点坐标以及它们在图像上的投影点坐标时&#xff0c;可…

wsl2配置文件.wslconfig不生效

问题 今天在使用wsl时&#xff0c;通过以下配置关闭swap内存&#xff0c;但是发现重启虚拟机之后也不会生效。 [wsl2] swap0 memory16GB后来在微软官方文档里看到&#xff0c;只有wsl2才支持通过.wslconfig文件配置&#xff0c;于是通过wsl -l -v查看当前wsl版本&#xff0c;…

借助Excel实现Word表格快速排序

实例需求&#xff1a;Word中的表格如下图所示&#xff0c;为了强化记忆&#xff0c;希望能够将表格内容随机排序&#xff0c;表格第一列仍然按照顺序编号&#xff0c;即编号不跟随表格行内容调整。 乱序之后的效果如下图所示&#xff08;每次运行代码的结果都不一定相同&#x…

系统架构设计师:系统架构设计基础知识

从第一个程序被划分成模块开始&#xff0c;软件系统就有了架构。 现在&#xff0c;有效的软件架构及其明确的描述和设计&#xff0c;已经成为软件工程领域中重要的主题。 由于不同人对Software Architecture (简称SA) 的翻译不尽相同&#xff0c;企业界喜欢叫”软件架构“&am…

T6识别好莱坞明星

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 导入基础的包 from tensorflow import keras from tensorflow.keras import layers,models import os, PIL, pathlib import matplotlib.pyplot as pl…

MybatisPlus的基础使用

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言1、基础crud增加insert()方法&#xff1a; 删除修改查询 2、分页查询配置分页拦截器使用分页查询功能开启MP日志在yml配置文件中配置日志查看日志 3、条件查询条…

基于stm32的智能变频电冰箱系统

基于stm32的智能变频电冰箱系统 持续更新&#xff0c;欢迎关注!!! 基于stm32的智能变频电冰箱系统 随着集成电路技术的发展&#xff0c;单片微型计算机的功能也不断增强&#xff0c;许多高性能的新型机种不断涌现出来。单片机以其功能强、体积小、可靠性高、造价低和开发周期短…

【提高篇】3.3 GPIO(三,工作模式详解 上)

目录 一,工作模式介绍 二,输入浮空 三,输入上拉 一,工作模式介绍 GPIO有八种工作模式,参考下面列表,我们先有一个简单的认识。 二,输入浮空 在输入浮空模式下,上拉/下拉电阻为断开状态,施密特触发器打开,输出被禁止。输入浮空模式下,IO口的电平完全是由外部电路…

代码训练营 day66|Floyd 算法、A * 算法、最短路算法总结

前言 这里记录一下陈菜菜的刷题记录&#xff0c;主要应对25秋招、春招 个人背景 211CS本CUHK计算机相关硕&#xff0c;一年车企软件开发经验 代码能力&#xff1a;有待提高 常用语言&#xff1a;C 系列文章目录 第66天 &#xff1a;第十一章&#xff1a;图论part11 文章目录…

Vue中template模板报错

直接<v出现如下模板&#xff0c;出现如下错误 注意两个地方&#xff1a; 1.template里面加一个div标签 2.要写name值 如下图