Flink作业执行之 1.DataStream和Transformation

Flink作业执行之 1.DataStream和Transformation

1. 滥觞

在使用Flink完成业务功能之余,有必要了解下我们的任务是如何跑起来的。知其然,知其所以然。

既然重点是学习应用程序如何跑起来,那么应用程序的内容不重要,越简单越好。
WordCount示例作为学习数据引擎时hello word程序,再合适不过。接下来便以任务执行顺序为线索开启对源码逐步学习。

public class WordCount {public static void main(String[] args) throws Exception {// 初始化执行环境Configuration configuration = new Configuration();configuration.setString("rest.port", "9091");StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);// 业务逻辑转换DataStream<String> text = env.fromCollection(Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan")).name("zl-source");DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1).name("counter");counts.print().name("print-sink");// 执行应用程序env.execute("WordCount");}
}

为了使示例代码足够纯粹(直接复制粘贴后即可跑起来的那种),因此在示例中直接使用List数据作为Source。

最后,计划将自己学习的过程以系列文档的形式作为记录。同时作为自己学习过程的记录,可能存在错误或片面理解,欢迎一起讨论。

2. 头疼的“角色”

在学习源码或查阅资料的同时,以下单词(但不限于)一定会频繁出现,它们或者直接对应flink源码中的接口、类名,或者是一些概念名称。初次看到难免让人抓狂。现在先对这些单词混个脸熟。

Client
JobManager/JobMaster
TaskManager/TaskExecutor
Transformation
StreamOperator
StreamGraph
JobGraph
ExecutionGraph
Task
StreamTask
……

3. 宏观视角

当任务开始执行后,便可以在WebUI上查看其对应的物理执行拓扑,即Task DAG。从我们编写的应用程序代码到Task DAG势必经历了复杂的解析转换操作,这个过程大体如下所示。

在这里插入图片描述

我们编写的应用程序代码首先会转化为Transformation,该实例将作为Flink世界中的起点,开启了之后一系列“旅程”。

4. env.execute()方法做了什么?

在使用DataStream API编写应用程序时,无论业务逻辑如何如何的复杂,但整体结构大致由三部分构成,即

// 1.初始化执行环境
StreamExecutionEnvironment env = ;
// 2.业务逻辑转换,即一系列的DataStream转化
DataStream source = ;
// 3.env.execute()
env.execute();

既然最后必须执行 env.execute()方法,那么首先了解下execute都执行了那些操作。

基于1.16版本的源码,并只保留了源码中的关键逻辑。

// 方法1
public JobExecutionResult execute(String jobName) throws Exception {final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);// 生成StreamGraph,最终调用方法4,通过StreamGraphGenerator生成StreamGraphStreamGraph streamGraph = getStreamGraph();// ...try {// 调用方法2return execute(streamGraph);} catch (Throwable t) {// ...}
}
// 方法2
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// 调用方法3,通过StreamGraph最终得到JobClientfinal JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;// ...jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// ...}
}
// 方法3
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {// 根据启动环境,得到对应环境的Executor实现// 如miniCluster环境则对应LocalExecutorfinal PipelineExecutor executor = getPipelineExecutor();// 在具体的executor.execute方法中,将StreamGraph先转化成JobGraph,在将JobGraph提交到JobManager中CompletableFuture<JobClient> jobClientFuture =executor.execute(streamGraph, configuration, userClassloader);try {JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));collectIterators.clear();return jobClient;} catch (ExecutionException executionException) {// ...}
}
// 方法4
private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {synchronizeClusterDatasetStatus();// 根据Transformation生成StreamGraphreturn getStreamGraphGenerator(transformations).generate();
}

通过上述源码调用链可以,完成从DataStream API->Transformation->StreamGraph->JobGraph的转化。最后将JobGraph提交到了JobManager中并,执行后续操作。

从上述方法4getStreamGraph(List<Transformation<?>> transformations)可知,StreamGraph由Transformation演变而来,此处不禁会产生一个新的疑问,Transformation又从何而来?

