zoukankan      html  css  js  c++  java
  • spark1.3.x与spark2.x启动executor不同的cpu core分配方式

    ***这里的executor在worker上分配策略以spreadOut 为例***

    1.3版本关键点:

    for (app <- waitingApps if app.coresLeft > 0) { //对还未被完全分配资源的apps处理
            val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
              .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根据core Free对可用Worker进行降序排序。
            val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker
            val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0
            var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10), 当前未分配core(5)中找最小的
            var pos = 0
            while (toAssign > 0) { 
              if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判断当前worker空闲cpu是否大于当前数组已经分配core值
                toAssign -= 1
                assigned(pos) += 1 //当前下标pos的Worker分配1个core +1
              }
              pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker
            }
            // Now that we've decided how many cores to give on each node, let's actually give them
            for (pos <- 0 until numUsable) {
              if (assigned(pos) > 0) { //如果assigned数组中的值>0,将启动一个executor在,指定下标的机器上。
                val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
                launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去启动Executor
                app.state = ApplicationState.RUNNING
              }
            }
          }

    以上红色代码清晰的展示了在平均分配的场景下,每次会给worker分配1个core,所以说在spark-submit中如果设置了 --executor-cores属性未必起作用;

    但在2.x版本的spark中却做了这方面的矫正,它确实会去读取--executor-cores属性中的值,如果该值未设置则依然按照1.3.x的方式执行,代码如下:

     private def scheduleExecutorsOnWorkers(
          app: ApplicationInfo,
          usableWorkers: Array[WorkerInfo],
          spreadOutApps: Boolean): Array[Int] = {
        val coresPerExecutor = app.desc.coresPerExecutor
        val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
        val oneExecutorPerWorker = coresPerExecutor.isEmpty
        val memoryPerExecutor = app.desc.memoryPerExecutorMB
        val numUsable = usableWorkers.length
        val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
        val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
        var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
    
        /** Return whether the specified worker can launch an executor for this app. */
        def canLaunchExecutor(pos: Int): Boolean = {
          val keepScheduling = coresToAssign >= minCoresPerExecutor
          val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    
          // If we allow multiple executors per worker, then we can always launch new executors.
          // Otherwise, if there is already an executor on this worker, just give it more cores.
          val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
          if (launchingNewExecutor) {
            val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
            val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
            val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
            keepScheduling && enoughCores && enoughMemory && underLimit
          } else {
            // We're adding cores to an existing executor, so no need
            // to check memory and executor limits
            keepScheduling && enoughCores
          }
        }
    
        // Keep launching executors until no more workers can accommodate any
        // more executors, or if we have reached this application's limits
        var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
        while (freeWorkers.nonEmpty) {
          freeWorkers.foreach { pos =>
            var keepScheduling = true
            while (keepScheduling && canLaunchExecutor(pos)) {
              coresToAssign -= minCoresPerExecutor
              assignedCores(pos) += minCoresPerExecutor
    
              // If we are launching one executor per worker, then every iteration assigns 1 core
              // to the executor. Otherwise, every iteration assigns cores to a new executor.
              if (oneExecutorPerWorker) {
                assignedExecutors(pos) = 1
              } else {
                assignedExecutors(pos) += 1
              }
    
              // Spreading out an application means spreading out its executors across as
              // many workers as possible. If we are not spreading out, then we should keep
              // scheduling executors on this worker until we use all of its resources.
              // Otherwise, just move on to the next worker.
              if (spreadOutApps) {
                keepScheduling = false
              }
            }
          }
          freeWorkers = freeWorkers.filter(canLaunchExecutor)
        }
        assignedCores
      }
  • 相关阅读:
    HttpRunner接口自动化测试框架
    使用Appium 测试微信小程序和微信公众号方法
    WiFi无线连接真机进行Appium自动化测试方法
    idea tomcat 乱码问题的解决及相关设置
    解决idea导入maven项目缺少jar包的问题
    Docker php安装扩展步骤详解
    Python之No module named setuptools 安装pip
    MySQL中group_concat函数 --- 很有用的一个用来查询出所有group by 分组后所有 同组内的 内容
    Nginx如何来配置隐藏入口文件index.php(代码)
    vueThink框架搭建与填坑(new)
  • 原文地址:https://www.cnblogs.com/zzq-include/p/9276915.html
Copyright © 2011-2022 走看看