在 spark 源码分析之二 -- SparkContext 的初始化过程 中,第 14 步 和 16 步分别描述了 TaskScheduler的 初始化 和 启动过程。
话分两头,先说 TaskScheduler的初始化过程
TaskScheduler的实例化
1 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
其调用了org.apache.spark.SparkContext#createTaskScheduler , 源码如下:
1 /** 2 * Create a task scheduler based on a given master URL. 3 * Return a 2-tuple of the scheduler backend and the task scheduler. 4 */ 5 private def createTaskScheduler( 6 sc: SparkContext, 7 master: String, 8 deployMode: String): (SchedulerBackend, TaskScheduler) = { 9 import SparkMasterRegex._ 10 11 // When running locally, don't try to re-execute tasks on failure. 12 val MAX_LOCAL_TASK_FAILURES = 1 13 14 master match { 15 case "local" => 16 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 17 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) 18 scheduler.initialize(backend) 19 (backend, scheduler) 20 21 case LOCAL_N_REGEX(threads) => 22 def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 23 // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. 24 val threadCount = if (threads == "*") localCpuCount else threads.toInt 25 if (threadCount <= 0) { 26 throw new SparkException(s"Asked to run locally with $threadCount threads") 27 } 28 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 29 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 30 scheduler.initialize(backend) 31 (backend, scheduler) 32 33 case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => 34 def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 35 // local[*, M] means the number of cores on the computer with M failures 36 // local[N, M] means exactly N threads with M failures 37 val threadCount = if (threads == "*") localCpuCount else threads.toInt 38 val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) 39 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 40 scheduler.initialize(backend) 41 (backend, scheduler) 42 43 case SPARK_REGEX(sparkUrl) => 44 val scheduler = new TaskSchedulerImpl(sc) 45 val masterUrls = sparkUrl.split(",").map("spark://" + _) 46 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 47 scheduler.initialize(backend) 48 (backend, scheduler) 49 50 case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => 51 // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. 52 val memoryPerSlaveInt = memoryPerSlave.toInt 53 if (sc.executorMemory > memoryPerSlaveInt) { 54 throw new SparkException( 55 "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( 56 memoryPerSlaveInt, sc.executorMemory)) 57 } 58 59 val scheduler = new TaskSchedulerImpl(sc) 60 val localCluster = new LocalSparkCluster( 61 numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) 62 val masterUrls = localCluster.start() 63 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 64 scheduler.initialize(backend) 65 backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { 66 localCluster.stop() 67 } 68 (backend, scheduler) 69 70 case masterUrl => 71 val cm = getClusterManager(masterUrl) match { 72 case Some(clusterMgr) => clusterMgr 73 case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 74 } 75 try { 76 val scheduler = cm.createTaskScheduler(sc, masterUrl) 77 val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 78 cm.initialize(scheduler, backend) 79 (backend, scheduler) 80 } catch { 81 case se: SparkException => throw se 82 case NonFatal(e) => 83 throw new SparkException("External scheduler cannot be instantiated", e) 84 } 85 } 86 }
不同的实现如下:
实例化部分剖析完毕,下半部分重点剖析yarn-client mode 下 TaskScheduler 的启动过程
yarn-client模式TaskScheduler 启动过程
初始化调度池
yarn-client 模式下,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend
在org.apache.spark.SparkContext#createTaskScheduler 方法中,有如下调用:
1 case masterUrl => 2 val cm = getClusterManager(masterUrl) match { 3 case Some(clusterMgr) => clusterMgr 4 case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 5 } 6 try { 7 val scheduler = cm.createTaskScheduler(sc, masterUrl) 8 val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 9 cm.initialize(scheduler, backend) 10 (backend, scheduler) 11 } catch { 12 case se: SparkException => throw se 13 case NonFatal(e) => 14 throw new SparkException("External scheduler cannot be instantiated", e) 15 }
其中的,cm.initialize(scheduler, backend)中的cm 是org.apache.spark.scheduler.cluster.YarnClusterManager,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend。YarnClusterManager 的 initialize 方法实现如下:
1 override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { 2 scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) 3 }
其并没有实现 initialize, 父类TaskSchedulerImpl 的实现如下:
1 def initialize(backend: SchedulerBackend) { 2 this.backend = backend 3 schedulableBuilder = { 4 schedulingMode match { 5 case SchedulingMode.FIFO => 6 new FIFOSchedulableBuilder(rootPool) 7 case SchedulingMode.FAIR => 8 new FairSchedulableBuilder(rootPool, conf) 9 case _ => 10 throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + 11 s"$schedulingMode") 12 } 13 } 14 schedulableBuilder.buildPools() 15 }
可以看出,其重要作用就是设置 TaskScheduler 的 TaskSchedulerBackend 引用。
调度模式主要有FIFO和FAIR两种模式。默认是FIFO模式,可以使用spark.scheduler.mode 参数来设定。使用建造者模式来创建 Pool 对象。
其中,org.apache.spark.scheduler.FIFOSchedulableBuilder#buildPools是一个空实现,即没有做任何的操作;而 org.apache.spark.scheduler.FairSchedulableBuilder#buildPools会加载 相应调度分配策略文件;策略文件可以使用 spark.scheduler.allocation.file 参数来设定,如果没有设定会进一步加载默认的 fairscheduler.xml 文件,如果还没有,则不加载。如果有调度池的配置,则根据配置配置调度pool并将其加入到 root 池中。最后初始化 default 池并将其加入到 root 池中。
在HeartBeatReceiver 中设定 taskscheduler 变量
1 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
首先,_heartbeatReceiver 是一个 RpcEndPointRef 对象,其请求最终会被 HeartbeatReceiver(Endpoint)接收并处理。即org.apache.spark.HeartbeatReceiver#receiveAndReply方法:
1 case TaskSchedulerIsSet => 2 scheduler = sc.taskScheduler 3 context.reply(true)
具体的关于RPC的相关解释,会在后面有专门的文章篇幅介绍。在这里就不做过多解释。 // TODO
启动TaskScheduler
org.apache.spark.SparkContext 的初始化方法有如下代码启动 TaskScheduler:
1 _taskScheduler.start()
yarn-client模式下,运行中调用了 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法,它沿用了父类 TaskSchedulerImpl 的实现:
1 override def start() { 2 // 1. 启动 task scheduler backend 3 backend.start() 4 // 2. 设定 speculationScheduler 定时任务 5 if (!isLocal && conf.getBoolean("spark.speculation", false)) { 6 logInfo("Starting speculative execution thread") 7 speculationScheduler.scheduleWithFixedDelay(new Runnable { 8 override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 9 checkSpeculatableTasks() 10 } 11 }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) 12 } 13 }
第1步:task scheduler backend 的启动:org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend#start的方法如下:
1 /** 2 * Create a Yarn client to submit an application to the ResourceManager. 3 * This waits until the application is running. 4 */ 5 override def start() { 6 // 1. 获取driver 的 host 和 port 7 val driverHost = conf.get("spark.driver.host") 8 val driverPort = conf.get("spark.driver.port") 9 val hostport = driverHost + ":" + driverPort 10 // 2. 设定 driver 的 web UI 地址 11 sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } 12 13 val argsArrayBuf = new ArrayBuffer[String]() 14 argsArrayBuf += ("--arg", hostport) 15 16 logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) 17 val args = new ClientArguments(argsArrayBuf.toArray) 18 totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) 19 // 3. 启动 deploy client,并切初始化 driverClient 的 Rpc environment,并在该RPC 环境中初始化master 和 driver 的rpc endpoint 20 client = new Client(args, conf) 21 // 4. 将 application id 绑定到 yarn 上 22 bindToYarn(client.submitApplication(), None) 23 24 // SPARK-8687: Ensure all necessary properties have already been set before 25 // we initialize our driver scheduler backend, which serves these properties 26 // to the executors 27 super.start() 28 // 5. 检查 yarn application的状态,不能为 kill, finished等等 29 waitForApplication() 30 // 6. 监控线程 31 monitorThread = asyncMonitorApplication() 32 monitorThread.start() 33 }
重点解释一下第三步,涉及的源码步如下:
1 object Client { 2 def main(args: Array[String]) { 3 // scalastyle:off println 4 if (!sys.props.contains("SPARK_SUBMIT")) { 5 println("WARNING: This client is deprecated and will be removed in a future version of Spark") 6 println("Use ./bin/spark-submit with "--master spark://host:port"") 7 } 8 // scalastyle:on println 9 new ClientApp().start(args, new SparkConf()) 10 } 11 } 12 13 private[spark] class ClientApp extends SparkApplication { 14 15 override def start(args: Array[String], conf: SparkConf): Unit = { 16 val driverArgs = new ClientArguments(args) 17 18 if (!conf.contains("spark.rpc.askTimeout")) { 19 conf.set("spark.rpc.askTimeout", "10s") 20 } 21 Logger.getRootLogger.setLevel(driverArgs.logLevel) 22 23 val rpcEnv = 24 RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) 25 26 val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). 27 map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) 28 rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) 29 30 rpcEnv.awaitTermination() 31 } 32 33 }
可以看到,在Client 的main方法中,初始化了ClientApp 对象,并调用了其 start 方法,在start 方法中, 首先解析了 driver的 参数。然后创建了 driver 端的 RPC environment,然后 根据解析的 master 的信息,初始化 master 的endpointref,并且建立了 client endpoint 并返回 client endpoint ref。
定时执行推测任务
下面继续看 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法 的 第二步方法,首先 spark 推测任务 feature 默认是关闭的,原因如果有很多任务都延迟了,那么它会再启动一个相同的任务,这样可能会消耗掉所有的资源,对集群资源和提交到集群上的任务造成不可控的影响。启动了一个延迟定时器,定时地执行 checkSpeculatableTasks 方法,如下:
1 // Check for speculatable tasks in all our active jobs. 2 def checkSpeculatableTasks() { 3 var shouldRevive = false 4 synchronized { 5 shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) // 1. 推测是否应该跑一个新任务 6 } 7 if (shouldRevive) { 8 backend.reviveOffers() // 2. 跑一个新任务 9 } 10 }
其中,第一步推断任务,有两个实现一个是Pool 的实现,一个是TaskSetManager 的实现,Pool 会递归调用子Pool来获取 speculatable tasks。如果需要推测,则运行task scheduler backend 的 reviveOffers方法,大致思路如下,首先获取 executor 上的空闲资源,然后将这些资源分配给 推测的 task,供其使用。
总结,本篇源码剖析了在Spark Context 启动过程中, 以 yarn-client 模式为例,剖析了task scheduler 是如何启动的。
其中关于RpcEnv的介绍直接略过了,下一篇会专门讲解Spark 中内置的Rpc 机制的整体架构以及其是如何运行的。