三年 Sparker 都不一定知道的算子内幕

一、如何在 mapPartitions 中释放资源

mapPartitions是一种对每个分区进行操作的转换操作,于常用的map操作类似,但它处理的是整个分区而不是单个元素。mapPartitions的应用场景适合处理需要在每个分区内批量处理数据的场景,通常用于优化性能和减少计算开销。例如:减少数据库连接、网络连接等。即然涉及到资源的初始化那么必定伴随着资源的释放,这是本节讨论的重点。

以和 mysql 中数据交互为例,下面是一段伪代码

rdd.mapPartitions(iter => {// 初始化数据库连接lazy val connection = initConnection(args)// 迭代数据val result = iter.map(... /*处理逻辑会使用到 connection 对象*/)// 在返回结果之前需要释放资源connection.close()// 返回处理结果result
})

上面的代码在运行阶段之前都是没有问题的(可编译、可打包),不存在语法问题。但是在运行时会报No operations allowed after connection closed,直接分析报错原因是在 map 中使用 connection 获取数据时该连接已经被关闭,直观的感觉是close方法在map之前被调用,真正的原因是什么呢?

众所周知 spark 在调用行动算子之前是不会执行上游算子中的逻辑,在观察 spark rdd 算子链之间传递的对象是 scala 的迭代器,而 scala 的迭代器具有lazy特性的不如 spark 的lazy特性被人“广为流传”

package fun.uhope.practiseobject P2 {def main(args: Array[String]): Unit = {List(1, 2, 3, 4, 5).toIterator.map(x => {println("map被调用了")x})}
}

上面的代码执行后没有任务输出,因为 scala 的迭代器也需要行动算子去触发计算。那么mapPartitions代码的报错原因显然是iter.map(...)只是返回了一个迭代器对象,内部逻辑并没有被执行,随后下一行代码关闭了数据库连接,当 rdd 在后续调用了行动算子其内部也会去触发这个迭代器对象执行对应的内部逻辑,此时数据库连接才会被使用但这个连接早就被关闭了。

对症下药!!!需要在数据库连接关闭之前执行完map逻辑

方案一:强制触发迭代器计算(不推荐)

将迭代器转换为 scala 的集合类型,代码如下

rdd.mapPartitions(iter => {// 初始化数据库连接lazy val connection = initConnection(args)// 迭代数据val result = iter.map(... /*处理逻辑会使用到 connection 对象*/).toList// 在返回结果之前需要释放资源connection.close()// 返回处理结果result.toIterator
})

toList会强制执行迭代器的逻辑,但后果是迭代器中映射的数据会被全部存储在内存中,如果分区的数据过大调用toList可能会发生 OOM。需要慎用

方案二:重写迭代器(推荐)

mapPartitions需要返回一个迭代器,如果这个迭代器可以实现在初始化的时候获取资源连接,在迭代完最后一个元素时释放资源即可。下面是自定义迭代器实现方式

rdd.mapPartitions(iter => {new Iterator[String]{// 初始化数据库连接lazy val connection = initConnection(args)// 判断迭代器是否还有元素override def hasNext: Boolean = {val hasNext = iter.hasNextif (!hasNext) {// 释放资源connection.close()}hasNext}// 获取迭代器元素override def next(): String = {val line = iter.next()... /*处理逻辑会使用到 connection 对象*/}}
})

该方法即保留了迭代器按需摄取数据的能力又实现了资源的及时释放

二、reduceByKey vs groupByKey

word count 入门案例如下

rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println)

同时按照 sql 的实现逻辑还可以这么写

rdd.flatMap(_.split(" ")).map((_, 1)).groupByKey().mapValues(_.sum).foreach(println)

虽然groupByKey可以实现相同的结果,但效率较低,因为它会将所有相同key的值拉到一起,可能导致较大的网络传输开销和内存消耗。而reduceByKey默认实现了map端预聚合

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

三、 是全局有序吗

众所周知大数据场景下的全局排序是极其消耗资源的,hive 在执行 order by 时会将全部的数据 shuffle 到一个 reduce 节点上进行排序。spark 也提供了 rdd 的排序算子那么是全局有序还是分区有序?

sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9), numSlices = 3).sortBy(x => x).saveAsTextFile("data/sort result")

rdd的分区数是 3 排序后将结果写入本地文件(3 个)依次查看文件数据

