ChunJun: 自定义插件

序言

Chunjun的版本兼容可能会有问题,在我们了解了自定义插件后,在修改源码以应对不同的场景就会得心应手了,针对Chunjun1.12.Release版本说明cuiyaonan2000@163.com 

自定义插件整体流程

从数据流的角度来看ChunJun,可以理解为不同数据源的数据流通过对应的ChunJun插件处理,变成符合ChunJun数据规范的数据流;脏数据的处理可以理解为脏水流通过污水处理厂,变成符合标准,可以使用的水流,而对不能处理的水流收集起来。----总的来说跟Flink的数据处理一样,只是增加了一个插件的概念用于处理不同的数据源,并生成对应的Flink任务cuiyaonan2000@163.com

插件开发不需要关注任务具体如何调度,只需要关注关键问题:

  1. 数据源本身读写数据的正确性;
  2. 如何合理且正确地使用框架;
  3. 配置文件的规范,每个插件都应有对应的配置文件;

每个插件应当有以下目录:

  1. conf:存放插件配置类的包。
  2. converter:存放插件数据类型转换规则类的包。
  3. source:存放插件数据源读取逻辑有关类的包。
  4. sink:存放插件数据源写入逻辑有关类的包。
  5. table:存放插件数据源sql模式有关类的包。  -----这个应该不是我们的重点,flink的sql并不好cuiyaonan2000@163.com
  6. util:存放插件工具类的包,chunjun已经封装了一些常用工具类在chunjun-core模块中,如果还需编写插件工具类的请放在该插件目录中的util包

以Stream插件为例子,他的插件结构如下图所示:

调试

Debug调试

(1)本地调试

在chunjun-local-test模块中,官方已经写好了本地测试的LocalTest类,只需更改脚本文件路径,在代码处打上断点即可调试。

(2)远程调试

如果需要远程调试,那么需要在 flink-conf.yaml 中增加 Flink 的远程调试配置,然后在 idea 中配置”JVM Remote“,在代码块中打断点(这种方法还能调试 Flink 本身的代码)

 

env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006

只需要修改标记的这两个地方,如果是 HA 集群,需要根据日志修改怎么看日志,怎么修改,自行查资料

至此,任务 idea 调试流程就这些内容。

任务类型

从Chunjun的配置文件Json中可以看到任务的分类

  • sync:同步任务,同理有同步任务的读插件和写插件,即sync(reader),sync(writer)
  • sql:计算任务,,同理有计算任务的读插件和写插件,即sync(reader),sync(writer)

reader

开发流程

以Stream插件为例

插件数据源读取逻辑需要继承BaseRichInputFormat类,BaseRichInputFormat是具体的输入数据的操作,包括open、nextRecord、close,每个插件具体操作自己的数据,InputFormat公共内容都在BaseRichInputFormat,不要随意修改。

创建StreamInputFormat类继承BaseRichInputFormat类,重写其中的必要方法。

public class StreamInputFormat extends BaseRichInputFormat {//创建数据分片@Overridepublic InputSplit[] createInputSplitsInternal(int minNumSplits) {......}//打开数据连接@Overridepublic void openInternal(InputSplit inputSplit) {......}//读取一条数据@Overridepublic RowData nextRecordInternal(RowData rowData) throws ReadRecordException {......}//判断数据是否读取完毕@Overridepublic boolean reachedEnd() {......}//关闭数据连接@Overrideprotected void closeInternal() {......}
}

由此可见StreamInputFormat 具体的实施类,但是在调用实现类的方法前还有引导类的创建,具体流程是:StreamSourceFactory-->StreamInputFormatBuilder-->StreamInputFormat 中间会引用StreamConf和StreamColumnConverter  至此一个source就完成了cuiyaonan2000@163.com

业务流程

1 com.dtstack.chunjun.Main是启动类,首先判断是计算任务,还是同步任务

