Spark 基于 MapReduce (MR) 开发

  • Spark 是一种强大的分布式计算框架,最初受到 Hadoop MapReduce 的启发。与 Hadoop 不同的是,Spark 专注于内存计算和优化处理流程,提高了计算性能和效率。

Spark 框架特点

Spark 是一个开源的大数据处理框架,专注于 内存计算,提供了更高效的数据处理能力。与传统基于磁盘的 Hadoop MapReduce 相比,Spark 使用内存作为数据处理的中介,极大提高了数据分析和处理的速度。它是一种 快速通用可扩展 的大数据分析计算框架。

  • 快速:Spark 的核心特性是内存计算,能将任务执行速度提升 10 倍以上。它支持批处理和流处理。
    • Spark支持内存计算,并且通过DAG(由向无环图)执行引擎支持无环数据流,所以计算同样的任务在内存中比Hadoop的MR快100左右,在硬盘中快10倍左右。
  1. Spark处理数据时,中间结果通常存储与内存中
  2. Spark提供了丰富的算子(API 即Spark自带的函数),可以使得复杂的任务在一个Spark程序中完成
  • 易于使用: Spark是由Scala语言编写的,但是支持Java、Python、R、SQL、Scala等多种语言,有良好的语言适应性,而且且提供大量的API方便编程
  • 通用:Spark 不仅限于批量数据处理,它支持 SQL 查询、流处理、机器学习、图计算等多种任务。
    • 拥有Spark SQL 、 Spark Streaming 、 MLlib 、 GraphX 等多种工具库
  • 可扩展:Spark 具有很好的扩展性,能够在大规模集群上处理海量数据。
  • 支持多种运行方式:Spark支持多种运行方式,包括在Hadoop,Mesos上,也支持在Standalone上独立运行,同时支持k8s(Kubernetes)上运行
    • 对于数据源而言,Spark支持从HDFS、HBase、Kafka、Cassandra等多种途径获取

Spark 相比于 Hadoop MR

Hadoop 与 Spark 的对比

  • Hadoop MapReduce:以磁盘为基础的计算模式,适合处理离线批量数据,处理速度较慢。
  • Spark:基于内存的计算模式,更加高效,尤其适合实时数据处理和复杂的分析任务。

1. 开发语言

  • MapReduce (MR):最早的 Hadoop MapReduce 使用 Java 编写。虽然 Java 具有广泛的使用范围,但它在处理大规模数据时显得不够灵活。
  • Spark:Spark 采用了 Scala 作为主要的开发语言。Scala 具有函数式编程的特性,特别适合进行大量的数据处理。除了 Scala,Spark 也支持 Java、Python 和 R 等语言。

2. 处理方式

20240903220327.png

MapReduce工作示意图

  • Hadoop:Hadoop 最早出现时,主要基于磁盘的外存计算(即 MapReduce),每次处理完数据后都会将结果写入磁盘,因此处理效率较低。
20240903220327.png
Spark工作示意图

Spark架构

Spark 架构 由以下几个部分组成:

  • Driver 程序:负责创建 SparkContext 对象,并将应用程序逻辑划分为多个 Task
  • Executor 进程:负责执行 Task,并将结果返回给 Driver
  • Cluster Manager :负责管理集群中的资源和节点,可以是 YARN ,也可以是其他类型
  • SparkContext 对象:负责与 Cluster Manager 交互申请资源,并创建 Executor
  • RDD :弹性分布式数据集,是 Spark 的核心抽象,表示一个不可变、分区、可并行操作的数据集合
  • DAGScheduler :负责将 RDD 的操作转换为 DAG (有向无环图),并将 DAG 划分为多个 Stage
  • TaskScheduler :负责将 Stage 划分为多个 Task,并将 Task 分配给 Executor 执行

Spark框架中的角色

资源管理层

  • 集群资源管理者(Matser):Master

  • 单机资源管理者(Woker):Woker