WordCount示例代码中并没有与Transformation直接相关的代码。通过查看getSreamGraph方法的完成调用链可知其入参直接来自是StreamExecutionEnvironment类中的transformations成员属性值。在应用程序第一步便生成了StreamExecutionEnvironment的实例,接下来通过env得到DataStream并进行了一系列的转化操作,而在最后的execute方法中便已直接使用transformations属性值了,那么该属性中一定是前面2个过程中实际赋值的。

protected final List<Transformation<?>> transformations = new ArrayList<>();

5. Transformation何时生成?

从StreamExecutionEnvironment的源码中可知,transformations属性只有addOperator方法会执行集合的add操作,其余地方均为集合的get操作。
然而addOperator方法有诸多调用方,且均为其他类中的调用,继续往上查看调用方有些困难,因此这里暂时记下addOperator方法唯一对transformations集合中执行add操作的结论。

// 该方法不适合用户使用。创建operator的api方法必须调用此方法
@Internal
public void addOperator(Transformation<?> transformation) {Preconditions.checkNotNull(transformation, "transformation must not be null.");this.transformations.add(transformation);
}

通过查看StreamExecutionEnvironment实例的创建过程,可以发现在创建过程中并无transformations的add操作,因此是在DataStream转换操作中对transformations执行了add操作。

5.1. DataStream

在Flink中使用DataStream表示数据流。其仅用于表达业务转化逻辑,实际上并没有真正的存储数据。

DataSteam是顶层封装类,其子类如下

在这里插入图片描述
DataStream类中只有两个成员属性,分别是StreamExecutionEnvironment和Transformation,并在构造方法中对其进行初始化。因此实例化DataStream的同时除执行环境外,还必须传入Transformation的实例。

