一、原理图解
二、源码分析
1、Executor注册机制
worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程; Executor注册机制: ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala /** * 在actor的初始化方法中 */ override def preStart() { logInfo("Connecting to driver: " + driverUrl) // 获取了driver的executor driver = context.actorSelection(driverUrl) // 向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类 // driver注册executor成功之后,会发送回来RegisteredExecutor消息 driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } ###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala override def receiveWithLogging = { // driver注册executor成功之后,会发送回来RegisteredExecutor消息 // 此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄 // 其实它的大部分功能,都是通过Executor实现的 case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
3、启动Task
###org.apache.spark.executor/CoarseGrainedExecutorBackend.scala // 启动task case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { // 反序列化task val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) // 用内部的执行句柄,Executor的launchTask()方法来启动一个task executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) } ###org.apache.spark.executor/Executor.scala def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) { // 对于每一个task,都会创建一个TaskRunner // TaskRunner继承的是Java多线程中的Runnable接口 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) // 将TaskRunner放入内存缓存 runningTasks.put(taskId, tr) // Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行 // 线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的 threadPool.execute(tr) }