zoukankan      html  css  js  c++  java
  • Hadoop源码分析26 JobTracker主要容器和线程

    2.  JobTracker中的容器

    2.1 nodesAtMaxLevelhostnameToNodeMap

      //(hostname -- Node(NetworkTopology))

      MapString,Node hostnameToNodeMap =

        Collections.synchronizedMap(new TreeMapString,Node());

    private SetNode nodesAtMaxLevel =

        Collections.newSetFromMap(new ConcurrentHashMapNode,Boolean());

     

     addHostToNodeMapping(Stringhost, String networkLoc) 中:

    node = new NodeBase(host,networkLoc);

    。。。。。。

    hostnameToNodeMap.put(host,node);

    nodesAtMaxLevel.add(getParentNode(node,getNumTaskCacheLevels() - 1));

     

    2.2 jobInProgressListeners

    private final ListJobInProgressListener jobInProgressListeners =

        new CopyOnWriteArrayListJobInProgressListener();

    调用:

    public void addJobInProgressListener(JobInProgressListenerlistener) {

        jobInProgressListeners.add(listener);

      }

    public void removeJobInProgressListener(JobInProgressListenerlistener) {

        jobInProgressListeners.remove(listener);

    }

    private void updateJobInProgressListeners(JobChangeEventevent) {

        for (JobInProgressListenerlistener : jobInProgressListeners){

          listener.jobUpdated(event);

        }

    }

    2.3 jobs

    //All the knownjobs.  (jobid-JobInProgress)

      MapJobID,JobInProgress jobs = 

        Collections.synchronizedMap(new TreeMapJobID,JobInProgress());

     addJob(JobIDjobId, JobInProgress job) 中填充:

        synchronized (jobs){

          synchronized (taskScheduler){

            jobs.put(job.getProfile().getJobID(),job);

            for (JobInProgressListenerlistener : jobInProgressListeners){

              listener.jobAdded(job);

            }

          }

        }

    2.4 userToJobsMap

    //(user - listof JobInProgress)

      TreeMapString,ArrayListJobInProgress>> userToJobsMap =

        new TreeMapString,ArrayListJobInProgress>>();

    finalizeJob(JobInProgressjob) 填充:

    StringjobUser = job.getProfile().getUser();

    synchronized (userToJobsMap){

          ArrayListJobInProgress userJobs= userToJobsMap.get(jobUser);

          if (userJobs== null){

            userJobs=  new ArrayListJobInProgress();

            userToJobsMap.put(jobUser,userJobs);

          }

          userJobs.add(job);

        }

    2.5 trackerToJobsToCleanup

    //(trackerID -- listof jobs to cleanup)

      MapString,SetJobID>> trackerToJobsToCleanup =

        new HashMapString,SetJobID>>();

    addJobForCleanup(JobIDid) 填充:

    synchronized (trackerToJobsToCleanup){

            SetJobID jobsToKill= trackerToJobsToCleanup.get(taskTracker);

            if (jobsToKill== null){

              jobsToKill= new HashSetJobID();

              trackerToJobsToCleanup.put(taskTracker,jobsToKill);

            }

            jobsToKill.add(id);

          }

    2.6 trackerToTasksToCleanup

    //(trackerID -- listof tasks to cleanup)

      MapString,SetTaskAttemptID>> trackerToTasksToCleanup =

        new HashMapString,SetTaskAttemptID>>();

     

    updateTaskStatuses(TaskTrackerStatusstatus) 填充

    if (!job.inited()){

            //if job is not yet initialized ... kill the attempt

            synchronized (trackerToTasksToCleanup){

              SetTaskAttemptID tasks= trackerToTasksToCleanup.get(trackerName);

              if (tasks== null){

                tasks= new HashSetTaskAttemptID();

                trackerToTasksToCleanup.put(trackerName,tasks);

              }

              tasks.add(taskId);

            }

            continue;

          }

     

    2.7 taskidToTIPMaptaskidToTrackerMaptrackerToTaskMap

    //All the known TaskInProgress items, mapped to by taskids(taskid-TIP)

    MapTaskAttemptID,TaskInProgress taskidToTIPMap =

        new TreeMapTaskAttemptID,TaskInProgress();

     

       //(taskid -- trackerID)

      TreeMapTaskAttemptID,String taskidToTrackerMap = new   TreeMapTaskAttemptID,String();

      //(trackerID-TreeSetof taskids running at that tracker)

      TreeMapString,SetTaskAttemptID>> trackerToTaskMap =

        new TreeMapString,SetTaskAttemptID>>();

     

    createTaskEntry(TaskAttemptIDtaskid, String taskTracker, TaskInProgresstip) 填充:

        taskidToTrackerMap.put(taskid,taskTracker);

        taskidToTIPMap.put(taskid,tip);

        SetTaskAttemptID taskset= trackerToTaskMap.get(taskTracker);

        if (taskset== null){

          taskset= new TreeSetTaskAttemptID();

          trackerToTaskMap.put(taskTracker,taskset);

        }

    2.8 hostnameToTaskTrackertrackerExpiryQueue

    //This is used to keep track of all trackers running on one host.While

      //decommissioning the host, all the trackers on the host will belost.

      MapString,SetTaskTracker>> hostnameToTaskTracker =

        Collections.synchronizedMap(new TreeMapString,SetTaskTracker>>());

      TreeSetTaskTrackerStatus trackerExpiryQueue =

        new TreeSetTaskTrackerStatus(

            new ComparatorTaskTrackerStatus() {

                public int compare(TaskTrackerStatusp1, TaskTrackerStatus p2) {

                  if (p1.getLastSeen()  p2.getLastSeen()){

                               return -1;

                  } else if (p1.getLastSeen()  p2.getLastSeen()){

                       return 1;

                  } else {

                       return (p1.getTrackerName().compareTo(p2.getTrackerName()));

                  }

             }

         }

     );

     

     addNewTracker(TaskTrackertaskTracker) 填充:

        TaskTrackerStatusstatus = taskTracker.getStatus();

        trackerExpiryQueue.add(status);

     

        //  Registerthe tracker if its not registered

        Stringhostname = status.getHost();

        if (getNode(status.getTrackerName())== null){

          //Making the network location resolution inline ..

          resolveAndAddToTopology(hostname);

        }

     

        //add it to the set of tracker per host

        SetTaskTracker trackers= hostnameToTaskTracker.get(hostname);

        if (trackers== null){

          trackers= Collections.synchronizedSet(new HashSetTaskTracker());

          hostnameToTaskTracker.put(hostname,trackers);

        }

    2.9 trackerToMarkedTasksMap

      //(trackerID - TreeSetof completed taskids running at that tracker)

      TreeMapString,SetTaskAttemptID>> trackerToMarkedTasksMap =

        new TreeMapString,SetTaskAttemptID>>();

     

      markCompletedTaskAttempt(StringtaskTracker, TaskAttemptID taskid)调用:

        SetTaskAttemptID taskset= trackerToMarkedTasksMap.get(taskTracker);

        if (taskset== null){

          taskset= new TreeSetTaskAttemptID();

          trackerToMarkedTasksMap.put(taskTracker,taskset);

        }

        taskset.add(taskid);

    2.10 trackerToHeartbeatResponseMap

    //(trackerID -- lastsent HeartBeatResponse)

      MapString,HeartbeatResponse trackerToHeartbeatResponseMap =

        new TreeMapString,HeartbeatResponse();

     HeartbeatResponseheartbeat(TaskTrackerStatus status,

                                                      boolean restarted,

                                                      boolean initialContact,

                                                      boolean acceptNewTasks,

                                                      short responseId)中调用

    //Update the trackerToHeartbeatResponseMap

        trackerToHeartbeatResponseMap.put(trackerName,response);

    2.11 taskTrackers

      private HashMapString,TaskTracker taskTrackers =

        new HashMapString,TaskTracker();

    updateTaskTrackerStatus(StringtrackerName, TaskTrackerStatusstatus)调用:

     

          TaskTrackertaskTracker = taskTrackers.get(trackerName);

          if (taskTracker!= null){

            alreadyPresent= true;

          } else {

            taskTracker= new TaskTracker(trackerName);

          }

         

          taskTracker.setStatus(status);

          taskTrackers.put(trackerName,taskTracker);

    2.12 uniqueHostsMap

    MapString,IntegeruniqueHostsMap = new ConcurrentHashMapString,Integer();

    updateTaskTrackerStatus(StringtrackerName, TaskTrackerStatusstatus)调用:

    if (status== null){

            taskTrackers.remove(trackerName);

            IntegernumTaskTrackersInHost =

              uniqueHostsMap.get(oldStatus.getHost());

            if (numTaskTrackersInHost!= null){

              numTaskTrackersInHost--;

              if (numTaskTrackersInHost  0)  {

                uniqueHostsMap.put(oldStatus.getHost(),numTaskTrackersInHost);

              }

              else {

                uniqueHostsMap.remove(oldStatus.getHost());

              }

            }

          }

    addHostCapacity(StringhostName)

            int numTrackersOnHost= 0;

            //add the capacity of trackers on the host

            for (TaskTrackerStatusstatus : getStatusesOnHost(hostName)) {

              int mapSlots= status.getMaxMapSlots();

              totalMapTaskCapacity +=mapSlots;

              int reduceSlots= status.getMaxReduceSlots();

              totalReduceTaskCapacity +=reduceSlots;

              numTrackersOnHost++;

              getInstrumentation().decBlackListedMapSlots(mapSlots);

              getInstrumentation().decBlackListedReduceSlots(reduceSlots);

            }

            uniqueHostsMap.put(hostName,numTrackersOnHost);

            decrBlacklistedTrackers(numTrackersOnHost);

    2.13 内部类ExpireLaunchingTasks launchingTasks

      3.1

    2.14内部类FaultInfo numFaultsblackRfbMapgrayRfbMap

    private static class FaultInfo{

        int[] numFaults;      //timeslice buckets

        private HashMapReasonForBlackListing,String blackRfbMap;

        private HashMapReasonForBlackListing,String grayRfbMap;

    }

    调用:

        void incrFaultCount(long timeStamp){

          checkRotation(timeStamp);

          ++numFaults[bucketIndex(timeStamp)];

        }

        public void addBlacklistedReason(ReasonForBlackListingrfb,

                                         Stringreason, boolean gray){

          if (gray){

            grayRfbMap.put(rfb,reason);

          } else {

            blackRfbMap.put(rfb,reason);

          }

        }

        void setBlacklist(ReasonForBlackListingrfb, String trackerFaultReport,

                          boolean gray){

          if (gray){

            graylisted = true;

            this.grayRfbMap.put(rfb,trackerFaultReport);

          } else {

            blacklisted = true;

            this.blackRfbMap.put(rfb,trackerFaultReport);

          }

        }

     

    2.15内部类FaultyTrackersInfo potentiallyFaultyTrackers

      private class FaultyTrackersInfo{

        //A map from hostName to its faults

        private MapString,FaultInfo potentiallyFaultyTrackers =

                  new HashMapString,FaultInfo();

    }

    调用:

    //Assumes JobTracker is locked on the entry

    private FaultInfogetFaultInfo(StringhostName, boolean createIfNecessary)   {

          FaultInfofi = null;

          synchronized (potentiallyFaultyTrackers){

            fi= potentiallyFaultyTrackers.get(hostName);

            if (fi== null &&createIfNecessary) {

              fi= new FaultInfo(clock.getTime(), NUM_FAULT_BUCKETS,

                                 TRACKER_FAULT_BUCKET_WIDTH_MSECS);

              potentiallyFaultyTrackers.put(hostName,fi);

            }

          }

          return fi;

    }

     

    2.16 内部类RecoveryManagerjobsToRecoverrecoveredTrackershangingAttempts

      class RecoveryManager{

       

        SetJobID jobsToRecover//set of jobs to be recovered

        SetString recoveredTrackers =

          Collections.synchronizedSet(new HashSetString());

        class JobRecoveryListener implements Listener{

                  //Maintains open transactions

             private MapString,String hangingAttempts =

                    new HashMapString,String();

         

        }

    }

    调用:

        void addJobForRecovery(JobIDid) {

          jobsToRecover.add(id);

        }

        private void markTracker(StringtrackerName) {

          recoveredTrackers.add(trackerName);

        }

        private void processTaskAttempt(StringtaskAttemptId,

                                          JobHistory.TaskAttemptattempt)

       {   ......

           hangingAttempts.put(id.getTaskID().toString(),taskAttemptId);

           ......

       }

    2.17 内部类RetireJobsjobIDStatusMapjobRetireInfoQ

      3.3

    3.JobTracker中的内部类线程

    3.1 线程ExpireLaunchingTasks

     

      

    private class ExpireLaunchingTasks implements Runnable{

        

        private MapTaskAttemptID,Long launchingTasks =

          new LinkedHashMapTaskAttemptID,Long();

    }

    launchingTasks中超时的TaskAttemptID设置为失败,主要代码:

    TaskAttemptIDtaskId = pair.getKey();

    TaskInProgresstip =   taskidToTIPMap.get(taskId);

    JobInProgress job =tip.getJob();

    .....

    job.failedTask(tip,taskId, "Errorlaunching task",

                                         tip.isMapTask()?TaskStatus.Phase.MAP:

                                         TaskStatus.Phase.STARTING,

                                         TaskStatus.State.FAILED,

                                         trackerName);

     

     launchingTasks     JobTracker.ExpireLaunchingTasks.addNewTask(TaskAttemptID taskName)

     

    JobTracker.ExpireLaunchingTasks.removeTask(TaskAttemptID taskName)

     

    中被添加、删除。

     

    3.2 线程ExpireTrackers

      ///////////////////////////////////////////////////////

      //Used to expire TaskTrackers that have gone down

      /////////////////////////////////////////////////////// 

    class ExpireTrackers implements Runnable{

    }

    trackerExpiryQueue中超时的TaskTrackers 或者更新,或者移出 hostnameToTaskTracker

    trackerExpiryQueue 在方法JobTracker.addNewTracker(TaskTracker taskTracker)JobTracker.RecoveryManager.recover() 中被添加/删除。

     

    3.3. 线程RetireJobs

      ///////////////////////////////////////////////////////

      //Used to remove old finished Jobs that have been around for toolong

      ///////////////////////////////////////////////////////

    class RetireJobs implements Runnable{

        private final MapJobID,RetireJobInfo jobIDStatusMap =

          new HashMapJobID,RetireJobInfo();

        private final LinkedListRetireJobInfo jobRetireInfoQ =

          new LinkedListRetireJobInfo();

    }

    jobs Allthe knownjobs.  (jobid-JobInProgress) 中已完成且超时的job 移出jobsjobInProgressListeners清理其JobHistory,放入 jobIDStatusMapjobRetireInfoQ 

    jobIDStatusMap存储RetireJobInfo会在JobTracker.getJobCounters(JobIDjobid)JobTracker.getJobProfile(JobID jobid)JobTracker.getJobStatus(JobIDjobid)中调用做

    jobRetireInfoQ存储RetireJobInfo会在JobTracker.generateRetiredJobTable(JobTrackertracker, introwId)JobTracker.getAllJobs()中调用做

     

  • 相关阅读:
    二 ,Smarty模板技术/引擎——变量操作(1)
    一,Smarty模板技术/引擎——简介
    MVC模式学习--雇员管理系统项目开发
    mysqli扩展库---------预处理技术
    drupal7 上传文件中文乱码
    php根据IP获取IP所在城市
    php获取客户端IP
    drupal中安装CKEditor文本编辑器,并配置图片上传功能 之 方法一
    drupal7的node的内容的存储位置
    drupal7 安装百度编辑器Ueditor及后续使用
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276494.html
Copyright © 2011-2022 走看看