2 以exeSyncJob为例进入可以看到,这里就是根据我们传入的Json文件内容生成环境变量

3 .将上面解析生成的SyncConf,然后通过反射加载具体的插件调用createSource方法生成DataStream,  注意这里就是重点了根据 我们的json文件的内容,来获取StreamSourceFactory ,然后创建的数据内容是DataStream----从这里开始就是重点了

4 createSource方法中会构建inputformat对象,然后调用createInput方法,将inputformat对象封装至DtInputFormatSourceFunction中。

未完待续~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/141718.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

单元测试 —— JUnit 5 参数化测试

JUnit 5参数化测试 目录 设置我们的第一个参数化测试参数来源 ValueSourceNullSource & EmptySourceMethodSourceCsvSourceCsvFileSourceEnumSourceArgumentsSource参数转换参数聚合奖励总结 如果您正在阅读这篇文章,说明您已经熟悉了JUnit。让我为您概括一下…

使用原生html<table>构造复杂table表

<table border data-sort"sortDisabled" align"center" class"table"><tbody><tr class"textarea1"><td rowspan"1" colspan"2" class"background-gray"><label>日期<…

Fiddler抓取Https请求配置

官网&#xff1a;https://www.telerik.com/fiddler 配置抓取https包 1.Tools->Options->Https&#xff0c;勾选下面。 2.Actions ->Trust Root Certificate.安装证书到本地 3.在手机端设置代理&#xff1a;本机ip如&#xff1a;192.168.1.168 端口号:8888。 4.手机…

Vue中的自定义指令详解

文章目录 自定义指令自定义指令-指令的值&#xff08;给自定义指令传参数&#xff09;自定义指令- v-loading指令封装 自定义指令 自定义指令&#xff1a;自己定义的指令&#xff0c;可以封装一些dom 操作&#xff0c;扩展额外功能&#xff08;自动聚焦&#xff0c;自动加载&a…

@DateTimeFormat 和 @JsonFormat 的详细研究

关于这两个时间转化注解&#xff0c;先说结论 一、介绍 1、DateTimeFormat DateTimeFormat 并不会根据得到其属性 pattern 把前端传入的数据转换成自己想要的格式&#xff0c;而是将前端的String类型数据封装到Date类型&#xff1b;其次它的 pattern 属性是用来规范前端传入…

el-select 下拉框全选、多选的几种方式组件

组件一、基础多选 适用性较广的基础多选&#xff0c;用 Tag 展示已选项 <template><el-select v-model"value1" multiple placeholder"请选择"><el-optionv-for"item in options":key"item.value":label"item.la…

深入理解C#中委托的使用及不同类型委托的应用示例

在C#中&#xff0c;委托是一种强大而灵活的机制&#xff0c;可以引用一个或多个方法&#xff0c;并允许以类似函数指针的方式进行调用。委托在事件处理、回调函数和多线程编程等场景中非常有用。本文将深入探讨C#中委托的使用&#xff0c;并介绍不同类型委托的应用示例。 目录…

基于改进莱维飞行和混沌映射的粒子群优化BP神经网络预测股票价格研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

基于Java+SpringBoot+Vue3+Uniapp前后端分离考试学习一体机设计与实现2.0版本(视频讲解,已发布上线)

博主介绍&#xff1a;✌全网粉丝4W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

ChatGPT降温背后:大模型发展迎来真正转折点?

作为目前AI领域的“神级产品”&#xff0c;ChatGPT的诞生&#xff0c;即吹响了AI革命的号角&#xff0c;随后包括谷歌、微软、Meta在内的国外科技公司&#xff0c;以及百度、华为、阿里、商汤科技、360、科大讯飞等在内的国内大厂&#xff0c;纷纷在短时间内推出了自家大模型产…

LLM-TAP随笔——大语言模型基础【深度学习】【PyTorch】【LLM】

