zoukankan      html  css  js  c++  java
  • TaskTracker学习笔记

    转自:http://blog.csdn.net/androidlushangderen/article/details/41477061

     上次分析完JobTracker通过TaskScheduler如何把作业分配给TaskTracker,这次把目光 移动到TaskTracker上面。TaskTracker在这里其实是一个slave的从属关系。我在后面的分析会通过TaskTracker的执行流程,主要讲他的2个过程的分析1.作业启动执行2.与JobTracker的heatbeat的过程。2个过程都是非常的典型。

             与JobTracker一样,TaskTracker也是作为一项服务所运行的,他也有自己的main函数入口。下面是一张全局的TaskTracker执行过程流程图:

    jvmManager负责为每个Task分配一个java虚拟机环境让其执行,避免任务之间的干扰,TaskMemoryManager负责任务内存的监控,对于某些任务恶意消耗资源内存,会给予杀死此任务的处理。

            1.TaskTracker任务启动

            下面从main函数的入口开始分析一下TaskTracker的执行流程:

    1. /** 
    2.    * Start the TaskTracker, point toward the indicated JobTracker 
    3.    * taskTracker同样也是一个服务程序,main函数开始执行 
    4.    */  
    5.   public static void main(String argv[]) throws Exception {  
    6.     StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);  
    7.     if (argv.length != 0) {  
    8.       System.out.println("usage: TaskTracker");  
    9.       System.exit(-1);  
    10.     }  
    11.     try {  
    12.       //初始化作业配置  
    13.       JobConf conf=new JobConf();  
    14.       // enable the server to track time spent waiting on locks  
    15.       ReflectionUtils.setContentionTracing  
    16.         (conf.getBoolean("tasktracker.contention.tracking", false));  
    17.       //初始化度量统计系统  
    18.       DefaultMetricsSystem.initialize("TaskTracker");  
    19.       //根据作业配置初始化TaskTracker  
    20.       TaskTracker tt = new TaskTracker(conf);  
    21.       //注册MBean,方便外界工具检测TaskTracker的状态  
    22.       MBeans.register("TaskTracker", "TaskTrackerInfo", tt);  
    23.       //执行TaskTracker服务主程序  
    24.       tt.run();  
    25.     } catch (Throwable e) {  
    26.       LOG.error("Can not start task tracker because "+  
    27.                 StringUtils.stringifyException(e));  
    28.       System.exit(-1);  
    29.     }  
    30.   }  

    让后我们进入其中的执行主程序tt.run():

    1. /** 
    2.    * The server retry loop.   
    3.    * This while-loop attempts to connect to the JobTracker.  It only  
    4.    * loops when the old TaskTracker has gone bad (its state is 
    5.    * stale somehow) and we need to reinitialize everything. 
    6.    */  
    7.   public void run() {  
    8.     try {  
    9.       getUserLogManager().start();  
    10.       //开启CleanUp清理线程  
    11.       startCleanupThreads();  
    12.       boolean denied = false;  
    13.       while (running && !shuttingDown && !denied) {  
    14.         boolean staleState = false;  
    15.         try {  
    16.           // This while-loop attempts reconnects if we get network errors  
    17.           while (running && !staleState && !shuttingDown && !denied) {  
    18.             try {  
    19.               //offerService()执行了核心的启动操作  
    20.               State osState = offerService();  
    21.               if (osState == State.STALE) {  
    22.                 staleState = true;  
    23.               } else if (osState == State.DENIED) {  
    24.                 denied = true;  
    25.               }  
    26.               ......  

    我们可以看到,这里通过while操作,循环进行服务操作,如果拒绝服务,则会shutdown中断服务,服务的主要操作又在offerService方法中:

    1. /** 
    2.    * Main service loop.  Will stay in this loop forever. 
    3.    * 主要的循环服务操作 
    4.    */  
    5.   State offerService() throws Exception {  
    6.       .....  
    7.       // Send the heartbeat and process the jobtracker's directives  
    8.         //发送给JobTracker心跳包  
    9.         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);  
    10.   
    11.         // Note the time when the heartbeat returned, use this to decide when to send the  
    12.         // next heartbeat     
    13.         lastHeartbeat = System.currentTimeMillis();  
    14.           
    15.         ....          
    16.         //在这里获取了心跳回应中的action命令  
    17.         TaskTrackerAction[] actions = heartbeatResponse.getActions();  
    18.         if(LOG.isDebugEnabled()) {  
    19.           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +   
    20.                     heartbeatResponse.getResponseId() + " and " +   
    21.                     ((actions != null) ? actions.length : 0) + " actions");  
    22.         }  
    23.         if (reinitTaskTracker(actions)) {  
    24.           return State.STALE;  
    25.         }  
    26.               
    27.         // resetting heartbeat interval from the response.  
    28.         heartbeatInterval = heartbeatResponse.getHeartbeatInterval();  
    29.         justStarted = false;  
    30.         justInited = false;  
    31.         if (actions != null){   
    32.           for(TaskTrackerAction action: actions) {  
    33.             if (action instanceof LaunchTaskAction) {  
    34.               //如果是执行Task任务指令,执行添加任务到任务队列中  
    35.               addToTaskQueue((LaunchTaskAction)action);  
    36.             } else if (action instanceof CommitTaskAction) {  
    37.              //如果是提交任务的指令,则执行后面的操作  
    38.               CommitTaskAction commitAction = (CommitTaskAction)action;  
    39.               if (!commitResponses.contains(commitAction.getTaskID())) {  
    40.                 LOG.info("Received commit task action for " +   
    41.                           commitAction.getTaskID());  
    42.                 commitResponses.add(commitAction.getTaskID());  
    43.               }  
    44.             } else {  
    45.               //其他的指令一律添加到tasksToCleanup队列中等待被处理  
    46.               tasksToCleanup.put(action);  
    47.             }  
    48.           }  
    49.         }  
    50.         .....  

    在这里我省略了比较多的代码,把执行任务相关的核心操作保留了,在这里就开始执行了后面的和Task相关的很多操作了,当然这些任务都是通过收到JobTracker的心跳包Response来获得的,在通过获取里面的TaskTrackerAction命令来判断执行的。TaskTrackerAction里面包含了1枚举类,包括了以下的相关指令:



    具体什么意思,看上面的英文解释就能理解了吧,上面代表了6种命令操作,我们侧重看第一个launch_task的命令执行,在上面的判断执行方法是addToTaskQueue();方法:

    1. private void addToTaskQueue(LaunchTaskAction action) {  
    2.     //任务类型加入到任务待执行的容器中  
    3.     if (action.getTask().isMapTask()) {  
    4.       mapLauncher.addToTaskQueue(action);  
    5.     } else {  
    6.       reduceLauncher.addToTaskQueue(action);  
    7.     }  
    8.   }  

    这里的mapLauncher,reduceLauncher的类型是TaskLauncher,他是一个线程类:

    1. class TaskLauncher extends Thread {  
    2.     private IntWritable numFreeSlots;  
    3.     private final int maxSlots;  
    4.     private List<TaskInProgress> tasksToLaunch;  
    5.    ....  

    也就是说,待执行的map,Reduce任务都是添加到taskToLauch中的,

    1. public void addToTaskQueue(LaunchTaskAction action) {  
    2.       //新建1个TIP,并加入tasksToLaunch列表  
    3.       synchronized (tasksToLaunch) {  
    4.         TaskInProgress tip = registerTask(action, this);  
    5.         tasksToLaunch.add(tip);  
    6.         //唤醒所有被tasksToLaunch wait的操作,说明此时有新的任务了  
    7.         tasksToLaunch.notifyAll();  
    8.       }  
    9.     }  

    加入之后唤醒相应的操作,这个就很好理解了,一定是在empty的时候被阻塞住了,

    1. public void run() {  
    2.       while (!Thread.interrupted()) {  
    3.         try {  
    4.           TaskInProgress tip;  
    5.           Task task;  
    6.           synchronized (tasksToLaunch) {  
    7.             while (tasksToLaunch.isEmpty()) {  
    8.               tasksToLaunch.wait();  
    9.             }  
    10.             //get the TIP  
    11.             tip = tasksToLaunch.remove(0);  
    12.             task = tip.getTask();  
    13.             LOG.info("Trying to launch : " + tip.getTask().getTaskID() +   
    14.                      " which needs " + task.getNumSlotsRequired() + " slots");  
    15.           }  
    16.           //wait for free slots to run  
    17.           .....  
    18.           //got a free slot. launch the task  
    19.           startNewTask(tip);  

    到了startNewTask就是开始所谓的任务了。到此为止,TaskTracker的任务执行这条路,我们算彻底打通了,相关时序图如下:

            2.Heateat过程

            下面我们看另外一个流程,心跳机制。此过程的实现同样的主要是在offerService的循环操作中。首先第一步,判断是否到了发送心跳包的时间,因为心跳包是隔周期性的时间发送的,所以这里必须会进行判读:

    1. /** 
    2.    * Main service loop.  Will stay in this loop forever. 
    3.    * 主要的循环服务操作 
    4.    */  
    5.   State offerService() throws Exception {  
    6.     long lastHeartbeat = System.currentTimeMillis();  
    7.   
    8.     while (running && !shuttingDown) {  
    9.       try {  
    10.         long now = System.currentTimeMillis();  
    11.           
    12.         // accelerate to account for multiple finished tasks up-front  
    13.         //判断上次心跳的时间+心跳等待时间是否已经到了当前时间,如果到了可以发送新的心跳包  
    14.         long remaining =   
    15.           (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;  
    16.         //如果还没到,时间有剩余,则要强行等待剩余的时间  
    17.         while (remaining > 0) {  
    18.           // sleeps for the wait time or   
    19.           // until there are *enough* empty slots to schedule tasks  
    20.           synchronized (finishedCount) {  
    21.             finishedCount.wait(remaining);  
    22.               
    23.             // Recompute  
    24.             now = System.currentTimeMillis();  
    25.             remaining =   
    26.               (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;  
    27.               
    28.             if (remaining <= 0) {  
    29.               // Reset count   
    30.               finishedCount.set(0);  
    31.               break;  
    32.             }  
    33.           }  
    34.         }  
    35.         .....  

    假设已经到达了发送时间了,会执行后面的操作,检测版本号,TaskTracker和JobTracker的版本号必须一致:

    1. .....  
    2.         // If the TaskTracker is just starting up:  
    3.         // 1. Verify the buildVersion  
    4.         // 2. Get the system directory & filesystem  
    5.         if(justInited) {  
    6.          //验证版本号,如果版本号不对,则返回拒绝状态  
    7.           String jobTrackerBV = jobClient.getBuildVersion();  
    8.           if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {  
    9.             String msg = "Shutting down. Incompatible buildVersion." +  
    10.             " JobTracker's: " + jobTrackerBV +   
    11.             " TaskTracker's: "+ VersionInfo.getBuildVersion();  
    12.             LOG.error(msg);  
    13.             try {  
    14.               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);  
    15.             } catch(Exception e ) {  
    16.               LOG.info("Problem reporting to jobtracker: " + e);  
    17.             }  
    18.             return State.DENIED;  
    19.           }  

    如果通过上述2个验证,基本上就达到了发送的条件了,下面就准备发送操作了:

    1. // Send the heartbeat and process the jobtracker's directives  
    2.         //发送给JobTracker心跳包  
    3.         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);  

    就是在上面这个方法中实现了发送的操作,此方法的返回值是JobTracker的心跳回复包,里面就包含着刚刚的TaskTrackerAction命令信息。我们进入transmitHeartBeat。之前分析过,心跳机制的有1个主要的作用就是汇报TaskTracker的资源使用情况和作业执行情况给JobTracker节点。以此可以让主节点可以进行资源调配。所以在上面的这个方法必不可少的操作是构建TaskTracker的Status状态信息。这个类包含的信息还比较多。下面是主要的此类的关系结构:

    里面2大包含类ResourceStatus(TaskTracker资源使用情况),TaskTrackerHealthStatus(TaskTracker节点健康状况)。首先当然是新建一个Status了:

    1. /** 
    2.    * Build and transmit the heart beat to the JobTracker 
    3.    * 将TaskTracker自身的状态信息发送给JobTracker,并获得一个心跳包的回应 
    4.    * @param now current time 
    5.    * @return false if the tracker was unknown 
    6.    * @throws IOException 
    7.    */  
    8.   HeartbeatResponse transmitHeartBeat(long now) throws IOException {  
    9.     ....  
    10.     //   
    11.     // Check if the last heartbeat got through...   
    12.     // if so then build the heartbeat information for the JobTracker;  
    13.     // else resend the previous status information.  
    14.     //  
    15.     if (status == null) {  
    16.       synchronized (this) {  
    17.         status = new TaskTrackerStatus(taskTrackerName, localHostname,   
    18.                                        httpPort,   
    19.                                        cloneAndResetRunningTaskStatuses(  
    20.                                          sendCounters),   
    21.                                        failures,   
    22.                                        maxMapSlots,  
    23.                                        maxReduceSlots);   
    24.       }  
    25.    

    后面就是各种获取节点CPU,内存等基本信息,这里就不列举了,不过这里提一点,对于TaskTracker是否还能运行任务,在这里是通过TaskTracker是否达到了它的maxSlot上限作为1个标准。一般1个Reduce Task占据1个slot单元,1个Map Task同样占据1个Slot单元,如果1个TaskTracker结点拥有好多slot单元,那么他就可以运行很多Task。

    1. //  
    2.     // Check if we should ask for a new Task  
    3.     // 检测TaskTracker是否需要一个新 Task任务  
    4.     //  
    5.     boolean askForNewTask;  
    6.     long localMinSpaceStart;  
    7.     synchronized (this) {  
    8.       //通过判断当前所占据的slots数量是否已经达到最大slot的数量作为标准  
    9.       askForNewTask =   
    10.         ((status.countOccupiedMapSlots() < maxMapSlots ||   
    11.           status.countOccupiedReduceSlots() < maxReduceSlots) &&   
    12.          acceptNewTasks);   
    13.       localMinSpaceStart = minSpaceStart;  
    14.     }  

    askForNewTask布尔类型就代表TaskTracker是否还能运行新的任务,封装好了这些Status信息之后,就要执行关键的发送步骤了:

    1. //  
    2. // Xmit the heartbeat  
    3. // 通过JobClient发送给JobTracker,并获得1个回复  
    4. //  
    5. HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
    6.                                                           justStarted,  
    7.                                                           justInited,  
    8.                                                           askForNewTask,   
    9.                                                           heartbeatResponseId);  

    是通过JobClient的方法发送的。得到的heartbeatResponse返回结果就是JobTracker结果了。至于里面JobClient具体怎么发送就不是本次分析的重点了,HeartBeat也分析完毕。同样看一下流程图:

          

         总结

         2个过程都是在offerService核心服务程序中执行的。了解完JobTracker和TaskTracker的工作原理,在聊了具体Task任务的执行的5个阶段,从微观Task细节的执行到宏观上作业调度的原理分析理解,的确对MapReduce计算模型的理解上升了许多的层次。

  • 相关阅读:
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    vue element 表单验证不通过,滚动到固对应位置
    vue源码之数据驱动
    vue源码之数据驱动
    vue源码之数据驱动
    每天一点点之数据结构与算法
    每天一点点之数据结构与算法
    python案例
    python案例
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5044009.html
Copyright © 2011-2022 走看看