本文是Scheduler模块源码分析的第二篇,第一篇Spark Scheduler模块源码分析之DAGScheduler主要分析了DAGScheduler。本文接下来结合Spark-1.6.0的源码继续分析TaskScheduler和SchedulerBackend。
一、TaskScheduler和SchedulerBackend类结构和继承关系
之所以把这部分放在最前面,是想让大家在阅读后续文章时对TaskScheduler和SchedulerBackend是什么有一个概念。因为有些方法是从父类继承的,如果不先把类结构梳理清楚,到后面遇到多个XXXScheduler或者XXXBackend时迷失方向。
1、TaskScheduler
其实,严格来说TaskScheduler是一个trait,打开org.apache.spark.scheduler.TaskScheduler
就可以看到。在源码中真正使用的还是TaskScheduler的实现类。至于使用的是哪个实现类,和Spark的部署模式有关,本文中使用的是yarn-client模式,对应YarnScheduler类,第二部分创建TaskScheduler时会有分析。
TaskScheduler及其子类的类结构和继承关系,以及重要的变量和方法都列举在下面的UML图中,下图中只列出有具体实现的方法。
2、SchedulerBackend
这里说到的org.apache.spark.scheduler.SchedulerBackend
也是一个trait,根据第二节SchedulerBackend类创建过程中的分析,从yarn-client模式下使用的YarnClientSchedulerBackend往上分析,类结构图如下
从上图中可以看出yarn-client模式下使用的YarnClientSchedulerBackend类继承自一个ExecutorAllocationClient的trait,所以YarnClientSchedulerBackend可以与cluster manager联系,用于请求和kill Executor。同时也继承自SchedulerBackend trait,所以YarnClientSchedulerBackend也可以用于请求资源。
其实每一个SchedulerBackend的具体实现起到的作用基本相同,就是向当前等待分配计算资源的Task分配计算资源(即Executors),并且在分配的Executor上启动Task,完成计算的资源调度过程。
二、TaskScheduler和SchedulerBackend的创建
1、SparkContext#createTaskScheduler创建实例
在DAGScheduler中已经提到TaskScheduler和SchedulerBackend都是在SparkContext中同时生成的。
val (sched , ts) = SparkContext.createTaskScheduler(this, master) // 这里的master是"spark.master"参数的值,String类型
_schedulerBackend = sched//生成 schedulerBackend
_taskScheduler = ts//生成 taskScheduler
_taskScheduler .start()
进入到SparkContext.createTaskScheduler
方法中,该方法中根据master类型,生成不同的TaskScheduler和SchedulerBackend实例,下面仍然以“yarn-client”模式进行分析,其他模式请进入该方法中查看。
master match { // 获取master的类型
case "yarn-client" =>
val scheduler = try {
val clazz = Utils.classForName( "org.apache.spark.scheduler.cluster.YarnScheduler" )
val cons = clazz.getConstructor( classOf[SparkContext])
// yarn-client模式,生成一个YarnScheduler类型的scheduler实例
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException( "YARN mode not available ?" , e)
}
}
val backend = try {
val clazz =
Utils.classForName( "org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend" )
// yarn-client模式,生成一个YarnClientSchedulerBackend实例
val cons = clazz.getConstructor( classOf[TaskSchedulerImpl] , classOf [SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException( "YARN mode not available ?" , e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
}
从上面代码中可以看到“yarn-client”模式下,scheduler的类型是org.apache.spark.scheduler.cluster.YarnScheduler
,backend的类型是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
。
2、构建调度池
从上面代码中可以看到,在生成这两个对象后,接下来将backend传入scheduler的初始化方法中进行初始化,TaskSchedulerImpl.initialize
方法中主要是根据调度模式初始化调度池,关于Spark调度模式,已经在Spark调度模式-FIFO和FAIR中有详细的分析。
3、Application的提交
回到SparkContext,在TaskScheduler和SchedulerBackend的具体实例构建后,调用TaskSchedulerImpl.start
方法,启动taskScheduler。如果启动了speculative功能(即设置spark.speculation=true
)的话,会后台启动一个线程检查符合speculation条件的task。在TaskSchedulerImpl.start方法中,调用backend.start方法,启动schedulerBackend。在YarnClientSchedulerBackend.start方法中,根据配置参数生成一个Client类型的对象,然后调用Client.submitApplication
方法,提交该Application.
YarnClientSchedulerBackend#start方法
override def start() {
val driverHost = conf.get("spark.driver.host") // 获取driver的IP地址
val driverPort = conf.get("spark.driver.port") // 获取driver的端口号
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) } // 为对应UI地址绑定ui对象
val argsArrayBuf = new ArrayBuffer[String]() // 获取启动参数
argsArrayBuf += ("--arg", hostport)
argsArrayBuf ++= getExtraClientArguments
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors // 获取启动时指定的Executor个数
client = new Client(args, conf) // 生成driver端的client
appId = client.submitApplication() // 通过client提交application
super.start() // 最终调用了CoarseGrainedSchedulerBackend中的start方法
waitForApplication() // 等待Application开始运行
if (conf.contains("spark.yarn.credentials.file")) {
YarnSparkHadoopUtil.get.startExecutorDelegationTokenRenewer(conf)
}
// 启动一个监控线程,当SparkContext结束或者该Application在Yarn上运行结束,这个线程才会结束
monitorThread = asyncMonitorApplication()
monitorThread.start()
}
有关Application的提交与监控,有兴趣的话,可以从YarnClientSchedulerBackend#start
方法开始进一步的分析。
三、Task的提交
在Spark Scheduler模块源码分析之DAGScheduler中提到,DAGScheduler完成对Stage的划分后,会提交Stage。从这里开始,继续从上一篇文章最后的四.3 DAGScheduler.submitMissingTasks方法开始进行分析。
1、DAGScheduler#submitMissingTasks
这个方法比较长,这里只列举出源代码中的主要逻辑,搞清楚DAGScheduler提交Stage时如何开始对Task的提交的。
private def submitMissingTasks(stage: Stage, jobId: Int) {
//取得当前Stage需要计算的partition
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// 将当前stage存入running状态的stage列表中
runningStages += stage
// 判断当前stage是ShuffleMapStage还是ResultStage
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
...
// 向listenerBus提交StageSubmitted事件
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
...
// 根据stage的类型获取其中包含的task
val tasks: Seq[Task[_]] = try {
stage match {
// ShuffleMapStage中对应的是ShuffleMapTask
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators)
}
// ResultStage中对应的是ResultTask
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, stage.internalAccumulators)
}
}
}
...
if (tasks.size > 0) { // 如果当前Stege中有task
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
// 根据tasks生成TaskSet,然后通过TaskScheduler.submitTasks方法提交TaskSet
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else { // 如果当前Stege中不包含task
// 由于前面已经向listenerBus中提交了StageSubmitted事件,现在这个Stege中没有task运行
// 则正常流程时,该stage不会被标记为结束。那么需要手动指定该stege为finish状态。
markStageAsFinished(stage, None)
// log中的显示信息
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
}
}
这个方法的主要过程和逻辑都已经在源码注释中进行了分析,在提交Stage时,对于不同的ShuffleMapStage和ResultStage,有不同的处理逻辑。最终根据Stage对于的rdd partition生成tasks组,然后通过TaskScheduler.submitTasks
方法,将tasks生成的TaseSet进行提交。
这里面有一点需要注意的是,在Stage提交时,会向LiveListenerBus发送一个SparkListenerStageSubmitted事件,正常情况下,随着Stage中的task运行结束,会最终将Stage设置成完成状态。但是,对于空的Stage,不会有task运行,所以该Stage也就不会结束,需要在提交时手动将Stage的运行状态设置成Finished。
2、TaskSchedulerImpl#submitTasks
上一步中生成的TaskSet对象传入该方法中,那么首先看一下TaskSet的结构。在TaskSet中,有一个Task类型的数组包含当前Stage对应的Task,然后就是一些stageId,stageAttemptId,以及priority等信息。从前面代码中可以看到,这里传入的优先级是jobId,越早提交的job越优先运行。
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
}
接下来进入TaskScheduler.submitTasks
方法,主要的过程都在注释中。这里需要再分析一些TaskSetManager的zombie状态。isZombie的默认值为false,进入true状态有两种情况:TaskSetManager中的tasks都执行成功了,或者这些tasks不再需要执行(比如当前stage被cancel)。之所以在tasks都执行成功后将该TaskSetManager设置为zombie状态而不是直接清除该对象,是为了从TaseSetManager中获取task的运行状况信息。
那么对于isZombie为false的TaseSetManager,即表示其中的tasks仍然需要执行,如果对于当前stage,有一个taskSet正在执行,并且此次提交的taskSet和正在执行的那个不是同一个,那么就会出现同一个Stage执行两个不同的TaskSet的状况,这是不允许的。
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 生成一个TaskSetManager类型对象,
// task最大重试次数,由参数spark.task.maxFailures设置,默认为4
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
// key为stageId,value为一个HashMap,这个HashMap中的key为stageAttemptId,value为TaskSetManager对象
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
// 如果当前这个stageId对应的HashMap[Int, TaskSetManager]中存在某个taskSet
// 使得当前的taskSet和这个taskSet不是同一个,并且当前这个TaskSetManager不是zombie进程
// 即对于同一个stageId,如果当前这个TaskSetManager不是zombie进程,即其中的tasks需要运行,
// 并且对当前stageId,有两个不同的taskSet在运行
// 那么就应该抛出异常,确保同一个Stage在正常运行情况下不能有两个taskSet在运行
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
// 根据调度模式生成FIFOSchedulableBuilder或者FairSchedulableBuilder,将当前的TaskSetManager提交到调度池中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// 向schedulerBackend申请资源
backend.reviveOffers()
}
schedulableBuilder.addTaskSetManager
这部分是向调度池提交任务的,可以参考Spark调度模式-FIFO和FAIR。
3、CoarseGrainedSchedulerBackend#reviveOffers
这个方法比较简单,只有一行代码
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
4、NettyRpcEnv#send
在NettyRpcEnv中有几个重载的send方法,最终进入的下面这个。由于该方法的逻辑比较简单,下面直接列出debug时该方法中各变量的内容。其中192.168.1.88是spark客户端,即提交Spark应用的机器,通过192.168.1.88:42060端口发送一个ReviveOffers的message。
接下来代码会进入下一节中,执行CoarseGrainedSchedulerBackend.receive方法。
这一段主要会在第四节Task的运行和运行结果的处理中用到。在NettyRpcEnv中有一个重要的变量dispatcher,其类型为org.apache.spark.rpc.netty.Dispatcher,在Dispatcher中会生成一个ThreadPoolExecutor类型的threadpool变量。这个threadpool变量会在后续的Executor模块中进一步分析。
5、CoarseGrainedSchedulerBackend#receive
通过netty发送一个请求资源的消息后,CoarseGrainedSchedulerBackend的receive方法则会接收分配到的资源。
在该方法中,由于接收到的是ReviveOffers,会调用makeOffers方法开始生成资源。
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
...
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
...
}
6、CoarseGrainedSchedulerBackend#makeOffers
在本次debug过程中,只启动了两个Executor,分别在m001和m002机器上。从下图中可以看出,workOffers中获取到了两个Executor,executorId分别为1和2。
接下来会执行launchTasks方法,该方法接收的参数是schedulerBackend分配的计算资源,有关计算资源的分配,在后续的7-8-9-10小节中深入分析,关于launchTasks方法,会在第四节-Task的运行和运行结果的处理进一步分析。
7、TaskSchedulerImpl#resourceOffers
资源生成好封装成WorkerOffer类型的队列后,就开始进入SchedulerBackend中,由SchedulerBackend分配这些资源。
/**
* 由cluster manager来调用,为task分配节点上的资源。
* 根据优先级为task分配资源,
* 采用round-robin方式使task均匀分布到集群的各个节点上。
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
var newExecAvail = false
...
// 为避免多个Task集中分配到某些机器上,对这些Task进行随机打散.
val shuffledOffers = Random.shuffle(offers)
// .
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) //存储分配好资源的task
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 从调度池中获取排好序的TaskSetManager,由调度池确定TaskSet的执行顺序
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) { // 按顺序取出各taskSet
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) { // 如果该executor是新分配来的
taskSet.executorAdded() // 重新计算TaskSetManager的就近原则
}
}
// 为从rootPool中获得的TaskSetManager列表分配资源。就近顺序是:
// PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
// 对每一个taskSet,按照就近顺序分配最近的executor来执行task
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
// 将前面随机打散的WorkOffers计算资源按照就近原则分配给taskSet,用于执行其中的task
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
8、TaskSchedulerImpl#resourceOfferSingleTaskSet
这个方法主要是在分配的executor资源上,执行taskSet中包含的所有task。首先遍历分配到的executor,如果当前executor中的cores个数满足配置的单个task需要的core数要求(该core数由参数spark.task.cpus
确定,默认值为1),才能在该executor上启动task。
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) { // 顺序遍历当前存在的Executor
val execId = shuffledOffers(i).executorId // Executor ID
val host = shuffledOffers(i).host // Executor所在的host
if (availableCpus(i) >= CPUS_PER_TASK) { // 如果当前executor上的core数满足配置的单个task的core数要求
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // 为当前stage分配一个executor
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet // 存储该task->taskSet的映射关系
taskIdToExecutorId(tid) = execId // 存储该task分配到的executorId
executorIdToTaskCount(execId) += 1 // 该executor上执行的task个数加一
executorsByHost(host) += execId // 存储host->executorId的映射关系
availableCpus(i) -= CPUS_PER_TASK // 该Executor可用core数减一
assert(availableCpus(i) >= 0) // 如果启动task后,该executor上的core数大于等于0,才算正常启动。
launchedTask = true
}
} catch {
...
}
}
}
return launchedTask
}
9、TaskSetManager#resourceOffer
上面方法中,将executor分配给taskSet之后在这个方法中,最终启动一个个task,并未启动的task生成task描述对象TaskDescription。isZombie的值在上面已经讲过了,如果为false,则表示当前TaskSetManager中的task没有全部执行成功,可以继续执行剩余task。
每次分配一个executor给TaskSetManager,然后TaskSetManager从未执行的task队列中取出一个task分配到该executor上执行。
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
...
// 顺序取出TaskSetManager中未执行的task
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
val task = tasks(index) // 取出一个task
val taskId = sched.newTaskId() // 分配一个新的taskId
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
// 生成taskInfo对象存储当前task的相关信息
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info // 记录taskId->taskInfo映射关系
taskAttempts(index) = info :: taskAttempts(index)
...
// 序列化并返回task
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
...
}
// 如果序列化后的task大小超过100KB时,直接抛出异常
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")
// 为该task准备好执行环境后,开始执行task
sched.dagScheduler.taskStarted(task, info)
// 返回一个TaskDescription信息
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}
10、DAGScheduler#taskStarted
为task准备好执行环境和资源后,在TaskScheduler和SchedulerBackend中绕了一大圈后又再次回到了DAGScheduler中,调用taskStarted方法,最终将该task运行起来。
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
}
在这个方法中通过eventProcessLoop的post方法提交一个BeginEvent事件到DAGSchedulerEventProcessLoop中。有关DAGSchedulerEventProcessLoop,在Spark Scheduler模块源码分析之DAGScheduler中已经分析过了。
四、Task的运行和运行结果的处理
这一部分主要分析Task运行起来后的过程,以及运行结束后的结果如何处理。第三节中讲到,一个Stage中包含一组Tasks,这些Task在分配的Executor上执行。
在三、6-CoarseGrainedSchedulerBackend#makeOffers的最后,调用了CoarseGrainedSchedulerBackend#launchTasks方法。
1、CoarseGrainedSchedulerBackend#launchTasks
在该方法中,会将当前task在driver上序列化后发送到executor上运行。序列化后的task大小,如果超过128MB-200KB,当前task不能执行,并且把task对应的taskSetManager设置成zombie模式,因此其中剩余的task都不再执行。如果不超过该限制,则会把task分发到executor上。
序列化后的Task发送到Executor上执行,是通过Akka来进行的。其实上面那个限制大小也是Akka决定的。在AkkaUtils类中可以看到,这个限制由两个参数来确定spark.akka.frameSize,默认值为128
,经过处理后,最终计算成128MB,表示Akka最大能传递的消息大小。除了用于发送序列化后的task数据之外,Akka本身会使用到一部分空间存储一些额外的数据,这一部分的大小为200KB。所以在默认情况下,对Akka来说,一个消息最大能传递的数据大小为128MB - 200KB。这两个参数在后面Executor中也会用到。
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
// 序列化当前task
val serializedTask = ser.serialize(task)
// 如果当前task序列化后的大小超过了128MB-200KB,跳过当前task,并把对应的taskSetManager置为zombie模式
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else { // 序列化后的task大小不超过限制时,将当前task发送到Executor上执行。
// 获取当前task分配到的executor相关信息
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) //将tasks发送到Executor
}
}
}
根据当前task所分配的executor信息,将该executor可用core减去配置的CPUS_PER_TASK,然后调用send方法发送一个LaunchTask消息。
2、CoarseGrainedExecutorBackend#receive
Driver和Executor之间仍然使用Netty进行通信,在Driver端send一个LaunchTask的消息后,在Executor端会对应有一个receive方法接收消息。在下面代码中,可以看到Executor除了响应LaunchTask之外还能处理的其他事件。这些事件都继承自CoarseGrainedClusterMessage类。
override def receive: PartialFunction[Any, Unit] = {
// 注册Executor消息
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
// 启动Task事件
case LaunchTask(data) =>
// 如果分配的executor为空,即当前task无分配的executor,则直接退出
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
// 否则,反序列化Akka从driver端传输过来的task数据
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
// executor上启动task
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
// 杀死Task事件
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
System.exit(1)
} else {
executor.killTask(taskId, interruptThread)
}
// 停止Executor事件
case StopExecutor =>
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)
// 终止事件
case Shutdown =>
executor.stop()
stop()
rpcEnv.shutdown()
}
3、Executor#launchTask
略。。
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
4、Executor#run
这个方法是Executor执行task的主要方法。Task在Executor中执行完成后,会通过向Driver发送StatusUpdate的消息来通知Driver任务的状态更新为TaskState.FINISHED。
在Executor运行Task时,得到计算结果会存入org.apache.spark.scheduler.DirectTaskResult。在将结果传回Driver时,会根据结果的大小有不同的策略:对于较大的结果,将其以taskId为key存入org.apache.storage.BlockManager,如果结果不大,则直接回传给Driver。回传是通过AKKA来实现的,所以能够回传的值会有一个由AKKA限制的大小,这里涉及到一个参数spark.akka.frameSize,默认为128,单位为Byte
,在源码中最终转换成了128MB。表示AKKA最大能传递的消息大小为128MB,但是同时AKKA会保留一部分空间用于存储其他数据,这部分的大小为200KB,那么结果如果小于128MB - 200KB的话就可以直接返回该值,否则的话,在不大于1G的情况下(可以通过参数spark.driver.maxResultSize
来修改,默认为1g),会通过BlockManager来传递。详细信息会在Executor模块中描述。完整情况如下:
(1)如果结果大于1G,直接丢弃
(2)如果结果小于等于1G,大于128MB - 200KB,通过BlockManager记录结果的tid和其他信息
(3)如果结果小于128MB - 200 KB,直接返回该值
override def run(): Unit = {
...
logInfo(s"Running $taskName (TID $taskId)")
// 开始执行Task,
// yarn-client模式下,调用CoarseGrainedExecutorBackend的statusUpdate方法
// 将该Task的运行状态置为RUNNING
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
...
try {
// 反序列化task,获取该Task需要使用到的file和jar文件
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.setTaskMemoryManager(taskMemoryManager)
// 判断如果该task被kill了,直接抛出异常
if (killed) {
throw new TaskKilledException
}
...
var threwException = true
val (value, accumUpdates) = try {
// 调用Task.run方法,开始运行task
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
...
}
...
// 生成DirectTaskResult对象,并序列化Task的运行结果
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
val serializedResult: ByteBuffer = {
// 如果序列化后的结果比spark.driver.maxResultSize配置的还大,直接丢弃该结果
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
}
// 如果序列化后的结果小于上面的配置,而大于spark.akka.frameSize - 200KB
// 结果通过BlockManager回传
else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
}
// 如果结果小于spark.akka.frameSize - 200KB,则可通过AKKA直接返回Task的该执行结果
else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
// 更新当前Task的状态为finished
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case ffe: FetchFailedException =>
...
case _: TaskKilledException | _: InterruptedException if task.killed =>
...
case cDE: CommitDeniedException =>
...
case t: Throwable =>
...
} finally {
// 总runnint状态的task列表中将该task移除
runningTasks.remove(taskId)
}
}
}
5、CoarseGrainedExecutorBackend#statusUpdate
上面代码中,在Task开始时以及Task处理完毕后都会通过本方法更新Task的状态。这里的msg是StatusUpdate类型,通过调用driver的send方法将该msg发出。
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
6、CoarseGrainedSchedulerBackend#receive
这个方法最终获取到上面发出的msg,这个方法在三-5、CoarseGrainedSchedulerBackend#receive中已经分析过,当时是接收请求资源的消息。这里继续分析接收到StatusUpdate后的逻辑。
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
// 调用TaskSchedulerImpl.statusUpdate方法处理task状态变更消息
scheduler.statusUpdate(taskId, state, data.value)
...
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
...
}
7、TaskSchedulerImpl#statusUpdate
在这个方法中,主要分析完成状态和失败状态的Task后续处理流程的入口。
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
synchronized {
try {
...
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
// 如果Task的状态为FINISHED或FAILED或KILLED或LOST,该task就被认为执行结束
// 将运行完成的Task从相关记录器中移除
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
}
}
// 任务成功完成
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid) // TaskSetManager标记该任务已经结束
// 成功Task后续处理逻辑入口
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
}
// 如果TASK状态为FAILED或KILLED或LOST
else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
// 处理失败任务,加入到失败任务处理队列中
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
case None =>
...
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// 如果当前Executor lost,则通知DAGScheduler该Executor已经lost。
// SchedulerBackend申请新的Executor
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
7.1、成功执行的Task
7.2、执行失败的Task