zoukankan      html  css  js  c++  java
  • Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    1、spreadOutApp尽量平均分配到每个executor上;

    2、非spreadOutApp尽量在使用单个executor的资源。

     

    源码分析

    org.apache.spark.deploy.master.Master

    1、首先判断,master状态不是ALIVE的话,直接返回
    2、调度driver
    3、 Application的调度机制(核心之核心,重中之重)

    源码如下:

      1    /* 
      2    *schedule()解决了spark资源调度的问题
      3    */
      4   private def schedule() {
      5     //首先判断,master状态不是ALIVE的话,直接返回
      6     //也就是说,stanby master是不会进行application等资源调度的
      7     if (state != RecoveryState.ALIVE) { return }
      8 
      9     // First schedule drivers, they take strict precedence over applications
     10     // Randomization helps balance drivers
     11     
     12     //Random.shuffle的原理,大家要清楚,就是对传入的集合的元素进行随机的打乱
     13     //取出了workers中的所有之前注册上来的worker,进行过滤,必须是状态为ALIVE的worker
     14     //对状态为ALIVE的worker,调用Random的shuffle方法进行随机的打乱
     15     val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
     16     val numWorkersAlive = shuffledAliveWorkers.size
     17     var curPos = 0
     18 
     19     //首先,调度driver
     20     //为什么要调度driver,大家想一下,什么情况下,会注册driver,并且会导致driver被调度
     21     //其实 ,只有用yarn-cluster模式提交的时候,才会注册driver;因为standalone和yarn-client模式,都会在本地直接
     22     //启动driver,而不会来注册driver,就更不可能让master调度driver了
     23     
     24     //driver调度机制
     25     //遍历waittingDrivers ArrayBuffer
     26     for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
     27       // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
     28       // start from the last worker that was assigned a driver, and continue onwards until we have
     29       // explored all alive workers.
     30       var launched = false
     31       var numWorkersVisited = 0
     32       
     33       //while的条件,numWorkersVisited小于numWorkersAlive
     34       //什么意思?就是说,只要还有活着的worker没有遍历到,那么就继续进行遍历
     35       //而且,当前这个driver还没有被启动,也就是launched为false
     36       while (numWorkersVisited < numWorkersAlive && !launched) {
     37         val worker = shuffledAliveWorkers(curPos)
     38         numWorkersVisited += 1
     39         
     40         //如果当前这个worker的空闲内存量大于等于,driver需要的内存
     41         //并且worker的空闲cpu数量,大于等于driver需要的cpu数量
     42         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
     43           //启动driver
     44           launchDriver(worker, driver)
     45           //并且将driver从waitingDrivers队列中移除
     46           waitingDrivers -= driver
     47           launched = true
     48         }
     49         
     50         //将指针指向下一个worker
     51         curPos = (curPos + 1) % numWorkersAlive
     52       }
     53     }
     54 
     55     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     56     // in the queue, then the second app, etc.
     57     // Application的调度机制(核心之核心,重中之重)
     58     // 首先, application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
     59     if (spreadOutApps) {
     60       // Try to spread out each app among all the nodes, until it has all its cores
     61       
     62       //首先,遍历waitingApps中的ApplicationInfo,并且过滤出application还需要高度的cores的application
     63       for (app <- waitingApps if app.coresLeft > 0) {
     64         //从workers中,过滤状态为ALIVE的,再次过滤可以被Application使用的Worker,然后按照剩余cpu数量倒序排序
     65         val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
     66           .filter(canUse(app, _)).sortBy(_.coresFree).reverse
     67         val numUsable = usableWorkers.length
     68         //创建一个空数组,存储了要分配给每个worker的cpu数量
     69         val assigned = new Array[Int](numUsable) // Number of cores to give on each node
     70         //获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值
     71         var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
     72         
     73         //通过这种算法,其实会将每个application,要启动的executor,都平均分布到各个worker上去
     74         //比如有20个cpu core要分配,那么实际会循环两遍worker,每次循环,给每个worker分配1个core
     75         //最后每个worker分配了2个core
     76         
     77         //while条件,只要要分配的cpu,还没有分配完,就继续循环
     78         var pos = 0
     79         while (toAssign > 0) {
     80           //每一个worker,如果空闲的cpu数量大于,已经分配出去的cpu数量
     81           //也就是说,worker还有可分配的cpu
     82           if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
     83             //将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
     84             toAssign -= 1
     85             //给这个worker分配的cpu数量,加1
     86             assigned(pos) += 1
     87           }
     88           //指针移动到下一下worker
     89           pos = (pos + 1) % numUsable
     90         }
     91         
     92         // Now that we've decided how many cores to give on each node, let's actually give them
     93         // 给每个worker分配完application要求的cpu core之后
     94         // 遍历worker
     95         for (pos <- 0 until numUsable) {
     96           //只要判断之前给这个worker分配到了core
     97           if (assigned(pos) > 0) {
     98             //首先,在application内部缓存结构中,添加executor
     99             //并且创建ExecutorDesc对象,其中封装了,给这个executor分配多少个cpu core
    100             //在spark-submit脚本中,可以指定要多少executor,每个execuor多少个cpu,多少内存
    101             //那么基于源码机制,实际上,executor的实际数量,以及每个executor的cpu,可能与配置是不一样的
    102             //因为,我人帝里基于总的cpu来分配的,就是比如,要求3个executor,每个要3个cpu,那么比如,有9个workers,每个有1个cpu
    103             //那么其实总其知道,要分配9个core,其实根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个executor
    104             //最后会启动,9个executor,每个executor有1个cpu core
    105             val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
    106             //那么就在worker上启动executor
    107             launchExecutor(usableWorkers(pos), exec)
    108             //将application状态设置为running
    109             app.state = ApplicationState.RUNNING
    110           }
    111         }
    112       }
    113     } else {
    114       // Pack each app into as few nodes as possible until we've assigned all its cores
    115       
    116       //非spreadOutApps调度算法
    117       
    118       //这种算法与spreadOutApps算法正好相反,1、spreadOutApp尽量平均分配到每个executor上;2、非spreadOutApp尽量在使用单个executor的资源。
    119       //每个application,都尽可能分配到尽量少的worker上去,比如总其有10个worker,每个有10个core
    120       //app总共要分配 20个core,那么其实,只会分配到两个worker上,每个worker都占满10个core
    121       //那么,其余的app,就只能 分配到下一个worker了
    122       //比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个croe
    123       //只会启动2个executor,每个有10个cores
    124       
    125       //将每个Application,尽可能少的分配到worker上去
    126       //首先,遍历worker,并且是状态为ALIVE,还有空闲cpu的worker
    127       for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
    128         //遍历application,并且是还有城朵分配的core的application
    129         for (app <- waitingApps if app.coresLeft > 0) {
    130           //判断,如果当前这个worker可以被 application使用
    131           if (canUse(app, worker)) {
    132             //取worker剩余cpu数量,与app要分配的cpu数量的最小值
    133             val coresToUse = math.min(worker.coresFree, app.coresLeft)
    134             //如果Worker剩余cpu为0了,就不分配了
    135             if (coresToUse > 0) {
    136               // 给app添加一个executor
    137               val exec = app.addExecutor(worker, coresToUse)
    138               //在worker上启动executor
    139               launchExecutor(worker, exec)
    140               //将application状态设置为running
    141               app.state = ApplicationState.RUNNING
    142             }
    143           }
    144         }
    145       }
    146     }
    147   }
  • 相关阅读:
    财务自由之路
    权力的48条法则
    将进酒
    DELL服务器报价,有公司需要可以联系,谢谢。北京经纬恒通商贸有限公司秦嘉俊
    实战HTML5表单
    《HTML5+CSS3精通》
    行路难
    事件入门
    DOM
    剑指offer---包含min函数的栈
  • 原文地址:https://www.cnblogs.com/haoyy/p/6173703.html
Copyright © 2011-2022 走看看