在 Spark 中,RDD 的分区数对于并行计算的效率非常重要,SparkCore 读取 HDFS 文件时 RDD 分区数的确定受多方面因素的影响。本文将从源码的角度分析 Spark 如何确定 RDD 分区数,并通过代码示例和案例帮助理解分区策略。
Spark RDD 分区数确定的源码解析
Spark 读取 HDFS 文件时,分区数主要由 文件块大小(block size)、分片大小(split size)、期望分区数(spark.default.parallelism)等参数共同决定。
1. splitSize
的确定
Spark 读取 HDFS 文件时,会根据文件的总大小和分区期望数来计算每个分区的大小(splitSize
)。源码如下:
val goalSize = totalSize / math.max(minPartitions, 1)
val splitSize = Math.max(minSize, Math.min(goalSize, blockSize))
goalSize
:每个分区的目标大小,由总文件大小除以分区数(minPartitions
)计算得出。splitSize
:最终的分区大小,取goalSize
与 HDFSblockSize
之间的较小值,确保每个分区数据量不会超过一个 HDFS 块的大小。
2. 代码示例:分区数计算
假设一个文件的大小为 1 GB,块大小为 128 MB,期望分区数(spark.default.parallelism
)为 8。则每个分区的目标大小 goalSize
为 128 MB(1 GB / 8),最终的 splitSize
为 128 MB(和块大小相同)。这时文件会被分为 8 个分区。
3. 示例代码:RDD 分区数确定
import org.apache.spark.{SparkConf, SparkContext}object HDFSPartitionExample {def main(args: Array[String]): Unit = {// 创建 SparkContextval conf = new SparkConf().setAppName("HDFS Partition Example").setMaster("local")val sc = new SparkContext(conf)// 读取 HDFS 文件val filePath = "hdfs://path/to/file"val rdd = sc.textFile(filePath, minPartitions = 8) // 设置最小分区数为 8println(s"分区数: ${rdd.getNumPartitions}")// 查看每个分区的数据量val partitionSizes = rdd.mapPartitionsWithIndex { (idx, iter) =>Iterator((idx, iter.size))}.collect()partitionSizes.foreach { case (index, size) =>println(s"分区 $index: 数据量 $size 条记录")}sc.stop()}
}
4. 实验结果分析
- 1 GB 文件,128 MB 块大小,8 个期望分区:生成 8 个分区,每个分区 128 MB。
- 1 GB 文件,64 MB 块大小,10 个期望分区:由于
goalSize
为 100 MB,实际每个分区大小取 64 MB(块大小)。生成 16 个分区,每个分区 64 MB。 - 1 GB 文件,256 MB 块大小,4 个期望分区:
goalSize
为 250 MB,splitSize
为 250 MB,生成 4 个分区,每个分区 250 MB。
总结
- Spark 通过
goalSize
和blockSize
来平衡分区数量与块大小。 - 分区数会随着文件大小、块大小、期望分区数等参数变化。
- 分区数设定不合理会影响性能,例如分区数过多会导致任务调度开销增加,分区数过少则可能导致计算资源未充分利用。