菜单
本页目录

RDD 的数据是过程数据

  • RDD之间进行的是函数式计算(Transformation的转换),当开始执行后,新RDD生成,代表旧RDD消失。
  • RDD的数据是过程数据,只在处理过程中存在,一旦形成完成,就不见了。

这个特性是RDD的内存最大资源利用,老的RDD没用了就从内存清除,给后续计算腾出内存空间

RDD 的数据是过程数据

这种特性也是必须基于血缘关系的,程序才能根据前趋关系再次计算出RDD3

RDD缓存

RDD 缓存概念

RDD 缓存(或持久化)允许用户手动保存一个 RDD 在内存中(或在内存和磁盘上),以便能够有效地重用它在后续的操作中。这特别有用于那些需要被多次读取的数据集,例如在迭代算法或多个不同的操作中。

为什么使用 RDD 缓存?

在没有缓存的情况下,每次触发一个 action 操作(例如 count()collect())时,Spark 会从头开始重新计算所有从数据源开始到该 action 操作的所有 transformation。这种计算可以非常耗时,尤其是当 transformations 链很长或者数据集很大时。通过使用缓存,Spark 可以在第一次计算后保存中间结果,从而避免后续重复计算这些结果。

缓存级别

Spark 提供了多种缓存级别来存储 RDD,以适应不同的使用场景:

  • MEMORY_ONLY: 默认的存储级别。RDD 存储在 JVM 的堆内存中。如果内存不足以存储所有数据,那么不会缓存部分分区,每次需要这些分区时都会重新计算。
  • MEMORY_AND_DISK: RDD 存储在内存中,但如果 RDD 太大不能完全放入内存,则将不适合内存的部分存储到磁盘上,并在需要时读取。
  • MEMORY_ONLY_SER: 类似于 MEMORY_ONLY,但是数据会被序列化存储在内存中,占用的空间更少,但是读取速度可能会慢一些。
  • MEMORY_AND_DISK_SER: 类似于 MEMORY_AND_DISK,但是数据在内存中是序列化存储的。
  • DISK_ONLY: 全部数据只存储在磁盘上。

使用 RDD 缓存的步骤

  1. 选择级别:根据你的需要选择一个合适的存储级别。
  2. 调用 persist 或 cache 方法
    val rdd = sc.parallelize(1 to 10000)
    rdd.persist(StorageLevel.MEMORY_ONLY)  // 使用 persist 并选择一个存储级别
    rdd.cache()  // 等同于 rdd.persist(StorageLevel.MEMORY_ONLY)
    
  3. 执行 Action 操作:在执行 Action 操作后,RDD 将会根据指定的存储级别被缓存。
  4. 释放缓存:如果你的 RDD 占用了大量的资源,你可能需要在不需要它之后手动释放缓存:
    rdd.unpersist()
    

缓存和持久化 API

  1. cache()

    • 用途: 将 RDD 缓存到默认的存储级别(MEMORY_ONLY)。
    • 示例:
      val rdd = sc.parallelize(data)
      rdd.cache()
      
  2. persist()

    • 用途: 允许用户为 RDD 指定一个自定义的存储级别。
    • 参数: StorageLevel —— 指定存储级别。
    • 示例:
      val rdd = sc.parallelize(data)
      rdd.persist(StorageLevel.MEMORY_AND_DISK)
      

StorageLevel

StorageLevel 类定义了不同的存储级别选项,以下是可用的存储级别及其对应的 API:

  1. MEMORY_ONLY

    • 存储 RDD 的所有分区在内存中,如果内存不足,未被存储的分区将在需要时重新计算。
  2. MEMORY_AND_DISK

    • RDD 的分区首先尝试存储在内存中。如果内存不足以存储全部数据,将会将数据写到磁盘。
  3. MEMORY_ONLY_SER

    • 类似于 MEMORY_ONLY,但是数据将被序列化存储在内存中,减少存储空间但增加了反序列化的开销。
  4. MEMORY_AND_DISK_SER

    • 类似于 MEMORY_AND_DISK,但数据在内存中是序列化的。如果内存不足,序列化的数据将被写到磁盘。
  5. DISK_ONLY

    • RDD 的所有数据只存储在磁盘上。
  6. MEMORY_ONLY_2, MEMORY_AND_DISK_2

    • 这些级别与 MEMORY_ONLY 和 MEMORY_AND_DISK 类似,但每个分区有两个副本,提供更高的容错性。
  7. OFF_HEAP

    • (如果配置了)数据将被存储在非 JVM 堆内存中,这依赖于底层存储的支持。

释放缓存

  • unpersist()
    • 用途: 用于从内存(和/或磁盘)中移除并释放一个 RDD 的缓存。
    • 示例:
      rdd.unpersist()
      

RDD CheckPoint

  • CheckPoint技术,也是将RDD的数据,保存起来。
  • 仅支持硬盘存储
  • 逻辑设计上是安全的,物理存储上不完全是
  • 不保留血缘关系

RDD CheckPoint过程

CheckPoint存储RDD数据,是集中收集各个分区数据进行存储的,二缓存是分散存储。

  • 代码:
    # 设置CheckPoint 第一件事是给予spark快照目录
    # 本地模式支持,本地文件系统,集群模式请选择分布式文件系统
    sc.setCheckpointDir(hdfs://mycluster/spark/checkpoint)
    # ......
    rdd.checkpoint() # 直接调用算子将RDD快照
    

CheckPoint 与 缓存对比

  • CheckPoint不营分区数量多少,风险是一样的,缓存分区越多,风险越高
  • CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠存储,Cheint被认为是安全的
  • CheckPoint不支持内存,缓存可以,缓存如果写内存性能比checkPoint要好一些
  • CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留