zoukankan      html  css  js  c++  java
  • 输出、状态hadoop源码TaskAttemptID TaskTrackerAction JobTracker,FileOutputCommitter相关by小雨

    首先声明,我是一个菜鸟。一下文章中出现技术误导情况盖不负责

        1,TaskAttemptID代表task attempt,一个task attempt就是一个map/reduce task 的一个例实taskid,而个每TaskAttemptID由两部分构成:TaskID+task序列号

        eg:

        attempt_200707121733_0003_m_000005_0

        代表2007年07月12日17点33分启动的第0003号作业(job)的第00005号map任务的第0号task attempt

        2,TaskTrackerAction的类型有,LaunchTaskAction(),KillTaskAction(),KillJobAction(),ReinitTrackerAction(),CommitTaskAction();

     /**
       * Ennumeration of various 'actions' that the {@link JobTracker}
       * directs the {@link TaskTracker} to perform periodically.
       *
       */
      public static enum ActionType {
        /** Launch a new task. */
        LAUNCH_TASK,
        
        /** Kill a task. */
        KILL_TASK,
        
        /** Kill any tasks of this job and cleanup. */
        KILL_JOB,
        
        /** Reinitialize the tasktracker. */
        REINIT_TRACKER,

        /** Ask a task to save its output. */
        COMMIT_TASK
      };
     
      /**
       * A factory-method to create objects of given {@link ActionType}.
       * @param actionType the {@link ActionType} of object to create.
       * @return an object of {@link ActionType}.
       */
      public static TaskTrackerAction createAction(ActionType actionType) {
        TaskTrackerAction action = null;
        
        switch (actionType) {
        case LAUNCH_TASK:
          {
            action = new LaunchTaskAction();
          }
          break;
        case KILL_TASK:
          {
            action = new KillTaskAction();
          }
          break;
        case KILL_JOB:
          {
            action = new KillJobAction();
          }
          break;
        case REINIT_TRACKER:
          {
            action = new ReinitTrackerAction();
          }
          break;
        case COMMIT_TASK:
          {
            action = new CommitTaskAction();
          }
          break;
        }

        return action;
      }

        3,JobTracker中相干:

               offerService()

                           |--taskScheduler.start();

                           |--recoveryManager.recover();

        通过RecoveryManager的recover()法方恢复Job

                           |-- this.expireTrackersThread.start();

        检测已失效的TaskTracker点节。

        // Used to expire TaskTrackers that have gone down

                           |-- this.retireJobsThread.start();

        除清那些已成完很长时光仍然存在队列中的job

                           |--expireLaunchingTaskThread.start();

        停止那些在超时时光内未告报进度的Tasks。

        A thread to timeout tasks that have been assigned to task trackers,
       * but that haven't reported back yet.

                           |--completedJobsStoreThread.start();

                           |--this.interTrackerServer.start();/ start the inter-tracker server once the jt is ready
     

          /**
       * Run forever
       */
      public void offerService() throws InterruptedException, IOException {
        // Prepare for recovery. This is done irrespective of the status of restart
        // flag.
        while (true) {
          try {
            recoveryManager.updateRestartCount();
            break;
          } catch (IOException ioe) {
            LOG.warn("Failed to initialize recovery manager. ", ioe);
            // wait for some time
            Thread.sleep(FS_ACCESS_RETRY_PERIOD);
            LOG.warn("Retrying...");
          }
        }

        taskScheduler.start();
        
        //  Start the recovery after starting the scheduler
        try {
          recoveryManager.recover();
        } catch (Throwable t) {
          LOG.warn("Recovery manager crashed! Ignoring.", t);
        }
        
        this.expireTrackersThread = new Thread(this.expireTrackers,
                                              "expireTrackers");
        this.expireTrackersThread.start();
        this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
        this.retireJobsThread.start();
        expireLaunchingTaskThread.start();

        if (completedJobStatusStore.isActive()) {
          completedJobsStoreThread = new Thread(completedJobStatusStore,
                                                "completedjobsStore-housekeeper");
          completedJobsStoreThread.start();
        }

        // start the inter-tracker server once the jt is ready
        this.interTrackerServer.start();
        
        synchronized (this) {
          state = State.RUNNING;
        }

        4, Job的五种状态

     public static final int RUNNING = 1;
      public static final int SUCCEEDED = 2;
      public static final int FAILED = 3;
      public static final int PREP = 4;

      public static final int KILLED = 5;

        

        5,jobStatus信息:

      this.jobid = jobid;
         this.setupProgress = setupProgress;
         this.mapProgress = mapProgress;
         this.reduceProgress = reduceProgress;
         this.cleanupProgress = cleanupProgress;
         this.runState = runState;

        6,FileOutputCommitter相干

        setupJob:Hadoop初始化时设置job的输出;

        commitJob:当job成完时,除清job的输出,这个法方在馈反来回的job状态为SUCCEEDED时用调;

        cleanupJob:job结束后除清job的输出;

         // do the clean up of temporary directory

        abortJob:当job的返回状态是FAILEDKILLED时,行执该数函,用于终止作业的输出;

        setupTask:设置task的输出;

        // FileOutputCommitter's setupTask doesn't do anything. Because the
           // temporary task directory is created on demand when the
           // task is writing.

        needsTaskCommit:检测task否是须要交提;

        commitTask:将task的输出移到作业的输出目录;

        // Move the task outputs to their final place

        // Delete the temporary task-specific output directory

        abortTask:取消task的输出;

        

         

        

        

        

        

    文章结束给大家分享下程序员的一些笑话语录: IBM和波音777
      波音777是有史以来第一架完全在电脑虚拟现实中设计制造的飞机,所用的设备完全由IBM公司所提供。试飞前,波音公司的总裁非常热情的邀请IBM的技术主管去参加试飞,可那位主管却说道:“啊,非常荣幸,可惜那天是我妻子的生日,So..”..
      波音公司的总载一听就生气了:“胆小鬼,我还没告诉你试飞的日期呢!”

  • 相关阅读:
    vue组件系列-数字滚动组件
    重新振兴自己
    EL表达式
    org.hibernate.PropertyAccessException: Null value was assigned to a property of primitive type sette
    Struts2常用标签总结
    mybatis+strut2+spring整合总结
    hibernate的详细注解以及例子
    Struts2基于注解的Action配置
    【干货】如何通过OPC自定义接口来实现客户端数据的读取?
    即将离职,共享下我的知识库
  • 原文地址:https://www.cnblogs.com/jiangu66/p/3025998.html
Copyright © 2011-2022 走看看