任务计算层

  • 单任务管理者(Master):Driver

    • 运行位置:
      • Standalone 和 Client 模式:在提交应用程序的客户端节点上运行,而不是在 Worker 或 Master 节点上。
      • Cluster 模式(如 YARN 或 Kubernetes):Driver 可以被安排在集群内运行。例如在 YARN 的 Cluster 模式下,Driver 会在 YARN 的 ApplicationMaster 中运行;在 Kubernetes 中,Driver 被作为 Kubernetes Pod 运行在集群中。
      • 本地模式:如果运行在本地模式(local[*]),Driver 和 Executor 都在本地运行,并不涉及分布式集群。
    • 职责:
      • 应用逻辑控制:Driver 负责整个应用程序的主逻辑和代码的执行。
      • 任务调度:Driver 创建 SparkContext,解析应用程序中的转换和行动,将作业划分为多个阶段和任务。
      • 作业分配:将作业划分为多个任务并分配到集群中的 Executors。
      • 状态监控:Driver 实时监控作业和任务的状态,收集和汇总任务执行的结果。
      • 容错管理:负责处理和重新调度因失败而需要重试的任务。
    • 生命周期:
      • 启动:Driver 在应用程序启动时创建,通常伴随 SparkContext 的初始化。它作为整个应用的主进程,在 Driver 初始化后会启动任务分配和调度。
      • 应用程序运行期间:Driver 在应用程序运行期间会持续保持活动状态,直到所有任务执行完成。它还管理任务的执行状态和处理错误。
      • 结束:当应用程序完成或失败时,Driver 会关闭,释放所有资源,包括 SparkContext。在 Cluster 模式下,Driver 进程和相应的资源也会被集群管理器回收。
    efac0489bb9cce7aa44dfbc5a3ddc410.png
Driver角色
  • 单任务执行者(Woker):Executor
    • 运行位置:Executor 总是运行在 Worker 节点上(或等效的资源节点上,如 YARN Container、Kubernetes Pod 等),它不会出现在 Master 节点或客户端节点上。
    • 职责
      • 执行 Task:Executor 负责接收并执行 Driver 分配的 Task(任务),这些任务处理特定的数据分区。
      • 数据缓存:Executor 可以在内存中缓存中间数据或计算结果,从而提升多次数据处理的性能。
      • 任务结果返回:在任务完成后,Executor 会将计算结果返回给 Driver,或在某些情况下将数据保存在 Worker 节点上以便后续阶段使用。
    • 生命周期
      • Executor 的生命周期与应用程序绑定,当应用程序启动时,Executors 在 Worker 上被分配资源启动,并在应用程序结束时被销毁。因此,每个应用在运行时拥有独立的 Executors,多个应用的 Executors 彼此隔离。
Executor角色

Spark 运行模式

1. 单机模式(Local 模式)

  • 该模式主要用于本地开发和测试。
  • Spark 在本地运行单个进程,不涉及集群的概念。
  • 适合小规模的作业和调试任务。
  1. 本质启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
  • Local模式可以限制模拟Spark集群环境的线程数量,即Local[N]Local[*]
    • Local[N] 模式:指定固定的 N 个线程来并行执行任务,超出 CPU 核心数时可能会导致线程争用。
    • Local[*] 模式:根据机器的实际 CPU 核心数动态分配线程,使得并行度等于 CPU 核心数,较为优化资源分配。
    • Local:单线程模式,整个应用程序在一个线程中执行,无并行性

无论是 Local[N] 还是 Local[*] 模式,具体的线程调度和 CPU 分配仍取决于操作系统的线程管理,Spark 本身不会强制线程与 CPU 核心一一映射。

单机模式

2. 分布式模式

  • 系统利用多个节点的资源来进行任务分配,常见的分布式模式。

1. YARN 部署环境

参考文章1

  • 资源由 Hadoop 的 YARN 提供,Spark 作为应用运行在 YARN 上,这种模式通常称Spark on YARN

Hadoop YARN 架构

YARN 本身是一个资源调度框架,负责对运行在内部的计算框架进行资源调度管理。其为分布式计算任务提供两个独立的进程:

  • 资源管理
  • 计算调度
资源管理
资源层面

在集群中,管理资源有两个角色,组成管理计算框架:

  • ResourceManager
    • 负责为任务分配资源
  • NodeManager
    • 每个计算节点上仅且只有一个NodeManager(毕竟一个物理节点不需要多位管理者)负责向ResourceManager,周期性传输节点信息(CPU,内存,磁盘,网络等监控信息)
计算调度
计算调度

计算发生在 NodeManager 所在的节点上,有两个角色:

  • ApplicationMaster 负责向 ResourcesManager 申请资源,并调度作业(Task)
  • Container 负责执行具体的计算作业

