zoukankan      html  css  js  c++  java
  • spark-3.0 application 调度算法解析

    spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.0 到 现在最新的spark 3.0 ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制。

    private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps) {
    //如果在 spark-submmit 脚本中,指定了每个executor 多少个 CPU core,
    // 则每个Executor 分配该个数的 core,
    // 否则 默认每个executor 只分配 1 个 CPU core
    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
    // 当前 APP 还需要分配的 core 数 不能 小于 单个 executor 启动 的 CPU core 数
    if (app.coresLeft >= coresPerExecutor) {
    // Filter out workers that don't have enough resources to launch an executo/*ku*/r
    // 过滤出 状态 为 ALIVE,并且还能 发布 Executor 的 worker
    // 按照剩余的 CPU core 数 倒序
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
    .filter(canLaunchExecutor(_, app.desc))
    .sortBy(_.coresFree).reverse
    if (waitingApps.length == 1 && usableWorkers.isEmpty) {
    logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
    }
        // TODO:  默认采用 spreadOutApps  调度算法, 将 application需要的 executor资源 分派到  多个 worker 上去
          val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    // Now that we've decided how many cores to allocate on each worker, let's allocate them
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
    allocateWorkerResourceToExecutors(
    app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
    }
    }
    }
    }
    判断一个 worker 是否可以发布 executor
    private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = {
    canLaunch(
    worker,
    desc.memoryPerExecutorMB,
    desc.coresPerExecutor.getOrElse(1),
    desc.resourceReqsPerExecutor)
    }
    让我们看一看里面的 canlaunch 方法
    private def canLaunch(
    worker: WorkerInfo,
    memoryReq: Int,
    coresReq: Int,
    resourceRequirements: Seq[ResourceRequirement])
    : Boolean = {
    // worker 上 空闲的 内存值 要 大于等于 请求的 内存值
    val enoughMem = worker.memoryFree >= memoryReq
    // worker 上 空闲的 core 数 要 大于等于 请求的 core数
    val enoughCores = worker.coresFree >= coresReq
    // worker 是否满足 executor 请求的资源
    val enoughResources = ResourceUtils.resourcesMeetRequirements(
    worker.resourcesAmountFree, resourceRequirements)
    enoughMem && enoughCores && enoughResources
    }

    回到上面的 scheduleExecutorsOnWorkers
    private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    // 默认情况下 是 开启 oneExecutorPerWorker 机制的,也就是默认是在 一个 worker 上 只启动 一个 executor的
    // 如果在spark -submit 脚本中设置了coresPerExecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    // 判断 Worker节点是否能够启动Executor
    def canLaunchExecutorForApp(pos: Int): Boolean = {

    val keepScheduling = coresToAssign >= minCoresPerExecutor
    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    val assignedExecutorNum = assignedExecutors(pos)

    // If we allow multiple executors per worker, then we can always launch new executors.
    // Otherwise, if there is already an executor on this worker, just give it more cores.

    // 如果spark -submit 脚本中设置了coresPerExecutor值,
    // 或者当前 这个worker 还没有为这个 application 分配 过 executor ,
    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
    // TODO: 可以启动新的 Executor
    if (launchingNewExecutor) {
    val assignedMemory = assignedExecutorNum * memoryPerExecutor
    val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
    val assignedResources = resourceReqsPerExecutor.map {
    req => req.resourceName -> req.amount * assignedExecutorNum
    }.toMap
    val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
    case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
    }
    val enoughResources = ResourceUtils.resourcesMeetRequirements(
    resourcesFree, resourceReqsPerExecutor)
    val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
    keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
    } else {
    // We're adding cores to an existing executor, so no need
    // to check memory and executor limits
    // TODO: 不满足启动新的 Executor条件,则 在 老的 Executor 上 追加 core 数
    keepScheduling && enoughCores
    }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application's limits

    var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
    while (freeWorkers.nonEmpty) {
    freeWorkers.foreach { pos =>
    var keepScheduling = true
    while (keepScheduling && canLaunchExecutorForApp(pos)) {
    coresToAssign -= minCoresPerExecutor
    assignedCores(pos) += minCoresPerExecutor

    // If we are launching one executor per worker, then every iteration assigns 1 core
    // to the executor. Otherwise, every iteration assigns cores to a new executor.
    if (oneExecutorPerWorker) {
    //TODO: 如果该Worker节点不能启动新的 Executor,则每次在老的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值默认 为 1 )
    assignedExecutors(pos) = 1
    } else {
    //TODO: 如果该Worker节点可以启动新的 Executor,则每次在新的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值为 spark-submit脚本配置的 coresPerExecutor 值)
    assignedExecutors(pos) += 1
    }

    // Spreading out an application means spreading out its executors across as
    // many workers as possible. If we are not spreading out, then we should keep
    // scheduling executors on this worker until we use all of its resources.
    // Otherwise, just move on to the next worker.
    if (spreadOutApps) {
    // TODO: 这里传入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上 再去 分配 core,直到 worker
    // TODO: 完成一次遍历
    keepScheduling = false
    }
    }
    }
    freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
    }
    // 返回每个Worker节点分配的CPU核数
    assignedCores
    }

    再来分析 allocateWorkerResourceToExecutors
    private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
    val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
    // TODO : 当前 这个 application 追加 一次 Executor
    val exec = app.addExecutor(worker, coresToAssign, allocated)
    //TODO: 给worker 线程 发送 launchExecutor 命令
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING
    }
    }
    ok,至此,spark最新版本 spark-3.0的Application 调度算法分析完毕!!!
  • 相关阅读:
    Spring Boot从入门到精通(一)搭建第一个Spring Boot程序
    程序员未来的出路究竟在哪里?一位老码农的心声
    ​IntelliJ IDEA使用技巧—使用EasyCode插件一键生成代码04期
    浅谈Java后端开发工程师腾讯面试经历分享总结
    Java面试技巧—如何自我介绍
    互联网大厂Java面试题集—Spring boot常见面试题(二)
    互联网大厂Java面试题集—Spring boot面试题(一)
    ActiveMQ消息队列从入门到实践(4)—使用Spring JMS收发消息
    ActiveMQ消息队列从入门到实践(1)—JMS的概念和JMS消息模型
    有多少程序员干到35岁,那么其他人去干什么了?
  • 原文地址:https://www.cnblogs.com/guodong1789/p/11982170.html
Copyright © 2011-2022 走看看