image-20240916213706172

可以看出sortBy居然实现了全局有序,下面一探究竟 spark 是如何在大数据集下进行全局排序。

def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values
}def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
{val part = new RangePartitioner(numPartitions, self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

从调用链来看关键是使用了RangePartitioner分区器,是一种基于范围的分区器。通过随机采样的方式近似估计分区键的分布情况结合分区数(假定为 n)将 rdd 的数据分为 n 段,随后在每个分区中进行局部排序。因为是基于范围的分区,分区之间本身就具备顺序性当每个分区的局部排序完成之后全局排序便自动完成。

四、多种 rePartition

spark 中提供两种方法进行重分区coalescerepartition。从调用链分析二者的关系

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}

理解coalesce的关键是 shuffle 选项,

从是否 shuffle 的角度分析

  1. 分区增加一定需要 shuffle,至少存在一个分区数据需要分发给多个分区
  2. 分区减少可以不需要 shuffle,将若干个分区全部分发给一个分区

从分区变化和是否 shuffle 角度分析

  1. 是否 shuffle 对分区减少没有必然联系
  2. 不 shuffle 且增加分区时无效

因此

package fun.uhope.transformimport fun.uhope.util.InitSparkContextobject RePartition {def main(args: Array[String]): Unit = {// 重分区val sc = InitSparkContext.withLocal()val sourceRDD = sc.parallelize(Nil)println(s"原始分区数 ${sourceRDD.partitions.length}")// coalesce 可以减少分区也可以增加分区// 减少分区时,可以不发生 shuffle// 增加分区时,shuffle 一定要设置为 true,否则分区数不发生变化val rdd1 = sourceRDD.coalesce(numPartitions = 4, shuffle = false)println(s"变成 4 分区 shuffle false ${rdd1.partitions.length}")val rdd2 = sourceRDD.coalesce(numPartitions = 16, shuffle = false)println(s"变成 16 分区 shuffle false ${rdd2.partitions.length}")val rdd3 = sourceRDD.coalesce(numPartitions = 16, shuffle = true)println(s"变成 16 分区 shuffle true ${rdd3.partitions.length}")// repartition 底层是 coalesce 且一定会发生 shuffleval rdd4 = sourceRDD.repartition(32)println(s"变成 32 分区的 repartition ${rdd4.partitions.length}")val rdd5 = sourceRDD.repartition(4)println(s"变成 32 分区的 repartition ${rdd5.partitions.length}")sc.stop()}
}

结论:coalesce相对repartition更加底层且灵活,但需要理解分区与shuflle的底层逻辑。repartitioncoaleace的一种特殊情况,它总是执行shuffle

Tips: 在数据分布不均的情况下减少分区建议使用shuffle这样可以让最终分区的数据变的更加均衡虽然会带来一定的资源消耗

五、广播变量的多种实现方式

Spark 中的广播变量(Broadcast Variables)是一种优化技术,主要用于在集群中高效分发只读数据。通过广播变量,Spark 可以将数据在各个节点上缓存,从而避免在每个任务中重复发送相同的数据,减少网络传输开销和提高性能。通常的使用场景如下:

  1. 小型只读数据集的共享
  2. mapjoin
  3. 机器学习模型广播
  4. 重复数据缓存

只考虑技术实现通常有:类 scala 闭包变量引用、spark 广播变量、临时文件

类 scala 闭包变量应用

val config = new HashMap[String, String]()
rdd.map(x => config.getOrElse(x, 'Nil')).foreach(println)

从语法上这是 scala 的闭包实现,但 spark 作为分布式计算框架变量 config 的初始化在Driver端完成,但 map 算子的逻辑在Executor端进行。因此类闭包的实现 spark 会将 config 对象进行序列化后通过网络发送到每个Executor的 JVM 中,至于在Executor中会被反序列化几份需要结合广播的变量类型

  1. 如果是 object 对象,具备单例每个 JVM(Executor) 只有一份
  2. 如果是 class 对象,每个 task 一份

Tip: 因为需要序列化,因此被广播的变量一定可以被序列化(继承Serializable)。同时因为内置的序列化协议会附带很多其它无用信息在广播大变量时不建议使用

spark 广播变量

val map = new HashMap[String, String]()
val config = sc.broadcast(map)
rdd.map(x => config.value.getOrElse(x, 'Nil')).foreach(println)

对比类闭包的实现,spark 提供的广播变量有以下优点

  1. 每个Executor保存一份
  2. 使用BitTorrent协议数据分块分发机制,使得数据可以从多个节点分别获取,有效减少数据传输延迟和带宽消耗加速广播过程
  3. 可以使用kryo序列化协议,相比 java 内置的序列化性能更高、序列化后的数据包更小

临时文件

在 MapReduce 编程框架中要实现广播(或mapjoin)通常是在 Job 中调用addCacheFile()将文件分发到集群的各个 Mapper 节点上,这个每个 Mapper都可以在本地文件中访问数据副本。Spark 同样支持

sc.addFile("hdfs://user/spark/jobxxx/config.txt")

之后的算子就可以像访问本地文件一样访问数据副本,但这种方式需要自己维护数据读取和解析在使用上的便捷性不如spark 提供的广播变量。这种方式不推荐使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/142414.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

AJAX 进阶 day4

目录 1.同步代码和异步代码 2.回调函数地狱和 Promise 链式调用 2.1 回调函数地狱 2.2 Promise - 链式调用 2.3 Promise 链式应用 3.async 和 await 使用 3.1 async函数和await 3.2 async函数和await_捕获错误 4.事件循环-EventLoop 4.1 事件循环 4.2 宏任务与微任务…

【有啥问啥】对比学习(Contrastive Learning,CL)的原理与前沿应用详解

对比学习(Contrastive Learning,CL)的原理与前沿应用详解 对比学习(Contrastive Learning)是自监督学习领域的关键方法之一,近年来因其在图像、文本和跨模态任务上的优越表现,受到了学术界和工…

【重学 MySQL】三十、数值类型的函数

【重学 MySQL】三十、数值类型的函数 基本函数角度与弧度互换函数三角函数指数与对数进制间的转换示例 基本函数 MySQL提供了一系列基本的数值函数,用于处理数学运算和数值转换。以下是一些常用的基本函数及其用法: 函数用法ABS(x)返回x的绝对值。SIGN…

Linux下文件下载中文乱码问题

最近做的一个项目中,本地打包到线上后,发现生成的文件中出现中文乱码,但在本地运行正常。经排查,文件输入输出流都指定了utf-8的编码格式,IDE的File Encodings也都是utf-8,Linux编码格式也是utf-8&#xff…

商务人士必备的精准翻译工具盘点

网易翻译是一款我外出游玩时候必备的翻译工具,最近没出去玩但是有更多的翻译需求了,为了方便在电脑上的操作我也找了不少翻译工具,这次一起分享给大家,看看哪款更得你的眼缘。 1.福昕在线翻译 链接直达:https://fany…

回归预测|基于灰狼优化正则化极限学习机的数据回归预测Matlab程序GWO-RELM 多特征输入单输出

回归预测|基于灰狼优化正则化极限学习机的数据回归预测Matlab程序GWO-RELM 多特征输入单输出 文章目录 一、基本原理1. 极限学习机(ELM)模型2. 灰狼优化算法(GWO)3. GWO-RELM回归预测流程总结 二、实验结果三、核心代码四、代码获…

C++——多线程编程(从入门到放弃)

进程:运行中的程序 线程:进程中的进程 线程的最大数量取决于CPU的核心数 一、将两个函数添加到不同线程中 demo:两个函数test01()和test02(),实现将用户输入的参数进行打印输出1000次 将这两个函数均放到独立的线程t1和t2中&…

【优化器】Optimizer——深度学习中的优化器是什么作用呢?

【优化器】Optimizer——深度学习中的优化器是什么作用呢? 【优化器】Optimizer——深度学习中的优化器是什么作用呢? 文章目录 【优化器】Optimizer——深度学习中的优化器是什么作用呢?1.什么是优化器?梯度下降法3. 常见的优化…

在typescript浏览器端中调用C++编写的函数,WebAssembly传递指针类型的参数,以及处理指针类型的返回值。

首先要在Cmake工程中的cmakelists.txt文件中引入Emscripten工具链&#xff1a; set(CMAKE_TOOLCHAIN_FILE "D:/CppPkg/emsdk/upstream/emscripten/cmake/Modules/Platform/Emscripten.cmake")直接看C代码&#xff1a; #include <emscripten/emscripten.h> #i…

鸿蒙开发之ArkTS 基础六 对象

什么是对象的呢&#xff1f;就是描述物体的特征和行为&#xff0c;是可以存储多种数据的容器 对象的定义和使用 let 对象名称: 对象结构类型 值 通过interface 关键字来约定对象结构类型,语法结构如下&#xff1a; interface 对象名{ 属性1&#xff1a;类型 属性2&#…

11.01类的定义和对象的使用(练习)

类的定义 类名&#xff1a;手机(Phone) 成员变量&#xff1a;品牌(brand&#xff09;&#xff0c;价格&#xff08;price&#xff09; 成员方法&#xff1a;打电话(calL)&#xff0c;发短信&#xff08;sendMessage&#xff09; 调用类变量和方法

基于SpringBoot+Vue+MySQL的高校心理教育辅导系统

系统展示 用户前台界面 管理员后台界面 系统背景 随着社会的快速发展&#xff0c;大学生群体面临着日益复杂的学习、生活及就业压力&#xff0c;心理健康问题日益凸显。传统的面对面心理咨询方式因时间、空间等限制&#xff0c;难以满足学生多样化的需求。因此&#xff0c;利用…

基于spring的ssm整合

目录 基于spring的ssm整合 Spring 框架 SpringMVC 框架 MyBatis 框架 1.创建项目 2.导入依赖 3.导入sql 4.创建jdbc.propries文件 1&#xff09;mysql8以下 2&#xff09;mysql8以上的 5.创建mybatis-config.xml配置文件 6.创建spring-Config.xml文件 7.创建项目所需包和类 1&a…

2024-1.2.12-Android-Studio配置

本地博客: https://k1t0111.github.io/ K1T0 最近在做一些app方向的移动技术开发学习&#xff0c;但是由于AS的配置问题&#xff0c;市面上找不到最新的2024版本的AS的相关配置。笔者也是踩了很多坑&#xff0c;因此想写一篇文章记录一下最新的AS 2024 1.2.12的对应java环境的一…

Html css样式总结

1.Html css样式总结 1.1. 定位position 布局是html中非常重要的一部分&#xff0c;而定位在页面布局中也是使用频率很高的方法&#xff0c;本章节为定位在布局中的使用技巧和注意事项。   position定位有4个属性&#xff0c;分别是static(默认&#xff09;&#xff0c;absol…

CLIP论文中关键信息记录

由于clip论文过长&#xff0c;一直无法完整的阅读该论文&#xff0c;故而抽取论文中的关键信息进行记录。主要记录clip是如何实现的的&#xff08;提出背景、训练数据、设计模式、训练超参数、prompt的作用&#xff09;&#xff0c;clip的能力&#xff08;clip的模型版本、clip…

感知器神经网络

1、原理 感知器是一种前馈人工神经网络&#xff0c;是人工神经网络中的一种典型结构。感知器具有分层结构&#xff0c;信息从输入层进入网络&#xff0c;逐层向前传递至输出层。根据感知器神经元变换函数、隐层数以及权值调整规则的不同&#xff0c;可以形成具有各种功能特点的…

学习笔记 韩顺平 零基础30天学会Java(2024.9.15)

P557 泛型应用实例 P558 泛型使用细节1 P560 泛型使用细节2 P560 泛型课堂练习 代码见Exceise P561 自定义泛型类 对于第二点&#xff0c;因为不知道类型&#xff0c;所以不知道开辟多少空间&#xff0c;因此不能初始化 第三点&#xff0c;静态方法与类相关的&#xff0c;在类…

LeetCode[中等] 189.轮转数组

给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 思路 创建一个新数组&#xff0c;存储原数组旋转后的元素&#xff0c;然后将新数组中的元素复制回原数组。 public class Solution {public void Rotate(int[] nums, int k)…

【docker】阿里云使用docker,2024各种采坑

▒ 目录 ▒ &#x1f6eb; 导读需求开发环境 1️⃣ dial tcp: lookup on 8.8.8.8:53: no such host失败属于DNS问题 2️⃣ docker镜像配置配置最新镜像源 3️⃣ 【重点】阿里云专用获取自己的镜像加速器地址配置镜像地址 &#x1f6ec; 文章小结&#x1f4d6; 参考资料 &#x…