Flink作业执行之 2.算子 StreamOperator

Flink作业执行之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {// 处理数据void processElement(StreamRecord<IN> element) throws Exception;
}// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {// 处理双流输入中第一个流上的元素void processElement1(StreamRecord<IN1> element) throws Exception;// 处理双流输入中第二个流上的元素void processElement2(StreamRecord<IN2> element) throws Exception;
}// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {// 封装Functionprotected final F userFunction;// 通过Function实现进行算子的实例化public AbstractUdfStreamOperator(F userFunction) {this.userFunction = requireNonNull(userFunction);checkUdfCheckpointingPreconditions();}// 算子生命周期的相关方法,实际上调用Function的方法@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();}}@Overridepublic void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction);}
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {if (operator == null) {return null;} else if (operator instanceof StreamSource&& ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {// 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);} else if (operator instanceof StreamSink&& ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {// 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);} else if (operator instanceof AbstractUdfStreamOperator) {return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);} else {return new SimpleOperatorFactory<>(operator);}
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 获取StreamMap对应的StreamOperatorFactory工厂类return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {OneInputTransformation<T, R> resultTransform =new OneInputTransformation<>(this.transformation,operatorName,// 将StreamOperatorFactory工厂实例,传入到Transformation中operatorFactory,outTypeInfo,environment.getParallelism());@SuppressWarnings({"unchecked", "rawtypes"})SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {// 以下3个属性从父类继承// 函数实例protected final F userFunction;// 结果输出protected transient Output<StreamRecord<OUT>> output;// 默认算子链生成策略protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);// 实例化StreamMap时,指定ALWAYS的算子链生成策略chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {// userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。output.collect(element.replace(userFunction.map(element.getValue())));}
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/*** StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 */
public enum ChainingStrategy {// 尽可能的将和上游算子链接到一起,大多数算子的默认值ALWAYS,// 当前算子不会上下游算子链接到一起NEVER,// 不会上游算子连接到一起,但是可以和下游算子链接到一起HEAD,// 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。HEAD_WITH_SOURCES;public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1451204.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Java——LinkedList

1、链表 1.1 链表的概念及结构 链表在逻辑层面上是连续的&#xff0c;在物理层面上不一定是连续的 链表结构可分为&#xff0c;单向或双向、带头或不带头、循环或非循环&#xff0c;组合共计8种 重点&#xff1a;无头单向非循环链表、无头双向链表 1.2 模拟实现无头单向非…

【代码随想录】【算法训练营】【第35天】[134]加油站 [135]分发糖果 [860]柠檬水找零 [406]根据身高重建队列

前言 思路及算法思维&#xff0c;指路 代码随想录。 题目来自 LeetCode。 day 35&#xff0c;连休两天~ 题目详情 [134] 加油站 题目描述 134 加油站 解题思路 前提&#xff1a;数组 思路&#xff1a;全局贪心算法&#xff1a;最小累加剩余汽油为负数&#xff0c;说明…

面向对象编程重载

系列文章目录 文章目录 系列文章目录前言一、重载&#xff08;overload&#xff09; 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff0c;这篇文章男女通用&#xff0c;看懂了…

Antd 自定义列表全选功能

背景 需要为List组件自定义全选功能&#xff0c;如下图所示&#xff1a; 全选checkbox需要与下面每一项的checkbox联动&#xff1b;当从第一页翻页到第二页的时候&#xff0c;第一页已选的内容保持&#xff0c;可以对第二页勾选&#xff0c;同时保证全选checkbox的状态是正确的…

在自己的电脑上搭建我的世界Java版服务器

很多朋友&#xff0c;喜欢玩Minecraft&#xff0c;也希望搭建一个服务器&#xff0c;用于和小伙伴联机&#xff1b; 并且&#xff0c;拥有服务器后&#xff0c;即使所有玩家都下线&#xff0c;“世界”依旧在运行&#xff0c;玩家可以随时参与其中&#xff0c;说不定一上线&am…

【Da-SimaRPN】《Distractor-aware Siamese Networks for Visual Object Tracking》

ECCV-2018 中科大 文章目录 1 Background and Motivation2 Related Work3 Advantages / Contributions4 Method4.1 Features and Drawbacks in Traditional Siamese Networks4.2 Distractor-aware Training4.3 Distractor-aware Incremental Learning4.4 DaSiamRPN for Long-t…

mmap引起的内存泄漏分析

最近遇到一个内存泄漏问题&#xff0c;由于问题出现在客户端&#xff0c;只能通过客户提供的Log来分析。 根据客户提供的/proc/meminfo数据发现&#xff0c;MemAvailable 由294072kB减小至18128kB&#xff0c;减小约269MB&#xff0c;引起该变化的最直接原因是PageTables由614…

49.Chome浏览器有三种清缓存方式

49.Chome浏览器有三种清缓存方式&#xff1a;正常重新加载、硬件重新加载、清空缓存并硬性重新加载 1、【正常重新加载】 触发方式&#xff1a;①F5  ②CtrlR  ③在地址栏上回车  ④点击链接 如果缓存不过期会使用缓存。这样浏览器可以避免重新下载JavaScript文件、图像、…

kettle从入门到精通 第六十九课 ETL之kettle kettle cdc mysql,轻松实现增量同步

1、之前kettle cdc mysql的时候使用的方案是canalkafkakettle&#xff0c;今天我们一起学习下使用kettle的插件Debezium直接cdc mysql。 注&#xff1a;CDC (Change Data Capture) 是一种技术&#xff0c;用于捕获和同步数据库中的更改。 1&#xff09;Debezium步骤解析mysql b…

反贿赂管理体系认证:提升企业诚信与防范风险的双重利器

反贿赂管理体系认证在当今商业环境中发挥着至关重要的作用。这一认证不仅有助于提高企业的道德标准和社会责任感&#xff0c;还能有效防范商业风险&#xff0c;并提升内部管理水平和工作效率。 反贿赂管理体系认证要求企业制定和执行严格的反贿赂政策和程序&#xff0c;从而在…

计算机网络原理实验(7):分析IP报文结构

一、实验名称 分析IP报文结构 二、实验目的&#xff1a; 1.掌握使用Wireshark分析俘获trace文件的基本技能&#xff1b; 2.深刻理解IP报文结构和工作原理。 三、实验内容和要求 1.分析俘获的分组&#xff1b; 2.分析IP报文结构。 3.记录每一字段的值&#xff0c;分析它的作…

FL Studio(FL 21)软件下载-详细安装教程视频

​fl studio 编曲软件即,简称FL,是音乐人熟知的水果编曲软件.可以完成完整的音乐制作环境或数字音频工作站(DAW)就是大家熟悉的水果 编曲软件&#xff0c;一个全能的音乐制作软件&#xff0c;包括编曲、录音、剪辑和混音等诸多功能&#xff0c;让你的电脑编程一个全能的录音室。…

基坑监测:关键环节与深入剖析,保障施工安全与质量新标准

在建筑工程中&#xff0c;基坑监测是一项至关重要的工作&#xff0c;它涉及对基坑施工现场的实时监测数据进行分析和评估&#xff0c;以确保基坑施工活动的稳定、安全和高效进行。基坑监测涵盖地质勘探、基坑开挖、加固、支护、周边环境以及工程质量验收等多个环节&#xff0c;…

通信原理抽样定理和PAM调制解调硬件实验

一、实验目的 1. 加深理解抽样定理&#xff1b; 2. 加深理解脉冲幅度调制的原理。 二、实验内容 1. 观测PAM平顶抽样波形&#xff1b; 2. 观测PAM自然抽样波形及解码后波形。 三、实验器材 1. 双踪示波器&#xff1b; 2. 通信原理实验箱信号源模块、①号模块。 四、实…

文章解读与仿真程序复现思路——电工技术学报EI\CSCD\北大核心《基于广义目标级联法的多牵引变电站 光伏-储能协同规划配置》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

java写一个验证码

生成验证码 内容&#xff1a;可以是小写字母&#xff0c;也可以是大写字母&#xff0c;还可以是数字 规则 长度为5 内容中是四位字母&#xff0c;1位数字。 其中数字只有1位&#xff0c;但是可以出现在任意的位置。 package User;import java.util.ArrayList; import jav…

FlashDB的TS数据库的标准ANSI C移植验证

本文目录 1、引言2、环境准备3、修改驱动4、验证 文章对应视频教程&#xff1a; 暂无&#xff0c;可以关注我的B站账号等待更新。 点击图片或链接访问我的B站主页~~~ 1、引言 在当今数据驱动的时代&#xff0c;高效可靠的数据存储与管理对于嵌入式系统及物联网(IoT)应用至关重…

【Unity每日一记】FairyGUI为什么能自动生成代码,它的好处是什么

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

kali中安装zsteg教程

1、下载文件 git clone http://www.github.com/zed-0xff/zsteg 2、第一步需要保证虚拟机是有网络的&#xff0c;不然无法克隆 3、可以将网络设置成如下后重启&#xff0c;访问百度看看能不能访问&#xff0c;若可以访问&#xff0c;则进行下一步 4、查看源&#xff0c;删除源&…