一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录

      • 你可以学到啥
      • 测试代码
      • 背景知识
      • SQL转变流程图
      • 问题

你可以学到啥

  • SQL如何一步步变成执行计划的
  • 有哪些优化器,哪些优化规则
  • calcite 和flink 如何结合的

测试代码

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);

背景知识

需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识

SQL转变流程图

SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
在这里插入图片描述

问题

  • 问题1,FlinkBatchProgram
    所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
  • 问题2,calcite 优化器和flink 如何结合的
    logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
    剩下的优化器HepPlanner,HepPlanner 完全使用规则。

  • 问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
    因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的

  • 问题4,物理计划如何生成执行代码的?
    BatchPhysicalTableSourceScan 类

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
  • 问题5,为啥aa_user 表被广播,哪里实现的?

BatchPhysicalHashJoinRule 规则实现的

核心代码

 val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}

主要就是实现

  def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}

最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类

class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts*   TableScan RelNode* @param mq*   RelMetadataQuery instance* @param index*   the index of the given column* @return*   the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}

ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数

 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/141499.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

非线性规划------ + 案例 + Python源码求解(见文中)

目录 一、非线性代数模型的例子eg1&#xff1a;人口增长模型&#xff08;Logistic Growth Model&#xff09;模型公式Python建模与求解代码运行结果 eg2&#xff1a;化学反应速率模型&#xff08;Michaelis-Menten方程&#xff09;模型公式Python建模与求解代码运行结果 eg3&am…

栈的应用之表达式求值(前缀、中缀、后缀)

目录 引入 1.中缀表达式的求值(两种方式) 方式一:使用两个栈直接求值 方式二:将其转换为后缀表达式求值 ①转换: ②对后缀表达式求值: 2.后缀表达式的求值(1个栈,存放运算符) 3.前缀表达式的求值(1个栈用来存放数字,相对来讲没那么重要) 引入 1.中缀表达式的…

高级 API 性能:着色器

着色器通过使您能够控制渲染过程的各个方面&#xff0c;在图形编程中发挥着关键作用。它们在 GPU 上运行&#xff0c;负责操作顶点、像素和其他数据。 常规着色器计算着色器像素渲染顶点着色器几何体、域和外壳着色器 常规着色器 这些提示适用于所有类型的着色器。 推荐 避…

细说STM32单片机使用通用定时器生成固定占空比和可变占空比PWM波的方法

目录 一、本实例测试的目的 二、硬件和CubeMX项目配置 1、硬件开发板 2、项目配置 &#xff08;1&#xff09;定时器TIM2_CH1 &#xff08;2&#xff09;时钟和Debug &#xff08;3&#xff09; NVIC &#xff08;4&#xff09;GPIO 3、输出固定占空比的PWM波源码 &…

docker进入容器运行命令详细讲解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; 在 Docker 中&#xff0c;进入容器并运行命令是常见的操作&#xff0c;尤其是当你想要调试、检查日志或手动运行某些程序时。Docker 提供了几种方式来进入容器和执行命令。 前提条件 确保你的 Docker 容器…

Vulnhub:BlueSky

靶机下载地址 信息收集 主机发现 nmap扫描攻击机同网段存活主机。 nmap 192.168.31.0/24 -Pn -T4 靶机ip&#xff1a;192.168.31.171。 端口扫描 nmap 192.168.31.171 -A -p- -T4 开放端口22,8080。 目录扫描 访问8080端口&#xff0c;如图&#xff0c;是tomcat管理页面…

unity3d入门教程七

unity3d入门教程七 17.1物理系统17.2静态刚体17.3刚体的碰撞17.4刚体的反弹18.1运动学刚体18.2碰撞检测18.3碰撞事件回调18.4目标的识别18.5碰撞的规避 17.1物理系统 在物理系统中的物体具有质量和速度的是刚体 不用写代码就会自由落体运动了 17.2静态刚体 给 ‘地面’ 添…

学习笔记JVM篇(四)

垃圾回收器 说完垃圾回收算法接下来就需要对应的垃圾回收器去回垃圾回收器。接下来介绍几种垃圾回收器 1、Serial 串行回收器&#xff0c;是单线程版本&#xff0c;暂停所有的应用。在单CPU的情况下效率是很高的&#xff0c;因为不涉及线程的上下文切换。适用于小型程序和客…

