聊聊Flink:Flink的分区机制

一、前言

flink任务在执行过程中,一个流(stream)包含一个或多个分区(Stream partition)。TaskManager中的一个slot的subtask就是一个stream partition(流分区),一个Job的流(stream)分布在多个不同的Slot上执行。每一个算子可以包含一个或多个子任务(subtask),这些subtask执行在不同的分区中,本质是在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。

1.1 Flink数据传输

  • 组件之间的通信消息传输,即Client、JobManager、TaskManager之间的信息传递,采用Akka框架(主要用作组件间的协同,如心跳检测、状态上报、指标统计、作业提交和部署等)。
  • 算子之间的流数据传输
    • 本地线程内的流数据传输(同一个SubTask中):同一个SubTask内的两个Operator(属于同一个OperatorChain)之间的数据传输是方法调用,即上游算子处理完数据后,直接调用下游算子的processElement方法。
    • 本地线程间的流数据传输(同一个TaskManager的不同SubTask中):即同一个TaskManager(JVM进程)中的不同Task(线程,本质上是SubTask)的算子之间的数据传输,通过本地内存进行数据传递,存在数据序列化和反序列过程。
    • 跨网络的流数据传输(不同TaskManager的SubTask中):采用Netty框架,通过Socket传递,也存在数据序列化和反序列过程。

flink中的重分区算子定义上下游subtask之间数据传递的方式,SubTask之间进行数据传递模式有两种,一种是one-to-oneforwarding)模式,另一种是redistributing的模式。

1.2 重分区算子数据传递的两种方式

  • One-to-one:数据不需要重新分布,上游SubTask生产的数据与下游SubTask受到的数据完全一致,数据不需要重分区,也就是数据不需要经过IO,比如下图中source->map的数据传递形式就是One-to-One方式。常见的map、fliter、flatMap等算子的SubTask的数据传递都是one-to-one的对应关系。类似于spark中的窄依赖。
  • Redistributing:数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map->keyBy。创建的keyBy、broadcast、rebalance、shuffle等算子的SubTask的数据传递都是Redistributing方式,但它们具体数据传递方式是不同的。类似于spark中的宽依赖。

在这里插入图片描述

flink中的重分区算子除了keyBy以外,还有broadcast、rebalance、shuffle、rescale、global、partitionCustom等多种算子,它们的分区方式各不相同。需要注意的是,这些算子中除了keyBy能将DataStream转化为KeyedStream外,其它重分区算子均不会改变Stream的类型。

二、分区策略

数据在算子之间流动需要依靠分区策略(分区器),Flink目前内置了以下几种分区策略和自定义分区策略。已实现的分区策略对应的API为:
在这里插入图片描述
自定义分区策略的API为CustomPartitionerWrapper。

各个API的继承关系如下图所示:
在这里插入图片描述
ChannelSelector是分区策略的顶层接口,其决定了记录应该写入哪个逻辑通道,通道可理解为下游算子的某个实例,或下游并行算子的某个子任务。该接口的定义源码如下:
在这里插入图片描述

抽象类StreamPartitioner实现了ChannelSelector接口,是一个用于流程序的特殊的ChannelSelector,其中定义了一些通用的分区策略方法。Flink中的所有分区策略(分区器)都继承了StreamPartitioner类,并且实现了各自独有的分区规则。

三、内置分区策略

3.1 BinaryHashPartitioner

该分区策略位于Blink的Table API的org.apache.flink.table.runtime.partitioner包中,是一种针对BinaryRowData的哈希分区器。BinaryRowData是RowData的实现,可以显著减少Java对象的序列化/反序列化。RowData用于表示结构化数据类型,运行时通过Table API或SQL管道传递的所有顶级记录都是RowData的实例。关于BinaryHashPartitioner,我们这里不做过多讲解。

3.2 BroadcastPartitioner

广播分区策略将上游数据记录输出到下游算子的每个并行实例中,即下游每个分区都会有上游的所有数据。使用DataStream的broadcast()方法即可设置该DataStream向下游发送数据时使用广播分区策略。

来一段代码演示下:

