zoukankan      html  css  js  c++  java
  • Hadoop 2.2.0 Job源代码阅读笔记

      本文所有涉及的内容均为2.2.0版本中呈现。

      概述:

      Job在创建Job并且提交的人的眼中,可以在创建的时候通过配置Job的内容,控制Job的执行,以及查询Job的运行状态。一旦Job提交以后,将不能对其进行配置,否则将会出现IllegalStateException异常。

      正常情况下用户通过Job类来创建、描述、提交Job,以及监控Job的处理过程。下面是一个简单的例子:  

    // Create a new Job
    Job job = new Job(new Configuration());
    job.setJarByClass(MyJob.class);
    
    // Specify various job-specific parameters     
    job.setJobName("myjob");
    
    job.setInputPath(new Path("in"));
    job.setOutputPath(new Path("out"));
    
    job.setMapperClass(MyJob.MyMapper.class);
    job.setReducerClass(MyJob.MyReducer.class);
    // Submit the job, then poll for progress until the job is complete
    job.waitForCompletion(true);

      基本结构:  

      Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。

      Job定义的静态常量:  

    private static final Log LOG = LogFactory.getLog(Job.class);
    
      @InterfaceStability.Evolving
      public static enum JobState {DEFINE, RUNNING};
      private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
      public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
      /** Key in mapred-*.xml that sets completionPollInvervalMillis */
      public static final String COMPLETION_POLL_INTERVAL_KEY = 
        "mapreduce.client.completion.pollinterval";
      
      /** Default completionPollIntervalMillis is 5000 ms. */
      static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
      /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
      public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
        "mapreduce.client.progressmonitor.pollinterval";
      /** Default progMonitorPollIntervalMillis is 1000 ms. */
      static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
    
      public static final String USED_GENERIC_PARSER = 
        "mapreduce.client.genericoptionsparser.used";
      public static final String SUBMIT_REPLICATION = 
        "mapreduce.client.submit.file.replication";
      private static final String TASKLOG_PULL_TIMEOUT_KEY =
               "mapreduce.client.tasklog.timeout";
      private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;

      Job定义的私有变量:  

     private JobState state = JobState.DEFINE;
     private JobStatus status;
     private long statustime;
     private Cluster cluster;

      Job类加载的时候就要执行的加载配置文件的方法:  

    static {
        ConfigUtil.loadResources();
     }

      加载的配置文件包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。

      

      Job的构造函数:  

      @Deprecated
      public Job() throws IOException {
        this(new Configuration());
      }
    
      @Deprecated
      public Job(Configuration conf) throws IOException {
        this(new JobConf(conf));
      }
    
      @Deprecated
      public Job(Configuration conf, String jobName) throws IOException {
        this(conf);
        setJobName(jobName);
      }
    
      Job(JobConf conf) throws IOException {
        super(conf, null);
        // propagate existing user credentials to job
        this.credentials.mergeAll(this.ugi.getCredentials());
        this.cluster = null;
      }
    
      Job(JobStatus status, JobConf conf) throws IOException {
        this(conf);
        setJobID(status.getJobID());
        this.status = status;
        state = JobState.RUNNING;
      }

      可以注意到Hadoop不鼓励通过缺省的构造函数和通过Configuration类来构造Job对象。通过JobConf对象来构建Job是一个不错的选择。

      

      获取Job对象的实例化方法:

        除了通过构造函数,Job类中还提供了通过一些静态方法来获取Job的事例对象,看一下具体定义:    

     /**
       * Creates a new {@link Job} with no particular {@link Cluster} .
       * A Cluster will be created with a generic {@link Configuration}.
       * 
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       */
      public static Job getInstance() throws IOException {
        // create with a null Cluster
        return getInstance(new Configuration());
      }
          
      /**
       * Creates a new {@link Job} with no particular {@link Cluster} and a 
       * given {@link Configuration}.
       * 
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * A Cluster will be created from the conf parameter only when it's needed.
       * 
       * @param conf the configuration
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       */
      public static Job getInstance(Configuration conf) throws IOException {
        // create with a null Cluster
        JobConf jobConf = new JobConf(conf);
        return new Job(jobConf);
      }
    
          
      /**
       * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
       * A Cluster will be created from the conf parameter only when it's needed.
       *
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * @param conf the configuration
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       */
      public static Job getInstance(Configuration conf, String jobName)
               throws IOException {
        // create with a null Cluster
        Job result = getInstance(conf);
        result.setJobName(jobName);
        return result;
      }
      
      /**
       * Creates a new {@link Job} with no particular {@link Cluster} and given
       * {@link Configuration} and {@link JobStatus}.
       * A Cluster will be created from the conf parameter only when it's needed.
       * 
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * @param status job status
       * @param conf job configuration
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       */
      public static Job getInstance(JobStatus status, Configuration conf) 
      throws IOException {
        return new Job(status, new JobConf(conf));
      }
    
      /**
       * Creates a new {@link Job} with no particular {@link Cluster}.
       * A Cluster will be created from the conf parameter only when it's needed.
       *
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * @param ignored
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       * @deprecated Use {@link #getInstance()}
       */
      @Deprecated
      public static Job getInstance(Cluster ignored) throws IOException {
        return getInstance();
      }
      
      /**
       * Creates a new {@link Job} with no particular {@link Cluster} and given
       * {@link Configuration}.
       * A Cluster will be created from the conf parameter only when it's needed.
       * 
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * @param ignored
       * @param conf job configuration
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       * @deprecated Use {@link #getInstance(Configuration)}
       */
      @Deprecated
      public static Job getInstance(Cluster ignored, Configuration conf) 
          throws IOException {
        return getInstance(conf);
      }
      
      /**
       * Creates a new {@link Job} with no particular {@link Cluster} and given
       * {@link Configuration} and {@link JobStatus}.
       * A Cluster will be created from the conf parameter only when it's needed.
       * 
       * The <code>Job</code> makes a copy of the <code>Configuration</code> so 
       * that any necessary internal modifications do not reflect on the incoming 
       * parameter.
       * 
       * @param cluster cluster
       * @param status job status
       * @param conf job configuration
       * @return the {@link Job} , with no connection to a cluster yet.
       * @throws IOException
       */
      @Private
      public static Job getInstance(Cluster cluster, JobStatus status, 
          Configuration conf) throws IOException {
        Job job = getInstance(status, conf);
        job.setCluster(cluster);
        return job;
      }

        可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。

        

        轮询周期的方法:    

     /** The interval at which monitorAndPrintJob() prints status */
      public static int getProgressPollInterval(Configuration conf) {
        // Read progress monitor poll interval from config. Default is 1 second.
        int progMonitorPollIntervalMillis = conf.getInt(
          PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
        if (progMonitorPollIntervalMillis < 1) {
          LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + 
            " has been set to an invalid value; "
            + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
          progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
        }
        return progMonitorPollIntervalMillis;
      }
    
      /** The interval at which waitForCompletion() should check. */
      public static int getCompletionPollInterval(Configuration conf) {
        int completionPollIntervalMillis = conf.getInt(
          COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
        if (completionPollIntervalMillis < 1) { 
          LOG.warn(COMPLETION_POLL_INTERVAL_KEY + 
           " has been set to an invalid value; "
           + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
          completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
        }
        return completionPollIntervalMillis;
      }

        上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。

        

        需要做异步处理的方法:    

    synchronized void ensureFreshStatus() 
          throws IOException {
        if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
          updateStatus();
        }
      }
    
    
     /** Some methods need to update status immediately. So, refresh
       * immediately
       * @throws IOException
       */
      synchronized void updateStatus() throws IOException {
        try {
          this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
            @Override
            public JobStatus run() throws IOException, InterruptedException {
              return cluster.getClient().getJobStatus(status.getJobID());
            }
          });
        }
        catch (InterruptedException ie) {
          throw new IOException(ie);
        }
        if (this.status == null) {
          throw new IOException("Job status not available ");
        }
        this.statustime = System.currentTimeMillis();
      }
    
    
     private synchronized void connect()
              throws IOException, InterruptedException, ClassNotFoundException {
        if (cluster == null) {
          cluster = 
            ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                       public Cluster run()
                              throws IOException, InterruptedException, 
                                     ClassNotFoundException {
                         return new Cluster(getConfiguration());
                       }
                     });
        }
      } 

        

        设置配置参数的方法:

        

    /**
       * Set the number of reduce tasks for the job.
       * @param tasks the number of reduce tasks
       * @throws IllegalStateException if the job is submitted
       */
      public void setNumReduceTasks(int tasks) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setNumReduceTasks(tasks);
      }
    
      /**
       * Set the current working directory for the default file system.
       * 
       * @param dir the new current working directory.
       * @throws IllegalStateException if the job is submitted
       */
      public void setWorkingDirectory(Path dir) throws IOException {
        ensureState(JobState.DEFINE);
        conf.setWorkingDirectory(dir);
      }
    
      /**
       * Set the {@link InputFormat} for the job.
       * @param cls the <code>InputFormat</code> to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setInputFormatClass(Class<? extends InputFormat> cls
                                      ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, 
                      InputFormat.class);
      }
    
      /**
       * Set the {@link OutputFormat} for the job.
       * @param cls the <code>OutputFormat</code> to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                       ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, 
                      OutputFormat.class);
      }
    
      /**
       * Set the {@link Mapper} for the job.
       * @param cls the <code>Mapper</code> to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setMapperClass(Class<? extends Mapper> cls
                                 ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
      }
    
      /**
       * Set the Jar by finding where a given class came from.
       * @param cls the example class
       */
      public void setJarByClass(Class<?> cls) {
        ensureState(JobState.DEFINE);
        conf.setJarByClass(cls);
      }
    
      /**
       * Set the job jar 
       */
      public void setJar(String jar) {
        ensureState(JobState.DEFINE);
        conf.setJar(jar);
      }
    
      /**
       * Set the reported username for this job.
       * 
       * @param user the username for this job.
       */
      public void setUser(String user) {
        ensureState(JobState.DEFINE);
        conf.setUser(user);
      }
    
      /**
       * Set the combiner class for the job.
       * @param cls the combiner to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setCombinerClass(Class<? extends Reducer> cls
                                   ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
      }
    
      /**
       * Set the {@link Reducer} for the job.
       * @param cls the <code>Reducer</code> to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setReducerClass(Class<? extends Reducer> cls
                                  ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
      }
    
      /**
       * Set the {@link Partitioner} for the job.
       * @param cls the <code>Partitioner</code> to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setPartitionerClass(Class<? extends Partitioner> cls
                                      ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setClass(PARTITIONER_CLASS_ATTR, cls, 
                      Partitioner.class);
      }
    
      /**
       * Set the key class for the map output data. This allows the user to
       * specify the map output key class to be different than the final output
       * value class.
       * 
       * @param theClass the map output key class.
       * @throws IllegalStateException if the job is submitted
       */
      public void setMapOutputKeyClass(Class<?> theClass
                                       ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setMapOutputKeyClass(theClass);
      }
    
      /**
       * Set the value class for the map output data. This allows the user to
       * specify the map output value class to be different than the final output
       * value class.
       * 
       * @param theClass the map output value class.
       * @throws IllegalStateException if the job is submitted
       */
      public void setMapOutputValueClass(Class<?> theClass
                                         ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setMapOutputValueClass(theClass);
      }
    
      /**
       * Set the key class for the job output data.
       * 
       * @param theClass the key class for the job output data.
       * @throws IllegalStateException if the job is submitted
       */
      public void setOutputKeyClass(Class<?> theClass
                                    ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setOutputKeyClass(theClass);
      }
    
      /**
       * Set the value class for job outputs.
       * 
       * @param theClass the value class for job outputs.
       * @throws IllegalStateException if the job is submitted
       */
      public void setOutputValueClass(Class<?> theClass
                                      ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setOutputValueClass(theClass);
      }
    
      /**
       * Define the comparator that controls how the keys are sorted before they
       * are passed to the {@link Reducer}.
       * @param cls the raw comparator
       * @throws IllegalStateException if the job is submitted
       */
      public void setSortComparatorClass(Class<? extends RawComparator> cls
                                         ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setOutputKeyComparatorClass(cls);
      }
    
      /**
       * Define the comparator that controls which keys are grouped together
       * for a single call to 
       * {@link Reducer#reduce(Object, Iterable, 
       *                       org.apache.hadoop.mapreduce.Reducer.Context)}
       * @param cls the raw comparator to use
       * @throws IllegalStateException if the job is submitted
       */
      public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                             ) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setOutputValueGroupingComparator(cls);
      }
    
      /**
       * Set the user-specified job name.
       * 
       * @param name the job's new name.
       * @throws IllegalStateException if the job is submitted
       */
      public void setJobName(String name) throws IllegalStateException {
        ensureState(JobState.DEFINE);
        conf.setJobName(name);
      }
    
      /**
       * Turn speculative execution on or off for this job. 
       * 
       * @param speculativeExecution <code>true</code> if speculative execution 
       *                             should be turned on, else <code>false</code>.
       */
      public void setSpeculativeExecution(boolean speculativeExecution) {
        ensureState(JobState.DEFINE);
        conf.setSpeculativeExecution(speculativeExecution);
      }
    
      /**
       * Turn speculative execution on or off for this job for map tasks. 
       * 
       * @param speculativeExecution <code>true</code> if speculative execution 
       *                             should be turned on for map tasks,
       *                             else <code>false</code>.
       */
      public void setMapSpeculativeExecution(boolean speculativeExecution) {
        ensureState(JobState.DEFINE);
        conf.setMapSpeculativeExecution(speculativeExecution);
      }
    
      /**
       * Turn speculative execution on or off for this job for reduce tasks. 
       * 
       * @param speculativeExecution <code>true</code> if speculative execution 
       *                             should be turned on for reduce tasks,
       *                             else <code>false</code>.
       */
      public void setReduceSpeculativeExecution(boolean speculativeExecution) {
        ensureState(JobState.DEFINE);
        conf.setReduceSpeculativeExecution(speculativeExecution);
      }
    
      /**
       * Specify whether job-setup and job-cleanup is needed for the job 
       * 
       * @param needed If <code>true</code>, job-setup and job-cleanup will be
       *               considered from {@link OutputCommitter} 
       *               else ignored.
       */
      public void setJobSetupCleanupNeeded(boolean needed) {
        ensureState(JobState.DEFINE);
        conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
      }
    
      /**
       * Set the given set of archives
       * @param archives The list of archives that need to be localized
       */
      public void setCacheArchives(URI[] archives) {
        ensureState(JobState.DEFINE);
        DistributedCache.setCacheArchives(archives, conf);
      }
    
      /**
       * Set the given set of files
       * @param files The list of files that need to be localized
       */
      public void setCacheFiles(URI[] files) {
        ensureState(JobState.DEFINE);
        DistributedCache.setCacheFiles(files, conf);
      }
    
      /**
       * Add a archives to be localized
       * @param uri The uri of the cache to be localized
       */
      public void addCacheArchive(URI uri) {
        ensureState(JobState.DEFINE);
        DistributedCache.addCacheArchive(uri, conf);
      }
      
      /**
       * Add a file to be localized
       * @param uri The uri of the cache to be localized
       */
      public void addCacheFile(URI uri) {
        ensureState(JobState.DEFINE);
        DistributedCache.addCacheFile(uri, conf);
      }
    
      /**
       * Add an file path to the current set of classpath entries It adds the file
       * to cache as well.
       * 
       * Files added with this method will not be unpacked while being added to the
       * classpath.
       * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
       * method instead.
       *
       * @param file Path of the file to be added
       */
      public void addFileToClassPath(Path file)
        throws IOException {
        ensureState(JobState.DEFINE);
        DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
      }
    
      /**
       * Add an archive path to the current set of classpath entries. It adds the
       * archive to cache as well.
       * 
       * Archive files will be unpacked and added to the classpath
       * when being distributed.
       *
       * @param archive Path of the archive to be added
       */
      public void addArchiveToClassPath(Path archive)
        throws IOException {
        ensureState(JobState.DEFINE);
        DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
      }
    
      /**
       * Originally intended to enable symlinks, but currently symlinks cannot be
       * disabled.
       */
      @Deprecated
      public void createSymlink() {
        ensureState(JobState.DEFINE);
        DistributedCache.createSymlink(conf);
      }
      
      /** 
       * Expert: Set the number of maximum attempts that will be made to run a
       * map task.
       * 
       * @param n the number of attempts per map task.
       */
      public void setMaxMapAttempts(int n) {
        ensureState(JobState.DEFINE);
        conf.setMaxMapAttempts(n);
      }
    
      /** 
       * Expert: Set the number of maximum attempts that will be made to run a
       * reduce task.
       * 
       * @param n the number of attempts per reduce task.
       */
      public void setMaxReduceAttempts(int n) {
        ensureState(JobState.DEFINE);
        conf.setMaxReduceAttempts(n);
      }
    
      /**
       * Set whether the system should collect profiler information for some of 
       * the tasks in this job? The information is stored in the user log 
       * directory.
       * @param newValue true means it should be gathered
       */
      public void setProfileEnabled(boolean newValue) {
        ensureState(JobState.DEFINE);
        conf.setProfileEnabled(newValue);
      }
    
      /**
       * Set the profiler configuration arguments. If the string contains a '%s' it
       * will be replaced with the name of the profiling output file when the task
       * runs.
       *
       * This value is passed to the task child JVM on the command line.
       *
       * @param value the configuration string
       */
      public void setProfileParams(String value) {
        ensureState(JobState.DEFINE);
        conf.setProfileParams(value);
      }
    
      /**
       * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
       * must also be called.
       * @param newValue a set of integer ranges of the map ids
       */
      public void setProfileTaskRange(boolean isMap, String newValue) {
        ensureState(JobState.DEFINE);
        conf.setProfileTaskRange(isMap, newValue);
      } 
      
      /**
       * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
       * tokens upon job completion. Defaults to true.
       */
      public void setCancelDelegationTokenUponJobCompletion(boolean value) {
        ensureState(JobState.DEFINE);
        conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
      }

        要非常注意的地方就是在每项配置的时候都需要检查状态,Job只有处于DEFINE状态下的时候才可以对其进行配置。

        

        屏幕输出的方法:    

    /**
       * Dump stats to screen.
       */
      @Override
      public String toString() {
        ensureState(JobState.RUNNING);
        String reasonforFailure = " ";
        int numMaps = 0;
        int numReduces = 0;
        try {
          updateStatus();
          if (status.getState().equals(JobStatus.State.FAILED))
            reasonforFailure = getTaskFailureEventString();
          numMaps = getTaskReports(TaskType.MAP).length;
          numReduces = getTaskReports(TaskType.REDUCE).length;
        } catch (IOException e) {
        } catch (InterruptedException ie) {
        }
        StringBuffer sb = new StringBuffer();
        sb.append("Job: ").append(status.getJobID()).append("
    ");
        sb.append("Job File: ").append(status.getJobFile()).append("
    ");
        sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
        sb.append("
    ");
        sb.append("Uber job : ").append(status.isUber()).append("
    ");
        sb.append("Number of maps: ").append(numMaps).append("
    ");
        sb.append("Number of reduces: ").append(numReduces).append("
    ");
        sb.append("map() completion: ");
        sb.append(status.getMapProgress()).append("
    ");
        sb.append("reduce() completion: ");
        sb.append(status.getReduceProgress()).append("
    ");
        sb.append("Job state: ");
        sb.append(status.getState()).append("
    ");
        sb.append("retired: ").append(status.isRetired()).append("
    ");
        sb.append("reason for failure: ").append(reasonforFailure);
        return sb.toString();
      }

      

      获取任务进程的方法:  

     /**
       * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
       * and 1.0.  When all map tasks have completed, the function returns 1.0.
       * 
       * @return the progress of the job's map-tasks.
       * @throws IOException
       */
      public float mapProgress() throws IOException {
        ensureState(JobState.RUNNING);
        ensureFreshStatus();
        return status.getMapProgress();
      }
    
      /**
       * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
       * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
       * 
       * @return the progress of the job's reduce-tasks.
       * @throws IOException
       */
      public float reduceProgress() throws IOException {
        ensureState(JobState.RUNNING);
        ensureFreshStatus();
        return status.getReduceProgress();
      }
    
      /**
       * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 
       * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
       * 
       * @return the progress of the job's cleanup-tasks.
       * @throws IOException
       */
      public float cleanupProgress() throws IOException, InterruptedException {
        ensureState(JobState.RUNNING);
        ensureFreshStatus();
        return status.getCleanupProgress();
      }
    
      /**
       * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 
       * and 1.0.  When all setup tasks have completed, the function returns 1.0.
       * 
       * @return the progress of the job's setup-tasks.
       * @throws IOException
       */
      public float setupProgress() throws IOException {
        ensureState(JobState.RUNNING);
        ensureFreshStatus();
        return status.getSetupProgress();
      }
  • 相关阅读:
    学习进度汇总
    session系列(一)--之--session 与cookie
    遇到一个合适的人到底有多难
    Spring Bean学习创建及使用<二>
    Spring Bean学习创建及使用<一>
    转发:Java对象及其引用
    多线程分配线程的实现方案:CountDownLatch类
    java基础知识
    java静态标示符static详解
    淘宝TAE平台定时任务包的部署步骤
  • 原文地址:https://www.cnblogs.com/fantiantian/p/3795702.html
Copyright © 2011-2022 走看看