FlinkPipelineComposer 详解

FlinkPipelineComposer 详解

原文

背景

在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务

通过提供一个yaml文件,描述source sink transform等主要信息

由FlinkPipelineComposer解析,自动调用DataStream api进行构建

官方样例

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

目前可以通过source配置的源只有mysql 和 values

values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的

目前可以使用的sink有

  1. doris
  2. elasticsearch
  3. kafka
  4. paimon
  5. starrocks
  6. values

FlinkPipelineComposer

我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的

values会将mysql产生的cdc消息打印到stdout上

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: x.x.x.xport: 3306username: usernamepassword: passwordtables: test.t1server-id: 5400-5404server-time-zone: UTC+8sink:type: valuesname: values Sinkpipeline:name: Sync Mysql Database to Valuesparallelism: 2

首先来观察一下这个任务提交到flink集群后具体的链路构成

在这里插入图片描述

结合官方给出的架构

在这里插入图片描述

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator

  1. Souce: Flink CDC Event Source: mysql
  2. SchemaOperator
  3. PrePartition

-------------- shuffle --------------

  1. PostPartion
  2. Sink Writer: values Sink

Souce: Flink CDC Event Source: mysql

负责

  1. 创建枚举器
  2. 创建reader
  3. 枚举split分发给reader
  4. reader读取数据生成事件

SchemaOperator

负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解

PrePartition

负责

  1. 广播FlushEvent
  2. 广播SchemaChangeEvent
  3. shuffle普通消息到下游

PostPartion

Sink Writer: values Sink

写入下游,values sink当前到实现是打印到stdout

源码解析

接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节

CliFrontend#main

CliFrontend.java:54

args

在这里插入图片描述

createExecutor 创建 executor CliFrontend.java:76

调用CliExecutor#run CliExecutor.java:70

看一下解析得到的pipelineDef
在这里插入图片描述

这里已经从yaml文件中解析出了source和sink的配置了

composer.compose 调用compose方法开始使用DataStream api进行构建

FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose

声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator

DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...

translate的调用顺序如下

