zoukankan      html  css  js  c++  java
  • spark 源码分析之四 -- TaskScheduler的创建和启动过程

    在 spark 源码分析之二 -- SparkContext 的初始化过程 中,第 14 步 和 16 步分别描述了 TaskScheduler的 初始化 和 启动过程。

    话分两头,先说 TaskScheduler的初始化过程

    TaskScheduler的实例化

    1 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

    其调用了org.apache.spark.SparkContext#createTaskScheduler , 源码如下:

     1 /**
     2    * Create a task scheduler based on a given master URL.
     3    * Return a 2-tuple of the scheduler backend and the task scheduler.
     4    */
     5   private def createTaskScheduler(
     6       sc: SparkContext,
     7       master: String,
     8       deployMode: String): (SchedulerBackend, TaskScheduler) = {
     9     import SparkMasterRegex._
    10 
    11     // When running locally, don't try to re-execute tasks on failure.
    12     val MAX_LOCAL_TASK_FAILURES = 1
    13 
    14     master match {
    15       case "local" =>
    16         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    17         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
    18         scheduler.initialize(backend)
    19         (backend, scheduler)
    20 
    21       case LOCAL_N_REGEX(threads) =>
    22         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    23         // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
    24         val threadCount = if (threads == "*") localCpuCount else threads.toInt
    25         if (threadCount <= 0) {
    26           throw new SparkException(s"Asked to run locally with $threadCount threads")
    27         }
    28         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    29         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
    30         scheduler.initialize(backend)
    31         (backend, scheduler)
    32 
    33       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
    34         def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    35         // local[*, M] means the number of cores on the computer with M failures
    36         // local[N, M] means exactly N threads with M failures
    37         val threadCount = if (threads == "*") localCpuCount else threads.toInt
    38         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
    39         val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
    40         scheduler.initialize(backend)
    41         (backend, scheduler)
    42 
    43       case SPARK_REGEX(sparkUrl) =>
    44         val scheduler = new TaskSchedulerImpl(sc)
    45         val masterUrls = sparkUrl.split(",").map("spark://" + _)
    46         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
    47         scheduler.initialize(backend)
    48         (backend, scheduler)
    49 
    50       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
    51         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
    52         val memoryPerSlaveInt = memoryPerSlave.toInt
    53         if (sc.executorMemory > memoryPerSlaveInt) {
    54           throw new SparkException(
    55             "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
    56               memoryPerSlaveInt, sc.executorMemory))
    57         }
    58 
    59         val scheduler = new TaskSchedulerImpl(sc)
    60         val localCluster = new LocalSparkCluster(
    61           numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
    62         val masterUrls = localCluster.start()
    63         val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
    64         scheduler.initialize(backend)
    65         backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
    66           localCluster.stop()
    67         }
    68         (backend, scheduler)
    69 
    70       case masterUrl =>
    71         val cm = getClusterManager(masterUrl) match {
    72           case Some(clusterMgr) => clusterMgr
    73           case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
    74         }
    75         try {
    76           val scheduler = cm.createTaskScheduler(sc, masterUrl)
    77           val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
    78           cm.initialize(scheduler, backend)
    79           (backend, scheduler)
    80         } catch {
    81           case se: SparkException => throw se
    82           case NonFatal(e) =>
    83             throw new SparkException("External scheduler cannot be instantiated", e)
    84         }
    85     }
    86   }

    不同的实现如下:

      

     实例化部分剖析完毕,下半部分重点剖析yarn-client mode 下 TaskScheduler 的启动过程

    yarn-client模式TaskScheduler 启动过程

    初始化调度池

    yarn-client 模式下,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend

    在org.apache.spark.SparkContext#createTaskScheduler 方法中,有如下调用:

     1 case masterUrl =>
     2         val cm = getClusterManager(masterUrl) match {
     3           case Some(clusterMgr) => clusterMgr
     4           case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
     5         }
     6         try {
     7           val scheduler = cm.createTaskScheduler(sc, masterUrl)
     8           val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
     9           cm.initialize(scheduler, backend)
    10           (backend, scheduler)
    11         } catch {
    12           case se: SparkException => throw se
    13           case NonFatal(e) =>
    14             throw new SparkException("External scheduler cannot be instantiated", e)
    15         }

     其中的,cm.initialize(scheduler, backend)中的cm 是org.apache.spark.scheduler.cluster.YarnClusterManager,TaskScheduler的实现是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的实现是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend。YarnClusterManager 的 initialize 方法实现如下:

    1   override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    2     scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
    3   }

    其并没有实现 initialize, 父类TaskSchedulerImpl 的实现如下:

     1 def initialize(backend: SchedulerBackend) {
     2     this.backend = backend
     3     schedulableBuilder = {
     4       schedulingMode match {
     5         case SchedulingMode.FIFO =>
     6           new FIFOSchedulableBuilder(rootPool)
     7         case SchedulingMode.FAIR =>
     8           new FairSchedulableBuilder(rootPool, conf)
     9         case _ =>
    10           throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
    11           s"$schedulingMode")
    12       }
    13     }
    14     schedulableBuilder.buildPools()
    15   }

     可以看出,其重要作用就是设置 TaskScheduler 的 TaskSchedulerBackend 引用。

    调度模式主要有FIFO和FAIR两种模式。默认是FIFO模式,可以使用spark.scheduler.mode 参数来设定。使用建造者模式来创建 Pool 对象。

    其中,org.apache.spark.scheduler.FIFOSchedulableBuilder#buildPools是一个空实现,即没有做任何的操作;而 org.apache.spark.scheduler.FairSchedulableBuilder#buildPools会加载 相应调度分配策略文件;策略文件可以使用 spark.scheduler.allocation.file 参数来设定,如果没有设定会进一步加载默认的 fairscheduler.xml 文件,如果还没有,则不加载。如果有调度池的配置,则根据配置配置调度pool并将其加入到 root 池中。最后初始化 default 池并将其加入到 root 池中。

    在HeartBeatReceiver 中设定 taskscheduler 变量

    1 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

     首先,_heartbeatReceiver 是一个 RpcEndPointRef 对象,其请求最终会被 HeartbeatReceiver(Endpoint)接收并处理。即org.apache.spark.HeartbeatReceiver#receiveAndReply方法:

      

    1 case TaskSchedulerIsSet =>
    2       scheduler = sc.taskScheduler
    3       context.reply(true)

    具体的关于RPC的相关解释,会在后面有专门的文章篇幅介绍。在这里就不做过多解释。 // TODO

    启动TaskScheduler

    org.apache.spark.SparkContext 的初始化方法有如下代码启动 TaskScheduler:

    1 _taskScheduler.start()

     yarn-client模式下,运行中调用了 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法,它沿用了父类 TaskSchedulerImpl 的实现:

     1 override def start() {
     2     // 1. 启动 task scheduler backend
     3     backend.start()
     4     // 2. 设定 speculationScheduler 定时任务
     5     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
     6       logInfo("Starting speculative execution thread")
     7       speculationScheduler.scheduleWithFixedDelay(new Runnable {
     8         override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
     9           checkSpeculatableTasks()
    10         }
    11       }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    12     }
    13   }

    第1步:task scheduler backend 的启动:org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend#start的方法如下:

     1 /**
     2    * Create a Yarn client to submit an application to the ResourceManager.
     3    * This waits until the application is running.
     4    */
     5   override def start() {
     6     // 1. 获取driver 的 host 和 port
     7     val driverHost = conf.get("spark.driver.host")
     8     val driverPort = conf.get("spark.driver.port")
     9     val hostport = driverHost + ":" + driverPort
    10     // 2. 设定 driver 的 web UI 地址
    11     sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
    12 
    13     val argsArrayBuf = new ArrayBuffer[String]()
    14     argsArrayBuf += ("--arg", hostport)
    15 
    16     logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
    17     val args = new ClientArguments(argsArrayBuf.toArray)
    18     totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
    19     // 3. 启动 deploy client,并切初始化 driverClient 的 Rpc environment,并在该RPC 环境中初始化master 和 driver 的rpc endpoint
    20     client = new Client(args, conf)
    21     // 4. 将 application id 绑定到 yarn 上
    22     bindToYarn(client.submitApplication(), None)
    23 
    24     // SPARK-8687: Ensure all necessary properties have already been set before
    25     // we initialize our driver scheduler backend, which serves these properties
    26     // to the executors
    27     super.start()
    28    // 5. 检查 yarn application的状态,不能为 kill, finished等等
    29     waitForApplication()
    30    // 6. 监控线程
    31     monitorThread = asyncMonitorApplication()
    32     monitorThread.start()
    33   }

    重点解释一下第三步,涉及的源码步如下:

     1 object Client {
     2   def main(args: Array[String]) {
     3     // scalastyle:off println
     4     if (!sys.props.contains("SPARK_SUBMIT")) {
     5       println("WARNING: This client is deprecated and will be removed in a future version of Spark")
     6       println("Use ./bin/spark-submit with "--master spark://host:port"")
     7     }
     8     // scalastyle:on println
     9     new ClientApp().start(args, new SparkConf())
    10   }
    11 }
    12 
    13 private[spark] class ClientApp extends SparkApplication {
    14 
    15   override def start(args: Array[String], conf: SparkConf): Unit = {
    16     val driverArgs = new ClientArguments(args)
    17 
    18     if (!conf.contains("spark.rpc.askTimeout")) {
    19       conf.set("spark.rpc.askTimeout", "10s")
    20     }
    21     Logger.getRootLogger.setLevel(driverArgs.logLevel)
    22 
    23     val rpcEnv =
    24       RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    25 
    26     val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
    27       map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
    28     rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    29 
    30     rpcEnv.awaitTermination()
    31   }
    32 
    33 }

    可以看到,在Client 的main方法中,初始化了ClientApp 对象,并调用了其 start 方法,在start 方法中, 首先解析了 driver的 参数。然后创建了 driver 端的 RPC environment,然后 根据解析的 master 的信息,初始化 master 的endpointref,并且建立了 client endpoint 并返回 client endpoint ref。

    定时执行推测任务

    下面继续看 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法 的 第二步方法,首先 spark 推测任务 feature 默认是关闭的,原因如果有很多任务都延迟了,那么它会再启动一个相同的任务,这样可能会消耗掉所有的资源,对集群资源和提交到集群上的任务造成不可控的影响。启动了一个延迟定时器,定时地执行 checkSpeculatableTasks 方法,如下:

     1 // Check for speculatable tasks in all our active jobs.
     2   def checkSpeculatableTasks() {
     3     var shouldRevive = false
     4     synchronized {
     5       shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) // 1. 推测是否应该跑一个新任务
     6     }
     7     if (shouldRevive) {
     8       backend.reviveOffers() // 2. 跑一个新任务
     9     }
    10   }

    其中,第一步推断任务,有两个实现一个是Pool 的实现,一个是TaskSetManager 的实现,Pool 会递归调用子Pool来获取 speculatable tasks。如果需要推测,则运行task scheduler backend 的 reviveOffers方法,大致思路如下,首先获取 executor 上的空闲资源,然后将这些资源分配给 推测的 task,供其使用。

    总结,本篇源码剖析了在Spark Context 启动过程中, 以 yarn-client 模式为例,剖析了task scheduler 是如何启动的。

    其中关于RpcEnv的介绍直接略过了,下一篇会专门讲解Spark 中内置的Rpc 机制的整体架构以及其是如何运行的。

  • 相关阅读:
    观察者模式
    strchr
    行转列
    Ja.Net:融合 Java 1.5 和 .NET !
    主题:借JavaFX之风,Swing终于熬到了出头之日
    DOM和SAX概念的总结
    几个linux的命令技巧
    gcc编译的东东
    详细介绍DOM和SAX
    oracle的number类型默认长度是多少?
  • 原文地址:https://www.cnblogs.com/johnny666888/p/11122802.html
Copyright © 2011-2022 走看看