DAG的宽窄依赖以及阶段划分
窄依赖:父RDD的一个分区,全部将数据发个子RDD的一个分区 宽依赖:父RDD的一个分区,将数据发给子RDD的多个分区 宽依赖还以一个别分shuffle,其原理过程与hadoop的shuffle阶段差不多
DAG的宽依赖与窄依赖
基于DAG的宽依赖划分阶段,阶段内部都是窄依赖可以构建内存送代的管道
在Spark中,依赖是指RDD之间的关系,也就是如何从一个RDD转换为另一个RDD。依赖分为两种:窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖(Narrow Dependency)
- 在窄依赖中,每个父RDD的分区至多会被一个子RDD的分区所使用。也就是说,一个分区的数据只被下游的一个分区依赖,数据不会发生跨分区的依赖关系。
- 由于数据局限在少数分区内处理,窄依赖可以在同一节点上执行,不需要数据的网络传输。
- 常见的窄依赖操作包括:
map()
、filter()
、flatMap()
、union()
(同一父RDD的分区合并)等。
示例:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val mappedRDD = rdd.map(_ * 2)
在这个例子中,map()
操作形成的是窄依赖,因为每个输入分区的数据只作用于输出的一个分区。
宽依赖(Wide Dependency)
- 在宽依赖中,子RDD的每个分区依赖于多个父RDD的分区。也就是说,一个分区的数据会被多个下游分区使用,通常会涉及多个分区之间的数据传输。
- 由于需要跨分区的数据传输,宽依赖会触发数据的shuffle操作,导致网络传输的开销。
- 常见的宽依赖操作包括:
groupByKey()
、reduceByKey()
、join()
等。
示例:
val rdd = sc.parallelize(Seq((1, "a"), (2, "b"), (1, "c")))
val reducedRDD = rdd.reduceByKey(_ + _)
在这个例子中,reduceByKey()
操作形成的是宽依赖,因为需要对相同的键值在不同分区之间进行聚合,这涉及跨节点的shuffle。
DAG中的阶段划分(Stage)
- Stage 的内部一定是窄依赖
- Stage 由Action算子或者读写文件开始
- Stage 的产生一定会经过宽依赖
在Spark的执行过程中,DAG会被划分成若干个阶段(stage)。阶段是任务执行的基本单元,它的划分依据是宽依赖与窄依赖的不同。
- 窄依赖的Transformation:可以在同一个阶段中顺序执行,因为它们只涉及在同一节点的本地数据操作。
- 宽依赖的Transformation:会触发数据shuffle,需要在不同阶段之间进行数据交换。这意味着每当遇到宽依赖时,DAG就需要划分为新的阶段。
阶段划分规则:
- 窄依赖的操作可以合并在同一个阶段执行:Spark可以将多个窄依赖操作组合成一个阶段,在同一批次中顺序执行,不需要额外的数据传输。
- 宽依赖操作会触发shuffle,划分新的阶段:当遇到宽依赖操作时,数据需要在不同节点之间重新分配,这通常通过网络传输完成(如shuffle操作)。因此,宽依赖会终止当前阶段并开始一个新的阶段。
阶段划分举例
val rdd = sc.textFile("hdfs://path/to/input")
val wordsRDD = rdd.flatMap(_.split(" ")) // 窄依赖
val pairsRDD = wordsRDD.map(word => (word, 1)) // 窄依赖
val wordCountRDD = pairsRDD.reduceByKey(_ + _) // 宽依赖 (shuffle)
val result = wordCountRDD.collect() // Action 触发
DAG阶段划分:
textFile
、flatMap
和map
操作都是窄依赖,Spark会将它们放在同一个阶段中。这是第一个阶段,所有这些操作都可以顺序执行,不需要shuffle。reduceByKey
是宽依赖操作,它会触发数据的shuffle。因此,reduceByKey
将会被划分到第二个阶段,因为需要跨分区的数据聚合。collect
作为Action操作,触发整个DAG的执行。
DAG划分:
- 第一阶段:
textFile
->flatMap
->map
(窄依赖) - 第二阶段:
reduceByKey
(宽依赖,触发shuffle)
- 窄依赖:同一分区数据处理可以在同一个阶段中进行,不触发shuffle。
- 宽依赖:需要跨分区的数据传输,触发shuffle,会划分新的阶段。
- DAG的阶段划分是为了优化并行度,尽可能减少shuffle操作,从而提升Spark任务的执行效率。