spark shuffle写操作——SortShuffleWriter


写入的简单流程:
1.生成ExternalSorter对象
2.将消息都是插入ExternalSorter对象中
3.获取到mapOutputWriter,将中间产生的临时文件合并到一个临时文件
4.生成最后的data文件和index文件
可以看到写入的重点类是ExternalSorter对象
image.png

ExternalSorter

基本功能:对(k,v)进行排序,中间可能存在合并操作,最后生成(k,c)。

  1. 使用partitioner对key进行分区
  2. 在每个分区中使用Comparator进行排序
  3. 输出一个单独的文件,每个分区对应这个文件中的一段范围。

如果禁用了合并操作,类型C必须等于V
这个类的工作流程如下:

  • 使用数据,反复填充内存缓冲区。如果是可以合并的数据,则使用PartitionedAppendOnlyMap;如果不合并,则使用PartitionedPairBuffer。在这些缓冲区中,我们首先按分区ID对元素进行排序,然后可能还会按键进行排序。为了避免对每个键多次调用分区器,我们将分区ID与每条记录一同存储。
  • 当每个缓冲区达到内存限制时,会将其spill到文件中。这个文件首先按分区ID排序,如果需要做聚合操作,其次可能按键或键的哈希码排序。对于每个文件,跟踪每个分区在内存中的对象数量,因此不必为每个元素都写出分区ID。
  • 当用户请求迭代器或文件输出时,溢写的文件会与任何剩余的内存数据一起被合并,使用上述定义的相同排序顺序(除非排序和聚合都被禁用)。如果需要按键进行聚合,我们或者从ordering参数中使用全序,或者读取具有相同哈希码的键并相互比较它们的相等性来合并值。
  • 用户在结束时应调用stop()方法来删除所有中间文件。

缓存buffer:PartitionedAppendOnlyMap、PartitionedPairBuffer
关键方法:insertAll、maybeSpillCollection、spill、writePartitionedMapOutput
image.png
image.png

PartitionedPairBuffer

capacity 容量
curSize 当前放入的数据量
data 数组,存储的数据,(k,v)占用数组的两个位置
image.png

insert

如果容量达到瓶颈就进行扩容。
先存key,再存value。再调用afterUpdate
image.png

afterUpdate

numUpdates数据插入/更新次数
nextSampleNum下一次采样的次数
更新numUpdates,如果达到采样次数,执行采样takeSample
image.png

takeSample

samples中只存两个样品数据,用来计算每次更新的差值。
采样的时候要移除多余的数据。更新下一次采样的数据量。
image.png

estimateSize

预估大小。
最后一个样品的lastSize+bytesPerUpdate*新增的更新次数。
image.png

resetSamples

重新进行采样。
image.png

growArray

扩容2倍容量,迁移数据,重启采样
image.png

partitionedDestructiveSortedIterator

生成比较器comparator,调用sort对缓存的数据进行排序。
image.png
sorter是使用TimSort进行排序的。
TimSort介绍: https://zhuanlan.zhihu.com/p/695042849
image.png

iterator

用pos计算剩余量。
data(2 * pos)为key,data(2 * pos+1) 为value
image.png

PartitionedAppendOnlyMap

存储数据用的数组data,里面的元素是key0, value0, key1, value1, key2, value2…
image.png

changeValue

PartitionedAppendOnlyMap插入数据不再是追加,而是有一个相同key合并值的过程。

  1. key是null,返回null,不进行存储
  2. key首次插入,更新data中的对应的kv值
  3. key非首次插入,更新data中合并的的新value
  4. key发生哈希冲突,就向后加1,直到不冲突

image.png

update

跟changeValue类似。
image.png

growTable

比较简单,就是容量扩大两倍,将旧的kv值重新计算hash插入到新的数组中,如果发生hash冲突就不断向后移动一位。
image.png

iterator

核心方法是nextValue,在nextValue中,遍历data数组的对应key值,要求不是null,表明这个位置是有值的。
如果有key为null,要求pos=-1且haveNullValue=true
image.png

partitionedDestructiveSortedIterator

调用destructiveSortedIterator方法
image.png

destructiveSortedIterator

data数组中元素是分散的,首先将数组中的元素都集中到数组的前面。后面就跟PartitionedPairBuffer的partitionedDestructiveSortedIterator方法一样使用TimeSort进行排序。
image.png

