zoukankan      html  css  js  c++  java
  • JobControl 的实现原理

    本文地址:http://www.cnblogs.com/archimedes/p/hadoop-jobcontrol.html,转载请注明源地址。

    引入实例:贝叶斯分类

    贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步骤:训练样本和分类

    其实现由多个MapReduce 作业完成,如图所示。其中,训练样本可由三个 MapReduce 作业实现:

    第一个作业(ExtractJob)抽取文档特征,该作业只需要 Map 即可完成 ;

    第二个作业(ClassPriorJob)计算类别的先验概率,即统计每个类别中文档的数目,并计算类别概率;

    第三个作业(ConditionalProbilityJob)计算单词的条件概率,即统计<label, word> 在所有文档中出现的次数并计算单词的条件概率。

    后两个作业的具体实现类似于WordCount。分类过程由一个作业(PredictJob)完成。该作业的 map()函数计算每个待分类文档属于每个类别的概率,reduce() 函数找出每个文档概率最高的类别,并输出 <docid, label>( 编号为 docid 的文档属于类别 label)。


    一个完整的贝叶斯分类算法可能需要 4 个有依赖关系的 MapReduce 作业完成,传统的做法是:为每个作业创建相应的 JobConf 对象,并按照依赖关系依次(串行)提交各个作业,如下所示:

    // 为 4 个作业分别创建 JobConf 对象
    JobConf extractJobConf = new JobConf(ExtractJob.class);
    JobConf classPriorJobConf = new JobConf(ClassPriorJob.class);
    JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob. class) ;
    JobConf predictJobConf = new JobConf(PredictJob.class);
    ...// 配置各个 JobConf
    // 按照依赖关系依次提交作业
    JobClient.runJob(extractJobConf);
    JobClient.runJob(classPriorJobConf);
    JobClient.runJob(conditionalProbilityJobConf);
    JobClient.runJob(predictJobConf);

    如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业,具体代码如下:

    Configuration extractJobConf = new Configuration();
    Configuration classPriorJobConf = new Configuration();
    Configuration conditionalProbilityJobConf = new Configuration();
    Configuration predictJobConf = new Configuration();
    ...// 设置各个Configuration
    // 创建Job对象。注意,JobControl要求作业必须封装成Job对象
    Job extractJob = new Job(extractJobConf);
    Job classPriorJob = new Job(classPriorJobConf);
    Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);
    Job predictJob = new Job(predictJobConf);
    //设置依赖关系,构造一个DAG作业
    classPriorJob.addDepending(extractJob);
    conditionalProbilityJob.addDepending(extractJob);
    predictJob.addDepending(classPriorJob);
    predictJob.addDepending(conditionalProbilityJob);
    //创建JobControl对象,由它对作业进行监控和调度
    JobControl JC = new JobControl("Native Bayes");
    JC.addJob(extractJob);//把4个作业加入JobControl中
    JC.addJob(classPriorJob);
    JC.addJob(conditionalProbilityJob);
    JC.addJob(predictJob);
    JC.run(); //提交DAG作业

    在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完成,classPriorJob 和 conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,predictJob 被调度。对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简便,且能使多个无依赖关系的作业并行运行。

    JobControl 设计原理分析

    JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入 READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING 状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。

    JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。 它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API 用于挂起、恢复和暂停该线程。

    Job类深入剖析

    在Job类的起始部分,定义了一些数据域,包括job所处的状态,以及其他相关的信息,具体代码如下:

    import java.util.ArrayList;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobID;
    import org.apache.hadoop.mapred.jobcontrol.Job;
    
      // 一个 job 将处于如下的一种状态
      final public static int SUCCESS = 0;    //成功
      final public static int WAITING = 1;     //警告
      final public static int RUNNING = 2;    //运行
      final public static int READY = 3;    //准备
      final public static int FAILED = 4;    //失败
      final public static int DEPENDENT_FAILED = 5;    //依赖的作业失败
        
      private JobConf theJobConf;
      private int state;
      private String jobID;         // 通过JobControl class分配和使用
      private JobID mapredJobID;    // 通过map/reduce分配的job ID
      private String jobName;        // 外部名字, 通过client app分配/使用
      private String message;        // 一些有用的信息例如用户消耗, 
      // e.g. job失败的原因
      private ArrayList<Job> dependingJobs;    // 当前job所依赖的jobs列表
      private JobClient jc = null;        // map reduce job client

    接着定义了两个构造函数:

      /** 
       * Construct a job.
       * @param jobConf a mapred job configuration representing a job to be executed.
       * @param dependingJobs an array of jobs the current job depends on
       */
      public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
        this.theJobConf = jobConf;
        this.dependingJobs = dependingJobs;
        this.state = Job.WAITING;
        this.jobID = "unassigned";
        this.mapredJobID = null; //not yet assigned 
        this.jobName = "unassigned";
        this.message = "just initialized";
        this.jc = new JobClient(jobConf);
      }
      
      /**
       * Construct a job.
       * 
       * @param jobConf mapred job configuration representing a job to be executed.
       * @throws IOException
       */
      public Job(JobConf jobConf) throws IOException {
        this(jobConf, null);
      }

    接着重写了String类中的toString方法,代码如下:

     @Override
      public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("job name:	").append(this.jobName).append("
    ");
        sb.append("job id:	").append(this.jobID).append("
    ");
        sb.append("job state:	").append(this.state).append("
    ");
        sb.append("job mapred id:	").append(this.mapredJobID==null ? "unassigned" 
            : this.mapredJobID).append("
    ");
        sb.append("job message:	").append(this.message).append("
    ");
            
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          sb.append("job has no depending job:	").append("
    ");
        } else {
          sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:
    ");
          for (int i = 0; i < this.dependingJobs.size(); i++) {
            sb.append("	 depending job ").append(i).append(":	");
            sb.append((this.dependingJobs.get(i)).getJobName()).append("
    ");
          }
        }
        return sb.toString();
      }
    toString

    接下来是一长串的get/set获取设置属性的代码: 

      /**
       * @return the job name of this job
       */
      public String getJobName() {
        return this.jobName;
      }
        
      /**
       * Set the job name for  this job.
       * @param jobName the job name
       */
      public void setJobName(String jobName) {
        this.jobName = jobName;
      }
        
      /**
       * @return the job ID of this job assigned by JobControl
       */
      public String getJobID() {
        return this.jobID;
      }
        
      /**
       * Set the job ID for  this job.
       * @param id the job ID
       */
      public void setJobID(String id) {
        this.jobID = id;
      }
        
      /**
       * @return the mapred ID of this job
       * @deprecated use {@link #getAssignedJobID()} instead
       */
      @Deprecated
      public String getMapredJobID() {
        return this.mapredJobID.toString();
      }
        
      /**
       * Set the mapred ID for this job.
       * @param mapredJobID the mapred job ID for this job.
       * @deprecated use {@link #setAssignedJobID(JobID)} instead
       */
      @Deprecated
      public void setMapredJobID(String mapredJobID) {
        this.mapredJobID = JobID.forName(mapredJobID);
      }
        
      /**
       * @return the mapred ID of this job as assigned by the 
       * mapred framework.
       */
      public JobID getAssignedJobID() {
        return this.mapredJobID;
      }
      
      /**
       * Set the mapred ID for this job as assigned by the 
       * mapred framework.
       * @param mapredJobID the mapred job ID for this job.
       */
      public void setAssignedJobID(JobID mapredJobID) {
        this.mapredJobID = mapredJobID;
      }
      
      /**
       * @return the mapred job conf of this job
       */
      public JobConf getJobConf() {
        return this.theJobConf;
      }
        
      /**
       * Set the mapred job conf for this job.
       * @param jobConf the mapred job conf for this job.
       */
      public void setJobConf(JobConf jobConf) {
        this.theJobConf = jobConf;
      }
        
      /**
       * @return the state of this job
       */
      public synchronized int getState() {
        return this.state;
      }
        
      /**
       * Set the state for this job.
       * @param state the new state for this job.
       */
      protected synchronized void setState(int state) {
        this.state = state;
      }
        
      /**
       * @return the message of this job
       */
      public String getMessage() {
        return this.message;
      }
        
      /**
       * Set the message for this job.
       * @param message the message for this job.
       */
      public void setMessage(String message) {
        this.message = message;
      }
        
      /**
       * @return the job client of this job
       */
      public JobClient getJobClient(){
              return this.jc;
      }
    
      /**
       * @return the depending jobs of this job
       */
      public ArrayList<Job> getDependingJobs() {
        return this.dependingJobs;
      }
    get/set

    当Job处于writing状态下的时候,可以向依赖列表中添加所依赖的Job:

      /**
       * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
       * is waiting to run, not during or afterwards.
       * 
       * @param dependingJob Job that this Job depends on.
       * @return <tt>true</tt> if the Job was added.
       */
      public synchronized boolean addDependingJob(Job dependingJob) {
        if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
          if (this.dependingJobs == null) {
            this.dependingJobs = new ArrayList<Job>();
          }
          return this.dependingJobs.add(dependingJob);
        } else {
          return false;
        }
      }

    还提供了是否处于完成状态和是否处于准备状态的判断方法:

      /**
       * @return true if this job is in a complete state
       */
      public boolean isCompleted() {
        return this.state == Job.FAILED || 
          this.state == Job.DEPENDENT_FAILED ||
          this.state == Job.SUCCESS;
      }
        
      /**
       * @return true if this job is in READY state
       */
      public boolean isReady() {
        return this.state == Job.READY;
      }
        

    提供了检查正在运行的Job的状态,如果完成,判断是成功还是失败,代码如下:

    /**
       * Check the state of this running job. The state may 
       * remain the same, become SUCCESS or FAILED.
       */
      private void checkRunningState() {
        RunningJob running = null;
        try {
          running = jc.getJob(this.mapredJobID);
          if (running.isComplete()) {
            if (running.isSuccessful()) {
              this.state = Job.SUCCESS;
            } else {
              this.state = Job.FAILED;
              this.message = "Job failed!";
              try {
                running.killJob();
              } catch (IOException e1) {
    
              }
              try {
                this.jc.close();
              } catch (IOException e2) {
    
              }
            }
          }
    
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
          try {
            if (running != null)
              running.killJob();
          } catch (IOException e1) {
    
          }
          try {
            this.jc.close();
          } catch (IOException e1) {
    
          }
        }
      }

    实现了检查并更新Job的状态的checkState()方法:

    /**
       * Check and update the state of this job. The state changes  
       * depending on its current state and the states of the depending jobs.
       */
       synchronized int checkState() {
        if (this.state == Job.RUNNING) {
          checkRunningState();
        }
        if (this.state != Job.WAITING) {
          return this.state;
        }
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          this.state = Job.READY;
          return this.state;
        }
        Job pred = null;
        int n = this.dependingJobs.size();
        for (int i = 0; i < n; i++) {
          pred = this.dependingJobs.get(i);
          int s = pred.checkState();
          if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
            break; // a pred is still not completed, continue in WAITING
            // state
          }
          if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
            this.state = Job.DEPENDENT_FAILED;
            this.message = "depending job " + i + " with jobID "
              + pred.getJobID() + " failed. " + pred.getMessage();
            break;
          }
          // pred must be in success state
          if (i == n - 1) {
            this.state = Job.READY;
          }
        }
        return this.state;
      }
        

    最后包含提交Job的方法submit(),代码如下:

      /**
       * Submit this job to mapred. The state becomes RUNNING if submission 
       * is successful, FAILED otherwise.  
       */
      protected synchronized void submit() {
        try {
          if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
            FileSystem fs = FileSystem.get(theJobConf);
            Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
            for (int i = 0; i < inputPaths.length; i++) {
              if (!fs.exists(inputPaths[i])) {
                try {
                  fs.mkdirs(inputPaths[i]);
                } catch (IOException e) {
    
                }
              }
            }
          }
          RunningJob running = jc.submitJob(theJobConf);
          this.mapredJobID = running.getID();
          this.state = Job.RUNNING;
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
        }
      }
        
    }

    完整的Job类源代码如下:

    package org.apache.hadoop.mapred.jobcontrol;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobID;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.util.StringUtils;
    
    /** This class encapsulates a MapReduce job and its dependency. It monitors 
     *  the states of the depending jobs and updates the state of this job.
     *  A job starts in the WAITING state. If it does not have any depending jobs, or
     *  all of the depending jobs are in SUCCESS state, then the job state will become
     *  READY. If any depending jobs fail, the job will fail too. 
     *  When in READY state, the job can be submitted to Hadoop for execution, with
     *  the state changing into RUNNING state. From RUNNING state, the job can get into 
     *  SUCCESS or FAILED state, depending the status of the job execution.
     *  
     */
    
    public class Job {
    
      // A job will be in one of the following states
      final public static int SUCCESS = 0;
      final public static int WAITING = 1;
      final public static int RUNNING = 2;
      final public static int READY = 3;
      final public static int FAILED = 4;
      final public static int DEPENDENT_FAILED = 5;
        
        
      private JobConf theJobConf;
      private int state;
      private String jobID;         // assigned and used by JobControl class
      private JobID mapredJobID; // the job ID assigned by map/reduce
      private String jobName;        // external name, assigned/used by client app
      private String message;        // some info for human consumption, 
      // e.g. the reason why the job failed
      private ArrayList<Job> dependingJobs;    // the jobs the current job depends on
        
      private JobClient jc = null;        // the map reduce job client
        
      /** 
       * Construct a job.
       * @param jobConf a mapred job configuration representing a job to be executed.
       * @param dependingJobs an array of jobs the current job depends on
       */
      public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
        this.theJobConf = jobConf;
        this.dependingJobs = dependingJobs;
        this.state = Job.WAITING;
        this.jobID = "unassigned";
        this.mapredJobID = null; //not yet assigned 
        this.jobName = "unassigned";
        this.message = "just initialized";
        this.jc = new JobClient(jobConf);
      }
      
      /**
       * Construct a job.
       * 
       * @param jobConf mapred job configuration representing a job to be executed.
       * @throws IOException
       */
      public Job(JobConf jobConf) throws IOException {
        this(jobConf, null);
      }
        
      @Override
      public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("job name:	").append(this.jobName).append("
    ");
        sb.append("job id:	").append(this.jobID).append("
    ");
        sb.append("job state:	").append(this.state).append("
    ");
        sb.append("job mapred id:	").append(this.mapredJobID==null ? "unassigned" 
            : this.mapredJobID).append("
    ");
        sb.append("job message:	").append(this.message).append("
    ");
            
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          sb.append("job has no depending job:	").append("
    ");
        } else {
          sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:
    ");
          for (int i = 0; i < this.dependingJobs.size(); i++) {
            sb.append("	 depending job ").append(i).append(":	");
            sb.append((this.dependingJobs.get(i)).getJobName()).append("
    ");
          }
        }
        return sb.toString();
      }
        
      /**
       * @return the job name of this job
       */
      public String getJobName() {
        return this.jobName;
      }
        
      /**
       * Set the job name for  this job.
       * @param jobName the job name
       */
      public void setJobName(String jobName) {
        this.jobName = jobName;
      }
        
      /**
       * @return the job ID of this job assigned by JobControl
       */
      public String getJobID() {
        return this.jobID;
      }
        
      /**
       * Set the job ID for  this job.
       * @param id the job ID
       */
      public void setJobID(String id) {
        this.jobID = id;
      }
        
      /**
       * @return the mapred ID of this job
       * @deprecated use {@link #getAssignedJobID()} instead
       */
      @Deprecated
      public String getMapredJobID() {
        return this.mapredJobID.toString();
      }
        
      /**
       * Set the mapred ID for this job.
       * @param mapredJobID the mapred job ID for this job.
       * @deprecated use {@link #setAssignedJobID(JobID)} instead
       */
      @Deprecated
      public void setMapredJobID(String mapredJobID) {
        this.mapredJobID = JobID.forName(mapredJobID);
      }
        
      /**
       * @return the mapred ID of this job as assigned by the 
       * mapred framework.
       */
      public JobID getAssignedJobID() {
        return this.mapredJobID;
      }
      
      /**
       * Set the mapred ID for this job as assigned by the 
       * mapred framework.
       * @param mapredJobID the mapred job ID for this job.
       */
      public void setAssignedJobID(JobID mapredJobID) {
        this.mapredJobID = mapredJobID;
      }
      
      /**
       * @return the mapred job conf of this job
       */
      public JobConf getJobConf() {
        return this.theJobConf;
      }
        
    
      /**
       * Set the mapred job conf for this job.
       * @param jobConf the mapred job conf for this job.
       */
      public void setJobConf(JobConf jobConf) {
        this.theJobConf = jobConf;
      }
        
      /**
       * @return the state of this job
       */
      public synchronized int getState() {
        return this.state;
      }
        
      /**
       * Set the state for this job.
       * @param state the new state for this job.
       */
      protected synchronized void setState(int state) {
        this.state = state;
      }
        
      /**
       * @return the message of this job
       */
      public String getMessage() {
        return this.message;
      }
        
      /**
       * Set the message for this job.
       * @param message the message for this job.
       */
      public void setMessage(String message) {
        this.message = message;
      }
        
    
      /**
       * @return the job client of this job
       */
      public JobClient getJobClient(){
              return this.jc;
      }
    
      /**
       * @return the depending jobs of this job
       */
      public ArrayList<Job> getDependingJobs() {
        return this.dependingJobs;
      }
      
      /**
       * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
       * is waiting to run, not during or afterwards.
       * 
       * @param dependingJob Job that this Job depends on.
       * @return <tt>true</tt> if the Job was added.
       */
      public synchronized boolean addDependingJob(Job dependingJob) {
        if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
          if (this.dependingJobs == null) {
            this.dependingJobs = new ArrayList<Job>();
          }
          return this.dependingJobs.add(dependingJob);
        } else {
          return false;
        }
      }
        
      /**
       * @return true if this job is in a complete state
       */
      public boolean isCompleted() {
        return this.state == Job.FAILED || 
          this.state == Job.DEPENDENT_FAILED ||
          this.state == Job.SUCCESS;
      }
        
      /**
       * @return true if this job is in READY state
       */
      public boolean isReady() {
        return this.state == Job.READY;
      }
        
      /**
       * Check the state of this running job. The state may 
       * remain the same, become SUCCESS or FAILED.
       */
      private void checkRunningState() {
        RunningJob running = null;
        try {
          running = jc.getJob(this.mapredJobID);
          if (running.isComplete()) {
            if (running.isSuccessful()) {
              this.state = Job.SUCCESS;
            } else {
              this.state = Job.FAILED;
              this.message = "Job failed!";
              try {
                running.killJob();
              } catch (IOException e1) {
    
              }
              try {
                this.jc.close();
              } catch (IOException e2) {
    
              }
            }
          }
    
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
          try {
            if (running != null)
              running.killJob();
          } catch (IOException e1) {
    
          }
          try {
            this.jc.close();
          } catch (IOException e1) {
    
          }
        }
      }
        
      /**
       * Check and update the state of this job. The state changes  
       * depending on its current state and the states of the depending jobs.
       */
       synchronized int checkState() {
        if (this.state == Job.RUNNING) {
          checkRunningState();
        }
        if (this.state != Job.WAITING) {
          return this.state;
        }
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          this.state = Job.READY;
          return this.state;
        }
        Job pred = null;
        int n = this.dependingJobs.size();
        for (int i = 0; i < n; i++) {
          pred = this.dependingJobs.get(i);
          int s = pred.checkState();
          if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
            break; // a pred is still not completed, continue in WAITING
            // state
          }
          if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
            this.state = Job.DEPENDENT_FAILED;
            this.message = "depending job " + i + " with jobID "
              + pred.getJobID() + " failed. " + pred.getMessage();
            break;
          }
          // pred must be in success state
          if (i == n - 1) {
            this.state = Job.READY;
          }
        }
    
        return this.state;
      }
        
      /**
       * Submit this job to mapred. The state becomes RUNNING if submission 
       * is successful, FAILED otherwise.  
       */
      protected synchronized void submit() {
        try {
          if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
            FileSystem fs = FileSystem.get(theJobConf);
            Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
            for (int i = 0; i < inputPaths.length; i++) {
              if (!fs.exists(inputPaths[i])) {
                try {
                  fs.mkdirs(inputPaths[i]);
                } catch (IOException e) {
    
                }
              }
            }
          }
          RunningJob running = jc.submitJob(theJobConf);
          this.mapredJobID = running.getID();
          this.state = Job.RUNNING;
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
        }
      }
        
    }
    Job

    JobControl类深入剖析

    在JobControl类的起始部分,定义了一些数据域,包括线程所处的状态,以及其他相关的信息,具体代码如下:

      // The thread can be in one of the following state
      private static final int RUNNING = 0;
      private static final int SUSPENDED = 1;
      private static final int STOPPED = 2;
      private static final int STOPPING = 3;
      private static final int READY = 4;
        
      private int runnerState;            // the thread state
        
      private Map<String, Job> waitingJobs;
      private Map<String, Job> readyJobs;
      private Map<String, Job> runningJobs;
      private Map<String, Job> successfulJobs;
      private Map<String, Job> failedJobs;
        
      private long nextJobID;
      private String groupName;

    接下来是对应的构造函数:

      /** 
       * Construct a job control for a group of jobs.
       * @param groupName a name identifying this group
       */
      public JobControl(String groupName) {
        this.waitingJobs = new Hashtable<String, Job>();
        this.readyJobs = new Hashtable<String, Job>();
        this.runningJobs = new Hashtable<String, Job>();
        this.successfulJobs = new Hashtable<String, Job>();
        this.failedJobs = new Hashtable<String, Job>();
        this.nextJobID = -1;
        this.groupName = groupName;
        this.runnerState = JobControl.READY;
      }

    接着是一个将Map的Jobs转换为ArrayList的转换方法(toArrayList),代码如下:

    private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
        ArrayList<Job> retv = new ArrayList<Job>();
        synchronized (jobs) {
          for (Job job : jobs.values()) {
            retv.add(job);
          }
        }
        return retv;
    }

    类中当然少不了一些get方法:

      /**
       * @return the jobs in the success state
       */
      public ArrayList<Job> getSuccessfulJobs() {
        return JobControl.toArrayList(this.successfulJobs);
      }
      public ArrayList<Job> getFailedJobs() {
        return JobControl.toArrayList(this.failedJobs);
      }
      private String getNextJobID() {
        nextJobID += 1;
        return this.groupName + this.nextJobID;
      }

    类中还有将Job插入Job队列的方法:

     private static void addToQueue(Job aJob, Map<String, Job> queue) {
        synchronized(queue) {
          queue.put(aJob.getJobID(), aJob);
        }        
      }
        
      private void addToQueue(Job aJob) {
        Map<String, Job> queue = getQueue(aJob.getState());
        addToQueue(aJob, queue);    
      }

    既然有插入队列,就有从Job队列根据Job运行状态而取出的方法,代码如下:

      private Map<String, Job> getQueue(int state) {
        Map<String, Job> retv = null;
        if (state == Job.WAITING) {
          retv = this.waitingJobs;
        } else if (state == Job.READY) {
          retv = this.readyJobs;
        } else if (state == Job.RUNNING) {
          retv = this.runningJobs;
        } else if (state == Job.SUCCESS) {
          retv = this.successfulJobs;
        } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
          retv = this.failedJobs;
        } 
        return retv;
      }

    添加一个新的Job的方法:

      /**
       * Add a new job.
       * @param aJob the new job
       */
      synchronized public String addJob(Job aJob) {
        String id = this.getNextJobID();
        aJob.setJobID(id);
        aJob.setState(Job.WAITING);
        this.addToQueue(aJob);
        return id;    
      }
        
      /**
       * Add a collection of jobs
       * 
       * @param jobs
       */
      public void addJobs(Collection<Job> jobs) {
        for (Job job : jobs) {
          addJob(job);
        }
      }

    获取线程的状态,设置、停止线程的方法:

    /**
       * @return the thread state
       */
      public int getState() {
        return this.runnerState;
      }
        
      /**
       * set the thread state to STOPPING so that the 
       * thread will stop when it wakes up.
       */
      public void stop() {
        this.runnerState = JobControl.STOPPING;
      }
        
      /**
       * suspend the running thread
       */
      public void suspend () {
        if (this.runnerState == JobControl.RUNNING) {
          this.runnerState = JobControl.SUSPENDED;
        }
      }
        
      /**
       * resume the suspended thread
       */
      public void resume () {
        if (this.runnerState == JobControl.SUSPENDED) {
          this.runnerState = JobControl.RUNNING;
        }
      }
        

    检查运行、等待的Jobs,将符合条件的添加至相应的队列: 

      synchronized private void checkRunningJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.runningJobs;
        this.runningJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.RUNNING) {
            System.out.println("The state of the running job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void checkWaitingJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.waitingJobs;
        this.waitingJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.WAITING) {
            System.out.println("The state of the waiting job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void startReadyJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.readyJobs;
        this.readyJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
          nextJob.submit();
          //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
          this.addToQueue(nextJob);
        }    
      }

    判断是否所有的JOb都结束的方法:

      synchronized public boolean allFinished() {
        return this.waitingJobs.size() == 0 &&
          this.readyJobs.size() == 0 &&
          this.runningJobs.size() == 0;
      }
        

    检查运行Jobs的状态、更新等待Job状态、在准备状态下提交的Run方法:

    /**
       *  The main loop for the thread.
       *  The loop does the following:
       *      Check the states of the running jobs
       *      Update the states of waiting jobs
       *      Submit the jobs in ready state
       */
      public void run() {
        this.runnerState = JobControl.RUNNING;
        while (true) {
          while (this.runnerState == JobControl.SUSPENDED) {
            try {
              Thread.sleep(5000);
            }
            catch (Exception e) {
                        
            }
          }
          checkRunningJobs();    
          checkWaitingJobs();        
          startReadyJobs();        
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
          try {
            Thread.sleep(5000);
          }
          catch (Exception e) {
                    
          }
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
        }
        this.runnerState = JobControl.STOPPED;
      }
    
    }

    完整的JobControl类:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.mapred.jobcontrol;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Hashtable;
    import java.util.Map;
    
    /** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
     *  the states of the jobs by placing them into different tables according to their 
     *  states. 
     *  
     *  This class provides APIs for the client app to add a job to the group and to get 
     *  the jobs in the group in different states. When a 
     *  job is added, an ID unique to the group is assigned to the job. 
     *  
     *  This class has a thread that submits jobs when they become ready, monitors the
     *  states of the running jobs, and updates the states of jobs based on the state changes 
     *  of their depending jobs states. The class provides APIs for suspending/resuming
     *  the thread,and for stopping the thread.
     *  
     */
    public class JobControl implements Runnable{
    
      // The thread can be in one of the following state
      private static final int RUNNING = 0;
      private static final int SUSPENDED = 1;
      private static final int STOPPED = 2;
      private static final int STOPPING = 3;
      private static final int READY = 4;
        
      private int runnerState;            // the thread state
        
      private Map<String, Job> waitingJobs;
      private Map<String, Job> readyJobs;
      private Map<String, Job> runningJobs;
      private Map<String, Job> successfulJobs;
      private Map<String, Job> failedJobs;
        
      private long nextJobID;
      private String groupName;
        
      /** 
       * Construct a job control for a group of jobs.
       * @param groupName a name identifying this group
       */
      public JobControl(String groupName) {
        this.waitingJobs = new Hashtable<String, Job>();
        this.readyJobs = new Hashtable<String, Job>();
        this.runningJobs = new Hashtable<String, Job>();
        this.successfulJobs = new Hashtable<String, Job>();
        this.failedJobs = new Hashtable<String, Job>();
        this.nextJobID = -1;
        this.groupName = groupName;
        this.runnerState = JobControl.READY;
      }
        
      private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
        ArrayList<Job> retv = new ArrayList<Job>();
        synchronized (jobs) {
          for (Job job : jobs.values()) {
            retv.add(job);
          }
        }
        return retv;
      }
        
      /**
       * @return the jobs in the waiting state
       */
      public ArrayList<Job> getWaitingJobs() {
        return JobControl.toArrayList(this.waitingJobs);
      }
        
      /**
       * @return the jobs in the running state
       */
      public ArrayList<Job> getRunningJobs() {
        return JobControl.toArrayList(this.runningJobs);
      }
        
      /**
       * @return the jobs in the ready state
       */
      public ArrayList<Job> getReadyJobs() {
        return JobControl.toArrayList(this.readyJobs);
      }
        
      /**
       * @return the jobs in the success state
       */
      public ArrayList<Job> getSuccessfulJobs() {
        return JobControl.toArrayList(this.successfulJobs);
      }
        
      public ArrayList<Job> getFailedJobs() {
        return JobControl.toArrayList(this.failedJobs);
      }
        
      private String getNextJobID() {
        nextJobID += 1;
        return this.groupName + this.nextJobID;
      }
        
      private static void addToQueue(Job aJob, Map<String, Job> queue) {
        synchronized(queue) {
          queue.put(aJob.getJobID(), aJob);
        }        
      }
        
      private void addToQueue(Job aJob) {
        Map<String, Job> queue = getQueue(aJob.getState());
        addToQueue(aJob, queue);    
      }
        
      private Map<String, Job> getQueue(int state) {
        Map<String, Job> retv = null;
        if (state == Job.WAITING) {
          retv = this.waitingJobs;
        } else if (state == Job.READY) {
          retv = this.readyJobs;
        } else if (state == Job.RUNNING) {
          retv = this.runningJobs;
        } else if (state == Job.SUCCESS) {
          retv = this.successfulJobs;
        } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
          retv = this.failedJobs;
        } 
        return retv;
      }
    
      /**
       * Add a new job.
       * @param aJob the new job
       */
      synchronized public String addJob(Job aJob) {
        String id = this.getNextJobID();
        aJob.setJobID(id);
        aJob.setState(Job.WAITING);
        this.addToQueue(aJob);
        return id;    
      }
        
      /**
       * Add a collection of jobs
       * 
       * @param jobs
       */
      public void addJobs(Collection<Job> jobs) {
        for (Job job : jobs) {
          addJob(job);
        }
      }
        
      /**
       * @return the thread state
       */
      public int getState() {
        return this.runnerState;
      }
        
      /**
       * set the thread state to STOPPING so that the 
       * thread will stop when it wakes up.
       */
      public void stop() {
        this.runnerState = JobControl.STOPPING;
      }
        
      /**
       * suspend the running thread
       */
      public void suspend () {
        if (this.runnerState == JobControl.RUNNING) {
          this.runnerState = JobControl.SUSPENDED;
        }
      }
        
      /**
       * resume the suspended thread
       */
      public void resume () {
        if (this.runnerState == JobControl.SUSPENDED) {
          this.runnerState = JobControl.RUNNING;
        }
      }
        
      synchronized private void checkRunningJobs() {
            
        Map<String, Job> oldJobs = null;
        oldJobs = this.runningJobs;
        this.runningJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.RUNNING) {
            System.out.println("The state of the running job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void checkWaitingJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.waitingJobs;
        this.waitingJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.WAITING) {
            System.out.println("The state of the waiting job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void startReadyJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.readyJobs;
        this.readyJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
          nextJob.submit();
          //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
          this.addToQueue(nextJob);
        }    
      }
        
      synchronized public boolean allFinished() {
        return this.waitingJobs.size() == 0 &&
          this.readyJobs.size() == 0 &&
          this.runningJobs.size() == 0;
      }
        
      /**
       *  The main loop for the thread.
       *  The loop does the following:
       *      Check the states of the running jobs
       *      Update the states of waiting jobs
       *      Submit the jobs in ready state
       */
      public void run() {
        this.runnerState = JobControl.RUNNING;
        while (true) {
          while (this.runnerState == JobControl.SUSPENDED) {
            try {
              Thread.sleep(5000);
            }
            catch (Exception e) {
                        
            }
          }
          checkRunningJobs();    
          checkWaitingJobs();        
          startReadyJobs();        
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
          try {
            Thread.sleep(5000);
          }
          catch (Exception e) {
                    
          }
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
        }
        this.runnerState = JobControl.STOPPED;
      }
    
    }
    JobControl

    参考资料

    《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

  • 相关阅读:
    卡特兰数
    hdu 1023 Train Problem II
    hdu 1022 Train Problem
    hdu 1021 Fibonacci Again 找规律
    java大数模板
    gcd
    object dection资源
    Rich feature hierarchies for accurate object detection and semantic segmentation(RCNN)
    softmax sigmoid
    凸优化
  • 原文地址:https://www.cnblogs.com/wuyudong/p/hadoop-jobcontrol.html
Copyright © 2011-2022 走看看