文章目录 2.大语言模型基础2.1、编码器和解码器架构2.2、注意力机制2.2.1、注意力机制&#xff08;Attention&#xff09;2.2.2、自注意力机制&#xff08;Self-attention&#xff09;2.2.3、多头自注意力&#xff08;Multi-headed Self-attention&#xff09; 2.3、transforme…

Crypto:一眼就解密

题目 根据题目给出的信息可知&#xff0c;flag的为base64编码&#xff0c;数字后面的可以知道为base64编码&#xff0c;解码可得

oracle 根据分号分割为多个列

oracle 没有split 函数&#xff0c;因此没法直接使用&#xff0c;但是时间上会遇到需要分割的时候&#xff0c;可以使用正则表达式 SELECT REGEXP_SUBSTR(administration, [^;], 1, 1) AS SKILL1, REGEXP_SUBSTR(administration, [^;], 1, 2) AS SKILL2, REGEXP_SUBSTR(admini…

Cpp/Qt-day050921Qt

目录 实现使用数据库的登录注册功能 头文件&#xff1a; registrwidget.h: widget.h: 源文件&#xff1a; registrwidget.c: widget.h: 效果图&#xff1a; 思维导图 实现使用数据库的登录注册功能 头文件&#xff1a; registrwidget.h: #ifndef REGISTRWIDGET_H #de…

【Kafka系列】(二)Kafka的基本使用

有的时候博客内容会有变动&#xff0c;首发博客是最新的&#xff0c;其他博客地址可能会未同步,认准https://blog.zysicyj.top 首发博客地址[1] 文章更新计划[2] 系列文章地址[3] Kafka 线上集群部署方案怎么做 操作系统 先说结论&#xff0c;Kafka 部署在 Linux 上要比 Window…

[网鼎杯 2020 朱雀组]Nmap 通过nmap写入木马 argcmd过滤实现逃逸

这道题也很好玩 啊 原本以为是ssrf 或者会不会是rce 结果是通过nmap写入木马 我们来玩一下 传入木马 映入眼帘是nmap 我们首先就要了解nmap的指令 Nmap 相关参数-iL 读取文件内容&#xff0c;以文件内容作为搜索目标 -o 输出到文件-oN 标准保存-oX XML保存-oG Grep保存-oA…

北工大汇编——综合题(2)

题目要求 编写一个比赛得分程序。共有7 个评委&#xff0c;按百分制打分&#xff0c;计分 原则是去掉一个最高分和一个最低分&#xff0c;求平均值。要求&#xff1a; 评委的打分以十进制从键盘输入。成绩以十进制给出&#xff0c;并保留 1位小数。输入输出时屏幕上要有相应提…

分治算法求解:逆序对,Max Sum,棋盘覆盖,a-Good String——中山大学软件工程学院算法第四次实验课 必做+选做题

写英文注释不是要“秀英文”&#xff0c;而是因为鄙人正在准备雅思&#xff0c;顺手练习 逆序对 题目描述 完整代码 #include<iostream> using namespace std; int num[500010]; // input numbers int tmp[500010]; // sequence after merging left and right part lon…

鼠标不动了怎么办?3招解决问题!

“这是怎么回事呢&#xff1f;我的鼠标怎么会用着用着就突然不动了呢&#xff1f;现在有一些比较重要的工作要处理。请问有什么方法可以快速解决这个问题吗&#xff1f;” 随着电脑在我们日常生活和工作中的广泛应用&#xff0c;鼠标是我们操作电脑不可或缺的工具之一。但是&am…

WebGL 绘制圆形的点

目录 前言 如何实现圆形的点&#xff1f; 片元着色器内置变量&#xff08;gl_FragCoord、gl_PointCoord&#xff09; gl_PointCoord的含义 示例程序&#xff08;RoundedPoint.js&#xff09; 代码详解 前言 本文将讨论示例程序RoundedPoint&#xff0c;该程序绘制了圆…