/*** 微信公众号:老周聊架构*/
public class PartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<Integer> dataStream = env.fromElements(1, 2, 3, 4, 5, 6);//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号dataStream.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}});//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Integer> dataStreamAfter = dataStream.broadcast();//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Integer, Object>() {@Overridepublic Object map(Integer value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).print();env.execute("PartitionerTest Job");}
}

直接IDEA控制台输出:
在这里插入图片描述

从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述
执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述
对比表发现,广播分区策略将上游每个元素分别发送到了下游算子的所有分区,这种策略会把数据复制多份,向下游算子的每个分区发送一份。
在这里插入图片描述
我们把上面的任务提交到Flink,同样也可以看出前面分区前每个子任务两条数据,分区后每个子任务六条数据。
在这里插入图片描述
在这里插入图片描述
3.3 ForwardPartitioner

转发分区策略只将元素转发给本地运行的下游算子的实例,即将元素发送到与当前算子实例在同一个TaskManager的下游算子实例,而不需要进行网络传输。要求上下游算子并行度一样,这样上下游算子可以同属一个子任务。

这里把上面的代码调整下:

dataStream.forward()

IDEA控制台输出:
在这里插入图片描述
从输出结果可以看出,数据共分为3个分区(编号为0、1、2)。执行分区策略前,每个元素所属的分区:
在这里插入图片描述

执行分区策略后,每个元素所属的分区如下:
在这里插入图片描述

对比发现,转发分区策略将上游同一个分区的元素发送到了下游同一个分区中。使用数据流图表示如下图:

在这里插入图片描述
在上下游的算子没有指定分区策略的情况下,如果上下游的算子并行度一致,则默认使用ForwardPartitioner,否则使用RebalancePartitioner。在StreamGraph类的源码中可以看到该规则:
在这里插入图片描述
对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
在这里插入图片描述
3.4 GlobalPartitioner

全局分区策略将上游所有元素发送到下游子任务编号等于0的分区算子实例上(下游第一个实例)。

这里把上面的代码调整下:

dataStream.global()

IDEA控制台输出:
在这里插入图片描述
分区前:
在这里插入图片描述

分区后:
在这里插入图片描述

全局分区策略将上游所有分区中的所有元素发送到了下游编号为0的分区中:
在这里插入图片描述

3.5 .KeyGroupStreamPartitioner

Key分区策略根据元素Key的Hash值输出到下游算子指定的实例。keyBy()算子底层正是使用的该分区策略,底层最终会调用KeyGroupStreamPartitioner的selectChannel()方法,计算每个Key对应的通道索引(通道编号,可理解为分区编号),根据通道索引将Key发送到下游相应的分区中。selectChannel()方法源码如下:
在这里插入图片描述
在这里插入图片描述

总的来说,Flink底层计算通道索引(分区编号)的流程如下:

  • 计算Key的HashCode值。
  • 将Key的HashCode值进行特殊的Hash处理,即MathUtils.murmurHash(keyHash),返回一个非负哈希码。
  • 将非负哈希码除以最大并行度取余数,得到keyGroupId,即Key组索引。
  • 使用公式keyGroupId×parallelism/maxParallelism得到分区编号。parallelism为当前算子的并行度,即通道数量;maxParallelism为系统默认支持的最大并行度,即128。

3.6 RebalancePartitioner

平衡分区策略使用循环遍历下游分区的方式,将上游元素均匀分配给下游算子的每个实例。每个下游算子的实例都具有相等的负载。当数据流中的元素存在数据倾斜时,使用该策略对性能有很大的提升。

这里把上面的代码调整下:

dataStream.setParallelism(2);

dataStreamAfter.setParallelism(3);

dataStream.rebalance()

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述
分区后:

在这里插入图片描述

平衡分区策略将上游所有元素均匀发送到了下游算子的所有分区:
在这里插入图片描述

3.7 RescalePartitioner

重新调节分区策略基于上下游算子的并行度,将元素以循环的方式输出到下游算子的每个实例。类似于平衡分区策略,但又与平衡分区策略不同。

上游算子将元素发送到下游哪一个算子实例,取决于上游和下游算子的并行度。例如,如果上游算子的并行度为2,而下游算子的并行度为4,那么一个上游算子实例将把元素均匀分配给两个下游算子实例,而另一个上游算子实例将把元素均匀分配给另外两个下游算子实例。相反,如果下游算子的并行度为2,而上游算子的并行度为4,那么两个上游算子实例将分配给一个下游算子实例,而另外两个上游算子实例将分配给另一个下游算子实例。