public class DataStream<T> {protected final StreamExecutionEnvironment environment;protected final Transformation<T> transformation;public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {this.environment =Preconditions.checkNotNull(environment, "Execution Environment must not be null.");this.transformation =Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");}// ...
}

回到WordCount示例代码中,从集合到DataStream的过程,封装示意如下。

在这里插入图片描述

注意,Transformation中并不是直接持有了AbstractUdfStreamOperator的引用,而是对应的工厂。

源码中关键步骤如下

// 步骤1,从List到Function
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {// ...// 创建SourceFunction实例,SourceFunction是Function的实现SourceFunction<OUT> function = new FromElementsFunction<>(data);return addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED).setParallelism(1);
}// 步骤2,从Function到StreamOperator
private <OUT> DataStreamSource<OUT> addSource(final SourceFunction<OUT> function,final String sourceName,@Nullable final TypeInformation<OUT> typeInfo,final Boundedness boundedness) {// ...// 创建StreamSource实例,StreamSource是AbstractUdfStreamOperator的子类,Flink中算子的表示final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}// 步骤3,从StreamOperator到Transformation,再到DataStream
public DataStreamSource(StreamExecutionEnvironment environment,TypeInformation<T> outTypeInfo,StreamSource<T, ?> operator,boolean isParallel,String sourceName,Boundedness boundedness) {super(environment,// 创建Transformation实例,Transformation是PhysicalTransformation的子类new LegacySourceTransformation<>(sourceName,// 将StreamSource封装到Transformation中operator,outTypeInfo,environment.getParallelism(),boundedness));// ...
}

继续查看DataStream的map操作可以可以发现,核心流程和上述由集合创建DataStream的过程基本一致:

  • 首先创建Function实例
  • 其次由Function实例创建AbstractUdfStreamOperator实例
  • 然后将AbstractUdfStreamOperator实例封装到Transformation实例中
  • 最后由Transformation和StreamExecutionEnvironment实例创建DataStream实例

不同之处在于,map操作最后将得到的PhysicalTransformation实例添加到StreamExecutionEnvironment实例中的transformations集合中去了。这点差异其实和Transformation实例表示的含义有关,放在文章末尾解释。

protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {// ...OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,operatorFactory,outTypeInfo,environment.getParallelism());SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);// 区别:添加Transformation到StreamExecutionEnvironment中getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

但并不是全部的DataStream转化操作都需要经历上述将Function实例封装成AbstractUdfStreamOperator实例,然后将AbstractUdfStreamOperator实例封装到PhysicalTransformation实例的过程。如示例代码中的keyBy和sum操作。其中keyBy并未直接涉及Function,而sum操作直接将得到的SumAggregator函数实例封装到了ReduceTransformation实例中,然后由ReduceTransformation实例得到DataStream实例。

5.2. Transformation

DataStream面向开发者,而Transformation面向flink内核。
每个DataStream实例中都包含一个Transformation实例,表示当前Datastream从上游的DataStream使用该Transformation而来。而所有DataStream中Transformation又都添加到了StreamExecutionEnvironment实例中的transformations集合中去,用于接下来的StreamGraph实例的生成。
Transformation中记录了上游的数据来源,但其并关心数据的物理来源、序列化、转发等问题。

Transformatio是顶层抽象类,有众多的子类,涵盖了DataStream的所有转换,其直接子类如下,可以分为两大类

  • PhysicalTransformation,将会转换成后续graph中节点信息
  • 非PhysicalTransformation,将会转换成后续graph中的边信息

在这里插入图片描述
Transformation中属性如下所示,其中Optional<SlotSharingGroup>表示共享槽位信息,只有开启了允许共享槽位后,该属性才会被设置值。

其构造方法如下,除name外还需要输出类型和并行度两个参数。

public Transformation(String name, TypeInformation<T> outputType, int parallelism) {this.id = getNewNodeId();this.name = Preconditions.checkNotNull(name);this.outputType = outputType;this.parallelism = parallelism;this.slotSharingGroup = Optional.empty();
}

PhysicalTransformation仅在其父类的基础上增加了设置ChainingStrategy的方法,用于表示生成算子链的策略。

@Internal
public abstract class PhysicalTransformation<T> extends Transformation<T> {PhysicalTransformation(String name, TypeInformation<T> outputType, int parallelism) {super(name, outputType, parallelism);}/** Sets the chaining strategy of this {@code Transformation}. */public abstract void setChainingStrategy(ChainingStrategy strategy);
}

PhysicalTransformation中有众多的实现子类,全部子类继承关系如下。

在这里插入图片描述
其中以下几个子类出场频率相对更高一些,其他子类只有我们的业务逻辑比较复杂时才会用到。

  • LegacySourceTransformation 表示Source的Transformation
  • LegacySinkTransformation 表示Sink的Transformation
  • SourceTransformation
  • SinkTransformation
  • OneInputTransformation 表示单个输入流的Transformation,如常见的map、flatMap、fliter等
  • TwoInputTransformation 表示两个输入流的Transformation,如concat

疑问:为什么Source和Sink都各自分别有两个Transformation子类?
通过名称也可以看出一些端倪,新老两种实现。
在1.14版本之前,分别通过env.addSource(SourceFunction)DataStream.addSink(SinkFunction)方法生成source和sink
从1.14版本开始新增了env.fromSource(Source)DataStream.sinkTo(Sink)的方式生成source和sink。
新旧方法中入参类型不同,因此导致了两种不同的Transformation实现,从各自的实现类中也可以体现这一点,如下所示。

public class LegacySourceTransformation<T> extends PhysicalTransformation<T>implements WithBoundedness {// sourceFunction的引用private final StreamOperatorFactory<T> operatorFactory;// ...
}public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>extends PhysicalTransformation<OUT> implements WithBoundedness {// source的引用private final Source<OUT, SplitT, EnumChkT> source;// ...
}public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {private final Transformation<T> input;// sinkFunction的引用private final StreamOperatorFactory<Object> operatorFactory;// ...
}public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {private final DataStream<InputT> inputStream;// sink的引用private final Sink<InputT> sink;private final Transformation<InputT> input;// ...
}

Source作为整个数据流的头部,不存在上游,因此其Transformation实现中没有上游Transformation的引用,除此之外其余的Transformation子类中,均持有一个表示上游Transformation的引用,如上述sink中的input属性。

最后解释下,前面提到的为什么没有将表示Source的DataStream中的Transformation加入到env中表示Transformation的集合中,而接下来的转化中,将对应的Transformation加入到了env中。因为Source作为数据源的头部,不会存在上游,而Source作为其他DataSteam的上游,一定会加入到其Transformation的input中,因此没必要单独将Source的transformation加入到env中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1452586.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux基础命令[29]-chown

文章目录 1. chown 命令说明2. chown 命令语法3. chown 命令示例3.1 修改属主3.2 修改属组3.3 修改属主和属组3.4 修改文件夹所属 4. 总结 1. chown 命令说明 chown&#xff1a;更改文件的用户或用户组&#xff0c;需要 root 用户或 sudo 权限的用户执行该命令。基本信息如下&…

【电路笔记】-共集极放大器

共集极放大器 文章目录 共集极放大器1、概述2、等效电路3、电压增益4、偏置方法5、输入阻抗6、输出阻抗7、电流增益8、示例:共集电极放大器的电压、电流和功率增益9、达林顿对10、总结1、概述 本文介绍另一种用于放大信号的双极晶体管架构,通常称为共集电极放大器 (CCA)。 C…

python如何对list求和

如何在Python中对多个list的对应元素求和&#xff0c;前提是每个list的长度一样。比如&#xff1a;a[1&#xff0c;2&#xff0c;3]&#xff0c;b[2&#xff0c;3&#xff0c;4]&#xff0c;c[3&#xff0c;4&#xff0c;5]&#xff0c;对a&#xff0c;b&#xff0c;c的对应元素…

Java中List流式转换为Map的终极指南

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 在Java编程中&#xff0c;经常需要将一个List对象转换为另一个Map对象。这可能是因为需要根据List中的元素的某些属性来创建一个新的键值对集合。在本文中&#xff0c;我将向您展示如何使用Java 中的流式API轻松地实…

IP地址、子网掩码、网段、网关

前面相同就是在同一个网段 如果子网掩码和网络号相与的结果是一样的&#xff0c;那么他们就在同一个子网 IP地址、子网掩码、网络号、主机号、网络地址、主机地址以及ip段/数字-如192.168.0.1/24是什么意思?_掩码248可以用几个ip-CSDN博客

uni-app 怎么在tabbar使用阿里图标库

提示&#xff1a;微信小图标不支持使用字体图标的方式&#xff0c;只能下载png 方法一&#xff1a;直接下载png图片 我们首选打开阿里矢量图标库 链接在下方 &#x1f447; iconfont-阿里巴巴矢量图标库iconfont-国内功能很强大且图标内容很丰富的矢量图标库&#xff0c;提供矢…

板凳------56.Linux/Unix 系统编程手册(下) -- SOCKET 介绍

56.1.概述 socket 是一种IPC方法&#xff0c;允许位于同一主机或使用网络连接起来的不同主机上的应用程序之间交换数据。 UNIX 允许位于同一主机系统上的应用程序之间通信 Internet domain IPv4 and IPV6 // socket 通信方式 1.各个应用程序创建一个socket&#xff0c;socket是…

GEO ISP图像调试-PFC(蓝紫边校正)

目录 1、简单介绍 2、调试策略 3、输出结果 1、简单介绍 GEO中中调整图像蓝紫边可分为两步&#xff0c;第一步&#xff1a;调整蓝紫边检测区域&#xff0c;第二步&#xff1a;设置去蓝紫边强度。 2、调试策略 图1 该图像蓝紫边较严重 主要原因是由于蓝紫边检测不准导致的&…

GStreamer 源码编译,在 Clion 下搭建调试环境

前言 最近在学习 GStreamer&#xff0c;官方提供了一些教程&#xff0c;本人希望能够断点调试&#xff0c;以便学习代码逻辑。本文记录如何在 Clion 搭建 GStreamer 源码编译、调试环境 步骤 下载源码 git clone https://gitlab.freedesktop.org/gstreamer/gstreamer.gitCl…

Orange_Pi_AIpro运行蜂鸟RISC-V仿真

Orange_Pi_AIpro运行蜂鸟RISC-V仿真 突发奇想&#xff0c;试一试Orange Pi AIpro上运行蜂鸟RISC-V的仿真。 准备 默认已经有一个Orange Pi AIpro&#xff0c;并且对设备进行一定的初始化配置&#xff0c;可以参考上一篇博文开源硬件初识——Orange Pi AIpro&#xff08;8T&a…

第2章 Rust初体验3/8:使用Result进行错误处理:编译时错误检查增强代码安全性:猜骰子冷热游戏

讲动人的故事,写懂人的代码 2.3.9 类型的关联函数:简化对象创建和初始化 席双嘉:“那个String::new(),毫无疑问,它确实像C++中的静态成员函数。” 贾克强:“哈哈!是的,两个冒号确实让人联想到一些东西,对吧?” “这其实是Rust中的关联函数(associated function,详…

非线性规划解决工资分配问题

来源&#xff1a;河北工业职业技术大学 安彤彤 彭金杉 张家硕 题目 薪资发放问题 一般公司给职员发放薪金&#xff0c;通常按每月等额发放。某公司即将改进薪金发放方案&#xff0c;允许任职5年以上的职员向公司财务部门申请工资每月可变额度发放&#xff0c;每月工资发放额…

HAL库开发--串口

知不足而奋进 望远山而前行 目录 文章目录 前言 学习目标 学习内容 开发流程 串口功能配置 串口功能开启 串口中断配置 串口参数配置 查询配置结果 发送功能测试 中断接收功能测试 printf配置 DMA收发 配置 DMA发送 DMA接收(方式1) DMA接收(方式2) 总结 前言…

关于二分法的理解(以JS为例)

算法介绍 基本概念 二分查找算法&#xff0c;又称折半查找算法&#xff0c;是一种在有序数组中查找特定元素的高效方法。它的核心思想是将数组分成两半&#xff0c;然后根据目标值与中间元素的比较结果来决定是继续在左半部分还是右半部分进行搜索。 工作原理 初始化&#…

【漏洞复现】东胜物流软件 GetProParentModuTreeList SQL注入漏洞

0x01 产品简介 东胜物流软件是青岛东胜伟业软件有限公司-款集订单管理、仓库管理、运输管理等多种功能于一体的物流管理软件。该公司初创于2004年11月(前身为青岛景宏物流信息技术有限公司)&#xff0c;专注于航运物流相关环节的产品和服务。东胜物流信息管理系统货代版采用MS…

计算机组成原理历年考研真题对应知识点(计算机系统层次结构)

目录 1.2计算机系统层次结构 1.2.2计算机硬件 【命题追踪——冯诺依曼计算机的特点(2019)】 【命题追踪——MAR 和 MDR 位数的概念和计算(2010、2011)】 1.2.3计算机软件 【命题追踪——三种机器语言的特点(2015)】 【命题追踪——各种翻译程序的概念(2016)】 1.2.5计算…

网络协议,OSI,简单通信,IP和mac地址

认识协议 1.讲故事 2004年&#xff0c;小明因为给他爹打电话&#xff08;座机&#xff09;费用太贵&#xff0c;所以约定一种信号&#xff0c;响一次是报平安&#xff0c;响两次是要钱&#xff0c;响三次才需要接通。 2.概念 协议&#xff1a;是一种约定&#xff0c;这种约…

【odoo15】前端自定义模态弹窗

概要 在odoo15或者在15之前&#xff0c;odoo前端的owl框架还没完全替换当前前端框架的时候&#xff0c;我们很多时候都是用js或者jq来直接操作dom&#xff0c;那么我们如果需要在前端用到一个模态弹窗&#xff0c;可以怎么解决呢&#xff1f; 方法1 直接用js原生的模态弹窗&am…

Zadig vs. Jenkins 详细比较

01、Zadig vs. Jenkins&#xff1a;关于时代的选择 最近官方公众号发布了一篇名为 《是时候和 Jenkins 说再见了》的文章&#xff0c;引起了社区的广泛关注和讨论。作为曾经最被广泛使用的持续构建交付工具&#xff0c;Jenkins 的江湖地位似乎被挑战了。评论中有一条被高度点赞…

和鲸科技执行总裁殷自强:面向空间数据协同分析场景的模型生命周期管理方法

导读&#xff1a; 由 ACM SIGSPATIAL 中国分会主办的第五届空间数据智能学术会议&#xff08;SpatialDI 2024&#xff09;于 2024 年 4 月 25 日- 27 日在南京圆满召开&#xff0c;主题为“ AGI 时代下的空间数据智能”&#xff0c;旨在深入推动空间数据智能研究的理论进步与应…