zoukankan      html  css  js  c++  java
  • Hadoop源码分析25 JobInProgress 主要容器

    1. JobInProgress中的容器

    1.1mapsreducescleanupsetup

     TaskInProgress maps[]= new TaskInProgress[0];

     

     TaskInProgress reduces[]= new TaskInProgress[0];

     

     TaskInProgress cleanup[]= new TaskInProgress[0];

     

     TaskInProgress setup[]= new TaskInProgress[0];

     

    在方法initTasks()中被初始化,

       mapsnew TaskInProgress[numMapTasks];

       for(int i=0; i numMapTasks;++i) {

         maps[i]= new TaskInProgress(jobId,jobFile,

                                      splits[i],

                                      jobtracker,conf,this,i, numSlotsPerMap);

       }

       this.reduces=new TaskInProgress[numReduceTasks];

       for(int i = 0;i numReduceTasks;i++) {

         reduces[i]=  new TaskInProgress(jobId,jobFile,

                                      numMapTasks,i,

                                      jobtracker,conf,this,numSlotsPerReduce);

         nonRunningReduces.add(reduces[i]);

       }

       // createcleanup two cleanup tips, one map and one reduce.

       cleanupnew TaskInProgress[2];

     

       // cleanupmap tip. This map doesn't use any splits. Just assign anempty

       //split.

       TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;

       cleanup[0]= new TaskInProgress(jobId,jobFile,emptySplit,

               jobtracker,conf,this,numMapTasks,1);

       cleanup[0].setJobCleanupTask();

     

       // cleanupreduce tip.

       cleanup[1]= newTaskInProgress(jobId,jobFile,numMapTasks,

                          numReduceTasks,jobtracker,conf,this,1);

       cleanup[1].setJobCleanupTask();

     

       // create twosetup tips, one map and one reduce.

       setup=new TaskInProgress[2];

     

       // setup maptip. This map doesn't use any split. Just assign anempty

       //split.

       setup[0]= new TaskInProgress(jobId,jobFile,emptySplit,

               jobtracker,conf,this,numMapTasks+ 1,1);

       setup[0].setJobSetupTask();

     

       // setupreduce tip.

       setup[1]= new TaskInProgress(jobId,jobFile,numMapTasks,

                          numReduceTasks+ 1,jobtracker,conf,this,1);

       setup[1].setJobSetupTask();

     

    1.2nonRunningMapCache

    //NetworkTopology Node to the set of TIPs

     MapNode,ListTaskInProgress>> nonRunningMapCache;

     

    在方法 initTasks()中被初始化:

    nonRunningMapCache = createCache(splits, maxLevel);

     

    createCache

    private MapNode,ListTaskInProgress>> createCache(

                                    TaskSplitMetaInfo[] splits, int maxLevel)

                                    throws UnknownHostException{

    ....

    }

    即根据 TaskSplitMetaInfo创建 Node TaskInProgress的映射。

     

     

    1.3runningMapCache

     

     // Map ofNetworkTopology Node to set of running TIPs

     MapNode,SetTaskInProgress>> runningMapCache = newIdentityHashMapNode,SetTaskInProgress>>();

    scheduleMap(TaskInProgresstip)中被填充数据:

       for(Stringhost: splitLocations) {

         Node node = jobtracker.getNode(host);

     

         for(int j = 0;j maxLevel;++j) {

           SetTaskInProgresshostMaps= runningMapCache.get(node);

           if(hostMaps== null){

             // create acache if needed

             hostMaps = new LinkedHashSetTaskInProgress();

             runningMapCache.put(node,hostMaps);

           }

           hostMaps.add(tip);

           node = node.getParent();

         }

       }

     

    1.4nonLocalMaps

     // A list ofnon-local, non-running maps

     final ListTaskInProgress nonLocalMaps = new  LinkedListTaskInProgress();;

    createCache(TaskSplitMetaInfo[]splits, intmaxLevel)中被填充数据:

       for(int i = 0;i splits.length;i++) {

         String[] splitLocations = splits[i].getLocations();

         if(splitLocations ==null||splitLocations.length== 0){

           nonLocalMaps.add(maps[i]);

           continue;

         }

       ......

      }

     

    1.5failedMaps

     // Set offailed, non-running maps sorted by #failures

     final SortedSetTaskInProgress failedMaps =  new TreeSetTaskInProgress(failComparator);;

    failedTask(TaskInProgresstip, TaskAttemptID taskid,  TaskStatusstatus,  TaskTracker taskTracker, boolean wasRunning, boolean wasComplete, boolean wasAttemptRunning)中被填充

     

         if (!isComplete){

             retireMap(tip);

             failMap(tip);

           }

          

        

    其中

    private synchronized void failMap(TaskInProgress tip) {

        ......

       failedMaps.add(tip);

     }

     

     

    1.6nonLocalRunningMaps

     

     // A set ofnon-local running maps

    SetTaskInProgress nonLocalRunningMaps =new LinkedHashSetTaskInProgress();

    scheduleMap(TaskInProgresstip) 中被填充

       String[] splitLocations = tip.getSplitLocations();

     

       // Add theTIP to the list of non-local running TIPs

       if(splitLocations ==null||splitLocations.length== 0){

         nonLocalRunningMaps.add(tip);

         return;

       }

     

    1.6nonRunningReduces

     // A list ofnon-running reduce TIPs

    SetTaskInProgress nonRunningReduces= new TreeSetTaskInProgress(failComparator);

     

    initTasks()被初始化

       this.reduces=new TaskInProgress[numReduceTasks];

       for(int i = 0;i numReduceTasks;i++) {

         reduces[i]= new TaskInProgress(jobId,jobFile,

                                      numMapTasks,i,

                                      jobtracker,conf,this,numSlotsPerReduce);

         nonRunningReduces.add(reduces[i]);

       }

     

    failedTask(TaskInProgresstip, TaskAttemptID taskid,  TaskStatusstatus,    TaskTracker taskTracker, boolean wasRunning, boolean wasComplete, boolean wasAttemptRunning)被填充

     

           

       

    if(!isComplete){

             retireReduce(tip);

             failReduce(tip);

     }

     

     private synchronized void failReduce(TaskInProgress tip) {

       。。。。。。

       nonRunningReduces.remove(tip);

     }

     

     

    1.7runningReduces

     

     // A set ofrunning reduce TIPs

     SetTaskInProgress runningReduces new LinkedHashSetTaskInProgress();

     

    调用如下:

     protected synchronized void scheduleReduce(TaskInProgress tip) {

       。。。

       runningReduces.add(tip);

     }

     

     private synchronized void retireReduce(TaskInProgress tip) {

     ......

       runningReduces.remove(tip);

     }

     

    1.8 mapCleanupTasksreduceCleanupTasks

     // A list ofcleanup tasks for the map task attempts, to belaunched

     ListTaskAttemptID mapCleanupTasks=new LinkedListTaskAttemptID();

    // A list ofcleanup tasks for the reduce task attempts, to belaunched

     ListTaskAttemptID reduceCleanupTasks =new  LinkedListTaskAttemptID();

     

     updateTaskStatus(TaskInProgresstip, TaskStatus status) 中填充

     

      if(state ==TaskStatus.State.FAILED_UNCLEAN||

                    state == TaskStatus.State.KILLED_UNCLEAN){

           tip.incompleteSubTask(taskid, this.status);

     

           if(tip.isMapTask()){

             mapCleanupTasks.add(taskid);

           } else{

             reduceCleanupTasks.add(taskid);

           }

           jobtracker.removeTaskEntry(taskid);

         }

     

     

     

    1.9taskCompletionEvents

     ListTaskCompletionEvent taskCompletionEvents=

    new ArrayListTaskCompletionEvent(numMapTasks+numReduceTasks+10);

     

    updateTaskStatus(TaskInProgress tip, TaskStatus status)中填充

     

    taskEvent= new TaskCompletionEvent(taskCompletionEventTracker,

                                               taskid,

                                               tip.idWithinJob(),

                                               status.getIsMap()&&

                                               !tip.isJobCleanupTask() &&

                                               !tip.isJobSetupTask(),

                                               taskCompletionStatus,

                                               httpTaskLogLocation

                                              );

    this.taskCompletionEvents.add(taskEvent);

    taskCompletionEventTracker++;

     

    1.10trackerToFailuresMap

     // Map oftrackerHostName - no. of taskfailures

    MapString,IntegertrackerToFailuresMap= new TreeMapString,Integer();

     

    addTrackerTaskFailure中调用

    synchronized void addTrackerTaskFailure(StringtrackerName TaskTracker taskTracker) {

       if(flakyTaskTrackers (clusterSize*CLUSTER_BLACKLIST_PERCENT)){

         String trackerHostName =convertTrackerNameToHostName(trackerName);

     

         Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);

         if(trackerFailures== null){

           trackerFailures = 0;

         }

         trackerToFailuresMap.put(trackerHostName,++trackerFailures);

       ......

    1.11firstTaskLaunchTimes

     // First*task launch time

     final MapTaskType,LongfirstTaskLaunchTimes=

         new EnumMapTaskType,Long(TaskType.class);

     

    setFirstTaskLaunchTime中调用

     void setFirstTaskLaunchTime(TaskInProgresstip) {

       TaskType key = tip.getFirstTaskType();

     

       synchronized(firstTaskLaunchTimes){

         // Could beoptimized to do only one lookup with a little more code

         if(!firstTaskLaunchTimes.containsKey(key)){

           firstTaskLaunchTimes.put(key,tip.getExecStartTime());

         }

       }

     }

     

    1.12mapTaskIdToFetchFailuresMap

     // Map ofmapTaskId - no. of fetchfailures

     private MapTaskAttemptID,Integer> mapTaskIdToFetchFailuresMap=

       new TreeMapTaskAttemptID,Integer();

     

    fetchFailureNotification(TaskInProgresstip, TaskAttemptID mapTaskId,  StringmapTrackerName, TaskAttemptID reduceTaskId, reduceTrackerName) 调用:

     

    IntegerfetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);

    fetchFailures= (fetchFailures == null)? 1 : (fetchFailures+1);

    mapTaskIdToFetchFailuresMap.put(mapTaskId,fetchFailures);

    1.13trackersReservedForMapstrackersReservedForReduces

     private MapTaskTracker,FallowSlotInfotrackersReservedForMaps=

       new  HashMapTaskTracker,FallowSlotInfo();

     

     private MapTaskTracker,FallowSlotInfotrackersReservedForReduces=

       new HashMapTaskTracker,FallowSlotInfo();

    reserveTaskTracker(TaskTrackertaskTracker, TaskType type, intnumSlots)中调用

    MapTaskTracker,FallowSlotInfomap =(type ==TaskType.MAP) ?  trackersReservedForMaps : trackersReservedForReduces;

    map.put(taskTracker,info);

  • 相关阅读:
    golang 垃圾回收 gc
    LINUX下目标文件的BSS段、数据段、代码段
    使用Golang利用ectd实现一个分布式锁
    KNN算法介绍
    机器学习
    golang map to struct
    NoSQL数据库-MongoDB和Redis
    Go语言中的单引号、双引号、反引号
    广告制胜无它,顺应人性尔——leo鉴书63
    从周迅发布恋情 看百度百科的社会价值
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276495.html
Copyright © 2011-2022 走看看