Spark概念知识
Spark 基于 MapReduce (MR) 开发
- Spark 是一种强大的分布式计算框架,最初受到 Hadoop MapReduce 的启发。与 Hadoop 不同的是,Spark 专注于内存计算和优化处理流程,提高了计算性能和效率。
Spark 框架特点
Spark 是一个开源的大数据处理框架,专注于 内存计算,提供了更高效的数据处理能力。与传统基于磁盘的 Hadoop MapReduce 相比,Spark 使用内存作为数据处理的中介,极大提高了数据分析和处理的速度。它是一种 快速、通用 和 可扩展 的大数据分析计算框架。
- 快速:Spark 的核心特性是内存计算,能将任务执行速度提升 10 倍以上。它支持批处理和流处理。
- Spark支持内存计算,并且通过DAG(由向无环图)执行引擎支持无环数据流,所以计算同样的任务在内存中比Hadoop的MR快100左右,在硬盘中快10倍左右。
- Spark处理数据时,中间结果通常存储与内存中
- Spark提供了丰富的算子(API 即Spark自带的函数),可以使得复杂的任务在一个Spark程序中完成
- 易于使用: Spark是由Scala语言编写的,但是支持Java、Python、R、SQL、Scala等多种语言,有良好的语言适应性,而且且提供大量的API方便编程
- 通用:Spark 不仅限于批量数据处理,它支持 SQL 查询、流处理、机器学习、图计算等多种任务。
- 拥有Spark SQL 、 Spark Streaming 、 MLlib 、 GraphX 等多种工具库
- 可扩展:Spark 具有很好的扩展性,能够在大规模集群上处理海量数据。
- 支持多种运行方式:Spark支持多种运行方式,包括在Hadoop,Mesos上,也支持在Standalone上独立运行,同时支持k8s(Kubernetes)上运行
- 对于数据源而言,Spark支持从HDFS、HBase、Kafka、Cassandra等多种途径获取
Spark 相比于 Hadoop MR
Hadoop 与 Spark 的对比:
- Hadoop MapReduce:以磁盘为基础的计算模式,适合处理离线批量数据,处理速度较慢。
- Spark:基于内存的计算模式,更加高效,尤其适合实时数据处理和复杂的分析任务。
1. 开发语言:
- MapReduce (MR):最早的 Hadoop MapReduce 使用 Java 编写。虽然 Java 具有广泛的使用范围,但它在处理大规模数据时显得不够灵活。
- Spark:Spark 采用了 Scala 作为主要的开发语言。Scala 具有函数式编程的特性,特别适合进行大量的数据处理。除了 Scala,Spark 也支持 Java、Python 和 R 等语言。
2. 处理方式:
MapReduce工作示意图
- Hadoop:Hadoop 最早出现时,主要基于磁盘的外存计算(即 MapReduce),每次处理完数据后都会将结果写入磁盘,因此处理效率较低。
Spark架构
Spark 架构 由以下几个部分组成:
- Driver 程序:负责创建 SparkContext 对象,并将应用程序逻辑划分为多个 Task
- Executor 进程:负责执行 Task,并将结果返回给 Driver
- Cluster Manager :负责管理集群中的资源和节点,可以是 YARN ,也可以是其他类型
- SparkContext 对象:负责与 Cluster Manager 交互申请资源,并创建 Executor
- RDD :弹性分布式数据集,是 Spark 的核心抽象,表示一个不可变、分区、可并行操作的数据集合
- DAGScheduler :负责将 RDD 的操作转换为 DAG (有向无环图),并将 DAG 划分为多个 Stage
- TaskScheduler :负责将 Stage 划分为多个 Task,并将 Task 分配给 Executor 执行
Spark框架中的角色
资源管理层
-
集群资源管理者(Matser):Master
-
单机资源管理者(Woker):Woker
任务计算层
-
单任务管理者(Master):Driver
- 运行位置:
- Standalone 和 Client 模式:在提交应用程序的客户端节点上运行,而不是在 Worker 或 Master 节点上。
- Cluster 模式(如 YARN 或 Kubernetes):Driver 可以被安排在集群内运行。例如在 YARN 的 Cluster 模式下,Driver 会在 YARN 的 ApplicationMaster 中运行;在 Kubernetes 中,Driver 被作为 Kubernetes Pod 运行在集群中。
- 本地模式:如果运行在本地模式(
local[*]
),Driver 和 Executor 都在本地运行,并不涉及分布式集群。
- 职责:
- 应用逻辑控制:Driver 负责整个应用程序的主逻辑和代码的执行。
- 任务调度:Driver 创建 SparkContext,解析应用程序中的转换和行动,将作业划分为多个阶段和任务。
- 作业分配:将作业划分为多个任务并分配到集群中的 Executors。
- 状态监控:Driver 实时监控作业和任务的状态,收集和汇总任务执行的结果。
- 容错管理:负责处理和重新调度因失败而需要重试的任务。
- 生命周期:
- 启动:Driver 在应用程序启动时创建,通常伴随 SparkContext 的初始化。它作为整个应用的主进程,在 Driver 初始化后会启动任务分配和调度。
- 应用程序运行期间:Driver 在应用程序运行期间会持续保持活动状态,直到所有任务执行完成。它还管理任务的执行状态和处理错误。
- 结束:当应用程序完成或失败时,Driver 会关闭,释放所有资源,包括 SparkContext。在 Cluster 模式下,Driver 进程和相应的资源也会被集群管理器回收。
- 运行位置:
- 单任务执行者(Woker):Executor
- 运行位置:Executor 总是运行在 Worker 节点上(或等效的资源节点上,如 YARN Container、Kubernetes Pod 等),它不会出现在 Master 节点或客户端节点上。
- 职责:
- 执行 Task:Executor 负责接收并执行 Driver 分配的 Task(任务),这些任务处理特定的数据分区。
- 数据缓存:Executor 可以在内存中缓存中间数据或计算结果,从而提升多次数据处理的性能。
- 任务结果返回:在任务完成后,Executor 会将计算结果返回给 Driver,或在某些情况下将数据保存在 Worker 节点上以便后续阶段使用。
- 生命周期:
- Executor 的生命周期与应用程序绑定,当应用程序启动时,Executors 在 Worker 上被分配资源启动,并在应用程序结束时被销毁。因此,每个应用在运行时拥有独立的 Executors,多个应用的 Executors 彼此隔离。
Spark 运行模式
1. 单机模式(Local 模式):
- 该模式主要用于本地开发和测试。
- Spark 在本地运行单个进程,不涉及集群的概念。
- 适合小规模的作业和调试任务。
- 本质启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
- Local模式可以限制模拟Spark集群环境的线程数量,即Local[N] 或 Local[*]
- Local[N] 模式:指定固定的
N
个线程来并行执行任务,超出 CPU 核心数时可能会导致线程争用。 - Local[*] 模式:根据机器的实际 CPU 核心数动态分配线程,使得并行度等于 CPU 核心数,较为优化资源分配。
- Local:单线程模式,整个应用程序在一个线程中执行,无并行性
- Local[N] 模式:指定固定的
无论是 Local[N] 还是 Local[*] 模式,具体的线程调度和 CPU 分配仍取决于操作系统的线程管理,Spark 本身不会强制线程与 CPU 核心一一映射。
2. 分布式模式:
- 系统利用多个节点的资源来进行任务分配,常见的分布式模式。
1. YARN 部署环境
- 资源由 Hadoop 的 YARN 提供,Spark 作为应用运行在 YARN 上,这种模式通常称Spark on YARN。
Hadoop YARN 架构
YARN 本身是一个资源调度框架,负责对运行在内部的计算框架进行资源调度管理。其为分布式计算任务提供两个独立的进程:
- 资源管理
- 计算调度
资源管理
在集群中,管理资源有两个角色,组成管理计算框架:
ResourceManager
- 负责为任务分配资源
NodeManager
- 每个计算节点上仅且只有一个
NodeManager
(毕竟一个物理节点不需要多位管理者)负责向ResourceManager
,周期性传输节点信息(CPU,内存,磁盘,网络等监控信息)
- 每个计算节点上仅且只有一个
计算调度
计算发生在 NodeManager
所在的节点上,有两个角色:
ApplicationMaster
负责向ResourcesManager
申请资源,并调度作业(Task)Container
负责执行具体的计算作业
在 YARN 中,每个应用程序都有一个 ApplicationMaster
,负责与 ResourceManager
交互申请资源,并与 NodeManager
交互启动和监控任务。
Spark on YARN
Spark on YARN 是指将 Spark 应用程序运行在 YARN 集群上,利用 YARN 提供的资源管理和计算调度功能。在这种架构下,Spark 中的角色和 YARN 中的角色一一对应:
ClusterManager
相当于ResourceManager
,负责资源调度WorkerNode
相当于NodeManager
,负责管理节点状态Driver
相当于ApplicationMaster
,负责调度作业,可以运行在集群外部的客户端,也可以运行在集群内部的某个节点上Executor
相当于Container
,负责具体的计算任务
Yarn-Cluster模式
使用spark-submit提交一个任务到高可用的YARN集群,使用cluster模式:
- –deploy-mode cluster :submit程序的时候可以指定
./bin/spark-submit
–class org.apache.spark.examples.mainTest
–master yarn
–deploy-mode cluster
–executor-memory 512m
–total-executor-cores 1
~/jars/spark-examples_test.jar
在YARN-Cluster模式中,当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:
- 第一个阶段是把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动;
- 第二个阶段是由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成
具体流程:
- Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
- ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
- ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
- 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
- ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
- 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己;
Yarn-Client模式
使用spark-submit提交一个任务到高可用的YARN集群,使用client模式:
- –deploy-mode client :submit程序的时候可以指定
./bin/spark-submit
–class org.apache.spark.examples.mainTest
–master yarn
–deploy-mode client
–executor-memory 512m
–total-executor-cores 1
~/jars/spark-examples_test.jar
- Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler、SparkEnv对象等,由于选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
- ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
- Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
- 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
- Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
- 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己;
2. Spark Standalone 模式
【 注意】Standalone的两种模式下(client/Cluster),Master在接到Driver注册Spark应用程序的请求后,会获取其所管理的剩余资源能够启动一个Executor的所有Worker,然后在这些Worker之间分发Executor,此时的分发只考虑Worker上的资源是否足够使用,直到当前应用程序所需的所有Executor都分配完毕,Executor反向注册完毕后,Driver开始执行main程序。
Standalone Client 模式
- 这是 Spark 自带的任务调度模式。
- 由 Spark 本身提供资源调度的功能,可以将集群资源用于任务执行。
- 在国内的生产环境中不常用,但在小型集群中可以作为独立的调度方式。
- 在Standalone Client模式下,Driver在任务提交的本地机器上运行,
- Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少可以启动一个Executor的所有Worker,
- 然后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,所有的Executor注册完成后,
- Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
Standalone Cluster 模式
- 在 Standalone Cluster 模式下,任务提交后,Master 会找到一个 Worker 启动 Driver进程,
- Driver 启动后向 Master 注册应用程序,
- Master 根据 submit 脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,
- 然后在这些 Worker 之间分配 Executor,Worker 上的 Executor 启动后会向 Driver 反向注册,
- 所有的 Executor 注册完成后,Driver 开始执行 main 函数,之后执行到 Action 算子时,开始划分 stage,每个 stage 生成对应的 taskSet,之后将 task 分发到各个 Executor 上执行。
3. Mesos 模式
- Spark 使用 Mesos 平台进行资源调度和任务管理。
- 虽然具有较高的扩展性和灵活性,但在国内的应用较为少见。
3. 云原生应用
- Kubernetes上运行
- 新技术自行查询资料学习
Spark内核调度角色
Job
- Job 是由 Action 操作触发的 Spark 作业。当用户在 RDD、DataFrame 或 Dataset 上调用
collect()
、count()
或save()
等 Action 操作时,Spark 会生成一个 Job。 - Driver 负责将 Job 划分为多个 Stage,并将每个 Stage 进一步划分为 Task。
- 使用的调度器:DAGScheduler
DAGScheduler
是 Spark 中负责作业调度的高层调度器,负责将 Job 按依赖关系划分为多个 Stage,并管理 Stage 的依赖关系。- 它会解析宽依赖和窄依赖,将 Job 分解成 Stage,进行调度并提交给 TaskScheduler。
Action
- Action 是 Spark 作业的触发器,表示一个需要立即计算并返回结果的操作。
- 典型的 Action 包括
collect()
、count()
、take()
、saveAsTextFile()
等。Action 会强制执行延迟计算(Lazy Evaluation),并将 Job 提交给调度系统。 - 使用的调度器:DAGScheduler
DAGScheduler
在接收到 Action 请求时会生成一个 Job,并将其划分为 Stage,随后进一步划分为 Task。
Stage
依赖关系的分析
- Spark 会分析 RDD 或 DataFrame/Dataset 的依赖关系来划分 Stage,依赖关系分为两种:
- 窄依赖(Narrow Dependency):一个 RDD 的每个分区仅依赖于上一个 RDD 的一个分区。例如,
map
、filter
、union
等操作。 - 宽依赖(Wide Dependency):一个 RDD 的分区依赖于上一个 RDD 的多个分区,这会导致数据的 Shuffle 操作。例如,
groupByKey
、reduceByKey
、join
等操作。
- 窄依赖(Narrow Dependency):一个 RDD 的每个分区仅依赖于上一个 RDD 的一个分区。例如,
宽依赖产生新的 Stage
- 宽依赖的操作(如
reduceByKey
、groupByKey
)会引发数据 Shuffle,因此这些操作会产生一个新的 Stage。 - 在同一个 Stage 内,所有的操作都是窄依赖的,直到遇到第一个宽依赖操作才会划分新的 Stage。
DAGScheduler 划分 Stage
- DAGScheduler 是 Spark 中负责将 Job 划分为多个 Stage 的调度器。它会根据 RDD 的依赖关系构建一个 DAG(有向无环图),表示整个 Job 的执行流程。
- DAG 的每个节点表示一个 RDD 转换操作,DAGScheduler 会按照宽依赖的边界划分 Stage。
Stage 划分流程
- Stage 1:包含所有窄依赖的 Transformation 操作,直到遇到第一个宽依赖操作。
- Stage 2:从第一个宽依赖后的操作开始,形成新的 Stage,包含接下来的窄依赖操作,直到遇到下一个宽依赖操作。
- 重复上述步骤,直到所有 RDD 转换操作都划分完毕。
Task
- 数据分区(Partition):
- Task 的数量等于数据的分区数。当一个 Stage 被提交时,Spark 会查看该 Stage 处理的 RDD 或 DataFrame 的分区数量,每个分区对应一个 Task。
- 例如,如果一个 RDD 有 10 个分区,那么这个 Stage 会被划分成 至多10 个 Task。
stage中task的数量至多为RDD的partion数量,partion是影响task上限的决定性因素
-
Task生命周期:
-
宽依赖结束
- Shuffle 操作:当一个 Stage 内部的 Task 遇到宽依赖(如
groupByKey
、reduceByKey
、join
等操作)时,数据需要经过 Shuffle 阶段重新分配。 - 数据传输:此时,Task 将当前分区的数据写入本地磁盘或内存,然后通过网络传输给下一个 Stage 的 Task。
- 任务结束:在完成数据的 Shuffle 后,当前 Task 的执行即告结束,等待下一个 Stage 的 Task 从重新分区后的数据继续执行。
- Shuffle 操作:当一个 Stage 内部的 Task 遇到宽依赖(如
-
Job 边界结束
- Action 操作:当 Spark 遇到 Action(如
collect()
、count()
、saveAsTextFile()
)时,触发一个新的 Job。 - 数据传递:每个 Task 处理完当前数据分区后,会将结果返回给 Driver 或输出到指定存储位置。
- 任务结束:在 Action 操作完成后,该 Job 的 Task 生命周期结束,不会将数据传输给下一个 Task,因为 Action 操作标志着作业的终结。
- Action 操作:当 Spark 遇到 Action(如
-
-
Executor 的资源分配:
- 每个 Task 是分配到集群中的一个 Executor 上执行的独立计算单元。TaskScheduler 会根据 Executor 的可用资源,将 Task 分配给不同的 Executor 以实现并行执行。
- 如果有多个 Executor 并且分区数大于 Executor 数量,则 Task 会被分配到不同的 Executor 并行处理;如果分区数小于 Executor 数量,则某些 Executor 可能不会被使用。
-
Task 的执行流程
-
Task 提交到 Executor:
- Driver 会将 Stage 划分成多个 Task 后,通过 TaskScheduler 将每个 Task 分配到集群的 Executor 上。
-
Task 在 Executor 上执行:
- Task 在 Executor 上的线程池中运行,每个 Task 在单个分区的数据上执行计算。
- Task 会读取分区的数据,执行指定的转换操作(如
map
、reduce
),并将结果存储在内存或磁盘上。
-
Task 结果返回:
- 每个 Task 执行完成后,会将计算结果发送回 Driver 或保存为中间数据,供下一阶段使用(如 Shuffle 数据)。
-
Spark内核调度器
DAGScheduler
由于比较复杂,单开一个文章讲解DAGScheduler官方文档解析
TaskScheduler
由于比较复杂,单开一个文章讲解TaskScheduler官方文档解析