zoukankan      html  css  js  c++  java
  • Fair Scheduler中的Delay Schedule分析

      延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。

      延迟调度的大体思想如下:

      若该job找到一个node-local的MapTask,则返回该task;若找不到,则延迟调度。即在nodeLocalityDelay时长内,重新找到一个node-local的MapTask并返回;

      否则等待时长超过nodeLocalityDelay之后,寻找一个rack-local的MapTask并返回;若找不到,则延迟调度。即在rackLocalityDelay时长内,重新找到一个rack-local的MapTask并返回;

      否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。

      FairScheduler.java中关于延迟调度的主要变量:

    1 long nodeLocalityDelay://node-local已经等待的时间
    2 long rackLocalityDelay: //rack-local已经等待的时间
    3 boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过)
    4 timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间
    5 LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别
    6 nodeLocalityDelay = rackLocalityDelay =
    7   Math.min(15000 ,  (long) (1.5 * jobTracker.getNextHeartbeatInterval()));

      

      在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):

     1 /**
     2    * Get the maximum locality level at which a given job is allowed to
     3    * launch tasks, based on how long it has been waiting for local tasks.
     4    * This is used to implement the "delay scheduling" feature of the Fair
     5    * Scheduler for optimizing data locality.
     6    * If the job has no locality information (e.g. it does not use HDFS), this 
     7    * method returns LocalityLevel.ANY, allowing tasks at any level.
     8    * Otherwise, the job can only launch tasks at its current locality level
     9    * or lower, unless it has waited at least nodeLocalityDelay or
    10    * rackLocalityDelay milliseconds depends on the current level. If it
    11    * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
    12    * it can go to any level.
    13    */
    14   protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
    15       long currentTime) {
    16     JobInfo info = infos.get(job);
    17     if (info == null) { // Job not in infos (shouldn't happen)
    18       LOG.error("getAllowedLocalityLevel called on job " + job
    19           + ", which does not have a JobInfo in infos");
    20       return LocalityLevel.ANY;
    21     }
    22     if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
    23       return LocalityLevel.ANY;
    24     }
    25     // Don't wait for locality if the job's pool is starving for maps
    26     Pool pool = poolMgr.getPool(job);
    27     PoolSchedulable sched = pool.getMapSchedulable();
    28     long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
    29     long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
    30     if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
    31         currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
    32       eventLog.log("INFO", "No delay scheduling for "
    33           + job.getJobID() + " because it is being starved");
    34       return LocalityLevel.ANY;
    35     }
    36     // In the common case, compute locality level based on time waited
    37     switch(info.lastMapLocalityLevel) {
    38     case NODE: // Last task launched was node-local
    39       if (info.timeWaitedForLocalMap >=
    40           nodeLocalityDelay + rackLocalityDelay)
    41         return LocalityLevel.ANY;
    42       else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
    43         return LocalityLevel.RACK;
    44       else
    45         return LocalityLevel.NODE;
    46     case RACK: // Last task launched was rack-local
    47       if (info.timeWaitedForLocalMap >= rackLocalityDelay)
    48         return LocalityLevel.ANY;
    49       else
    50         return LocalityLevel.RACK;
    51     default: // Last task was non-local; can launch anywhere
    52       return LocalityLevel.ANY;
    53     }
    54   }
    getAllowedLocalityLevel()

    1. 若lastMapLocalityLevel为Node:

      1)若timeWaitedForLocalMap >= nodeLocalityDelay + rackLocalityDelay,则可以调度off-switch及以下级别的MapTask;

      2)若timeWaitedForLocalMap >= nodeLocalityDelay,则可以调度rack-local及以下级别的MapTask;

      3)否则调度node-local级别的MapTask。

    2. 若lastMapLocalityLevel为Rack:

      1)若timeWaitedForLocalMap >= rackLocalityDelay,则调度off-switch及以下级别的MapTask;

      2)否则调度rack-local及以下级别的MapTask;

    3. 否则调度off-switch及以下级别的MapTask;

      延迟调度的具体工作流程如下(具体工作在FairScheduler.java的assignTasks()方法中完成):

      1 @Override
      2   public synchronized List<Task> assignTasks(TaskTracker tracker)
      3       throws IOException {
      4     if (!initialized) // Don't try to assign tasks if we haven't yet started up
      5       return null;
      6     String trackerName = tracker.getTrackerName();
      7     eventLog.log("HEARTBEAT", trackerName);
      8     long currentTime = clock.getTime();
      9     
     10     // Compute total runnable maps and reduces, and currently running ones
     11     int runnableMaps = 0;
     12     int runningMaps = 0;
     13     int runnableReduces = 0;
     14     int runningReduces = 0;
     15     for (Pool pool: poolMgr.getPools()) {
     16       runnableMaps += pool.getMapSchedulable().getDemand();
     17       runningMaps += pool.getMapSchedulable().getRunningTasks();
     18       runnableReduces += pool.getReduceSchedulable().getDemand();
     19       runningReduces += pool.getReduceSchedulable().getRunningTasks();
     20     }
     21 
     22     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     23     // Compute total map/reduce slots
     24     // In the future we can precompute this if the Scheduler becomes a 
     25     // listener of tracker join/leave events.
     26     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
     27     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
     28     
     29     eventLog.log("RUNNABLE_TASKS", 
     30         runnableMaps, runningMaps, runnableReduces, runningReduces);
     31 
     32     // Update time waited for local maps for jobs skipped on last heartbeat
     33     //备注一
     34     updateLocalityWaitTimes(currentTime);
     35 
     36     // Check for JT safe-mode
     37     if (taskTrackerManager.isInSafeMode()) {
     38       LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
     39       return null;
     40     } 
     41 
     42     TaskTrackerStatus tts = tracker.getStatus();
     43 
     44     int mapsAssigned = 0; // loop counter for map in the below while loop
     45     int reducesAssigned = 0; // loop counter for reduce in the below while
     46     int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
     47     int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
     48     boolean mapRejected = false; // flag used for ending the loop
     49     boolean reduceRejected = false; // flag used for ending the loop
     50 
     51     // Keep track of which jobs were visited for map tasks and which had tasks
     52     // launched, so that we can later mark skipped jobs for delay scheduling
     53     Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
     54     Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
     55     Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
     56 
     57     ArrayList<Task> tasks = new ArrayList<Task>();
     58     // Scan jobs to assign tasks until neither maps nor reduces can be assigned
     59     //备注二
     60     while (true) {
     61       // Computing the ending conditions for the loop
     62       // Reject a task type if one of the following condition happens
     63       // 1. number of assigned task reaches per heatbeat limit
     64       // 2. number of running tasks reaches runnable tasks
     65       // 3. task is rejected by the LoadManager.canAssign
     66       if (!mapRejected) {
     67         if (mapsAssigned == mapCapacity ||
     68             runningMaps == runnableMaps ||
     69             !loadMgr.canAssignMap(tts, runnableMaps,
     70                 totalMapSlots, mapsAssigned)) {
     71           eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
     72           mapRejected = true;
     73         }
     74       }
     75       if (!reduceRejected) {
     76         if (reducesAssigned == reduceCapacity ||
     77             runningReduces == runnableReduces ||
     78             !loadMgr.canAssignReduce(tts, runnableReduces,
     79                 totalReduceSlots, reducesAssigned)) {
     80           eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
     81           reduceRejected = true;
     82         }
     83       }
     84       // Exit while (true) loop if
     85       // 1. neither maps nor reduces can be assigned
     86       // 2. assignMultiple is off and we already assigned one task
     87       if (mapRejected && reduceRejected ||
     88           !assignMultiple && tasks.size() > 0) {
     89         break; // This is the only exit of the while (true) loop
     90       }
     91 
     92       // Determine which task type to assign this time
     93       // First try choosing a task type which is not rejected
     94       TaskType taskType;
     95       if (mapRejected) {
     96         taskType = TaskType.REDUCE;
     97       } else if (reduceRejected) {
     98         taskType = TaskType.MAP;
     99       } else {
    100         // If both types are available, choose the task type with fewer running
    101         // tasks on the task tracker to prevent that task type from starving
    102         if (tts.countMapTasks() + mapsAssigned <=
    103             tts.countReduceTasks() + reducesAssigned) {
    104           taskType = TaskType.MAP;
    105         } else {
    106           taskType = TaskType.REDUCE;
    107         }
    108       }
    109 
    110       // Get the map or reduce schedulables and sort them by fair sharing
    111       List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
    112       //对job进行排序
    113       Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
    114       boolean foundTask = false;
    115       //备注三
    116       for (Schedulable sched: scheds) { // This loop will assign only one task
    117         eventLog.log("INFO", "Checking for " + taskType +
    118             " task in " + sched.getName());
    119         //备注四
    120         Task task = taskType == TaskType.MAP ? 
    121                     sched.assignTask(tts, currentTime, visitedForMap) : 
    122                     sched.assignTask(tts, currentTime, visitedForReduce);
    123         if (task != null) {
    124           foundTask = true;
    125           JobInProgress job = taskTrackerManager.getJob(task.getJobID());
    126           eventLog.log("ASSIGN", trackerName, taskType,
    127               job.getJobID(), task.getTaskID());
    128           // Update running task counts, and the job's locality level
    129           if (taskType == TaskType.MAP) {
    130             launchedMap.add(job);
    131             mapsAssigned++;
    132             runningMaps++;
    133             //备注五
    134             updateLastMapLocalityLevel(job, task, tts);
    135           } else {
    136             reducesAssigned++;
    137             runningReduces++;
    138           }
    139           // Add task to the list of assignments
    140           tasks.add(task);
    141           break; // This break makes this loop assign only one task
    142         } // end if(task != null)
    143       } // end for(Schedulable sched: scheds)
    144 
    145       // Reject the task type if we cannot find a task
    146       if (!foundTask) {
    147         if (taskType == TaskType.MAP) {
    148           mapRejected = true;
    149         } else {
    150           reduceRejected = true;
    151         }
    152       }
    153     } // end while (true)
    154 
    155     // Mark any jobs that were visited for map tasks but did not launch a task
    156     // as skipped on this heartbeat
    157     for (JobInProgress job: visitedForMap) {
    158       if (!launchedMap.contains(job)) {
    159         infos.get(job).skippedAtLastHeartbeat = true;
    160       }
    161     }
    162     
    163     // If no tasks were found, return null
    164     return tasks.isEmpty() ? null : tasks;
    165   }
    assignTasks()

      备注一:updateLocalityWaitTimes()。首先更新自上次心跳以来,timeWaitedForLocalMap的时间,并将所有job 的skippedAtLastHeartbeat设为false;代码如下:

     1 /**
     2    * Update locality wait times for jobs that were skipped at last heartbeat.
     3    */
     4   private void updateLocalityWaitTimes(long currentTime) {
     5     long timeSinceLastHeartbeat = 
     6       (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
     7     lastHeartbeatTime = currentTime;
     8     for (JobInfo info: infos.values()) {
     9       if (info.skippedAtLastHeartbeat) {
    10         info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
    11         info.skippedAtLastHeartbeat = false;
    12       }
    13     }
    14   }
    updateLocalityWaitTimes()

      备注二:在while(true)循环中不断分配MapTask和ReduceTask,直到没有可被分配的为止;在循环中对所有job进行排序;接着在一个for()循环中进行真正的MapTask分配(Schedulable有两个子类,分别代表PoolSchedulable与JobSchedulable。这里的Schedulable可当做job看待)。

      备注三、四:在for()循环里,JobSchedulable中的assignTask()方法会被调用,来选择适当的MapTask或者ReduceTask。在选择MapTask时,先会调用FairScheduler.getAllowedLocalityLevel()方法来确定应该调度哪个级别的MapTask(具体的方法分析见上),然后根据该方法的返回值来选择对应级别的MapTask。assignTask()方法代码如下:

     1 @Override
     2   public Task assignTask(TaskTrackerStatus tts, long currentTime,
     3       Collection<JobInProgress> visited) throws IOException {
     4     if (isRunnable()) {
     5       visited.add(job);
     6       TaskTrackerManager ttm = scheduler.taskTrackerManager;
     7       ClusterStatus clusterStatus = ttm.getClusterStatus();
     8       int numTaskTrackers = clusterStatus.getTaskTrackers();
     9 
    10       // check with the load manager whether it is safe to 
    11       // launch this task on this taskTracker.
    12       LoadManager loadMgr = scheduler.getLoadManager();
    13       if (!loadMgr.canLaunchTask(tts, job, taskType)) {
    14         return null;
    15       }
    16       if (taskType == TaskType.MAP) {
    17           //确定应该调度的级别
    18         LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
    19             job, currentTime);
    20         scheduler.getEventLog().log(
    21             "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
    22         switch (localityLevel) {
    23           case NODE:
    24             return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
    25                 ttm.getNumberOfUniqueHosts());
    26           case RACK:
    27             return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
    28                 ttm.getNumberOfUniqueHosts());
    29           default:
    30             return job.obtainNewMapTask(tts, numTaskTrackers,
    31                 ttm.getNumberOfUniqueHosts());
    32         }
    33       } else {
    34         return job.obtainNewReduceTask(tts, numTaskTrackers,
    35             ttm.getNumberOfUniqueHosts());
    36       }
    37     } else {
    38       return null;
    39     }
    40   }
    assignTask()

      可以看到,在该方法中又会根据相应的级别调用JobInProgress类中的方法来获取该级别的MapTask。

      备注五:最后updateLastMapLocalityLevel()方法会更新该job的一些信息:lastMapLocalityLevel设为该job对应的级别;timeWaitedForLocalMap置为0。

     1   /**
     2    * Update a job's locality level and locality wait variables given that that 
     3    * it has just launched a map task on a given task tracker.
     4    */
     5   private void updateLastMapLocalityLevel(JobInProgress job,
     6       Task mapTaskLaunched, TaskTrackerStatus tracker) {
     7     JobInfo info = infos.get(job);
     8     boolean isNodeGroupAware = conf.getBoolean(
     9         "net.topology.nodegroup.aware", false);
    10     LocalityLevel localityLevel = LocalityLevel.fromTask(
    11         job, mapTaskLaunched, tracker, isNodeGroupAware);
    12     info.lastMapLocalityLevel = localityLevel;
    13     info.timeWaitedForLocalMap = 0;
    14     eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
    15   }
    updateLastMapLocalityLevel()

      本文基于hadoop1.2.1。如有错误,还请指正

      参考文章: 《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

        https://issues.apache.org/jira/secure/attachment/12457515/fair_scheduler_design_doc.pdf

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/4568270.html

  • 相关阅读:
    第四章:(2)原理之 Dubbo 框架设计
    大三学习进度29
    大三学习进度27
    大三学习进度31
    大三学习进度24
    大三学习进度29
    大三学习进度26
    大三学习进度28
    大三学习进度25
    大三学习进度32
  • 原文地址:https://www.cnblogs.com/gwgyk/p/4568270.html
Copyright © 2011-2022 走看看