zoukankan      html  css  js  c++  java
  • TaskTracker启动过程源码级分析

      TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的。

      TaskTracker是JobTracker和Task之间的桥梁:一方面,从JobTracker接收并执行各种命令:运行任务、提交任务、杀死任务等;另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker。TaskTracker与JobTracker和Task之间采用了RPC协议进行通信。

      TaskTracker的功能:1.汇报心跳:Tracker周期性将所有节点上各种信息通过心跳机制汇报给JobTracker。这些信息包括两部分:机器级别信息:节点健康情况、资源使用情况等;任务级别信息:任务执行进度、任务运行状态等。2.执行命令:JobTracker会给TaskTracker下达各种命令,主要包括:启动任务(LaunchTaskAction)、提交任务(CommitTaskAction)、杀死任务(KillTaskAction)、杀死作业(KillJobAction)和重新初始化(TaskTrackerReinitAction)。
      main()方法中主要工作就是初始化TaskTracker并启动它。
      一、初始化:TaskTracker tt = new TaskTracker(conf),TaskTracker(conf)获取本机的一些配置信息,初始化服务器并启动服务器(StatusHttpServer)。
      获取map和reduce最大的slot数量;获取jobTracker的地址jobTrackAddr;构造TaskTracker的HttpServer,可以从浏览器查看该tasktracker的运行情况及所有task;获取从HDFS下载文件的本地目录localStorage(即"mapred.local.dir");构造一个TaskController(是一个接口)默认是DefaultTaskController,用来控制任务的初始化、终结、清理tasks的,也可以启动和杀死tasks的JVM;调用initialize()方法,该方法是真正的构造函的地方,使用单独的方法可以再次调用;启动HttpServer。
      initialize()方法会清理一些历史文件(第一次启动TaskTracker不会有历史文件),并新建一些目录;初始化一些参数;新建一个JvmManager来管理着mapJvmManager和reduceJvmManager,这俩都是JvmManagerForType,JvmManager.launchJvm(TaskRunner t, JvmEnv env)方法会启动Task JVM运行Task,这个过程还比较复杂后续再说;构造taskReportServer,Task通过RPC向该Server汇报进度;构造一个distributedCacheManager分布式缓存管理并启动CleanupThread线程周期性的检查和清理不再使用的文件;然后实例化InterTrackerProtocol的jobClient,InterTrackerProtocol是用于TaskTracker和JobTracker通信的协议;启动一个MapEventsFetcherThread线程用于获取Map任务的输出数据信息,获取已运行完成的Map Task列表,为Reduce Task远程拷贝数据做准备;初始化内存,做任务内存监控;构造两个TaskLauncher线程:mapLauncher和reduceLauncher并启动,这个线程的run函数就是不断监测taskToLaunch队列中是否有新的 TaskTracker.TaskInProgress对象加入。如果有则从中取出一个对象,然后调用TaskTracker类的 startNewTask(TaskInProgress tip)来启动一个task。
      二、启动:tt.run()启动TaskTracker线程,维护一个和JobTracker通信的链接,其中run函数主要调用了offerService函数。JobTracker从不会主动与TaskTracker通信,总是被动等待TaskTracker汇报信息并领取其对应的命令。  
     1   public void run() {
     2     try {
     3       getUserLogManager().start();
     4       startCleanupThreads();
     5       boolean denied = false;
     6       while (running && !shuttingDown && !denied) {
     7         boolean staleState = false;
     8         try {
     9           // This while-loop attempts reconnects if we get network errors
    10           while (running && !staleState && !shuttingDown && !denied) { //如果连接断开,则会循环尝试连接JobTracker
    11             try {
    12                 //如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。
    13                 //这个循环会每隔一段时间与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。
    14               State osState = offerService();    
    15               if (osState == State.STALE) {
    16                 staleState = true;
    17               } else if (osState == State.DENIED) {
    18                 denied = true;
    19               }
    20             } catch (Exception ex) {
    21               if (!shuttingDown) {
    22                 LOG.info("Lost connection to JobTracker [" +
    23                          jobTrackAddr + "].  Retrying...", ex);
    24                 try {
    25                   Thread.sleep(5000);
    26                 } catch (InterruptedException ie) {
    27                 }
    28               }
    29             }
    30           }
    31         } finally {
    32           close();
    33         }
    34         if (shuttingDown) { return; }
    35         LOG.warn("Reinitializing local state");
    36         initialize();    //初始化所有成员和参数
    37       }
    38       if (denied) {
    39         shutdown();
    40       }
    41     } catch (IOException iex) {
    42       LOG.error("Got fatal exception while reinitializing TaskTracker: " +
    43                 StringUtils.stringifyException(iex));
    44       return;
    45     }
    46     catch (InterruptedException i) {
    47       LOG.error("Got interrupted while reinitializing TaskTracker: " +
    48           i.getMessage());
    49       return;
    50     }
    51   }

      getUserLogManager().start()会启动两个线程:用户日志清理线程和监控日志行为线程;会启动一个taskCleanupThread线程,这个线程会始终监视BlockingQueue<TaskTrackerAction> tasksToCleanup(用来存放要被清理的TaskInProgress),里面存储的是KillJobAction或者是KillTaskAction类型;接下来是两层的while循环,其中内层循环是一旦连接断开就循环尝试连接JobTracker,循环主体是State osState = offerService(),offerService()方法一旦执行如果没有其它情况将会一直执行,如果内层while循环退出则尝试重新initialize(),如果offerService()方法返回的状态是DENIED则退出所有循环并执行shutdown()。

       offerService()方法代码较多,如下:
      1 /**
      2    * Main service loop.  Will stay in this loop forever.
      3    */
      4   State offerService() throws Exception {
      5     long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间
      6     //此循环主要根据控制完成task个数控制心跳间隔。//TaskTracker进行是一直存在的
      7     while (running && !shuttingDown) {
      8       try {
      9         long now = System.currentTimeMillis();//获得当前时间    
     10         
     11         // accelerate to account for multiple finished tasks up-front
     12       //通过完成的任务数动态控制心跳间隔时间  
     13         long remaining = 
     14           (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
     15         while (remaining > 0) {
     16           // sleeps for the wait time or 
     17           // until there are *enough* empty slots to schedule tasks
     18           synchronized (finishedCount) {
     19             finishedCount.wait(remaining);
     20             
     21             // Recompute
     22             now = System.currentTimeMillis();
     23             remaining = 
     24               (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
     25             
     26             if (remaining <= 0) {
     27               // Reset count 
     28               finishedCount.set(0);
     29               break;
     30             }
     31           }
     32         }
     33 
     34         // If the TaskTracker is just starting up:
     35         // 1. Verify the buildVersion
     36         // 2. Get the system directory & filesystem
     37         if(justInited) {
     38           String jobTrackerBV = jobClient.getBuildVersion();
     39           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
     40             String msg = "Shutting down. Incompatible buildVersion." +
     41             "
    JobTracker's: " + jobTrackerBV + 
     42             "
    TaskTracker's: "+ VersionInfo.getBuildVersion();
     43             LOG.error(msg);
     44             try {
     45               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
     46             } catch(Exception e ) {
     47               LOG.info("Problem reporting to jobtracker: " + e);
     48             }
     49             return State.DENIED;
     50           }
     51           
     52           String dir = jobClient.getSystemDir();
     53           if (dir == null) {
     54             throw new IOException("Failed to get system directory");
     55           }
     56           systemDirectory = new Path(dir);
     57           systemFS = systemDirectory.getFileSystem(fConf);
     58         }
     59 
     60         now = System.currentTimeMillis();
     61         if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
     62           localStorage.checkDirs();
     63           lastCheckDirsTime = now;
     64           int numFailures = localStorage.numFailures();
     65           // Re-init the task tracker if there were any new failures
     66           if (numFailures > lastNumFailures) {
     67             lastNumFailures = numFailures;
     68             return State.STALE;
     69           }
     70         }
     71 
     72         // Send the heartbeat and process the jobtracker's directives
     73         //在transmitHeartBeat()函数处理中,
     74         //TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况,
     75         //检查目前执行的Task数目以及本地磁盘的空间使用情况等,
     76         //如果可以接收新的Task则设置heartbeat()的askForNewTask参数为true。
     77         //然后通过IPC接口调用JobTracker的heartbeat()方法发送过去,
     78         //heartbeat()返回值TaskTrackerAction数组。
     79         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//发送Heartbeat到JobTracker,得到response 
     80 
     81         // Note the time when the heartbeat returned, use this to decide when to send the
     82         // next heartbeat   
     83         lastHeartbeat = System.currentTimeMillis();
     84         
     85         // Check if the map-event list needs purging
     86         Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
     87         if (jobs.size() > 0) {
     88           synchronized (this) {
     89             // purge the local map events list
     90             for (JobID job : jobs) {
     91               RunningJob rjob;
     92               synchronized (runningJobs) {
     93                 rjob = runningJobs.get(job);          
     94                 if (rjob != null) {
     95                   synchronized (rjob) {
     96                     FetchStatus f = rjob.getFetchStatus();
     97                     if (f != null) {
     98                       f.reset();
     99                     }
    100                   }
    101                 }
    102               }
    103             }
    104 
    105             // Mark the reducers in shuffle for rollback
    106             synchronized (shouldReset) {
    107               for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
    108                    : runningTasks.entrySet()) {
    109                 if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
    110                   this.shouldReset.add(entry.getKey());
    111                 }
    112               }
    113             }
    114           }
    115         }
    116         //然后调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组。
    117         TaskTrackerAction[] actions = heartbeatResponse.getActions();
    118         if(LOG.isDebugEnabled()) {
    119           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
    120                     heartbeatResponse.getResponseId() + " and " + 
    121                     ((actions != null) ? actions.length : 0) + " actions");
    122         }
    123         if (reinitTaskTracker(actions)) {
    124           return State.STALE;
    125         }
    126             
    127         // resetting heartbeat interval from the response.
    128         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
    129         justStarted = false;
    130         justInited = false;
    131         if (actions != null){ 
    132             //遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用调用addToTaskQueue加入到待执行队列,
    133             //否则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者KillTaskAction等。
    134           for(TaskTrackerAction action: actions) {
    135             if (action instanceof LaunchTaskAction) {
    136               addToTaskQueue((LaunchTaskAction)action);  //如果是运行一个新的Task,则将Action添加到任务队列中,加入TaskLauncher线程的执行队列
    137             } else if (action instanceof CommitTaskAction) {
    138               CommitTaskAction commitAction = (CommitTaskAction)action;
    139               if (!commitResponses.contains(commitAction.getTaskID())) {
    140                 LOG.info("Received commit task action for " + 
    141                           commitAction.getTaskID());
    142                 commitResponses.add(commitAction.getTaskID());
    143               }
    144             } else {//杀死任务或作业
    145               tasksToCleanup.put(action);
    146             }
    147           }
    148         }
    149       //杀死一定时间没没有汇报进度的task 
    150         markUnresponsiveTasks();
    151       //当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间 
    152         killOverflowingTasks();
    153             
    154         //we've cleaned up, resume normal operation
    155         if (!acceptNewTasks && isIdle()) {
    156           acceptNewTasks=true;
    157         }
    158         //The check below may not be required every iteration but we are 
    159         //erring on the side of caution here. We have seen many cases where
    160         //the call to jetty's getLocalPort() returns different values at 
    161         //different times. Being a real paranoid here.
    162         checkJettyPort(server.getPort());
    163       } catch (InterruptedException ie) {
    164         LOG.info("Interrupted. Closing down.");
    165         return State.INTERRUPTED;
    166       } catch (DiskErrorException de) {
    167         String msg = "Exiting task tracker for disk error:
    " +
    168           StringUtils.stringifyException(de);
    169         LOG.error(msg);
    170         synchronized (this) {
    171           jobClient.reportTaskTrackerError(taskTrackerName, 
    172                                            "DiskErrorException", msg);
    173         }
    174         return State.STALE;
    175       } catch (RemoteException re) {
    176         String reClass = re.getClassName();
    177         if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
    178           LOG.info("Tasktracker disallowed by JobTracker.");
    179           return State.DENIED;
    180         }
    181       } catch (Exception except) {
    182         String msg = "Caught exception: " + 
    183           StringUtils.stringifyException(except);
    184         LOG.error(msg);
    185       }
    186     }
    187 
    188     return State.NORMAL;
    189   }
    View Code

      方法中有一个循环,没有其他状况会一直循环执行,循环内首先会阻塞心跳间隔时间,心跳间隔是动态的会不断修正的;如果TaskTracker是刚刚启动,需要先确认版本一致否则直接返回State.DENIED,然后获取system directory和filesystem;超过磁盘检查间隔就对磁盘进行检查,是否有损坏,然后检查localStorage.numFailures(),如果大于lastNumFailures,则直接返回State.STALE,对TaskTracker重新初始化; transmitHeartBeat(now)方法会发送心跳到JobTracker,并返回心跳响应信息heartbeatResponse,相应信息包括两部分,一个是作业集合recoveredJobs它是上次关闭JobTracker时正在运行的作业集合,重启JobTracker后需要恢复这些作业的运行状态(前提是用户启用了作业恢复功能),而TaskTracker收到该作业集合后,需重置这些作业对应Reduce Task的FetchStatus信息,从而迫使这些Reduce Task重新从Map Task端拷贝数据,另一部分是需要执行的命令列表;heartbeatResponse.getActions()获取JobTracker发送过来的指令数组,首先要检查的是是否有重新初始化TaskTracker的action,如果有则返回State.STALE,会重新初始化TaskTracker;修正心跳间隔;遍历actions数组,根据action的类型加入不同的数据结构中,是LaunchTaskAction:如果是action.getTask().isMapTask()==true,则将action加入mapLauncher,否则加入reduceLauncher,这两个launcher都是TaskLauncher(该类是TaskTracker类的一个内部类,具有一个数据成员,是 TaskTracker.TaskInProgress类型的队列。在此特别注意,在TaskTracker类内部所提到的TaskInProgress 类均为TaskTracker的内部类,我们用TaskTracker.TaskInProgress表示,一定要和mapred包中的 TaskInProgress类区分,后者我们直接用TaskInProgress表示);是CommitTaskAction:则放入commitResponses,commitResponses.add(commitAction.getTaskID());其他的类型放入tasksToCleanup.put(action),表示要清理,如执行KillJobAction或者KillTaskAction等。markUnresponsiveTasks()方法遍历runningTasks杀死一定时间没没有汇报进度的task(purgeTask(tip, true)方法)。killOverflowingTasks()方法,当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间,期间不接受任何新的task,acceptNewTasks=false,通过findTaskToKill(null)方法(会遍历runningTasks,优先考虑杀死reduce任务)找到合适的TaskInProgress killMe,执行purgeTask(killMe, false)。一旦空间不足而出现杀死task的情况出现,就会一直不接受任何新的task,直到所有的task执行完毕和所有的清理task也执行完毕,但仍会正常向JobTracker发送心跳信息,信息内容就会有所变化。

      transmitHeartBeat(now)方法是构造并将心跳信息发送到JobTracker。主要是构造一个TaskTrackerStatus这是要发送的东西,其内容包括任务的运行状态信息、TaskTracker资源信息、健康状态信息。如果可以接收新的Task则设置askForNewTask参数为true。当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数;当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数;acceptNewTasks==true。如果askForNewTask==true则对TaskTrackerStatus的实例status进行一些设置。然后对healthStatus = status.getHealthStatus()中的healthStatus进行一些设置。然后向JobTracker发送心跳并接受相应信息HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted,justInited,askForNewTask, heartbeatResponseId)这是一个RPC调用,具体可以查看JobTracker.heartbeat方法,后续再讲。调用成功后更新heartbeatResponseId。遍历所有task的状态TaskStatus,对那些状态为SUCCEEDED或者FAILED或者KILLED,做一些统计信息,根据task类型使得mapTotal或者reduceTotal减一,runningTasks去除这个task,然后清空runningTasks中所有的TaskInProgress.getStatus().clearStatus()状态信息,这些状态信息是瞬时的,仅发送一次,status = null,这些瞬时的状态信息是在构造TaskTrackerStatus时通过cloneAndResetRunningTaskStatuses(sendCounters)生成的。最后返回心跳结果heartbeatResponse。

      这样offerService()方法通过while循环一直阻塞一定的心跳间隔,然后获取JobTracker的心跳应答信息,根据其中的action添加到不同的数据结构中,并做一些检查控制TaskTracker能够较为合理的运行,总是一遍又一遍的做这些。

      

      至此,TaskTracker的启动过程讲解完了,错误之处还请指正。

      参考:1、http://www.2cto.com/kf/201311/260826.html
         2、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》
  • 相关阅读:
    Java
    maven打包
    maven
    memset用法祥解
    HTML5 canvas save和restore方法讲解
    修复无线链接时断时连
    VirtualBox内Linux系统怎样与Windows共享文件夹
    Ubuntu默认密码,及其修改
    mysql中char与varchar的区别分析
    javax.servlet包介绍
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3745283.html
Copyright © 2011-2022 走看看