ExecutorBackend
很简单的接口
package org.apache.spark.executor /** * A pluggable interface used by the Executor to send updates to the cluster scheduler. */ private[spark] trait ExecutorBackend { def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) }
StandaloneExecutorBackend
维护executor, 并负责注册executor以及executor和driver之间的通信
private[spark] class StandaloneExecutorBackend( driverUrl: String, executorId: String, hostPort: String, cores: Int) extends Actor with ExecutorBackend with Logging { var executor: Executor = null var driver: ActorRef = null override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorFor(driverUrl) // 创建driver actor ref, 以便于和driver通信 driver ! RegisterExecutor(executorId, hostPort, cores) // 向driver注册executor } override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties) // 当注册成功后, 创建Executor case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) if (executor == null) { logError("Received launchTask but executor was null") System.exit(1) } else { executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) // 调用executor.launchTask,启动task } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) // 当task状态变化时, 报告给driver actor } }
Executor
对于Executor, 维护一个threadPool, 可以run多个task, 取决于core的个数
所以对于launchTask, 就是在threadPool中挑个thread去run TaskRunner
private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
// Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env)
// Start worker thread pool val threadPool = new ThreadPoolExecutor( 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) }
}
TaskRunner
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { override def run() { try { SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化 updateDependencies(taskFiles, taskJars) val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 反序列化 attemptedTask = Some(task) logInfo("Its epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) // 调用task.run执行真正的逻辑 val taskFinish = System.currentTimeMillis()
val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) // 生成TaskResult val serializedResult = ser.serialize(result) // 将TaskResult序列化 logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 将任务完成和taskresult,通过statusUpdate报告给driver logInfo("Finished task ID " + taskId) } catch { // 处理各种fail, 同样也要用statusUpdate event通知driver case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) } case t: Throwable => { val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { m.executorRunTime = serviceTime m.jvmGCTime = getTotalGCTime - startGCTime } val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on // the other hand, maybe we could detect that when future tasks fail and exit then. logError("Exception in task ID " + taskId, t) //System.exit(1) } } } }