在 YARN 中,每个应用程序都有一个 ApplicationMaster ,负责与 ResourceManager 交互申请资源,并与 NodeManager 交互启动和监控任务。

Spark on YARN

参考文章2

Spark on YARN 是指将 Spark 应用程序运行在 YARN 集群上,利用 YARN 提供的资源管理和计算调度功能。在这种架构下,Spark 中的角色和 YARN 中的角色一一对应:

  • ClusterManager 相当于ResourceManager,负责资源调度
  • WorkerNode 相当于 NodeManager,负责管理节点状态
  • Driver 相当于 ApplicationMaster,负责调度作业,可以运行在集群外部的客户端,也可以运行在集群内部的某个节点上
  • Executor 相当于 Container,负责具体的计算任务
SparkOnYarn集群节点分布
Yarn-Cluster模式

使用spark-submit提交一个任务到高可用的YARN集群,使用cluster模式:

  • –deploy-mode cluster :submit程序的时候可以指定
./bin/spark-submit
–class org.apache.spark.examples.mainTest
–master yarn
–deploy-mode cluster
–executor-memory 512m
–total-executor-cores 1
~/jars/spark-examples_test.jar
Yarn-Cluster模式

在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:

  1. 第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;
  2. 第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成

具体流程:

  • Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
  • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
  • ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
  • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
  • ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
  • 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己;
Yarn-Client模式

使用spark-submit提交一个任务到高可用的YARN集群,使用client模式:

  • –deploy-mode client :submit程序的时候可以指定
./bin/spark-submit
–class org.apache.spark.examples.mainTest
–master yarn
–deploy-mode client
–executor-memory 512m
–total-executor-cores 1
~/jars/spark-examples_test.jar
Yarn-Client模式
  • Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler、SparkEnv对象等,由于选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
  • ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
  • Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
  • 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
  • Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
  • 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己;

2. Spark Standalone 模式

【 注意】Standalone的两种模式下(client/Cluster),Master在接到Driver注册Spark应用程序的请求后,会获取其所管理的剩余资源能够启动一个Executor的所有Worker,然后在这些Worker之间分发Executor,此时的分发只考虑Worker上的资源是否足够使用,直到当前应用程序所需的所有Executor都分配完毕,Executor反向注册完毕后,Driver开始执行main程序。

Standalone Client 模式

  • 这是 Spark 自带的任务调度模式。
  • 由 Spark 本身提供资源调度的功能,可以将集群资源用于任务执行。
  • 在国内的生产环境中不常用,但在小型集群中可以作为独立的调度方式。
Spark Standalone Client模式
  • 在Standalone Client模式下,Driver在任务提交的本地机器上运行,
  • Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,
  • 然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,
  • Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。

Standalone Cluster 模式

Spark Standalone Cluster模式
  • 在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver进程,
  • Driver 启动后向 Master 注册应用程序,
  • Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,
  • 然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,
  • 所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。

3. Mesos 模式

  • Spark 使用 Mesos 平台进行资源调度和任务管理。
  • 虽然具有较高的扩展性和灵活性,但在国内的应用较为少见。

3. 云原生应用

  • Kubernetes上运行
  • 新技术自行查询资料学习

Spark内核调度角色

Job

  • Job 是由 Action 操作触发的 Spark 作业。当用户在 RDD、DataFrame 或 Dataset 上调用 collect()count()save() 等 Action 操作时,Spark 会生成一个 Job。
  • Driver 负责将 Job 划分为多个 Stage,并将每个 Stage 进一步划分为 Task。
  • 使用的调度器:DAGScheduler
    • DAGScheduler 是 Spark 中负责作业调度的高层调度器,负责将 Job 按依赖关系划分为多个 Stage,并管理 Stage 的依赖关系。
    • 它会解析宽依赖和窄依赖,将 Job 分解成 Stage,进行调度并提交给 TaskScheduler。

Action

  • Action 是 Spark 作业的触发器,表示一个需要立即计算并返回结果的操作。
  • 典型的 Action 包括 collect()count()take()saveAsTextFile() 等。Action 会强制执行延迟计算(Lazy Evaluation),并将 Job 提交给调度系统。
  • 使用的调度器:DAGScheduler
    • DAGScheduler 在接收到 Action 请求时会生成一个 Job,并将其划分为 Stage,随后进一步划分为 Task。
程序划分为Job

Stage