采样相关方法

跟上面的PartitionedPairBuffer的采样相关方法一样。

spill相关方法

入口方法是maybeSpillCollection
image.png

maybeSpillCollection

不论使用的数据结构是buffer还是map,都是计算消耗的容量,再调用maybeSpill方法,最后重新初始化化对应数据结构。可以想到maybeSpill中就将缓存的数据放到了本地。
image.png

maybeSpill

每32条数据就进行一次内存使用情况判断。如果当前使用内存超过了限制,就先申请新的内存,按照两倍的内存使用量申请,不一定申请到足量的内存。申请后还是内存使用超过了限制,就进行spill,调用spill方法,同时调用releaseMemory释放内存。
image.png

releaseMemory

image.png

spill

调用destructiveSortedWritablePartitionedIterator方法返回排好序的分区迭代器。
调用spillMemoryIteratorToDisk将数据溢写到磁盘上
最后将生成的文件记录到spills中
image.png

destructiveSortedWritablePartitionedIterator

调用对应数据结构的partitionedDestructiveSortedIterator方法返回排序的迭代器。
就是上面的PartitionedPairBuffer和PartitionedAppendOnlyMap的partitionedDestructiveSortedIterator方法。
image.png

spillMemoryIteratorToDisk

创建临时文件,生成对应的writer
image.png
遍历将数据写入的文件中,每10000条进行一次flush。
如果失败了,调用revertPartialWritesAndClose进行回滚。
image.png

revertPartialWritesAndClose

如果这次写入出现问题,使用这个方法。回滚写入,只保留截止到上一次写入的内容。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1475099.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

包装类的128陷阱

我们先看一段代码 Integer num1100; Integer num2100; System.out.println(num1 num2); //trueInteger num3128; Integer num4128; System.out.println(num3 num4); //false 在执行之后,我们会发现num1num2是true,而num3num4却是false,…

Thingsboard 系列之通过 ESP8266+MQTT 模拟设备上报数据到平台

前置工作 Thingsboard平台ESP 8266 NodeMCU 开发板IDE: Arduino 或 VScode 均可 服务端具体对接流程 系统管理员账号通过 Thingsboard 控制面板创建租户等信息并以租户账号登录 实体 —> 设备维护具体设备信息 创建完成后通过管理凭据修改或直接复制访问令牌…

实力上榜!Coremail连续5年入选中国网络安全市场全景图

7月4日,网络安全行业研究机构数说安全正式发布《2024年中国网络安全市场全景图》(以下简称“全景图”),这是自2018年开始,数说安全发布的第七版全景图。 作为国内头部电子邮件产品与解决方案提供商,Coremai…

Linux基础: 二. Linux的目录和文件

文章目录 二. Linux的目录和文件1.1 目录概要1.2 目录详细说明 二. Linux的目录和文件 1.1 目录概要 command:ls / Linux的文件系统像一棵树一样,树干是根目录(/),树枝是子目录,树叶是文件; …

Qt 网络编程实战

一.获取主机的网络信息 需要添加network模块 QT core gui network主要涉及的类分析 QHostInfo类 QHostInfo::localHostName() 获取本地的主机名QHostInfo::fromName(const QString &) 获取指定主机的主机信息 addresses接口 QNetworkInterface类 QNetworkInterfac…

AI绘画Stable Diffusion:超强InstantID插件—面部特征一致性风格保持与迁移,轻松搞定私人写真摄影,SDWebUI使用指南

