spark 大表与大表join时的Shuffle机制和过程

        在 Spark 中,当处理大表与大表的 JOIN 操作时,通常会涉及到 Shuffle 机制,这是分布式计算中用于重新分布数据的关键步骤。Shuffle 的本质是将数据按照某种方式重新分组,使得相同 key 的数据能够被发送到同一个计算节点进行后续的操作。以下是详细的 Shuffle 机制在大表与大表 JOIN 操作中的工作过程,涵盖底层原理和源代码相关内容。

一、Shuffle 基本原理

    Shuffle 是 Spark 中用于处理需要跨多个分区(节点)计算的任务的关键机制。大体分为以下几个阶段:

  1. Map 阶段:将数据进行分区,并根据 key(用于 JOIN 的 key)进行 hash 分布。
  2. Shuffle 阶段:将 Map 阶段输出的数据发送到不同的 Reduce 任务中。每个 Reduce 任务负责处理特定的 key。
  3. Reduce 阶段:对相同 key 的数据进行操作,完成 JOINGROUP BY 等计算。

        在大表与大表 JOIN 时,数据量大且分布不均的 key 会导致 Shuffle 中的网络IO传输数据量巨大,因此这部分成为性能瓶颈的关键。

二、Shuffle 在 Join 中的工作流程

        对于大表与大表 JOIN 的情况,常见的操作类型是基于 key 的 equi-join(等值连接)。具体的执行过程如下:

  1. 第一步:读入数据
            Spark 会从数据源(如 HDFS、Hive 表等)中读取两个大表的数据,分别分布在不同的分区上。每个分区的数据是局部的,不包含全局的信息。

  2. 第二步:Map 阶段进行数据分区
            在 JOIN 操作中,Spark 会根据 key 值进行数据的哈希分区。每个分区根据 key 进行 hash,然后将相同 hash 值的 key 数据分发到相同的 Reduce 节点。例如,如果两个表都要根据 user_id 进行连接,Spark 会对 user_id 进行 hash 计算。

           在代码中,这一部分对应 RDD 的 partitionBy 操作(对于 DataFrame/Dataset 则是底层物理计划的分区操作)。ShuffledRDD 负责这一逻辑的实现。

    伪代码展示:

    // 对表A和表B的key进行分区
    val partitionedTableA = tableA.partitionBy(new HashPartitioner(numPartitions))
    val partitionedTableB = tableB.partitionBy(new HashPartitioner(numPartitions))
    

  3. 第三步:Shuffle 过程
        Shuffle 是一个将 Map 阶段计算的结果数据从一个计算节点发送到另一个计算节点的过程。对于 JOIN 操作,Shuffle 的目的是确保相同 key 的数据被分发到相同的节点上。

           在 Shuffle 过程中,Spark 会使用 shuffle write 将本地数据写到磁盘或网络中,然后通过网络将这些分区数据发送到目标节点。接着,shuffle read 负责从其他节点上读取相应分区的数据。

       ​​​​​​​ ShuffleMapTask 是负责执行 Shuffle 写阶段的任务类型, ShuffleManager 管理整个 Shuffle 的过程,默认实现为 SortShuffleManager

    伪代码展示:

    // 执行 shuffle,将 A 和 B 按照 key hash 之后分布到不同节点
    partitionedTableA.join(partitionedTableB)
    

    Shuffle 的详细步骤:

    • Shuffle Write: 每个 map 任务计算完局部数据后,会将数据写入本地磁盘的文件系统或存储在内存中。数据以 partition 为单位写出,针对每个分区分别存储。
    • Shuffle Read: Reduce 任务会根据分区信息从其他节点拉取数据,读取与自己分区匹配的数据块进行处理。
  4. 第四步:Reduce 阶段进行 JOIN 计算
            在 Shuffle 结束后,每个节点已经得到了自己负责的分区数据。接下来,Spark 会执行 JOIN 操作。对于 equi-join,Spark 会对每个分区中的数据进行匹配(类似于 merge join 或者 hash join)。因为相同 key 的数据已经被分布到同一个分区,所以可以直接进行连接操作。

            在源码层面,ShuffledRowRDD 是 Shuffle Read 后构造的 RDD,ShuffleRowJoinExec 是执行实际 JOIN 操作的物理计划节点。

  5. 第五步:输出结果
            Reduce 阶段完成 JOIN 操作后,结果会写入到相应的输出位置(如内存、磁盘、或是其他表中)。

