Spark 学习笔记 (二): 深入Spark计算引擎
先来回顾一下Spark的程序运行架构:
-
对于任何一个Spark程序,有且仅有一个
SparkContext
,其实一个SparkContext
就对应了一个Driver
; -
一个
Driver
就是一个进城,运行在一个节点上,程序的main函数就运行在Driver
上; -
main函数通过分析程序,将程序转化成一些列
Task
,然后分发到各个节点的Executor
上去执行;一个节点可以运行一个或多个Executor
;然后一个Executor
可以同时跑若干个Task
;-
每个节点有多少个Executor,每个Executor上有多少个Task,都是可以由用户指定的计算资源)
-
(分布式计算:主要就是需要分布式地调度计算资源和计算任务)
-
Job执行过程:作业、阶段与任务
Job逻辑执行图
Job的实际执行流程比用户头脑中的要复杂,需要先建立逻辑执行图(或者叫数据依赖图),然后划分逻辑执行图生成DAG型的物理执行图,然后生成具体Task
执行。
如何产生RDD,产生哪些RDD
一些典型的transformation()及其创建的RDD:
iterator(split) 的意思是 foreach record in the partition
RDD的依赖关系
实际上,从parent RDD 经过转换操作,生成RDD x的过过程中,需要考虑的问题是主要会分为三种:
-
RDD 本身的依赖关系。要生成的 RDD(以后用 RDD x 表示)是依赖一个 parent RDD,还是多个 parent RDDs?
- (这个比较直接可以由代码就可分析出比如 x = rdda.transformation(rddb) (e.g., x = a.join(b)) 就表示 RDD x 同时依赖于 RDD a 和 RDD b)
-
RDD x 中会有多少个 partition ?
- (这个一般可由用户只指定,不指定的话一般取
max(numPartitions[parent RDD 1], ..., numPartitions[parent RDD n])
- (这个一般可由用户只指定,不指定的话一般取
numPartitions[parent RDD n])`)
- RDD x 与其 parent RDDs 中
partition
之间是什么依赖关系?是依赖 parent RDD 中一个还是多个 partition?
主要需要考虑的是最后一个问题。这要结合不同 transformation() 的语义,不同的 transformation() 的依赖关系不同。
RDD x 中每个 partition 可以依赖于 parent RDD 中一个或者多个 partition。而且这个依赖可以是NarrowDependency 完全依赖(窄依赖)
或者ShuffleDependency 部分依赖(宽依赖)
。
下图展示了完全依赖和部分依赖
前三个是完全依赖,RDD x 中的 partition 与 parent RDD 中的 partition/partitions 完全相关。也就是说parent RDD中的每个partition只会被RDD x中的一个partiton使用!
最后一个是部分依赖,RDD x 中的 partition 只与 parent RDD 中的 partition 一部分数据相关,另一部分数据与 RDD x 中的其他 partition 相关。也就是说父RDD的每个partiton都有可能被多个子RDD的partition使用
- 部分依赖(宽依赖)是Spark计算的主要耗时阶段,需要重点优化的部分
在 Spark 中,完全依赖被称为 NarrowDependency,部分依赖被称为 ShuffleDependency。其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同(mapper 将其 output 进行 partition,然后每个 reducer 会将所有 mapper 输出中属 于自己的 partition 通过 HTTP fetch 得到)
Job物理执行图
主要的问题是:给定job的逻辑执行图,如何生成物理执行图(也就是 stages 和 tasks)?
逻辑执行计划到物理执行计划的转化需要执行:
- (1) 划分
Stage
- (2) 生成
Task
Spark Task的类型只有两种:ShuffleMapTask
和ResultTask
问:每个Stage的Task数目?
- First Stage: 由hdfs block或hbase regioin 数目决定
- Other Stages: 由用户设置,默认与第一个阶段相等
Stage划分算法
根据RDD的依赖关系,划分Stage:
从后往前推算,遇到ShuffleDependency就断开,遇到NarrowDependency就将其加入该stage。 每个stage里面task的数目由该stage最后一个 RDD 中的partition个数决定
物理图的执行
生成了Stage和Task后,接下来的问题就是怎么去执行物理图:
回想 pipeline
的思想是 数据用的时候再算,而且数据是流到要计算的位置的
Result 产生的地方的就是要计算的位置,要确定 “需要计算的数据”,我们可以从后往前推,需要哪个 partition 就计算哪个 partition,如果 partition 里面没有数据,就继续向前推,形成 computing chain。这样推下去,结果就是:需要首先计算出每个 stage 最左边的 RDD 中的某些 partition
整个 computing chain 根据数据依赖关系自后向前建立,遇到 ShuffleDependency 后形成 stage。在每个 stage 中,每个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。
生成Job
用户的 driver 程序中一旦出现 action(),就会生成一个 job
每一个 job 包含 n 个 stage,最后一个 stage 产生 result。比如,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程中,DAGScheduler 会首先划分 stage,然后先提交无 parent stage 的 stages,并在提交过程中确定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才能够提交。从 stage 和 task 的执行角度来讲,一个 stage 的 parent stages 执行完后,该 stage 才能执行
Spark 资源调度和任务调度
从TaskScheduler开始
TaskScheduler的主要作用就是获得需要处理的任务集合,并将其发送到集群进行处理。并且还有汇报任务运行状态的作用。 所以其是在Master端。具体有以下4个作用:
-
接收来自Executor的心跳信息,使Master知道该Executer的BlockManager还“活着”
-
对于失败的任务进行重试
-
对于stragglers(拖后腿的任务)放到其他的节点执行
-
向集群提交任务集,交给集群运行
调度和执行任务
-
作业调度
-
FIFO或Fair
-
优化机制:数据本地性和推测执行
-
-
任务执行
-
Task被序列化后,发送到executor上执行
-
ShuffleMapTask将中间数据写到本地,ResultTask远程读取数据
-
为什么要写在本地?
后面的 RDD 多个分区都要去读这个信息,如果放到内存,如果出现数据丢失,后面的所有步骤全部不能进行,违背了之前所说的需要父 RDD 分区数据全部 ready 的原则同一个 stage 里面的 task 是可以并发执行的,下一个 stage 要等前一个 stage ready
-
-
数据用的时候再算,而且数据是流到要计算的位置的
-
-
Spark 任务调度与执行模块,源码阅读与理解
Spark Shuffle过程
Shuffle是分布式计算框架的核心s数据交换方式,它的实现方式直接决定了计算框架的性能和可扩展性
Shuffle阶段主要解决的问题是:数据是怎么通过 ShuffleDependency 流向下一个 stage 的
产生shuffle的算子:join, cogroup, 和*ByKey(reduceByKey, groupByKey, sortByKey,…
Shuffle write
Shuffle中map端输出的数据要先写到磁盘,然后由reduce进行拉取
上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket
ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalRDD 中对应 partition 的 records。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.partition(record.getKey()))决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的FileSegment
,然后进入 shuffle read 阶段。
这样的实现很简单,但有几个问题:
-
产生的 FileSegment 过多
-
缓冲区占用内存空间大
第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法: 一个MapTask只产生一个FileSegment
;前面每个MapTask有几个Reduce就产生几个 FileSegment
;如下图:
在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i;数据块在数据文件中的偏移量存储在不同的索引文件中
Shuffle read
Shuffle read就是将之前Shuffle write写的数据拉过来再做处理;
fetch 来的 records 被逐个 aggreagte 到 HashMap 中,等到所有 records 都进入 HashMap,就得到最后的处理结果