内存迭代计算
Spark 内存迭代计算的核心特性
-
RDD持久化(Persistence/Caching): Spark通过将中间计算结果持久化到内存(或磁盘)中,避免了每次迭代都重新从头计算整个数据集。这对机器学习算法、图计算(如PageRank)等需要反复操作同一数据集的应用尤其重要。
- 持久化操作:
RDD.persist()
或RDD.cache()
用于将RDD缓存到内存中。 - persist():持久化可以选择多种存储级别,比如:
MEMORY_ONLY
:将数据缓存到内存中。MEMORY_AND_DISK
:如果内存不足,数据会被存储到磁盘。MEMORY_ONLY_SER
:将数据序列化后再缓存到内存中,以节省内存空间。DISK_ONLY
:直接将数据缓存到磁盘中,不存储到内存。
示例:
val rdd = sc.textFile("hdfs://path/to/file") val cachedRDD = rdd.map(line => line.split(" ")).cache() // 进行多次迭代计算 val count1 = cachedRDD.filter(words => words.length > 3).count() val count2 = cachedRDD.filter(words => words.contains("Spark")).count()
在这个例子中,
cachedRDD
被缓存到内存中,因此后续对cachedRDD
的计算将会非常快速,不需要重复读取文件或重新计算中间结果。 - 持久化操作:
-
DAG和懒惰计算: Spark的DAG(有向无环图)调度机制意味着Transformation操作是懒惰执行的,只有在遇到Action操作(如
count()
或collect()
)时,Spark才会真正开始执行DAG。这种设计允许Spark在迭代计算时灵活构建DAG并避免不必要的中间结果计算。持久化会将数据的中间状态存储到内存中,因此下一次迭代时无需从最初的原始数据重新构建DAG,减少了重复计算的开销。
-
迭代计算中的Shuffle与内存优化:
- 在迭代计算过程中,如果需要在多个节点之间进行数据传输,Spark会进行shuffle操作(如
groupByKey()
、reduceByKey()
等)。Shuffle操作通常会涉及大量的数据网络传输以及磁盘I/O操作。 - Spark提供了一些优化策略来减少迭代计算时的shuffle开销。通过将中间结果持久化到内存中,可以在不同的迭代之间避免重复的shuffle操作。
- 在迭代计算过程中,如果需要在多个节点之间进行数据传输,Spark会进行shuffle操作(如
- 注意:Spark我们一般推荐只设置全局并行度,不要再算子上设置并行度
- 除了一些排序算子外,计算算子就让他默认开分区就可以了
-
Task与Executor内存管理:
- 在每个Executor中,内存被划分为不同的区域:用于缓存RDD的内存(storage memory)和用于执行任务的内存(execution memory)。当RDD被持久化到内存时,它将占用Executor的存储内存。
- Spark在内存管理中使用了LRU(最近最少使用)算法,当内存不足时,Spark会自动将不常用的RDD从内存中移除,腾出空间给新的RDD数据。
- 如果内存不足,持久化的数据会被溢出到磁盘中(如果选择了
MEMORY_AND_DISK
级别),从而保证计算的稳定性。
-
迭代式算法的应用场景: 内存迭代计算在以下类型的算法中有显著优势:
- 机器学习算法:如梯度下降法、K-means聚类等,这些算法往往需要对同一批数据进行多次迭代计算。持久化数据可以避免重复的I/O和数据读取。
- 图计算:如PageRank算法,每次迭代都需要从上次的结果进行计算,持久化上次迭代的结果到内存中可以加速整个计算过程。
内存迭代计算的优化策略
-
合理选择持久化级别:
- 对于频繁访问的RDD,应该将其缓存到内存中(
MEMORY_ONLY
或MEMORY_AND_DISK
),以减少重复计算。 - 如果数据量较大,且内存不足,可以选择将RDD序列化后再缓存(
MEMORY_ONLY_SER
),以减少内存占用。
- 对于频繁访问的RDD,应该将其缓存到内存中(
-
避免不必要的shuffle操作:
- 在迭代计算中,shuffle操作往往是性能瓶颈。通过优化算法的设计,减少shuffle操作的频率,可以提高内存迭代计算的性能。例如,在进行分组操作之前可以先对数据进行预处理,以减少后续的shuffle开销。
-
使用broadcast变量:
- 对于在多个节点之间共享的只读数据,可以使用广播变量(
Broadcast
)将其缓存到每个节点的内存中。这样可以减少每次计算时的数据传输开销,提升迭代计算的效率。
- 对于在多个节点之间共享的只读数据,可以使用广播变量(
-
数据倾斜优化:
- 在shuffle操作中,如果某些分区的数据量过大,可能会导致计算负载不均衡。可以通过对数据进行预分区或自定义分区器的方式,来减少数据倾斜,提升迭代计算的效率。
Spark的内存迭代计算通过RDD的持久化和DAG调度机制,使得迭代计算过程更加高效,特别适用于机器学习和图计算等需要对同一数据集进行多次计算的场景。通过合理使用持久化和优化内存管理,开发者可以有效提升迭代计算的性能,减少重复计算和shuffle操作的开销。
-
Spark是怎么做内存计算的?DAG的作用?Stage阶段划分的作用?
- Spark会产生DAG图
- DAG图会基于分区和宽窄依赖关系划分阶段
- 个个阶段的内部都是窄依赖,窄依赖内,如果形成前后1:1的分区对应关系,就可以产生许多内存送代计算的管道
- 这些内存送代计算的管道,就是一个个具体的执行Task
- 一Task是一个具体的线程,任务跑在一个线程内,就是走内存计算了
-
Spark为什么比MR快
- Spark的算子丰富,MapReduce算子医乏(Map和Reduce),MapReduce这个编程模型,很难在一套MR中处理复杂的任务,很多的复杂任务,是需要写多个MapReduce进行串联.多个MR串联通过磁盘交互数据(编程模型上Spark占优,算子多)
- Spark可以执行内存送代,算子之间形成DAG基于依赖划分阶段后,在阶段内形成内存送代管道。但是MapReduce的Map和Reduce之间的交互依旧是通过硬盘来交互的。(算子交互上,和计算上可以尽量多的内存计算而非磁盘迭代)