zoukankan      html  css  js  c++  java
  • JobTracker作业调度分析

    转自:http://blog.csdn.net/Androidlushangderen/article/details/41408517

    JobTracker的作业调度给我感觉就是比较宏观意义上的操作。倘若你只了解了MapReduce的工作原理是远远不够的,这时去学习一下他在宏观层面的原理实现也是对我们非常有帮助的。首先我们又得从上次分析的任务提交之后的操作说起,Job作业通过RPC通信提交到JobTracker端之后,接下来会触发到下面的方法;

    1. /** 
    2.    * 初始化作业操作 
    3.    */  
    4.   public void initJob(JobInProgress job) {  
    5.     if (null == job) {  
    6.       LOG.info("Init on null job is not valid");  
    7.       return;  
    8.     }  
    9.               
    10.     try {  
    11.       JobStatus prevStatus = (JobStatus)job.getStatus().clone();  
    12.       LOG.info("Initializing " + job.getJobID());  
    13.       //初始化Task任务  
    14.       job.initTasks();  
    15.       ......  

    接着会执行initTasks的方法,但不是JobTracker,而是JobInProgress类中的方法:

    1. /** 
    2.   * Construct the splits, etc.  This is invoked from an async 
    3.   * thread so that split-computation doesn't block anyone. 
    4.   */  
    5.  public synchronized void initTasks()   
    6.  throws IOException, KillInterruptedException, UnknownHostException {  
    7.    if (tasksInited || isComplete()) {  
    8.      return;  
    9.    }  
    10.    ......  
    11.      
    12.    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);  
    13.    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);  
    14.    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);  
    15.    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);  
    16.   
    17.    //根据numMapTasks任务数,创建MapTask的总数  
    18.    maps = new TaskInProgress[numMapTasks];  
    19.    for(int i=0; i < numMapTasks; ++i) {  
    20.      inputLength += splits[i].getInputDataLength();  
    21.      maps[i] = new TaskInProgress(jobId, jobFile,   
    22.                                   splits[i],   
    23.                                   jobtracker, conf, this, i, numSlotsPerMap);  
    24.    }  
    25.    ......  
    26.   
    27.    //  
    28.    // Create reduce tasks  
    29.    //根据numReduceTasks,创建Reduce的Task数量  
    30.    this.reduces = new TaskInProgress[numReduceTasks];  
    31.    for (int i = 0; i < numReduceTasks; i++) {  
    32.      reduces[i] = new TaskInProgress(jobId, jobFile,   
    33.                                      numMapTasks, i,   
    34.                                      jobtracker, conf, this, numSlotsPerReduce);  
    35.      nonRunningReduces.add(reduces[i]);  
    36.    }  
    37.   
    38.    ......  
    39.      
    40.    // create cleanup two cleanup tips, one map and one reduce.  
    41.    //创建2个clean up Task任务,1个是Map Clean-Up Task,一个是Reduce Clean-Up Task   
    42.    cleanup = new TaskInProgress[2];  
    43.   
    44.    // cleanup map tip. This map doesn't use any splits. Just assign an empty  
    45.    // split.  
    46.    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;  
    47.    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,   
    48.            jobtracker, conf, this, numMapTasks, 1);  
    49.    cleanup[0].setJobCleanupTask();  
    50.   
    51.    // cleanup reduce tip.  
    52.    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,  
    53.                       numReduceTasks, jobtracker, conf, this, 1);  
    54.    cleanup[1].setJobCleanupTask();  
    55.   
    56.    // create two setup tips, one map and one reduce.  
    57.    //原理同上  
    58.    setup = new TaskInProgress[2];  
    59.   
    60.    // setup map tip. This map doesn't use any split. Just assign an empty  
    61.    // split.  
    62.    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,   
    63.            jobtracker, conf, this, numMapTasks + 1, 1);  
    64.    setup[0].setJobSetupTask();  
    65.   
    66.    // setup reduce tip.  
    67.    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,  
    68.                       numReduceTasks + 1, jobtracker, conf, this, 1);  
    69.    setup[1].setJobSetupTask();  
    70.      
    71.    ......  

    可以看见,在这里JobInProgress首次被划分为了很多的小的Task任务的形式存在,而这些小的任务是以TaskInProgress的类表示。在这里MapReduce把1个作业做出了如下的分解,numMapTasks个Map Task ,numReduceTasks个Reduce Task,2个CleanUp任务,2个SetUp任务,(Map Reduce,每个各占1个),好,可以大致勾画一下,1个JobInProgress的执行流程了。

         ok,initTask的任务已经完成,也就是说前面初始化的准备工作都已经完成了,后面就等着JobTacker分配作业给TaskTracker了。在这里MapReduce用的是HeartBeat的形式,就是心跳机制,心跳包在这里主要有3个作用:

    1.判断TaskTracker是否活着

    2.获取各个TaskTracker上的资源使用情况和任务的进度

    3.给TaskTracker分配任务

    而这里用到的就是第三作用。HeartBeat的调用形式同样是Hadoop自带的RPC实现方式。JobTracker不会直接分配作业给TaskTracker,中间会经过一个叫TaskScheduler掉调度器,这个可以用户自定义实现,满足不同的需求设计,在Hadoop中有默认的实现,所以你会看到大致这样的一个模型流程:

            所以接下来JobTracker首先会收到很多来自TaskTracker的心跳包,判断此TaskTracker是否是无任务状态的,无任务的话,马上让TaskSchedulera分配任务给他:

    1. public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,   
    2.                                                   boolean restarted,  
    3.                                                   boolean initialContact,  
    4.                                                   boolean acceptNewTasks,   
    5.                                                   short responseId)   
    6.     throws IOException {  
    7.    ....  
    8.         
    9.     //通过心跳机制发送命令回应  
    10.     // Initialize the response to be sent for the heartbeat  
    11.     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);  
    12.     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();  
    13.     boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());  
    14.     // Check for new tasks to be executed on the tasktracker  
    15.     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {  
    16.       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);  
    17.       if (taskTrackerStatus == null) {  
    18.         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);  
    19.       } else {  
    20.         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);  
    21.         //说明此TaskTtracker上无任务了  
    22.         if (tasks == null ) {  
    23.           //为此TaskTracker分配任务  
    24.           tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));  
    25.         }  

    接下来就是TaskScheduler的方法了,不过得找出他的实现类,TaskScheduler只是一个基类:

    1. public synchronized List<Task> assignTasks(TaskTracker taskTracker)  
    2.     throws IOException {  
    3.   TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();   
    4.   ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();  
    5.   final int numTaskTrackers = clusterStatus.getTaskTrackers();  
    6.   final int clusterMapCapacity = clusterStatus.getMaxMapTasks();  
    7.   final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();  
    8.   
    9.   //获取作业队列  
    10.   Collection<JobInProgress> jobQueue =  
    11.     jobQueueJobInProgressListener.getJobQueue();  
    12.    .....  
    13.       for (JobInProgress job : jobQueue) {  
    14.         if (job.getStatus().getRunState() != JobStatus.RUNNING ||  
    15.             job.numReduceTasks == 0) {  
    16.           continue;  
    17.         }  
    18.   
    19.           
    20.         //在这里分配了一个新的Reduce任务  
    21.         Task t =   
    22.           job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,   
    23.                                   taskTrackerManager.getNumberOfUniqueHosts()  
    24.                                   );  
    25.         .....  

    首先获取一个作业列表,在里面挑出一个作业给,在比如从里面挑出1个Reduce的任务区给整个TaskTracker执行,因为我们刚刚已经知道,所有的Task都是以TaskInProgress形式被包含于JobInProgress中的,所以又来到了JobInProgress中了

    1. /** 
    2.    * Return a ReduceTask, if appropriate, to run on the given tasktracker. 
    3.    * We don't have cache-sensitivity for reduce tasks, as they 
    4.    *  work on temporary MapRed files.   
    5.    */  
    6.   public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,  
    7.                                                int clusterSize,  
    8.                                                int numUniqueHosts  
    9.                                               ) throws IOException {  
    10.     .....  
    11.   
    12.     int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts,   
    13.                                     status.reduceProgress());  
    14.     if (target == -1) {  
    15.       return null;  
    16.     }  
    17.       
    18.     //这里继续调用方法,获取目标任务  
    19.     Task result = reduces[target].getTaskToRun(tts.getTrackerName());  
    20.     if (result != null) {  
    21.       addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);  
    22.     }  
    23.   
    24.     return result;  
    25.   }  

    此时就执行了一个TIP就是TaskInProgress里面去执行了,此时的转变就是JIP->TIP的转变。继续往里看,这时候来到的是TaskInProgress的类里面了:

    1. public Task getTaskToRun(String taskTracker) throws IOException {  
    2.     if (0 == execStartTime){  
    3.       // assume task starts running now  
    4.       execStartTime = jobtracker.getClock().getTime();  
    5.     }  
    6.   
    7.     // Create the 'taskid'; do not count the 'killed' tasks against the job!  
    8.     TaskAttemptID taskid = null;  
    9.     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {  
    10.       // Make sure that the attempts are unqiue across restarts  
    11.       int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;  
    12.       //启动一次TA尝试  
    13.       taskid = new TaskAttemptID( id, attemptId);  
    14.       ++nextTaskId;  
    15.     } else {  
    16.       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +  
    17.               " (plus " + numKilledTasks + " killed)"  +   
    18.               " attempts for the tip '" + getTIPId() + "'");  
    19.       return null;  
    20.     }  
    21.   
    22.     //加入到相应的数据结构中  
    23.     return addRunningTask(taskid, taskTracker);  
    24.   }  

    在这里明显的执行了所谓的TA尝试,就是说这是一次Task的尝试执行,因为不能保证这次任务就一定能执行成功。把这次尝试的任务ID加入系统变量中,就来到了addRunningTask,也就是说来到了方法执行的最末尾:

    1. /** 
    2.    * Adds a previously running task to this tip. This is used in case of  
    3.    * jobtracker restarts. 
    4.    * 添加任务 
    5.    */  
    6.   public Task addRunningTask(TaskAttemptID taskid,   
    7.                              String taskTracker,  
    8.                              boolean taskCleanup) {  
    9.     .....  
    10.     //添加任务和taskTracker的映射关系  
    11.     activeTasks.put(taskid, taskTracker);  
    12.     tasks.add(taskid);  
    13.   
    14.     // Ask JobTracker to note that the task exists  
    15.     //在JobTracker中增加一对任务记录  
    16.     jobtracker.createTaskEntry(taskid, taskTracker, this);  
    17.   
    18.     // check and set the first attempt  
    19.     if (firstTaskId == null) {  
    20.       firstTaskId = taskid;  
    21.     }  
    22.     return t;  
    23.   }  

    在这里,就增加了任务和TaskTracker的一些任务运行信息的变量关系。后面就等着TaskTracker自己去把任务挑出来,执行就OK了,上面这个步骤从TIP->TA的转变。我们把这种结构流程叫做“三层多叉树”的方式结构。

    整个作业的调度的时序关系图如下:

  • 相关阅读:
    capwap学习笔记——初识capwap(一)(转)
    capwap学习笔记——capwap的前世今生(转)
    实现一个简单的C++协程库
    c++ 异常处理(1)
    一个浮点数计算的问题
    c++11 中的 move 与 forward
    c++中的左值与右值
    说说尾递归
    boost bind及function的简单实现
    [译] 玩转ptrace (一)
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5044001.html
Copyright © 2011-2022 走看看