DataStream<Event> stream =sourceTranslator.translate(...
stream =transformTranslator.translatePreTransform(...
stream =transformTranslator.translatePostTransform(...
stream =schemaOperatorTranslator.translate(...
stream =partitioningTranslator.translate(...
sinkTranslator.translate(pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());return new FlinkPipelineExecution(env...)...

逐一说明

  1. sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
  • sourceProvider.getSource ->
    • MysqlSource ->
      • createReader
      • createEnumerator
  1. stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {return input;
}

由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform

  1. stream = transformTranslator.translatePostTransform
  • 同上
  1. stream = schemaOperatorTranslator.translate
  • 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
    1. 停住当前流
    2. 上报coodinator
    3. flush下游数据,让sink消耗完已有数据
    4. sink 通知coodinator flush完成
    5. coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
    6. schemaOperator重新放通数据
  1. stream = partitioningTranslator.translate
  • 构建prePartition postPartition节点
  1. sinkTranslator.translate
  • 构建sink节点
  1. FlinkPipelineExecution 中的 execute 方法调用 env.executeAsync(jobName)

总结

flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution

构建的主要逻辑集中在 FlinkPipelineComposer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/11853.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

10款音频剪辑推荐!!你的剪辑好帮手!!

在如今的数据化浪潮中&#xff0c;工作已经采用了线上线下相结合。我的工作就需要借助一些剪辑工具&#xff0c;来实现我对音频工具的剪辑。我初次接触到音频剪辑也是因为工作需求&#xff0c;从起初我只是一个音频剪辑的小白&#xff0c;这些工具的协助。吸引着我。对于这些工…

智能检测技术与传感器(热电传感器四个定律)

热电传感器&#xff1a; 两种不同的导体两端相互紧密地连接在一起&#xff0c;组成一个闭合回路。当两接点温度不等时&#xff08;设 &#xff09;&#xff0c;回路中就会产生大小和方向与导体材料及两接点的温度有关的电动势&#xff0c;从而形成电流&#xff0c;这种现象称为…

Ubuntu 20.04配置ollama并下载安装调用本地大语言模型

Ubuntu 20.04配置ollama并下载安装调用本地大语言模型 ollama 介绍(来自ChatGPT)主要特点 ollama开发环境预配置ollama在ubuntu下的安装直接安装压缩包安装创建开机ollama的脚本启动ollama ollama在ubuntu下的运行 ollama 介绍(来自ChatGPT) Ollama 是一种新的本地语言模型管理…

多点支撑:滚珠导轨的均匀分布优势!

滚珠导轨的滚珠稳定性可以有效保持滚珠导轨的稳定运行&#xff0c;减少滚珠脱落的风险&#xff0c;确保设备的长期稳定性和可靠性。事实上&#xff0c;滚珠导轨的滚珠稳定性主要依赖于以下几个方面&#xff1a; 1、精密的制造工艺&#xff1a;滚珠导轨的导轨和滑块通常采用高精…

轻松搭建在线文档管理系统:BookStack的Docker部署与远程访问指南

前言 本文将介绍如何在Linux系统上利用Docker本地部署在线文档管理系统BookStack&#xff0c;并通过cpolar内网穿透工具实现异地远程访问&#xff0c;无需公网IP或复杂的路由器设置。 BookStack是一个开源的知识管理平台&#xff0c;基于Laravel Vue.js构建。它提供了一个简…

【代码及应用】10个最常用的Python包!

世界上有超过200,000个Python程序包&#xff08;这只是基于官方的Python程序包索引PyPI托管的程序包&#xff09;这就引出了一个问题&#xff1a;拥有这么多的软件包&#xff0c;每个Python程序员都需要学习哪些软件包是最重要的&#xff1f; 包含编程资料、学习路线图、源代码…

Java面试要点01- 基本数据类型与包装类详解

本文目录 一、引言二、基本数据类型详解2.1 数值类型2.2 代码示例 三、包装类详解3.1 包装类介绍3.2 包装类的主要用途3.3 代码示例 四、注意事项和最佳实践4.1 数值计算注意事项4.2 包装类使用建议 五、面试重点详解5.1 基本类型和包装类的区别5.2 自动装箱和拆箱的原理5.3 In…

铠侠代理商 | KIOXIA SLC闪存选型和应用

一、铠侠&#xff08;KIOXIA&#xff09;的SLC闪存系列 铠侠SLC NAND可以高速写入大量数据&#xff0c;具有高的擦写次数耐久性和可靠性的1位/单元非易失性存储器。铠侠SLC NAND闪存产品系列具有多种容量和封装形式的选择&#xff0c;可满足嵌入式市场的不同需求。 铠侠的SLC…

ts定义接口返回写法

接口&#xff08;未进行ts定义&#xff09; export async function UserList(params: {// keyword?: string;current?: number;pageSize?: number;},// options?: { [key: string]: any }, ) {return request<API1.UserList>(http://geek.itheima.net/v1_0/mp/artic…

#多语言爬取京东价格信息 python 比价api接入指南

以下是使用 Python 接入京东价格信息比价 API 的一般指南&#xff1a; 寻找合适的比价 API 服务&#xff1a; 市面上有一些第三方数据服务提供商提供京东比价 API。这些服务通常需要你注册账号并申请 API Key 和 API Secret 等凭证&#xff0c;以便进行接口调用。你可以根据自己…

超详细:三大范式和反范式设计详解

目录 1、三大范式 第一范式&#xff1a; 列不可再分 。 第二范式&#xff1a; 行可以唯一区分 第三范式&#xff1a;确保数据的完整性、减少数据冗余和避免更新异常。 反方式模式 实验数据&#xff1a;模拟两张百万量级的数据表 反范式优化实验对比 反范式存在的问题 &am…

新标准大学英语综合教程1课后习题答案PDF第三版

《新标准大学英语&#xff08;第三版&#xff09;综合教程1 》是“新标准大学英语&#xff08;第三版&#xff09;”系列教材之一。本书共包含6个单元&#xff0c;从难度和话题上贴近大一上学生的认知和语言水平&#xff0c;包括与学生个人生活领域和社会文化等相关内容&#x…

Llama 3.2-Vision 多模态大语言模型

1. 引言 Llama 3.2-Vision多模态大型语言模型(文本 图像)是一个图像推理生成模型&#xff0c;按照官方的说法&#xff0c;在常见行业基准测试上&#xff0c;其性能优于许多可用的开源和闭源多模态模型。Llama 3.2-Vision有两个版本&#xff0c;一个是11B (7.9G)&#xff0c;另…

OpenObserve云原生平台指南:在Ubuntu上快速部署与远程观测

文章目录 前言1. 安装Docker2. Docker镜像源添加方法3. 创建并启动OpenObserve容器4. 本地访问测试5. 公网访问本地部署的OpenObserve5.1 内网穿透工具安装5.2 创建公网地址 6. 配置固定公网地址 前言 本文主要介绍如何在Linux系统使用Docker快速本地化部署OpenObserve云原生可…

隧道论文阅读2-采用无人融合扫描数据的基于深度学习的垂直型隧道三维数字损伤图

目前存在的问题&#xff1a; 需要开发新的无人测量系统测量垂直隧道图像数据量巨大&#xff0c;基于深度学习完成损伤评估跟踪获取图像位置的困难&#xff0c;对大型基础设施感兴趣区域(roi)的2d和3d地图建立进行了研究&#xff0c;对整个目标结构的损伤定位仍然具有挑战性。为…

【从VAE到LDM】Variational Auto Encoder原理以及关于Latent Diffusion的思考

论文链接&#xff1a;High-Resolution Image Synthesis with Latent Diffusion Models 官方实现&#xff1a;CompVis/latent-diffusion、CompVis/stable-diffusion 视频讲解&#xff1a;一个视频看懂VAE的原理以及关于latent diffusion的思考 前言 目前的扩散模型范式基本上都…

1111fxh,MYSQL加锁规则

怎么查看一个事务中对索引的加锁情况 -- 这条语句可以看到事务执行过程中加了哪些锁 select * from performance_schema.data_locks X是next-key lock 混合锁 X,REC_NOT_GAP就是行锁 X,GAP是间隙锁 可重复读下的加锁规则 2原则2优化 1.加锁的基本单位是next-key lock,即行…

mysql中数据不存在却查询到记录?

前言 首先看下面的查询语种 select * from AudioKnowledgeChatInfo where AudioId297795550566600706; 查询结果如下 看到上面的查询结果&#xff0c;是不是一脸懵&#xff1f;这audioId明显不对啊&#xff0c;怎么查询到了&#xff1f; 原因剖析 首先我们来看看数据库表…

拿不下总统之位,那就用热加载拿下验证码识别与爆破好了!

大家好&#xff0c;这里是在总统选举中惜败的超级牛 虽然没能拿下阿美利卡总统之位 但是牛牛的热加载功能&#xff0c;却能轻松拿下验证码的识别与爆破 验证码一般会在注册、登录等功能&#xff0c;用来防止自动化工具的攻击。一般的验证码生成过程如下图所示&#xff1a; …

闯关leetcode——202. Happy Number

大纲 题目地址内容 解题代码地址 题目 地址 https://leetcode.com/problems/happy-number/description/ 内容 Write an algorithm to determine if a number n is happy. A happy number is a number defined by the following process: Starting with any positive inte…