Spark-ShuffleWriter-UnsafeShuffleWriter-钨丝内存分配

一、上下文

《Spark-ShuffleWriter-UnsafeShuffleWriter》中提到在进行Page内存分配时,调用了一行代码

MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);

 这里就会走MemoryManager的钨丝内存分配,下面我们来详细看下

二、模式设定

  final val tungstenMemoryMode: MemoryMode = {//spark.memory.offHeap.enabled  默认 false  即:使用堆上分配if (conf.get(MEMORY_OFFHEAP_ENABLED)) {//spark.memory.offHeap.size 默认 0 //堆外分配的绝对内存量//此设置对堆内存使用没有影响,因此,如果执行器的总内存消耗必须符合某个硬限制,那么一定要相应地缩小JVM堆大小。require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")//当运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力,则为true。require(Platform.unaligned(),"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")MemoryMode.OFF_HEAP} else {MemoryMode.ON_HEAP}}

如果我们想使用堆外内存分配,必须满足3个条件

1、将spark.memory.offHeap.enabled设置为true

2、将spark.memory.offHeap.size设置为一个正数

3、运行JVM时,其中有sun的Unsafe包可用,并且底层系统具有未对齐的访问能力

三、堆上堆外区别

堆是什么?

为了严谨我们看看官网给出的解释:

https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.5.3

Java虚拟机有一个堆,在所有Java虚拟机线程之间共享。堆是运行时数据区域,从中为所有类实例和数组分配内存。

堆是在虚拟机启动时创建的。对象的堆存储由自动存储管理系统(称为垃圾收集器)回收;对象永远不会被显式释放。Java虚拟机不采用特定类型的自动存储管理系统,可以根据实现者的系统要求选择存储管理技术。堆的大小可以是固定的,也可以根据计算的需要进行扩展,如果不需要更大的堆,则可以进行收缩。堆的内存不需要是连续的。

Java虚拟机实现可以为程序员或用户提供对堆初始大小的控制,以及如果堆可以动态扩展或收缩,则可以控制最大和最小堆大小。(-Xms,-Xmx等参数)

以下异常情况与堆有关:

如果计算需要比自动存储管理系统可用的堆更多的堆,Java虚拟机会抛出OutOfMemoryError。

堆上:ON_HEAP

Java分配的非空对象都是由JVM的gc管理的,这一部分称堆上内存分配。JVM会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着会对Java应用造成性能影响。

堆上存储的都是对象,对象存储包含三个方面:

  1. 对象头‌:包含对象的元信息,如哈希码、锁信息等。在64位JVM上,对象头通常占用16字节的空间。
  2. 实例数据‌:存储对象的属性值。每个实例变量占用一定的空间,具体大小取决于变量类型和对齐要求。
  3. 对齐填充‌:为了保证对象在内存中的地址是8字节对齐的,可能会添加一些额外的填充字节。

在Spark的MemoryManager负责堆上分配的对象是HeapMemoryAllocator

堆外:OFF_HEAP

不受垃圾收集器管理的内存,受操作系统直接管理。因此只能存字节型数据,相比堆上分配更节省空间,寻址也更快。

在Spark的MemoryManager负责堆外分配的对象是UnsafeMemoryAllocator

四、HeapMemoryAllocator

1、allocate

  //WeakReference 是 Java 中用于实现弱引用的类。//当你希望引用一个对象,但是不希望这个对象被 JVM 的垃圾回收器(GC)视为垃圾回收的重要依据时,你可以使用弱引用。//弱引用所引用的对象一旦被垃圾回收器标记为可回收的对象,就会被自动清除(即使垃圾回收器在运行时还没有进行实际的回收动作)。//使用场景://    1、缓存对象:当你需要缓存一些对象,并且希望在内存紧张的时候能够释放这些对象,那么弱引用可以很好地满足这种需求。//    2、监听器和事件处理:在事件监听器中,如果你希望监听器能够被垃圾回收器回收,但是又不希望在监听器不再使用的时候手动去移除监听器,那么弱引用可以很好地满足这种需求。//很明显我们属于第1种场景private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();//1Mprivate static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;private boolean shouldPool(long size) {// 非常小的分配不太可能从池中受益。return size >= POOLING_THRESHOLD_BYTES;}public MemoryBlock allocate(long size) throws OutOfMemoryError {//多少个字,申请的内存都是 1 Byte 的倍数int numWords = (int) ((size + 7) / 8);//校准后的内存大小 单位 Bytelong alignedSize = numWords * 8L;assert (alignedSize >= size);//是否满足池机制 即申请的内存 >= 1/8 M 即 128 KBif (shouldPool(alignedSize)) {synchronized (this) {//bufferPoolsBySize种存的是已经释放的内存,如果正好有这部分内存,可以直接拿来用final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);if (pool != null) {//循环这个缓存池 poolwhile (!pool.isEmpty()) {//取出一个 数组引用final WeakReference<long[]> arrayReference = pool.pop();final long[] array = arrayReference.get();if (array != null) {assert (array.length * 8L >= size);//重新封装成 MemoryBlock  即 Page 三个参数//    1、Long[]//    2、去除头的偏移量//    3、实际数据长度//这样就方便程序直接操作数据的那块内存//Platform 是对 Unsafe 的封装MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);//spark.memory.debugFill 默认  falseif (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);}return memory;}}//将alignedSize 移除,表示其已经被用了bufferPoolsBySize.remove(alignedSize);}}}//重新申请内存long[] array = new long[numWords];MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);}return memory;}

 最终申请的内存如图所示:

 MemoryBlock是管理内存中的这一页数据的对象

 MemoryBlock

public class MemoryBlock extends MemoryLocation {//未由TaskMemoryManagers分配的页面的特殊“pageNumber”值public static final int NO_PAGE_NUMBER = -1;//用于标记TaskMemoryManager中已释放页面的特殊“pageNumber”值public static final int FREED_IN_TMM_PAGE_NUMBER = -2;//MemoryAllocator(内存分配器释)放的页面的特殊“pageNumber”值。这使我们能够检测到双重释放。public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;private final long length;//可选页码;当此内存块表示由TaskMemoryManager分配的页面时使用。//此字段是公共的,因此可以由位于不同包中的TaskMemoryManager进行修改public int pageNumber = NO_PAGE_NUMBER;public MemoryBlock(@Nullable Object obj, long offset, long length) {super(obj, offset);  // 就是一个 long [] this.length = length;  //数据真实需要的大小}//返回内存块的大小public long size() {return length;}//创建指向 long [] 使用的内存的内存块public static MemoryBlock fromLongArray(final long[] array) {return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);}//用指定的字节值填充内存块public void fill(byte value) {Platform.setMemory(obj, offset, length, value);}}

2、free

 public void free(MemoryBlock memory) {//如果要释放内存,必须满足以下几个条件//    1、之前申请过内存,也就是 long []  是存在的assert (memory.obj != null) :"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";//    2、内存之前没有被释放assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :"page has already been freed";//    3、TMM分配的页面必须首先通过TMM.freePage()释放,而不是直接在分配器free()中释放assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +"free()";//申请的内存大小final long size = memory.size();//spark.memory.debugFill 默认 false//是否分别用0xa5和0x5a字节填充新分配和释放的内存。这有助于发现未初始化或已释放内存的误用,但会带来一些开销。if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);}// 将页面标记为已释放(这样我们就可以检测到双重释放)memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;// 作为防止在释放bug后使用的额外防御层,我们对MemoryBlock进行了修改,以清空其对long[]数组的引用long[] array = (long[]) memory.obj;//将 页中的 long[] 置为null 且偏移量 置为 0memory.setObjAndOffset(null, 0);//校准后的内存分配大小 long alignedSize = ((size + 7) / 8) * 8;//判断释放满足最小分配大小 即:128 KB ,如果小于它也是不会申请页成功的if (shouldPool(alignedSize)) {synchronized (this) {LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);if (pool == null) {pool = new LinkedList<>();bufferPoolsBySize.put(alignedSize, pool);}//如果下次还有alignedSize的内存申请可以直接用pool.add(new WeakReference<>(array));}} else {// Do nothing}}

五、UnsafeMemoryAllocator

1、allocate

  public MemoryBlock allocate(long size) throws OutOfMemoryError {//使用Unsafe来从堆外分配怎么大的内存,不用校准long address = Platform.allocateMemory(size);//同样构建一个MemoryBlock 只是将obj 设置成了 null 且 address 设置成了直接地址MemoryBlock memory = new MemoryBlock(null, address, size);//spark.memory.debugFill 默认 falseif (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);}return memory;}

2、free

  public void free(MemoryBlock memory) {//释放内存前先检查 和堆上一样assert (memory.obj == null) :"baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :"page has already been freed";assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)|| (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);}//调用Unsafe来释放内存Platform.freeMemory(memory.offset);//修改了MemoryBlock以重置其指针。memory.offset = 0;// 将页面标记为已释放(这样我们就可以检测到双重释放)。memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/143242.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL高阶1831-每天的最大交易

题目 编写一个解决方案&#xff0c;报告每天交易金额 amount 最大 的交易 ID 。如果一天中有多个这样的交易&#xff0c;返回这些交易的 ID 。 返回结果根据 transaction_id 升序排列。 准备数据 Create table If Not Exists Transactions (transaction_id int, day date, …

python筛选出不合格密码的用户

有如下数据&#xff1a;筛选出不合格密码的用户&#xff0c;对出现至少四个连续数值为不合格密码&#xff0c;例如"1234"、"8765"为不合格密码 用户名密码X12345678Y87654321O10293847P39485726Q28475639R19283746S91827364T56473829U83746592V28374659W7…

新手教学系列——基于统一页面的管理后台设计(一)

在现代企业级应用中,后台管理系统往往是核心组成部分,特别是随着业务规模的扩展,如何在多个后端服务模块的基础上实现统一的登录验证、权限控制和页面管理,成为许多开发者面对的挑战。本文将以实际项目为例,详细讲解如何设计一个多模块的后台管理系统,满足不同服务模块的…

superset 解决在 mac 电脑上发送 slack 通知的问题

参考文档: https://superset.apache.org/docs/configuration/alerts-reports/ 核心配置: FROM apache/superset:3.1.0USER rootRUN apt-get update && \apt-get install --no-install-recommends -y firefox-esrENV GECKODRIVER_VERSION0.29.0 RUN wget -q https://g…

leetcode:字符串中的第一个唯一字符

#include <unordered_map> class Solution { public:int firstUniqChar(string s) {unordered_map<char, int> HashMap;string::iterator it s.begin();int i 0;//标记元素下标while (it ! s.end())//初始化哈希表{if (HashMap.count(*it) > 0)//原先hash表中…

saltstack远程执行

一、saltstack远程执行 一、saltstack远程执行&#xff1a;目标-targeting 详解见&#xff1a;https://www.cnblogs.com/phennry/p/5416408.html 1、查看认证主机情况 2、具体匹配 Globing :   *, 正则:      指定-E参数&#xff0c;正则表达式匹配多个 List&#xff1a…

我的AI工具箱Tauri版-FasterWhisper音频转文本

本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 进入软件后可以直接搜索 FasterWhisper 或者依次点击 Python音频技术/音频tools 进入该模块。 进入目录后需要进行一些基础配置&#xff0c;参数是默认的可以根据自己的机器进行一些简单的参数操作。 使用方…

新的 MathWorks 硬件支持包支持从 MATLAB 和 Simulink 模型到高通 Hexagon 神经处理单元架构的自动化代码生成

MathWorks 今天宣布&#xff0c;推出针对 Qualcomm Hexagon™ 神经处理单元&#xff08;NPU&#xff09;的硬件支持包。该处理单元嵌入在 Snapdragon 系列处理器中。MathWorks 硬件支持包&#xff0c;则专门针对 Qualcomm Technologies 的 Hexagon NPU 架构进行优化&#xff0c…

NISP 一级 | 7.2 信息安全风险管理

关注这个证书的其他相关笔记&#xff1a;NISP 一级 —— 考证笔记合集-CSDN博客 0x01&#xff1a;信息安全风险 信息系统不可能达到绝对安全&#xff0c;但可以通过安全风险&#xff08;以下简称“风险”&#xff09;控制来实现符合个人或单位目标的一定程度的安全。信息安全管…

Axure PR 9 标签 设计交互

大家好&#xff0c;我是大明同学。 这期内容&#xff0c;我们将深入探讨Axure中可编辑标签元件设计与交互技巧。 可移除标签元件 创建可移除标签所需的元件 1.打开一个新的 RP 文件并在画布上打开 Page 1。 2.在元件库中拖出一个文本框元件。 3.选中文本框元件&#xff0c…

Java安全(加密+HTTPS+WEB安全)

Java加密 单向加密 接收一段明文&#xff0c;然后以一种不可逆的方式将它转换成一段密文 ①、MD5&#xff0c;将无论多长的数据最后编码128位数据&#xff0c;常用文件校验、密码加密、散列数据 byte[] data ...;//明文数据 MessageDigest md5 MessageDigest.getInstance…

构建未来教育:智慧校园的功能与特色

智慧校园是一种利用先进科技手段将传统学校建设与管理进行智能化升级的教育发展模式。其主要功能和特点旨在提供更高效、便捷、安全的教育环境&#xff0c;致力于推动教育教学质量和校园管理水平向更智能化方向发展。 主要功能 1.智能安防系统: 监控摄像头、门禁系统等安防设…

【技术文章】ArcGIS Pro如何批量导出符号和工程样式?

目录 1.确定Pro软件版本 2.共享工程样式 3.管理和调用项目样式 制作好的地图&#xff0c;如何快速分享地图中的符号样式用于其它地图的制作&#xff1f; 在ArcMap软件中&#xff0c;可以通过命令一键批量导出所有符号。ArcGIS Pro软件是否也可以批量导出符号用于其它地图…

OceanBase 中 schema 的定义与应用

背景 经常在OceanBase 的问答社区 里看到一些关于 “schema 是什么” 的提问。 先纠正一些同学的误解&#xff0c; OceanBase 中的 Schema 并不简单的等同于 Database&#xff0c;本次分享将探讨 OceanBase 中的Schema是什么&#xff0c;及一些大家经常遇到的问题。 具体而…

从Profinet到Ethernet IP网关技术重塑工业网络,数据传输更流畅

Profinet转Ethernet IP网关在未来工业领域可能产生以下重要影响并发挥关键作用&#xff1a;促进工业设备集成与互操作性&#xff1a;打破协议壁垒&#xff1a;在工业场景中&#xff0c;存在多种不同的工业以太网协议&#xff0c;设备往往因协议差异而难以直接通信。 Profinet转…

C语言实现汉诺塔

这是一个古典的数学问题&#xff0c;是一个只有用递归方法解决的问题。问题是这样的&#xff1a;古代有一个梵塔&#xff0c;塔内有3个座A&#xff0c;B&#xff0c;C&#xff0c;开始时A座上有64个盘子&#xff0c;盘子大小不等&#xff0c;大的在下&#xff0c;小的在上。有一…

MybatisPlus:多条件 or()的使用

default List<ErpProductDO> selectByOE(String oe1, String oe2){return selectList(new LambdaUpdateWrapper<ErpProductDO>().eq(ErpProductDO::getOe,oe1).or().eq(ErpProductDO::getOe,oe2)); } 对应SQL为&#xff1a;

SpringBoot 整合docker,执行容器服务

我使用以下文章的镜像作为演示镜像,读者有自己的镜像可以使用自己的 TencentARC/GFPGAN人脸恢复Ubuntu-22.04搭建(附带Docker镜像)_tencentarc gfpgan-CSDN博客 1. 封装springboot 启动docker容器的方法 public String runDockerCommand(String[] command) {StringBuilder res…

如何使用ssm实现基于WEB的文学网的设计与实现+vue

TOC ssm626基于WEB的文学网的设计与实现vue 第一章 绪论 1.1研究背景与意义 在科学技术水平还比较低下的时期&#xff0c;相关行业通常采用人工登记的方式对相关的文学信息进行记录&#xff0c;而后对这些信息记录进行管理和控制。这种采用纸质存储信息的管理模式&#xff…

rocky9.2的lvs的NAT模式下的基本使用的详细示例

文章目录 前言什么是LVS?&#xff08;Linux Virtual Server&#xff09;LVS的组成1. 负载均衡器&#xff08;Load Balancer&#xff09;2. 后端服务器池&#xff08;Real Servers&#xff09;3. IPVS&#xff08;IP Virtual Server&#xff09;4. 调度算法&#xff08;Schedul…