大家好,我是设计师阿威 最近经常有同学问到AI绘画Stable Diffusion 关于风格迁移IPA、Instant ID等安装和使用方法,内容虽基础但也属常用工具。因此,本文将核心介绍Instant ID 使用和墨幽人造人XL体验(这是一款可出图商用的写实摄…

uniapp微信小程序端实现微信登录

从网上看了一些,好像说是要先调用uni.getUserProfile 或者uni.getUserInfo获取用户信息后,然后再调用uni.login才行,但是我看着uni.getUserProfile和uni.getUserInfo 返回的都是一些匿名信息(nickName: "微信用户", 头像…

经济寒冬:竞品凶猛,你的产品如何求生?

那些年曾被竞品干掉的产品 1997年到2010年左右是国内互联网行业的快速发展和多元化发展的时期,这一时期涌现出来一大批优秀的产品,市场竞争越来越激烈。苹果 在20 世纪 80 年代,乔布斯的苹果电脑,在当时可是PC行业的老大&#xf…

tessy 单元测试:小白入门指导手册

目录 1,创建单元测试工程目录 2,导入单元测试源文件 一:创建测试文件夹(最好和代码目录一一对应,方便查找) 二:选择测试环境 三:添加源文件 四:分析源文件 3,编写单元测试用例 一:设置函数参数的传输方向 二:添加单元测试用例 三:编辑单元测试用例数据 …

Kafka抛弃Zookeeper后如何启动?

Kafaka如何下载 官网地址 目前Kafka最新的版本就是3.7.1 我们可以看到下面这两个版本信息?什么意思呢? Scala 2.12 - kafka_2.12-3.7.1.tgz (asc, sha512)Scala 2.13 - kafka_2.13-3.7.1.tgz (asc, sha512) 我们应该知道,一个完整的Kafka实…

互联网十万个为什么之什么是数据备份?

数据备份是按照一定的备份频率创建数据副本的过程,将重要的数据复制到其它位置或者存储介质,并对生成的副本保留一定的时长。备份通常储存在不同的物理介质或云端,以确保数据的连续性和完整性。有效的备份策略至关重要,以防止数据…

c#类型转换和常见集合类型

目录 1. 整数转换,整数和字符串,字符串和整数之间的转换怎么实现? 2. 日期转换,获取当前日期,字符串转日期,日期转字符串怎么实现? 3. 举例一维、二维、三维数组 4. 需求:有个88…

07浅谈大语言模型可调节参数tempreture

浅谈temperature 什么是temperature? temperature是大预言模型生成文本时常用的两个重要参数。它的作用体现在控制模型输出的确定性和多样性: 控制确定性: temperature参数可以控制模型生成文本的确定性,大部分模型中temperatur…

《C++20设计模式》命令模式思考

文章目录 一、前言二、分析 拆解1、经典命令模式2、撤销操作3、关于Invoker类 三、实现 一、前言 哎!只要是书上写的和经典设计模式不同,我就会很伤脑筋。😩 命令模式到底是干什么的? 答:命令的发送者和接收者完全解…

SQLServer的系统数据库用别的服务器上的系统数据库替换后做跨服务器连接时出现凭证、非对称金钥或私密金钥的资料无效

出错作业背景: 公司的某个sqlserver服务器要做迁移,由于该sqlserver服务器上数据库很多,并且做了很多的job和维护计划,重新安装的sqlserver这些都是空的,于是就想到了把系统4个系统数据库进行替换,然后也把…

应用在灯带Type-C接口上的PD SINK协议芯片ECP5701/ECP5702获取充电器的5V、9V、12V、15V、20V供电

方案背景 近日,欧盟就统一充电器接口的提案达成了一项政治协议,其中规定了在欧盟地区销售的所有手机或其他便携式中小型电子设备必须采用统一的USB Type-C接口。这项决定意味着未来将会有更多的产品强制性地使用TYPE-C充电接口。 在这个背景下&#xf…

算法设计与分析 实验5 并查集法求图论桥问题

目录 一、实验目的 二、问题描述 三、实验要求 四、实验内容 (一)基准算法 (二)高效算法 五、实验结论 一、实验目的 1. 掌握图的连通性。 2. 掌握并查集的基本原理和应用。 二、问题描述 在图论中,一条边被称…

剑指西门子ABB施耐德,中国自动化公司杀入全球市场,业绩增长90%

导语 大家好,我是社长,老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 更多的海量【智能制造】相关资料,请到智能制造online知识星球自行下载。 一提到工业自动化领域,西门子、施耐德、ABB这些…

vue-使用Worker实现多标签页共享一个WebSocket

文章目录 前言一、SharedWorker 是什么SharedWorker 是什么SharedWorker 的使用方式SharedWorker 标识与独占 二、Demo使用三、使用SharedWorker实现WebSocket共享 前言 最近有一个需求,需要实现用户系统消息时时提醒功能。第一时间就是想用WebSocket进行长连接。但…

植物神经紊乱小救星来啦!放松小技巧get√

哈喽,小伙伴们!今天给大家带来一些超级实用的放松小技巧,特别适合那些时常感到植物神经紊乱,心情紧绷的亲人们哦!👸 🍀首先,深呼吸大法!每次感到紧张或者焦虑的时候&…