三、代码层面关键类和函数

  1. Shuffle 相关类和接口

    • ShuffleManager: 管理 Shuffle 过程的接口,决定如何进行数据的 Shuffle。默认实现为 SortShuffleManager,其主要负责将数据按 key 排序后写入并读取。
    • ShuffleDependency: 定义了数据 Shuffle 的依赖关系,描述了需要 Shuffle 的 RDD 和其 Partitioner。
    • ShuffleMapTask: 执行 Shuffle 写操作的 Task。
    • ShuffledRowRDD: 负责处理 Shuffle 读取后的数据。
  2. Join 相关类

    • ShuffleExchangeExec: 执行 Shuffle 数据的交换操作,用于分区。
    • BroadcastHashJoinExec: 当 JOIN 其中一张表较小时,可以采用广播机制避免 Shuffle。
    • SortMergeJoinExec: Spark 默认的大表与大表 JOIN 算法,适合排序后的数据。
    • ShuffledHashJoinExec: 基于 Shuffle 后的哈希 Join,适合大数据量。
  3. 关键函数

    • partitionBy: 根据给定的 Partitioning 函数对 RDD 进行重新分区。
    • shuffle: 将 RDD 按 key 进行 shuffle,涉及到数据的写入和读取。
    • join: DataFrame API 中的 join 函数封装了不同的 JOIN 算法,包括 Sort-Merge Join 和 Broadcast Join。

四、优化 Shuffle 的策略

由于大表 JOIN 时的 Shuffle 会产生大量的磁盘 I/O 和网络传输,以下是一些常见的优化策略:

  1. Broadcast Join(广播连接):当一张表很小而另一张表很大时,可以使用广播机制避免 Shuffle,即将小表广播到每个节点。这避免了大表的 Shuffle 操作,极大提高性能。

    通过设置:

    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) // 10MB
    
  2. Partition 数量的调优:合理设置分区数量(spark.sql.shuffle.partitions)可以减少单个分区的数据量过大或过小的问题,进而减小 Shuffle 阶段的网络开销。

  3. 合并小文件:启用 spark.shuffle.file.buffer 和 spark.reducer.maxSizeInFlight 来优化 Shuffle 文件的缓冲区和网络传输时的最大文件大小,以减少磁盘 I/O 的次数。

  4. Skew Join 处理:对于数据倾斜的场景,可以采用 Skew Join(倾斜 Join)的方式,将倾斜的 key 拆分到多个分区进行处理,减小单个 Reduce 任务的压力。

五、总结

        在 Spark 的大表 JOIN 过程中,Shuffle 机制是核心的步骤,其主要职责是重新分发数据使得相同 key 的记录能够分布到同一个节点。Shuffle 的开销主要在于数据的网络传输和磁盘 I/O,因此有效的分区策略、数据倾斜处理以及 JOIN 算法选择都是优化此过程的关键。通过对 Shuffle 源码和物理执行计划的理解,可以帮助开发者更好地调优 Spark 应用的性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1545051.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

HBase DDL操作代码汇总(namespace+table CRUD操作)

HBase DDL操作 DDL操作主要是关于命名空间和表格的内容增删改查。 注:如果出现无法连接到zookeeper等的相关错误,可以将以下代码打jar包,在HMaster节点上执行 错误提示: Exception in thread “main” java.net.SocketTimeoutExc…

DVWA 靶场环境搭建

作者:程序那点事儿 日期:2024/09/15 09:30 什么是DVWA: 是OWSASP官方编写的PHP网站,包含了各种网站常见漏洞(漏洞靶场),可以学习攻击及修复方式。 PHP环境包含了,Windows/Apache/Mysql/Php g…

公安局软件管理平台建设方案和必要性,论文-2-———未来之窗行业应用跨平台架构

一、平台方略 随着gov信息化建设的不断推进,各类ZW软件的应用需求日益增加。为了提高ZW软件的获取便利性、AQ性和规范性,建设一个专门的GOV软件管理平台具有重要意义。 集中提供各类ZW软件,方便工作人员快速获取和安装,减少因软…

开放原子开源基金会OPENATOM

AtomGit_开放原子开源基金会代码托管平台-AtomGit 开放原子开源基金会是致力于推动全球开源事业发展的非营利机构,于 2020 年 6 月在北京成立,由阿里巴巴、百度、华为、浪潮、360、腾讯、招商银行等多家龙头科技企业联合发起。 精选项目: 比…

IDEA:如何设置项目启动的JVM运行内存大小

IDEA版本不一样页面也不一样 -Xms20m -Xmx200m 其实在本地开发调试的时候不需要太大内存,如果测试性能建议放到运算服务器上面去跑~~~

Python 递归函数如何工作?如何防止递归调用过深导致栈溢出