假设上游算子并行度为2,分区编号为A和B,下游算子并行度为4,分区编号为1、2、3、4,那么A将把数据循环发送给1和2,B则把数据循环发送给3和4。假设上游算子并行度为4,编号为A、B、C、D,下游算子并行度为2,编号为1、2,那么A和B把数据发送给1,C和D则把数据发送给2。

这里把上面的代码调整下:

dataStream.rescale()

同时将第一个map算子的并行度设置为2,第二个map算子的并行度设置为4。

IDEA控制台输出:
在这里插入图片描述

分区前:
在这里插入图片描述

分区后:
在这里插入图片描述
在这里插入图片描述

接下来改变map算子的并行度,将第一个map算子的并行度设置为4,第二个map算子的并行度设置为2。

在这里插入图片描述

如果想将元素均匀地输出到下游算子的每个实例,以实现负载均衡,同时又不希望使用平衡分区策略的全局负载均衡,则可以使用重新调节分区策略。该策略会尽可能避免数据在网络间传输,而能否避免还取决于TaskManager的Task Slot数量、上下游算子的并行度等。

3.8 ShufflePartitioner

随机分区策略将上游算子元素输出到下游算子的随机实例中。元素会被均匀分配到下游算子的每个实例。这种策略可以实现计算任务的负载均衡。

这里把上面的代码调整下:

dataStream.shuffle()

这里就不做过多演示了。我们下面来看下自定义分区策略。

四、自定义分区策略

自定义分区策略的API为CustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。

4.1 新建自定义分区器

新建分区器类MyCustomPartitioner并实现接口Partitioner(Object表示分区Key的数据类型),实现其中未实现的方法partition(),在该方法中添加相应的分区逻辑。

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class MyCustomPartitioner implements Partitioner {@Overridepublic int partition(Object key, int numPartitions) {if (key.equals("chinese")) {return 0;} else if (key.equals("math")) {return 1;} else {return 2;}}
}

上述代码通过partition()方法取得分区编号,将Key值等于chinese的元素分配到编号为0的分区,将Key值等于math的元素分配到编号为1的分区,其余元素分配到编号为2的分区。

4.2 使用自定义分区器

调用DataStream的partitionCustom()方法传入自定义分区器类MyCustomPartitioner的实例,可以对DataStream按照自定义规则进行重新分区,代码如下:

/*** 自定义分区策略* 微信公众号:老周聊架构*/
public class CustomPartitionerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);DataStream<String> dataStream = env.fromElements("chinese,98", "math,88", "english,96");//1.分区策略前的操作//输出dataStream每个元素及所属的子任务编号SingleOutputStreamOperator<Map<String, Integer>> dataStreamBefore =dataStream.map(new RichMapFunction<String, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(String value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略前,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));Map<String, Integer> map = new HashMap<>();map.put(value.split(",")[0], Integer.parseInt(value.split(",")[1]));return map;}}).setParallelism(2);//2.设置分区策略//设置DataStream向下游发送数据时使用的策略DataStream<Map<String, Integer>> dataStreamAfter = dataStreamBefore.partitionCustom(new MyCustomPartitioner(), value -> value);//3.分区策略后的操作dataStreamAfter.map(new RichMapFunction<Map<String, Integer>, Map<String, Integer>>() {@Overridepublic Map<String, Integer> map(Map<String, Integer> value) throws Exception {System.out.println(String.format("元素值: %s, 分区策略后,子任务编号: %s", value,getRuntimeContext().getIndexOfThisSubtask()));return value;}}).setParallelism(3).print();env.execute("CustomPartitionerTest Job");}
}

分区前:
在这里插入图片描述

分区后:

在这里插入图片描述
自定义分区策略将上游所有元素按照自定义的规则发送到了下游的3个分区中。

把任务给到Flink上去跑,发现:
在这里插入图片描述

这是因为泛型擦除,下面的DataStream泛型需要指定类型,不能
在这里插入图片描述

小知识:

在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦除,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除

类型加好以后,再跑一下任务,会出现任务成功。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/16657.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