依赖关系的分析

  • Spark 会分析 RDD 或 DataFrame/Dataset 的依赖关系来划分 Stage,依赖关系分为两种:
    • 窄依赖(Narrow Dependency):一个 RDD 的每个分区仅依赖于上一个 RDD 的一个分区。例如,mapfilterunion 等操作。
    • 宽依赖(Wide Dependency):一个 RDD 的分区依赖于上一个 RDD 的多个分区,这会导致数据的 Shuffle 操作。例如,groupByKeyreduceByKeyjoin 等操作。

宽依赖产生新的 Stage

  • 宽依赖的操作(如 reduceByKeygroupByKey)会引发数据 Shuffle,因此这些操作会产生一个新的 Stage。
  • 在同一个 Stage 内,所有的操作都是窄依赖的,直到遇到第一个宽依赖操作才会划分新的 Stage

DAGScheduler 划分 Stage

  • DAGScheduler 是 Spark 中负责将 Job 划分为多个 Stage 的调度器。它会根据 RDD 的依赖关系构建一个 DAG(有向无环图),表示整个 Job 的执行流程。
  • DAG 的每个节点表示一个 RDD 转换操作,DAGScheduler 会按照宽依赖的边界划分 Stage

Stage 划分流程

  • Stage 1:包含所有窄依赖的 Transformation 操作,直到遇到第一个宽依赖操作。
  • Stage 2:从第一个宽依赖后的操作开始,形成新的 Stage,包含接下来的窄依赖操作,直到遇到下一个宽依赖操作。
  • 重复上述步骤,直到所有 RDD 转换操作都划分完毕。
Job划分Stage

Task

  1. 数据分区(Partition)
    • Task 的数量等于数据的分区数。当一个 Stage 被提交时,Spark 会查看该 Stage 处理的 RDD 或 DataFrame 的分区数量,每个分区对应一个 Task。
    • 例如,如果一个 RDD 有 10 个分区,那么这个 Stage 会被划分成 至多10 个 Task。

stage中task的数量至多为RDD的partion数量,partion是影响task上限的决定性因素

  1. Task生命周期

    • 宽依赖结束

      • Shuffle 操作:当一个 Stage 内部的 Task 遇到宽依赖(如 groupByKeyreduceByKeyjoin 等操作)时,数据需要经过 Shuffle 阶段重新分配。
      • 数据传输:此时,Task 将当前分区的数据写入本地磁盘或内存,然后通过网络传输给下一个 Stage 的 Task。
      • 任务结束:在完成数据的 Shuffle 后,当前 Task 的执行即告结束,等待下一个 Stage 的 Task 从重新分区后的数据继续执行。
    • Job 边界结束

      • Action 操作:当 Spark 遇到 Action(如 collect()count()saveAsTextFile())时,触发一个新的 Job。
      • 数据传递:每个 Task 处理完当前数据分区后,会将结果返回给 Driver 或输出到指定存储位置。
      • 任务结束:在 Action 操作完成后,该 Job 的 Task 生命周期结束,不会将数据传输给下一个 Task,因为 Action 操作标志着作业的终结。
  2. Executor 的资源分配

    • 每个 Task 是分配到集群中的一个 Executor 上执行的独立计算单元。TaskScheduler 会根据 Executor 的可用资源,将 Task 分配给不同的 Executor 以实现并行执行。
    • 如果有多个 Executor 并且分区数大于 Executor 数量,则 Task 会被分配到不同的 Executor 并行处理;如果分区数小于 Executor 数量,则某些 Executor 可能不会被使用。
  3. Task 的执行流程

    1. Task 提交到 Executor

      • Driver 会将 Stage 划分成多个 Task 后,通过 TaskScheduler 将每个 Task 分配到集群的 Executor 上。
    2. Task 在 Executor 上执行

      • Task 在 Executor 上的线程池中运行,每个 Task 在单个分区的数据上执行计算。
      • Task 会读取分区的数据,执行指定的转换操作(如 mapreduce),并将结果存储在内存或磁盘上。
    3. Task 结果返回

      • 每个 Task 执行完成后,会将计算结果发送回 Driver 或保存为中间数据,供下一阶段使用(如 Shuffle 数据)。
Stage划分Task

Spark内核调度器

DAGScheduler

由于比较复杂,单开一个文章讲解DAGScheduler官方文档解析


TaskScheduler

由于比较复杂,单开一个文章讲解TaskScheduler官方文档解析