首先需要了解FairScheduler是如何在各个Pool之间分配资源,以及每个Pool如何在Job之间分配资源的。FairScheduler的分配资源发生在update()方法中,而该方法由一个线程UpdateThread每隔updateInterval(由mapred.fairscheduler.update.interval参数决定,默认是500ms)就调用一次,以保证资源分配的实时性。
FairScheduler的资源分配算法由SchedulingAlgorithms的computeFairShares()方法实现,原理是通过二分查找选择出一个使得资源分配数最接近实际资源数的值。具体可以去阅读下SchedulingAlgorithms.computeFairShares()的源码(有点难理解,最好debug下)。
下面就来看看FairScheduler如何从众多的任务中选择出一个任务,即任务调度。
1.FairScheduler.assignTasks():该方法的调用是发生在JT接收到来自TT的心跳,在返回响应时会根据TT的实际情况选择一个任务交由TT执行,具体可参考http://blog.csdn.net/vickyway/article/details/17127559。该方法为指定TT选择一组适合其执行的Task。
// Compute total runnable maps and reduces, and currently running ones int runnableMaps = 0; int runningMaps = 0; int runnableReduces = 0; int runningReduces = 0; for (Pool pool: poolMgr.getPools()) { runnableMaps += pool.getMapSchedulable().getDemand(); runningMaps += pool.getMapSchedulable().getRunningTasks(); runnableReduces += pool.getReduceSchedulable().getDemand(); runningReduces += pool.getReduceSchedulable().getRunningTasks(); }
此处计算所有的Pool(资源池)总的runnableMaps(所有Map任务运行所需的Slot数量),runningMaps(运行中的Map任务数量),runnableReduces(所有Reduce任务运行所需的Slot数量),runningReduces(运行中的Reduce任务数量)。
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); // Compute total map/reduce slots // In the future we can precompute this if the Scheduler becomes a // listener of tracker join/leave events. int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus); int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
接着根据JT获取集群状态,获取totalMapSlots(集群中所有可运行Map的Slot数量)和totalReduceSlots(集群中所有可运行Reduce的Slot数量).
// Update time waited for local maps for jobs skipped on last heartbeat updateLocalityWaitTimes(currentTime);
次数是更新上一次TT发送心跳时没有进行更新time waited for local maps的Job进行更新time waited for local maps。
2.FairScheduler.updateLocalityWaitTimes:
/** * Update locality wait times for jobs that were skipped at last heartbeat. */ private void updateLocalityWaitTimes(long currentTime) { long timeSinceLastHeartbeat = (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime); lastHeartbeatTime = currentTime; for (JobInfo info: infos.values()) { if (info.skippedAtLastHeartbeat) { info.timeWaitedForLocalMap += timeSinceLastHeartbeat; info.skippedAtLastHeartbeat = false; } } }
首先计算出从上次心跳到现在的时间间隔(timeSinceLastHeartbeat),并更新上次的心跳时间。然后遍历infos(存放JobInProgress-->JobInfo的集合)中skippedAtLastHeartbeat==true的Job的JobInfo,将其timeWaitedForLocalMap值增加timeSinceLastHeartbeat,并将JobInfo的skippedAtLastHeartbeat设为false。回到FairScheduler。
3.FairScheduler.assignTasks():
// Check for JT safe-mode if (taskTrackerManager.isInSafeMode()) { LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); return null; }
检查JT是否处于SafeMode,处于SafeMode不进行任何任务的调度。
TaskTrackerStatus tts = tracker.getStatus(); int mapsAssigned = 0; // loop counter for map in the below while loop int reducesAssigned = 0; // loop counter for reduce in the below while int mapCapacity = maxTasksToAssign(TaskType.MAP, tts); int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts); boolean mapRejected = false; // flag used for ending the loop boolean reduceRejected = false; // flag used for ending the loop // Keep track of which jobs were visited for map tasks and which had tasks // launched, so that we can later mark skipped jobs for delay scheduling Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>(); Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>(); Set<JobInProgress> launchedMap = new HashSet<JobInProgress>(); ArrayList<Task> tasks = new ArrayList<Task>();
这段代码是初始化一些在调度任务时需要用到的变量,mapsAssigned和reducesAssigned记录已选择的Map/Reduce任务数量,mapCapacity和reduceCapacity记录该TT最大可接收到Map/Reduce任务数量,mapRejected和reduceRejected用来标识是否还可继续接收Map/Reduce任务,visitedForMap和visitedForReduce队列用来记录为寻找可执行的Task而访问的Job,launchedMap队列用来记录选择的Map任务,tasks队列用来存放选择的任务。下面看看maxTasksToAssign()方法是如何计算TT最大可接收的Map/Reduce数量的。
4.FairScheduler.maxTasksToAssign:
protected int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) { if (!assignMultiple) return 1; int cap = (type == TaskType.MAP) ? mapAssignCap : reduceAssignCap; int availableSlots = (type == TaskType.MAP) ? tts.getAvailableMapSlots(): tts.getAvailableReduceSlots(); if (cap == -1) // Infinite cap; use the TaskTracker's slot count return availableSlots; else return Math.min(cap, availableSlots); }
此处的assignMultiple变量是由mapred.fairscheduler.assignmultiple参数决定,默认是true,表示是否可同时调度Map和Reduce任务。mapAssignCap和reduceAssignCap变量分别是由mapred.fairscheduler.assignmultiple.maps参数和mapred.fairscheduler.assignmultiple.reduces参数决定,默认值都是-1,表示一次心跳最大可调度的Map/Reduce数量,-1表示无限制。availableSlots表示该TT在发送心跳时可使用的Map/Reduce slot数量,所以接收的任务不能超过该值。
5.FairScheduler.assignTasks():
下面的代码是一段无限循环,知道满足一定条件才退出,分段来看看循环内部。
if (!mapRejected) { if (mapsAssigned == mapCapacity || runningMaps == runnableMaps || !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots, mapsAssigned)) { eventLog.log("INFO", "Can't assign another MAP to " + trackerName); mapRejected = true; } } if (!reduceRejected) { if (reducesAssigned == reduceCapacity || runningReduces == runnableReduces || !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots, reducesAssigned)) { eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName); reduceRejected = true; } } if (mapRejected && reduceRejected || !assignMultiple && tasks.size() > 0) { break; // This is the only exit of the while (true) loop }
这一段主要是判断是否退出循环,即通过跟新mapRejected和reduceRejected值来决定是否退出循环。当mapsAssigned==mapCapacity,即已选择的Map数量已达到TT可接收的最大值时,或者runningMaps==runnableMaps,即所有的Map任务都已运行,或者loadMgr.canAssignReduce(tts, runnableReduces,totalReduceSlots, reducesAssigned)返回false,即LoadManager(实现类是CapBasedLoadManager)任务不可再继续调度Map任务。Reduce同上。下面看看LoadManager如何判断是否可以继续调度Map/Reduce任务。
6.CapBasedLoadManager.LoadManager():
public boolean canAssignMap(TaskTrackerStatus tracker, int totalRunnableMaps, int totalMapSlots, int alreadyAssigned) { int cap = getCap(totalRunnableMaps, tracker.getMaxMapSlots(), totalMapSlots); return tracker.countMapTasks() + alreadyAssigned < cap; }
int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) { double load = maxDiff + ((double)totalRunnableTasks) / totalSlots; return (int) Math.ceil(localMaxTasks * Math.min(1.0, load)); }
maxDiff值由mapred.fairscheduler.load.max.diff参数决定,默认是0.0f。该方法根据集群总的任务运行数与集群总的Slot数量的比例,来判断一个TT应该运行多个任务,据此决定是否继续向TT发送任务。
上面根据一定条件判断mapRejected和reduceRejected的值,下面通过判断mapRejected和reduceRejected值以及assignMultiple==false是已选择的tasks数量是否大于0,因为当assignMultiple==false时只能选择一个任务。当判断出需要退出循环时,则直接退出循环。
7.FairScheduler.assignTasks():
TaskType taskType; if (mapRejected) { taskType = TaskType.REDUCE; } else if (reduceRejected) { taskType = TaskType.MAP; } else { // If both types are available, choose the task type with fewer running // tasks on the task tracker to prevent that task type from starving if (tts.countMapTasks() + mapsAssigned <= tts.countReduceTasks() + reducesAssigned) { taskType = TaskType.MAP; } else { taskType = TaskType.REDUCE; } }
下面是决定选择Map任务还是Reduce任务。如果mapRejected==true,则选择Reduce任务,相反如何reduceRejected==true,则选择Map任务,当两者都==false时,根据TT上已运行的Map数量+已为该TT选择的Map任务数量与TT上已运行的Reduce数量+已为该TT选择的Reduce任务数量之间的大小决定如何选择,当相等时优选选择Map任务。上面是一些准备工作,下面就开始进行任务的调度了。
8.FairScheduler.assignTasks():
// Get the map or reduce schedulables and sort them by fair sharing List<PoolSchedulable> scheds = getPoolSchedulables(taskType); Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
第一句是获取所有的Map/Reduce类型的PoolScheduler。每个Pool中都存放着两个PoolScheduler,一个用来调度Map任务——mapSchedulable,另一个用来调度Reduce任务——reduceSchedulable。然后根据SchedulingAlgorithms.FairShareComparator进行排序,该排序算法主要是根据每个Pool或者Job中运行中的任务与Pool或者Job的自身状态之间的一个比率关系进行排序,即按运行中的任务数/Math.min(minShare,demand)升序排序,按运行中的任务数/weight升序排序。下面看看SchedulingAlgorithms.FairShareComparator类。
public static class FairShareComparator implements Comparator<Schedulable> { @Override public int compare(Schedulable s1, Schedulable s2) { double minShareRatio1, minShareRatio2; double tasksToWeightRatio1, tasksToWeightRatio2; int minShare1 = Math.min(s1.getMinShare(), s1.getDemand()); int minShare2 = Math.min(s2.getMinShare(), s2.getDemand()); boolean s1Needy = s1.getRunningTasks() < minShare1; boolean s2Needy = s2.getRunningTasks() < minShare2; minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0); minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0); tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight(); tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight(); int res = 0; if (s1Needy && !s2Needy) res = -1; else if (s2Needy && !s1Needy) res = 1; else if (s1Needy && s2Needy) res = (int) Math.signum(minShareRatio1 - minShareRatio2); else // Neither schedulable is needy res = (int) Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2); if (res == 0) { // Jobs are tied in fairness ratio. Break the tie by submit time and job // name to get a deterministic ordering, which is useful for unit tests. res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); if (res == 0) res = s1.getName().compareTo(s2.getName()); } return res; } }
先说一下:compare(a,b)-->-1,则a,b;compare(a,b)-->1,则b,a(老是记不住)。这个比较算法还是较简单的,原理就是哪个Scheduler中的运行中的任务数越接近其承受能力那么排序就越靠后,这也是很合理的,优先调度较轻松的Scheduler(表达不好,嘿嘿)。排序好了就可以有序的进行任务调度了。
9.FairScheduler.assignTasks():
boolean foundTask = false; for (Schedulable sched: scheds) { // This loop will assign only one task eventLog.log("INFO", "Checking for " + taskType + " task in " + sched.getName()); Task task = taskType == TaskType.MAP ? sched.assignTask(tts, currentTime, visitedForMap) : sched.assignTask(tts, currentTime, visitedForReduce); if (task != null) { foundTask = true; JobInProgress job = taskTrackerManager.getJob(task.getJobID()); eventLog.log("ASSIGN", trackerName, taskType, job.getJobID(), task.getTaskID()); // Update running task counts, and the job's locality level if (taskType == TaskType.MAP) { launchedMap.add(job); mapsAssigned++; runningMaps++; updateLastMapLocalityLevel(job, task, tts); } else { reducesAssigned++; runningReduces++; } // Add task to the list of assignments tasks.add(task); break; // This break makes this loop assign only one task } // end if(task != null) } // end for(Schedulable sched: scheds)
foundTask标志是否选择到任务,每次遍历只选择一个Task,因为每个选择一个Task之后,Scheduler的状态都会发生变化,然后再重新进行排序,再选择。这里可以看出Task的选择是调用Scheduler的assignTask()方法选择的。Scheduler有两个实现,分别是PoolScheduler和JobScheduler,此处是PoolScheduler。下面来看看PoolScheduler的assignTask()方法。
10.PoolScheduler.assignTask():
public Task assignTask(TaskTrackerStatus tts, long currentTime, Collection<JobInProgress> visited) throws IOException { int runningTasks = getRunningTasks(); if (runningTasks >= poolMgr.getMaxSlots(pool.getName(), taskType)) { return null; } SchedulingMode mode = pool.getSchedulingMode(); Comparator<Schedulable> comparator; if (mode == SchedulingMode.FIFO) { comparator = new SchedulingAlgorithms.FifoComparator(); } else if (mode == SchedulingMode.FAIR) { comparator = new SchedulingAlgorithms.FairShareComparator(); } else { throw new RuntimeException("Unsupported pool scheduling mode " + mode); } Collections.sort(jobScheds, comparator); for (JobSchedulable sched: jobScheds) { Task task = sched.assignTask(tts, currentTime, visited); if (task != null) return task; } return null; }
首先获取该PoolScheduler运行中的Task数量,然后判断如果运行中的任务数大于该Pool的该类型任务(Map/Reduce)的最大数量,则不调度任务,返回null。然后根据SchedulingMode mode = pool.getSchedulingMode()获取Pool的调度模式(FIFO/FAIR),即FairScheduler在对Pool中的Job进行调度时支持两种调度方式:FIFO和FAIR。FIFO:先进先出,先添加的Job先调度,使用SchedulingAlgorithms.FifoComparator比较器;FAIR:根据公平原则进行调度(和Pool的调度一样,也是使用SchedulingAlgorithms.FairShareComparator比较器)。该参数由定义Pool时的schedulingMode参数指定。下面简单说一下FIFO调度规则。
FIFO:先根据Hadoop自带的Job的优先级priority(分为5个等级,优先级从高到低依次是:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW),由在创建Job时通过mapred.job.priority参数指定,默认是NORMAL。然后根据Job的StartTime进行比较,越早的Job优先调度。
FAIR方式和PoolScheduler调度时一样。使用比较器对PoolScheduler中的Job(JobScheduler)进行排序。排序完成之后,遍历JobScheduler,通过调用JobScheduler的assignTask()方法选择任务。下面看看JobScheduler的assignTask()方法。
11.JobScheduler.assignTask():
public Task assignTask(TaskTrackerStatus tts, long currentTime, Collection<JobInProgress> visited) throws IOException { if (isRunnable()) { visited.add(job); TaskTrackerManager ttm = scheduler.taskTrackerManager; ClusterStatus clusterStatus = ttm.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); // check with the load manager whether it is safe to // launch this task on this taskTracker. LoadManager loadMgr = scheduler.getLoadManager(); if (!loadMgr.canLaunchTask(tts, job, taskType)) { return null; } if (taskType == TaskType.MAP) { LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel( job, currentTime); scheduler.getEventLog().log( "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel); switch (localityLevel) { case NODE: return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); case RACK: return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); default: return job.obtainNewMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); } } else { return job.obtainNewReduceTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); } } else { return null; } }
可以看出只有运行中的Job才能调度Task。visited.add(job)将该Job添加到visited队列,表示该Job在任务调度时有被访问过。然后通过LoadManager.canLaunchTask()方法判断是否可以在该TT上运行任务,这里默认是true。针对Map任务需要考虑任务的本地化,即尽可能的使Map任务运行在存放着输入文件的TT上,以提高Map任务运行效率。LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(job, currentTime)是获取Map任务的一个本地化级别,然后根据本地化级别调用不同方法获取不同的Task,而对于Reduce任务则直接选择一个任务即可。下面看看FairScheduler.getAllowedLocalityLevel()方法。
12.FairScheduler.getAllowedLocalityLevel():
JobInfo info = infos.get(job); if (info == null) { // Job not in infos (shouldn't happen) LOG.error("getAllowedLocalityLevel called on job " + job + ", which does not have a JobInfo in infos"); return LocalityLevel.ANY; } if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information return LocalityLevel.ANY; }
首先任务的本地化级别存在四个级别:NODE(表示Map的输入文件需要与任务运行的TT在一个节点上),NODEGROUP(表示Map的输入文件需要与任务运行的TT在一个节点组上),RACK(表示Map的输入文件需要与任务运行的TT在一个节机架上),ANY(无任何要求)。首先获取Job的JobInfo信息,如果不存在对应的JobInfo信息则返回LocalityLevel.ANY,如果Job的nonLocalMaps队列不为空也返回LocalityLevel.ANY。nonLocalMaps是在Job进行初始化时通过判断Job的Split如果没有Location则将该Split对应的Map任务添加到nonLocalMaps队列。
Pool pool = poolMgr.getPool(job); PoolSchedulable sched = pool.getMapSchedulable(); long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName()); long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout(); if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout || currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { eventLog.log("INFO", "No delay scheduling for " + job.getJobID() + " because it is being starved"); return LocalityLevel.ANY; }
判断该Job所在的Pool是否处于饥饿状态,是的话则直接返回LocalityLevel.ANY。此处根据Pool的minShareTimeout和fairShareTimeout两个属性值进行判断。Pool的lastTimeAtMinShare和lastTimeAtHalfFairShare值是在FairScheduler的update()方法中更新的,而该方法由一个线程一直调用。
// In the common case, compute locality level based on time waited switch(info.lastMapLocalityLevel) { case NODE: // Last task launched was node-local if (info.timeWaitedForLocalMap >= nodeLocalityDelay + rackLocalityDelay) return LocalityLevel.ANY; else if (info.timeWaitedForLocalMap >= nodeLocalityDelay) return LocalityLevel.RACK; else return LocalityLevel.NODE; case RACK: // Last task launched was rack-local if (info.timeWaitedForLocalMap >= rackLocalityDelay) return LocalityLevel.ANY; else return LocalityLevel.RACK; default: // Last task was non-local; can launch anywhere return LocalityLevel.ANY; }
下面根据Job的lastMapLocalityLevel,即该Job上一次调度Map任务时所选择的的LocalityLevel值决定本次如何进行Map任务的调度。如果lastMapLocalityLevel==NODE,则表示Job上一次调度Map任务是本地化等级是NODE,当等待时间timeWaitedForLocalMap>(nodeLocalityDelay + rackLocalityDelay)这两个属性之和时则选择LocalityLevel.ANY;但是如果timeWaitedForLocalMap只是>nodeLocalityDelay,那么则可以选择RACK级别的本地化,如果timeWaitedForLocalMap<nodeLocalityDelay,那么则选择NODE级别的本地化。timeWaitedForLocalMap的值是在FairScheduler的assignTasks()方法中更新的,即FairScheduler开始调度任务之前会先更新FairScheduler上保存的所有Job的JobInfo中的timeWaitedForLocalMap值,已确保后面能够正确选择Map任务的本地化级别。当lastMapLocalityLevel==RACK时,只需要timeWaitedForLocalMap>=rackLocalityDelay就可以返回ANY,否则返回RACK;默认返回RACK。这里就计算出在选择Map任务时的本地化级别,之后Job在选择Map任务时会根据本地化级别选择不同的任务进行运行。回到JobScheduler.assignTask()方法。
13.JobScheduler.assignTask():
switch (localityLevel) { case NODE: return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); case RACK: return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); default: return job.obtainNewMapTask(tts, numTaskTrackers, ttm.getNumberOfUniqueHosts()); }
这是在选择Map任务时根据本地化级别会调用不同的方法选择不同的任务。主要是obtainNewNodeLocalMapTask(),obtainNewNodeOrRackLocalMapTask(),obtainNewMapTask()三个方法,以及选择Reduce任务的obtainNewReduceTask()方法。这四个方法内部还是有点复杂的下次再深入分析。
到这里JobScheduler的assignTask()就完成了,返回Task,回到PoolScheduler的assignTask()方法,可以看到只要得到一个Task,PoolScheduler就会返回该Task,所以继续回到FairScheduler的assignTask()方法。在FairScheduler的assignTask()方法中可以看到,当返回一个Task之后会标志foundTask=true,如果是Map任务则会将Task对应的Job添加到launchedMap中,然后调用updateLastMapLocalityLevel()方法更新Job的JobInfo的lastMapLocalityLevel和timeWaitedForLocalMap值,以便下次正确的选择Map任务。
14.FairScheduler.assignTask():
if (!foundTask) { if (taskType == TaskType.MAP) { mapRejected = true; } else { reduceRejected = true; } }
该处很简单,判断如果没有选到Map任务或者Reduce任务,则将相应的标志设为true。
for (JobInProgress job: visitedForMap) { if (!launchedMap.contains(job)) { infos.get(job).skippedAtLastHeartbeat = true; } }
visitedForMap该值在进行调度Map任务时每访问一个Job都会被记录在该队列中,如果被访问的Job并不在launchedMap队列(存放被选中Map任务的Job)中,则将该Job对应的JobInfo的skippedAtLastHeartbeat参数设为true,表示本次心跳没有选择该Job的Map任务。这个skippedAtLastHeartbeat参数会影响Job的timeWaitedForLocalMap值,具体可以参考FairScheduler的updateLocalityWaitTimes()方法。
以上就是FairScheduler调度任务源码的一些简单的解析,如有错误之处,请指出,谢谢。