VRRP HSRP GLBP 三者区别

1. VRRP&#xff08;Virtual Router Redundancy Protocol&#xff0c;虚拟路由冗余协议&#xff09; 标准协议&#xff1a;VRRP 是一种开放标准协议&#xff08;RFC 5798&#xff09;&#xff0c;因此支持的厂商较多&#xff0c;通常用于多种网络设备中。主备模式&#xff1a;…

VMware 17虚拟Ubuntu 22.04设置共享目录

VMware 17虚拟Ubuntu 22.04设置共享目录 共享文件夹挂载命令&#xff01;&#xff01;&#xff01;<font colorred>配置启动自动挂载Chapter1 VMware 17虚拟Ubuntu 22.04设置共享目录一、卸载老版本二、安装open-vm-tools<font colorred>三、配置启动自动挂载四、添…

二叉树Golang

二叉树 前言 完全二叉树 最底层节点按顺序从左到右排列。 满二叉树 一颗二叉树只有0度和2度的节点。 二叉搜索树 左子树上的所有节点的值均小于根节点的值。右子树上的所有节点的值均大于根节点的值。 平衡二叉搜索树 左右两个子树的高度差的绝对值不超过1 。 二叉树的存储…

【鸿蒙开发】第十三章 ArkTS基础类库-容器(数据结构)

目录 1 容器简述 2 线性容器 2.1 ArrayList 2.2 Vector 2.3 List 2.4 LinkedList 2.5 Deque 2.6 Queue 2.7 Stack 2.8 线性容器的使用 3 非线性容器 3.1 HashMap 3.2 HashSet 3.3 TreeMap 3.4 TreeSet 3.5 LightWeightMap 3.6 LightWeightSet 3.7 PlainArray…

3D电子商务是什么?如何利用3D技术提升销售转化?

在数字化浪潮席卷全球的今天&#xff0c;网上购物已成为消费者日常生活中不可或缺的一部分。然而&#xff0c;尽管其便捷性无可比拟&#xff0c;但传统电商模式中的“看不见、摸不着”问题始终困扰着消费者与商家。商品是否符合期望、尺寸是否合适、颜色是否真实……这些不确定…

腾讯地图GL JS点标识监听:无dragend事件的经纬度获取方案

引入腾讯地图SDK <!--腾讯地图 API--><script charset"utf-8" src"https://map.qq.com/api/gljs?librariestools&v1.exp&key***"></script>构建地图容器 <div class"layui-card"><div class"layui-car…

基于SpringBoot+RabbitMQ完成应⽤通信

前言&#xff1a; 经过上面俩章学习&#xff0c;我们已经知道Rabbit的使用方式RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客 RabbitMQ的工作队列在Spring Boot中实现&#xff08;详解常⽤的⼯作模式&#xff09;-CSDN博客作为⼀个消息队列,RabbitMQ也可以⽤作应⽤程…

3. Spring Cloud Eureka 服务注册与发现(超详细说明及使用)

3. Spring Cloud Eureka 服务注册与发现(超详细说明及使用) 文章目录 3. Spring Cloud Eureka 服务注册与发现(超详细说明及使用)前言1. Spring Cloud Eureka 的概述1.1 服务治理概述1.2 服务注册与发现 2. 实践&#xff1a;创建单机 Eureka Server 注册中心2.1 需求说明 图解…

2024年11月第2个交易周收盘总结

计划自己的交易&#xff0c;交易自己的计划! 跟随市场而情绪波动&#xff0c;最终一定会导向失败&#xff01;连续、平稳、冷静地惯彻交易计划&#xff0c;比什么都重要&#xff01; 交易本身是极其简单和清楚的&#xff0c;让事情变复杂的原因不是行情走势和交易本身&#x…

一种时间戳对齐的方法(离线)

这段代码的主要功能是: 读取指定目录下的 pcd 文件和 jpg 文件。对于每个 pcd 文件,在 jpg 目录中找到时间戳最接近的 jpg 文件。将找到的 jpg 文件复制到对应的输出目录,实现时间戳对齐。 这种时间戳对齐的操作在多传感器数据融合中非常常见,它确保了不同传感器采集的数据在时…

