Spark中的调度模式主要有两种:FIFO和FAIR。默认情况下Spark的调度模式是FIFO(先进先出),谁先提交谁先执行,后面的任务需要等待前面的任务执行。而FAIR(公平调度)模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。spark.scheduler.mode
来设置,可选的参数有FAIR和FIFO。
1.调度池比较
FIFO建立调度池为空。
FAIR重写了buildpools的方法,读取默认路径 $SPARK_HOME/conf/fairscheduler.xml文件。addtaskmanager方法把TaskSetManager存入rootPool对应的子pool;
2.调度算法比较
FIFO:
FIFO模式的调度方式很容易理解,比较stageID,谁小谁先执行;
这也很好理解,stageID小的任务一般来说是递归的最底层,是最先提交给调度池的;
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } if (res < 0) { true } else { false } } }
FAIR:
fair模式来说的话,稍微复杂一点;
但是还是比较容易看懂,
1.先比较两个stage的 runningtask使用的核数,其实也可以理解为task的数量,谁小谁的优先级高;
2.比较两个stage的 runningtask 权重,谁的权重大谁先执行;
3.如果前面都一直,则比较名字了(字符串比较),谁大谁先执行;
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } }