RDD 的数据是过程数据
- RDD之间进行的是函数式计算(Transformation的转换),当开始执行后,新RDD生成,代表旧RDD消失。
- RDD的数据是过程数据,只在处理过程中存在,一旦形成完成,就不见了。
这个特性是RDD的内存最大资源利用,老的RDD没用了就从内存清除,给后续计算腾出内存空间

RDD 的数据是过程数据
这种特性也是必须基于血缘关系的,程序才能根据前趋关系再次计算出RDD3
RDD缓存
RDD 缓存概念
RDD 缓存(或持久化)允许用户手动保存一个 RDD 在内存中(或在内存和磁盘上),以便能够有效地重用它在后续的操作中。这特别有用于那些需要被多次读取的数据集,例如在迭代算法或多个不同的操作中。
为什么使用 RDD 缓存?
在没有缓存的情况下,每次触发一个 action 操作(例如 count()
或 collect()
)时,Spark 会从头开始重新计算所有从数据源开始到该 action 操作的所有 transformation。这种计算可以非常耗时,尤其是当 transformations 链很长或者数据集很大时。通过使用缓存,Spark 可以在第一次计算后保存中间结果,从而避免后续重复计算这些结果。
缓存级别
Spark 提供了多种缓存级别来存储 RDD,以适应不同的使用场景:
- MEMORY_ONLY: 默认的存储级别。RDD 存储在 JVM 的堆内存中。如果内存不足以存储所有数据,那么不会缓存部分分区,每次需要这些分区时都会重新计算。
- MEMORY_AND_DISK: RDD 存储在内存中,但如果 RDD 太大不能完全放入内存,则将不适合内存的部分存储到磁盘上,并在需要时读取。
- MEMORY_ONLY_SER: 类似于 MEMORY_ONLY,但是数据会被序列化存储在内存中,占用的空间更少,但是读取速度可能会慢一些。
- MEMORY_AND_DISK_SER: 类似于 MEMORY_AND_DISK,但是数据在内存中是序列化存储的。
- DISK_ONLY: 全部数据只存储在磁盘上。
使用 RDD 缓存的步骤
- 选择级别:根据你的需要选择一个合适的存储级别。
- 调用 persist 或 cache 方法:
val rdd = sc.parallelize(1 to 10000) rdd.persist(StorageLevel.MEMORY_ONLY) // 使用 persist 并选择一个存储级别 rdd.cache() // 等同于 rdd.persist(StorageLevel.MEMORY_ONLY)
- 执行 Action 操作:在执行 Action 操作后,RDD 将会根据指定的存储级别被缓存。
- 释放缓存:如果你的 RDD 占用了大量的资源,你可能需要在不需要它之后手动释放缓存:
rdd.unpersist()
缓存和持久化 API
-
cache()
- 用途: 将 RDD 缓存到默认的存储级别(MEMORY_ONLY)。
- 示例:
val rdd = sc.parallelize(data) rdd.cache()
-
persist()
- 用途: 允许用户为 RDD 指定一个自定义的存储级别。
- 参数:
StorageLevel
—— 指定存储级别。 - 示例:
val rdd = sc.parallelize(data) rdd.persist(StorageLevel.MEMORY_AND_DISK)
StorageLevel
StorageLevel
类定义了不同的存储级别选项,以下是可用的存储级别及其对应的 API:
-
MEMORY_ONLY
- 存储 RDD 的所有分区在内存中,如果内存不足,未被存储的分区将在需要时重新计算。
-
MEMORY_AND_DISK
- RDD 的分区首先尝试存储在内存中。如果内存不足以存储全部数据,将会将数据写到磁盘。
-
MEMORY_ONLY_SER
- 类似于 MEMORY_ONLY,但是数据将被序列化存储在内存中,减少存储空间但增加了反序列化的开销。
-
MEMORY_AND_DISK_SER
- 类似于 MEMORY_AND_DISK,但数据在内存中是序列化的。如果内存不足,序列化的数据将被写到磁盘。
-
DISK_ONLY
- RDD 的所有数据只存储在磁盘上。
-
MEMORY_ONLY_2, MEMORY_AND_DISK_2
- 这些级别与 MEMORY_ONLY 和 MEMORY_AND_DISK 类似,但每个分区有两个副本,提供更高的容错性。
-
OFF_HEAP
- (如果配置了)数据将被存储在非 JVM 堆内存中,这依赖于底层存储的支持。
释放缓存
- unpersist()
- 用途: 用于从内存(和/或磁盘)中移除并释放一个 RDD 的缓存。
- 示例:
rdd.unpersist()
RDD CheckPoint
- CheckPoint技术,也是将RDD的数据,保存起来。
- 仅支持硬盘存储
- 逻辑设计上是安全的,物理存储上不完全是
- 不保留血缘关系

RDD CheckPoint过程
CheckPoint存储RDD数据,是集中收集各个分区数据进行存储的,二缓存是分散存储。
- 代码:
# 设置CheckPoint 第一件事是给予spark快照目录 # 本地模式支持,本地文件系统,集群模式请选择分布式文件系统 sc.setCheckpointDir(hdfs://mycluster/spark/checkpoint) # ...... rdd.checkpoint() # 直接调用算子将RDD快照
CheckPoint 与 缓存对比
- CheckPoint不营分区数量多少,风险是一样的,缓存分区越多,风险越高
- CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,Cheint被认为是安全的
- CheckPoint不支持内存,缓存可以,缓存如果写内存性能比checkPoint要好一些
- CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留