zoukankan      html  css  js  c++  java
  • Spark资源调度

    一:任务调度和资源调度的区别:

    任务调度是指通过DAGScheduler,TaskScheduler,SchedulerBackend完成的job的调度

    资源调度是指应用程序获取资源的调度,他是通过schedule方法完成的

    二:资源调度解密

    因为master负责资源管理和调度,所以资源调度的方法schedule位于master.scala这个了类中,当注册程序或者资源发生改变的都会导致schedule的调用,例如注册程序的时候(包括worker,driver和application的注册等,注意executor是向SparkDeploySchedulerBackend注册的)

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }
    **
     * Schedule the currently available resources among waiting apps. This method will be called
     * every time a new app joins or resource availability changes.
     */

    每当新的应用程序加入或者可用资源发生改变(比如exccutor或者worker增加或者减少的时候)的时候,该方法都会发生响应

    private def schedule(): Unit = {
      if (state != RecoveryState.ALIVE) { return }//判断Master的状态是否为ALIVE,如果不是,则调度没有任何意义
      // Drivers take strict precedence over executors
      val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
     
    //将workers随机化,有利于负载均衡
      for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {//判断worker的状态,只有Alive级别的worker才能参与资源的分配工作
        for (driver <- waitingDrivers) {//循环遍历等待中的driver,当然这里指的是cluster模式,如果是client模式的话,driver就自动启动了。
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
     
    //当worker的free内存和cpu比driver所需要的多的时候,将driver放到workers中随机的一个worker,启动driver
            launchDriver(worker, driver)
            waitingDrivers -= driver//将启动的driver在等待队列中移除。
          }
        }
      }
      startExecutorsOnWorkers()
    }

    schedule的代码解析(简单的就放在上面的代码注释里了)

    Random.shuffle(workers)  将worker在master缓存数据结构中的顺序打乱
     
    def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
      val buf = new ArrayBuffer[T] ++= xs//构建一个临时的缓冲数组
     
      def swap(i1: Int, i2: Int) {//交换数组中指定下表的两个元素
        val tmp = buf(i1)
        buf(i1) = buf(i2)
        buf(i2) = tmp
      }
     
      for (n <- buf.length to 2 by -1) {//生成随机数,并不停交换,打乱了数组中元素的顺序
        val k = nextInt(n)
        swap(n - 1, k)
      }
     
      (bf(xs) ++= buf).result//返回随机化的新集合(这里就是workers的集合了)
    }

    2 waitingDrivers

    private val waitingDrivers = new ArrayBuffer[DriverInfo]
     
    可以看到这里waitingDrivers是一个数据元素为DriverInfo的数组,DriverInfo包含了driver的信息startTime(启动时间),id,desc(driver的描述信息),submitDate(提交日期)
     
    private[deploy] class DriverInfo(
        val startTime: Long,
        val id: String,
        val desc: DriverDescription,
        val submitDate: Date)
      extends Serializable {
     
    其中描述信息包含了一下内容
     
    private[deploy] case class DriverDescription(
        jarUrl: String,//jar包地址
        mem: Int,//内存信息
        cores: Int,//CPU
        supervise: Boolean//当spark-submit指定driver在cluster模式下运行的话如果设定了supervise,driver挂掉的时候回自动重启,
        command: Command) {//一些环境信息
     
      override def toString: String = s"DriverDescription (${command.mainClass})"
    }

    3 launchDriver spark只有先启动driver才能进行后面具体的调度

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
      logInfo("Launching driver " + driver.id + " on worker " + worker.id)
      worker.addDriver(driver)//表明driver运行的worker
      driver.worker = Some(worker)//driver和worker的相互引用
      worker.endpoint.send(LaunchDriver(driver.id, driver.desc))//master通过远程rpc发指令给worker,让worker启动driver。
      driver.state = DriverState.RUNNING//启动之后将driver的状态转为RUNNING
    }

    4 startExecutorsOnWorkers 先进先出的队列方式进行简单调度,spark默认启动Executor的方式是FIFO的方式,只有前一个app满足了资源分配的基础上,才会为下一个应用程序分配资源

    /**
     * Schedule and launch executors on workers
     */
    private def startExecutorsOnWorkers(): Unit = {
      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
      // in the queue, then the second app, etc.
      for (app <- waitingApps if app.coresLeft > 0) {//为应用程序具体分配Executor之前会判断当前应用程序是否还需要cores,
    如果不需要则不会为应用程序分配Executor
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor//应用程序所需要的cores // Filter out workers that don't have enough resources to launch an executor //过滤掉不满足条件的worker,条件为:worker的状态必须是AlIVE的,worker的内存和cpu必须比每一个Executor所需要的大。 //过滤完之后,按照可用cores进行排序,并将大的放到前面,最优的最先使用。 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse //这里采用spreadOutApps的方式来让应用程序尽可能分散的运行在每一个Node上,这种方式往往能顺便带来更好的数据本地性,通常数据是分散的分布在各台机器上,这种方式通常也是默认的。这方法返回的是每一个分配给每一个worker的cores的数组。具体的在分配cores的时候回尽可能的满足当前所需的 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them //下面进行真正的分配Executors,Master通过远程通信发指令给Worker来启动ExecutorBackend进程,向driver发送ExecutorAdded通信。 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
  • 相关阅读:
    [IMX6]Android6.0移植和分析
    Android设计原则和设计模式
    Linux内核源码目录
    Android源码博客目录
    Android应用博客目录
    imx6的kernel3.4.15启动流程
    dd命令
    shell
    i.mx6 Android5.1.1 build解析
    git总结
  • 原文地址:https://www.cnblogs.com/itboys/p/9966838.html
Copyright © 2011-2022 走看看