递归是编程中的一个重要概念,尤其在 Python 中,递归函数可以使某些问题的解决变得更加简洁和优雅。尽管递归具有强大的表达能力,但如果不加以控制,递归调用过深可能会导致栈溢出。本文将深入探讨递归函数的工作原理,如…

android和ios双端应用性能的测试工具

1.工具介绍 基于日常工作的需要,开发了一款新的android和ios端应用性能测试工具,本工具在数据测试方面与所流行的工具没有区别。欢迎下载使用体验。 本工具为筋斗云,工具说明 本工具无侵入,不需要root,低延迟…

二叉树的基本概念(上)

文章目录 🍊自我介绍🍊简介🍊树的定义树中的专业术语树的分类 🍊二叉树的特性讲解 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞关注评论收藏(一键四连)哦~ 🍊自我介…

VisualStudio如何卸载Resharper插件?

本来按理说,卸载插件应该就是在扩展下的已安装插件中,找到该插件,点一下就会出现卸载的按钮的。 没想到这个Resharper这么吊,卸载按钮居然是个灰色的,意思就是此路不通,有特权的。 那么这种情况下&#x…

第68期 | GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区,集成了生成预训练Transformer(GPT)、人工智能生成内容(AIGC)以及大语言模型(LLM)等安全领域应用的知识。在这里,您可以找…

Android studio安装问题及解决方案

Android studio安装问题及解决方案 gradle已经安装好了,但是每次就是找不到gradle的位置,每次要重新下载,很慢,每次都不成功 我尝试用安装android studio时自带的卸载程序,卸载android studio,然后重新下…

php发送邮箱教程:如何实现邮件发送功能?

php发送邮箱性能优化策略?怎么使用PHPMail发送邮箱? 无论是用户注册验证、密码重置,还是系统通知,邮件发送都是不可或缺的一部分。AokSend将详细介绍如何使用PHP实现邮件发送功能,帮助开发者快速掌握这一技能。 php发…

LeetCode从入门到超凡(三)回溯算法

引言 大家好,我是GISer Liu😁,一名热爱AI技术的GIS开发者。本系列文章是我跟随DataWhale 2024年9月学习赛的LeetCode学习总结文档;本文主要讲解回溯算法。💕💕😊 介绍 回溯算法(Back…

使用 Nuxt Kit 的构建器 API 来扩展配置

title: 使用 Nuxt Kit 的构建器 API 来扩展配置 date: 2024/9/24 updated: 2024/9/24 author: cmdragon excerpt: 摘要:本文详细介绍了如何使用 Nuxt Kit 的构建器 API 来扩展和定制 Nuxt 3 项目的 webpack 和 Vite 构建配置,包括扩展Webpack和Vite配置、添加自定义插件、…

MySQL Performance Schema 详解及运行时配置优化

引言 MySQL 的 Performance Schema 是一套性能监控与诊断工具,帮助开发者和数据库管理员收集、分析 MySQL 实例的运行状态,找出性能瓶颈并进行优化。通过 Performance Schema,我们能够监控不同的内部事件、线程、会话、语句执行等关键性能指…

[单master节点k8s部署]24.构建EFK日志收集平台(三)

Kibana Kibana是elasticsearch的可视化界面。 首先创建kibana的服务,yaml文件如下。k8s里的服务分为四种,clusterIP为仅仅为pod分配k8s集群内部的一个虚拟ip,用于集群内的pod通信,而不对外暴露。elasticsearch的服务就是cluster…

编译原理3——词法分析

3.1词法分析器的作用 词法分析是编译的第一阶段。词法分析器的主要任务是读入源程序的输入字符、将它们组成词素,生成并输出一个词法单元序列,每个词法单元对应于一个词素。 但在这个过程中,词法分析器还要和语法分析器进行交互。交互&…

jupyter安装与使用——Ubuntu服务器

jupyter安装与使用——Ubuntu服务器 一、安装miniconda3/anaconda31. 下载miniconda32. 安装miniconda33. 切换到bin文件夹4. 输入pwd获取路径5. 打开用户环境编辑页面6. 重新加载用户环境变量7. 初始化conda8.验证是否安装成功9.conda配置 二、安装jupyter2.1 conda安装2.2 配…

kali-linux-2023.4 安装与配置

kali官网 作者:程序那点事儿 日期:2024/01/15 21:34 进入kali官网,点到下载页面 选择安装方式(本次私用虚拟机安装)。裸机安装是指,先要安装虚拟机(例如:CentOS7&#xff09…

html TAB切换按钮变色、自动生成table

<!DOCTYPE html> <head> <meta charset"UTF-8"> <title>Dynamic Tabs with Table Data</title> <style> /* 简单的样式 */ .tab-content { display: none; border: 1px solid #ccc; padding: 1px; marg…