1.RDD(Resilient Distributed Dataset)基本概念
RDD是 Spark core的底层核心,表示一个不可变的、可分区、可并行计算的弹性分布式数据集
2.RDD 的特点:
不可变性
- 一旦创建,RDD 的数据不可改变。所有对 RDD 的操作(如
map
,filter
,reduce
等)都会生成一个新的 RDD,而不会修改原始 RDD。- 这种不可变性使得 RDD 在分布式计算环境下非常稳定,避免了并发冲突。
分区
- RDD可以多个分区。分区是 Spark 作业并行计算的基本单位,每个分区可以独立处理。
- 这种分区结构使得 Spark 可以有效地进行数据并行处理,提高计算效率。
分布式存储
- RDD 可以存储在分布式文件系统中(如 HDFS、S3、HBase 等),并可以在多个节点上进行操作。数据的分布和存储位置是由 Spark 集群管理的。
弹性
- 容错的弹性:数据丢失可以自动恢复
- 存储的弹性:内存与磁盘的自动切换
- 计算的弹性:计算出错重试机制
- 分片的弹性:可根据需要重新分片
懒加载
- RDD 的转换操作是惰性执行的。即当用户对 RDD 执行操作时,这些操作并不会立刻执行,Spark 会首先构建一个 DAG(有向无环图),描述所有的转换步骤。实际的计算只有在执行 Action 操作时才会启动。
3.RDD 的属性:
【Partitions】分区列表:
- RDD 是由多个分区组成的,每个分区都在一个计算节点上执行。分区数可以通过创建 RDD 时指定,或者根据数据源的大小和集群的资源来自动确定。
【Partitioner】RDD分片函数:
- 基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值
- 基于范围的RangePartitioner。
- 只有对于key-value的RDD,并且产生shuffle,才会有Partitioner;
- 非key-value的RDD的Parititioner的值是None。
【compute】每个分区都有一个计算函数:
- RDD 包含血统信息,记录了该 RDD 如何通过一系列转换操作从原始数据中构建出来。血统信息用于恢复丢失的数据(例如,某个节点失败后,Spark
可以利用血统信息重新计算该节点的数据)。- 血统信息是 RDD 的关键属性之一,它帮助 Spark 实现容错性。
【Dependencies】依赖关系:
- 每个 RDD 都有一个或多个依赖关系,表示它是如何由其它 RDD 转换得到的。RDD 之间的依赖关系有两种:
- 窄依赖(Narrow Dependency):每个父 RDD 的分区对应一个子 RDD 的分区。例如,
map
、filter
、union
等操作。- 宽依赖(Wide Dependency):父 RDD 的一个分区可能会被多个子 RDD 的分区使用。例如,
groupBy()
、reduceByKey()
等操作。【优先位置】:
- 移动数据不如移动计算,Spark在任务调度时,会尽可能将计算任务分配到所要处理数据块的存储位置
4.RDD 的常见操作:
1. 转换操作(Transformation)
- RDD 的转换操作是惰性执行的,返回一个新的 RDD,原始 RDD 不会改变。
- 常见的转换操作包括:
map()
: 对 RDD 中的每个元素应用一个函数,生成一个新的 RDD。filter()
: 筛选符合条件的元素,生成一个新的 RDD。flatMap()
: 类似于map()
,但每个元素可以映射为多个输出元素。union()
: 合并两个 RDD。groupBy()
: 按照给定的条件将 RDD 中的数据分组。2. 行动操作(Action)
- 行动操作触发实际的计算并返回结果。行动操作会执行并返回计算结果。
- 常见的行动操作包括:
collect()
: 将 RDD 中的所有元素收集到本地。count()
: 计算 RDD 中元素的数量。reduce()
: 对 RDD 中的元素进行聚合操作。saveAsTextFile()
: 将 RDD 中的内容保存到外部文件系统(如 HDFS)。
5.RDD 的缓存级别:
MEMORY_ONLY
:
- 说明:将RDD数据仅缓存到内存中,如果内存不足,Spark会丢弃那些无法缓存的数据。
- 特点:速度最快,但会受到内存限制。如果内存不够,计算会重新执行。
- 适用场景:内存充足,且不需要持久化数据。
MEMORY_AND_DISK
:
- 说明:如果内存不足,RDD数据会溢写到磁盘上。
- 特点:比
MEMORY_ONLY
更可靠,因为当内存不够时,数据会自动保存在磁盘上,但磁盘的读取速度会较慢。- 适用场景:内存不够用时需要保证数据可用,磁盘读取速度不会成为瓶颈。
MEMORY_ONLY_SER
(序列化存储):
- 说明:将RDD数据以序列化格式缓存到内存中,序列化后通常可以节省内存空间。
- 特点:相比
MEMORY_ONLY
,节省内存,但存储和读取速度较慢。- 适用场景:内存不足且需要节省内存空间时,适用于存储简单的数据类型。
MEMORY_AND_DISK_SER
(序列化存储):
- 说明:类似于
MEMORY_AND_DISK
,但数据会以序列化的形式缓存到内存和磁盘。- 特点:节省内存,但会增加序列化和反序列化的开销。
- 适用场景:适用于内存有限且需要减少内存使用的情况。
DISK_ONLY
:
- 说明:将RDD数据仅存储到磁盘上,不使用内存。
- 特点:性能较慢,最好在内存不足时可以使用。
- 适用场景:数据非常大,无法完全存储在内存中时,可以使用磁盘缓存。
OFF_HEAP
:
- 说明:将RDD数据存储到外部内存中。
- 特点:支持堆外内存缓存,避免了JVM堆内内存的管理开销。
- 适用场景:在需要高效管理大规模数据集时,且不希望受到JVM堆内存限制的情况。
例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
rdd.cache() // 默认使用 MEMORY_ONLY 缓存级别,等价于persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK) // 指定缓存级别