在Spark消息通信原理(三)(https://www.cnblogs.com/SysoCjs/p/11355900.html)中第(6)点提到过,Executor是任务执行的容器,executor接收到LaunchTask消息之后(其实是GoraseGrainedExecutorBackend接收到来自DriverEndpoint的LaunchTask消息后,调用Executor的launchTasks方法),就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。
TaskRunner有一个run方法,方法里主要做的是:对发送过来的Task本身(ShuffleMapTask和ResultTask),以及它所依赖的jar等文件进行反序列,然后对反序列后的Task交给Task对象的run方法。由于Task是一个抽象类,具体实现交给两个子类ShuffleMapTask和ResultTask。
TaskRunner.run方法的部分源码:
override def run(): Unit = {
//生成内存管理实例——taskMemoryManager,用于任务运行期间内存的管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInastance()
//向Driver终端发送任务运行开始消息,通知Driver对状态进行更新
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = computeTotalGcTime()
try{
//对任务运行时所需要的文件、jar、代码等进行反序列
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.setTaskMemoryManager(taskMemoryManager)
//如果任务在反序列之前被kill掉,则抛出异常
if(killed){
throw new TaskKilledException
}
env.mapOutputTracker.updateEpoch(task.epoch)
//调用Task的runTask方法,由于Task是一个抽象类,所以具体实现交给它的子类——ShuffleMapTask和ResultTask
taskStart = System.currentTimeMillis()
var threwException = true
val value = try{
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
res
}finally{...}
...
}
}
不同的Task实体类,它们处理计算的结果的方式上会有所差别。
对于ShuffleMapTask,计算结果会写到BlockManager之中,最终返回给DAGScheduler的是一个MapStatus对象,该对象管理了ShuffleMapTask的相关存储信息,这些存储信息并不是计算结果本身,而是运算结果到BlockManager的相关联系,这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。ShuffleMapTask.runTask部分源码:
override def runTask(context: TaskContext):MapStatus = {
val deserializeStartTime = System.currentTimeMills()
//反序列化获取RDD和RDD的依赖
val ser = SparkEnv.get.closureSerializer.newInastance()
val (rdd, dep) = ser.derialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer:ShuffleWriter[Any, Any] = null
try{
val manager = SparkEnv.get.ShuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//首先调用rdd.iterator,如果RDD已经Cache或者Checkpoint,则直接读取结果;否则计算。
writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_<:Product2[Any, Any]]])
//关闭writer,返回结果,包含数据的location和size等元数据信息
writer.stop(success = true).get
}catch{...}
}
对于ResultTask,它的计算结果以func函数的形式返回。ResultTask.runTask部分源码:
override def runTask(context:TaskContext):U = {
//反序列化广播变量,得到RDD
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T]. (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserialiZeStartTime
metrics = Some(context.taskMetrics)
//返回
func(context, rdd.iterator(partition, context))
}