【C语言】分支和循环(下)

分支和循环&#xff08;下&#xff09; 5、练习&#xff1a;判断年份是否为闰年6、短路7、switch语句7.1 if语句和switch语句的对比7.2switch语句中的break语句7.3switch语句中的default7.4 switch语句中的case和default的顺序问题 8、while循环8.1 if和while的对比8.2 while语…

C++_20_多态

多继承会造成 菱形继承** 使用虚继承来解决 不是给爷爷类加 也不是给子类加 是给父类加 虚基指针和虚基表 多态 概念&#xff1a; 概念&#xff1a; 一个事物的多种形态&#xff0c;简称多态 如&#xff1a; 对象的多态 ​ 张三 ​ 在对象面前 怂 ​ 在朋友面前 谄媚 ​ 在父…

搜索二叉树BSTree的原理及实现

目录 一、简介 二、功能的实现 节点的实现 这里为什么模板参数采用的是K而不是T呢&#xff1f; 树体的实现 非递归版本 Insert函数 Find函数 Erase函数 递归版本 中序遍历 FindR InsertR EraseR 构造函数 析构函数 拷贝构造 赋值重载 一、简介 BSTree&#x…

【CS110L】Rust语言 Lecture3-4 笔记

文章目录 第三讲 所有权:移动与借用&例1例2例3 错误处理&#xff08;开头&#xff09;为什么空指针如此危险&#xff0c;我们能做什么以应对&#xff1f;— 引出Optionis_none()函数unwrap_or()函数常见用法 第四讲 代码实践:链表Box节点和链表的定义节点和链表的构造函数判…

Hack the 21LTR: Scene 1 靶机

靶机配置 kali配置 虚拟网络适配器配置 不行的时候关闭虚拟机&#xff0c;多点几次生成 主机发现和端口扫描 主机发现 arp-scan -l 端口扫描 端口扫描发现21&#xff0c;22&#xff0c;80端口开放 nmap -sV -A -T4 192.168.2.120 访问80端口 http://192.168.2.120/ 查看页…

SOMEIP_ETS_108: SD_Deregister_from_Eventgroup

测试目的&#xff1a; 验证DUT在接收到StopSubscribeEventgroup消息并取消订阅后&#xff0c;不会响应TestEventUINT8触发的事件。 描述 本测试用例旨在确保DUT在取消对事件组的订阅后&#xff0c;不会对随后的事件触发做出响应。 测试拓扑&#xff1a; 具体步骤&#xff1…

.NET内网实战:通过命令行解密Web.config

01阅读须知 此文所节选自小报童《.NET 内网实战攻防》专栏&#xff0c;主要内容有.NET在各个内网渗透阶段与Windows系统交互的方式和技巧&#xff0c;对内网和后渗透感兴趣的朋友们可以订阅该电子报刊&#xff0c;解锁更多的报刊内容。 02基本介绍 本文内容部分节选自小报童…

Spring Boot集成Akka Cluster快速入门Demo

1.什么是Akka Cluster&#xff1f; Akka Cluster将多个JVM连接整合在一起&#xff0c;实现消息地址的透明化和统一化使用管理&#xff0c;集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序&#xff0c;部署到很多JVM上去实现程序的分布式并行运算&#xf…

编译原理之预处理

目录 生成预处理文件的的命令 预处理做了什么 实验 --------------------------------------------------------------------------------------------------------------------------------- 本篇文章主要是带着大家一起看看预处理阶段编译器都做了些什么 --------------…

十四,在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明

十四&#xff0c;在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明 文章目录 十四&#xff0c;在Spring Boot当中对应“ Tomcat 服务器的相关配置”和“服务器的切换”的详细说明1. 基本介绍2. 准备工作&#xff1a;3. 内置 Tomcat 的配置3.1 第…

Git项目管理工具

分布式版本控制系统

62. 不同路径、64. 最小路径和

思路 dp&#xff1a;代表到达当前位置的总方式 初始化&#xff1a;第一行的位置dp[0][j]&#xff1a;当前位置只能由左边的位置向右移动得到 所以只有1种方式 d[0][j]1, d[0][0]1 第一列的位置 dp[i][0]&#xff1a;当前位置只能由上一个位置向下移动得到 除此之外的位置可以由…