【数据分享】全国农产品成本收益资料汇编(1953-2024)

数据介绍 一、《全国农产品成本收益资料汇编 2024》收录了我国2023年主要农产品生产成本和收益资料及 2018年以来六年的成本收益简明数据。其中全国性数据均未包括香港、澳门特别行政区和台湾省数据。 二、本汇编共分七个部分,即:第一部分,综合;第二部分,各地区粮食、油料;第…

SQL 处理数列

在关系模型的数据结构中&#xff0c;并没有“顺序”这一概念。因此&#xff0c;基于它实现的关系数据库中的表和视图的行和列也必然没有顺序。 1 处理数列 1.1 实践 1.1.1 生成连续编号 图 t_num 数据库源与目标视图v_seq 需求&#xff1a;根据0~9 这10个数&#xff0c;生成…

【云原生系列--Longhorn的部署】

Longhorn部署手册 1.部署longhorn longhorn架构图&#xff1a; 1.1部署环境要求 kubernetes版本要大于v1.21 每个节点都必须装open-iscsi &#xff0c;Longhorn依赖于 iscsiadm主机为 Kubernetes 提供持久卷。 apt-get install -y open-iscsiRWX 支持要求每个节点都安装 N…

编写情绪K线指标(附带源码下载)

编写需求&#xff1a; 很多交易者抱怨&#xff0c;传统的跟踪类指标常常存在滞后的问题&#xff0c;而预测类指标又常常不够可靠。那么&#xff0c;是否存在一种指标&#xff0c;能够精准地反映当前K线的强弱变化&#xff0c;并且具备高度的时效性呢&#xff1f; 效果展示&am…

16、pxe自动装机

pxe自动装机的组成 pxe&#xff1a;自动安装系统必要的运行环境 无人值守&#xff1a;为系统定制化的安装需要的软件 pxe的优点 规模化&#xff1a;同时装配多台服务器&#xff08;20-30&#xff09; 自动化&#xff1a;系统安装和服务配置不需要人工干预 远程实现&#x…

H.265流媒体播放器EasyPlayer.js网页直播/点播播放器WebGL: CONTEXT_LOST_WEBGL错误引发的原因

EasyPlayer无插件直播流媒体音视频播放器属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;无须安装任何插件&#xff0c;起播快、延迟低、兼容性强&#xff0c;使用非常便捷。 EasyPlayer.js能够同时支持HTTP、HTTP-FLV、HLS&a…

Javaweb开发核⼼心之玩转Servlet4(笔记)

javaweb开发核⼼心之玩转Servlet4.0 简介&#xff1a;什么是Servlet-开发你的第⼀一个动态⽹网站 什么是Servlet 简介&#xff1a;是JavaServlet的简称&#xff0c;⽤用Java编写的运⾏行行在Web服务器器或应⽤用服务器器上的程序,具有独⽴立于平台和协议的特性, 主要功能在于交…

VUE实现通话:边录边转发送语言消息、 播放pcm 音频

文章目录 引言I 音频协议音频格式:音频协议:II 实现协议创建ws对象初始化边录边转发送语言消息 setupPCM按下通话按钮时开始讲话,松开后停止讲话播放pcm 音频III 第三库recorderplayer调试引言 需求:电台通讯网(电台远程遥控软件-超短波)该系统通过网络、超短波终端等无线…

无人机遥控器基础讲解——CKESC电调小课堂08

无人机遥控器是控制无人机飞行的重要设备&#xff0c;以下是对其的详细介绍&#xff1a; CKESC-专业级电调研发生产供应商http://www.ckesc.com 一、外观与布局 1. 通常由两个摇杆、多个功能按钮、一个显示屏和天线组成。 2. 摇杆一般位于遥控器的中央位置&#xff0c;用于控…

谷歌新作:Unbounded开放世界RPG,AI定义无限游戏新纪元

在开放世界和角色扮演游戏的领域里&#xff0c;玩家们总是渴望着那种无拘无束的自由体验。他们梦想着一个没有空气墙阻隔&#xff0c;没有剧情杀限制&#xff0c;没有任何交互限制的游戏世界。现在&#xff0c;这个梦想可能即将成真。谷歌联合北卡罗来纳大学教堂山分校推出的Un…