zoukankan      html  css  js  c++  java
  • JobControl 的实现原理

    本文地址:http://www.cnblogs.com/archimedes/p/hadoop-jobcontrol.html,转载请注明源地址。

    引入实例:贝叶斯分类

    贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步骤:训练样本和分类

    其实现由多个MapReduce 作业完成,如图所示。其中,训练样本可由三个 MapReduce 作业实现:

    第一个作业(ExtractJob)抽取文档特征,该作业只需要 Map 即可完成 ;

    第二个作业(ClassPriorJob)计算类别的先验概率,即统计每个类别中文档的数目,并计算类别概率;

    第三个作业(ConditionalProbilityJob)计算单词的条件概率,即统计<label, word> 在所有文档中出现的次数并计算单词的条件概率。

    后两个作业的具体实现类似于WordCount。分类过程由一个作业(PredictJob)完成。该作业的 map()函数计算每个待分类文档属于每个类别的概率,reduce() 函数找出每个文档概率最高的类别,并输出 <docid, label>( 编号为 docid 的文档属于类别 label)。


    一个完整的贝叶斯分类算法可能需要 4 个有依赖关系的 MapReduce 作业完成,传统的做法是:为每个作业创建相应的 JobConf 对象,并按照依赖关系依次(串行)提交各个作业,如下所示:

    // 为 4 个作业分别创建 JobConf 对象
    JobConf extractJobConf = new JobConf(ExtractJob.class);
    JobConf classPriorJobConf = new JobConf(ClassPriorJob.class);
    JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob. class) ;
    JobConf predictJobConf = new JobConf(PredictJob.class);
    ...// 配置各个 JobConf
    // 按照依赖关系依次提交作业
    JobClient.runJob(extractJobConf);
    JobClient.runJob(classPriorJobConf);
    JobClient.runJob(conditionalProbilityJobConf);
    JobClient.runJob(predictJobConf);

    如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业,具体代码如下:

    Configuration extractJobConf = new Configuration();
    Configuration classPriorJobConf = new Configuration();
    Configuration conditionalProbilityJobConf = new Configuration();
    Configuration predictJobConf = new Configuration();
    ...// 设置各个Configuration
    // 创建Job对象。注意,JobControl要求作业必须封装成Job对象
    Job extractJob = new Job(extractJobConf);
    Job classPriorJob = new Job(classPriorJobConf);
    Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);
    Job predictJob = new Job(predictJobConf);
    //设置依赖关系,构造一个DAG作业
    classPriorJob.addDepending(extractJob);
    conditionalProbilityJob.addDepending(extractJob);
    predictJob.addDepending(classPriorJob);
    predictJob.addDepending(conditionalProbilityJob);
    //创建JobControl对象,由它对作业进行监控和调度
    JobControl JC = new JobControl("Native Bayes");
    JC.addJob(extractJob);//把4个作业加入JobControl中
    JC.addJob(classPriorJob);
    JC.addJob(conditionalProbilityJob);
    JC.addJob(predictJob);
    JC.run(); //提交DAG作业

    在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完成,classPriorJob 和 conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,predictJob 被调度。对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简便,且能使多个无依赖关系的作业并行运行。

    JobControl 设计原理分析

    JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入 READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING 状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。

    JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。 它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API 用于挂起、恢复和暂停该线程。

    Job类深入剖析

    在Job类的起始部分,定义了一些数据域,包括job所处的状态,以及其他相关的信息,具体代码如下:

    import java.util.ArrayList;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobID;
    import org.apache.hadoop.mapred.jobcontrol.Job;
    
      // 一个 job 将处于如下的一种状态
      final public static int SUCCESS = 0;    //成功
      final public static int WAITING = 1;     //警告
      final public static int RUNNING = 2;    //运行
      final public static int READY = 3;    //准备
      final public static int FAILED = 4;    //失败
      final public static int DEPENDENT_FAILED = 5;    //依赖的作业失败
        
      private JobConf theJobConf;
      private int state;
      private String jobID;         // 通过JobControl class分配和使用
      private JobID mapredJobID;    // 通过map/reduce分配的job ID
      private String jobName;        // 外部名字, 通过client app分配/使用
      private String message;        // 一些有用的信息例如用户消耗, 
      // e.g. job失败的原因
      private ArrayList<Job> dependingJobs;    // 当前job所依赖的jobs列表
      private JobClient jc = null;        // map reduce job client

    接着定义了两个构造函数:

      /** 
       * Construct a job.
       * @param jobConf a mapred job configuration representing a job to be executed.
       * @param dependingJobs an array of jobs the current job depends on
       */
      public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
        this.theJobConf = jobConf;
        this.dependingJobs = dependingJobs;
        this.state = Job.WAITING;
        this.jobID = "unassigned";
        this.mapredJobID = null; //not yet assigned 
        this.jobName = "unassigned";
        this.message = "just initialized";
        this.jc = new JobClient(jobConf);
      }
      
      /**
       * Construct a job.
       * 
       * @param jobConf mapred job configuration representing a job to be executed.
       * @throws IOException
       */
      public Job(JobConf jobConf) throws IOException {
        this(jobConf, null);
      }

    接着重写了String类中的toString方法,代码如下:

     @Override
      public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("job name:	").append(this.jobName).append("
    ");
        sb.append("job id:	").append(this.jobID).append("
    ");
        sb.append("job state:	").append(this.state).append("
    ");
        sb.append("job mapred id:	").append(this.mapredJobID==null ? "unassigned" 
            : this.mapredJobID).append("
    ");
        sb.append("job message:	").append(this.message).append("
    ");
            
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          sb.append("job has no depending job:	").append("
    ");
        } else {
          sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:
    ");
          for (int i = 0; i < this.dependingJobs.size(); i++) {
            sb.append("	 depending job ").append(i).append(":	");
            sb.append((this.dependingJobs.get(i)).getJobName()).append("
    ");
          }
        }
        return sb.toString();
      }
    toString

    接下来是一长串的get/set获取设置属性的代码: 

      /**
       * @return the job name of this job
       */
      public String getJobName() {
        return this.jobName;
      }
        
      /**
       * Set the job name for  this job.
       * @param jobName the job name
       */
      public void setJobName(String jobName) {
        this.jobName = jobName;
      }
        
      /**
       * @return the job ID of this job assigned by JobControl
       */
      public String getJobID() {
        return this.jobID;
      }
        
      /**
       * Set the job ID for  this job.
       * @param id the job ID
       */
      public void setJobID(String id) {
        this.jobID = id;
      }
        
      /**
       * @return the mapred ID of this job
       * @deprecated use {@link #getAssignedJobID()} instead
       */
      @Deprecated
      public String getMapredJobID() {
        return this.mapredJobID.toString();
      }
        
      /**
       * Set the mapred ID for this job.
       * @param mapredJobID the mapred job ID for this job.
       * @deprecated use {@link #setAssignedJobID(JobID)} instead
       */
      @Deprecated
      public void setMapredJobID(String mapredJobID) {
        this.mapredJobID = JobID.forName(mapredJobID);
      }
        
      /**
       * @return the mapred ID of this job as assigned by the 
       * mapred framework.
       */
      public JobID getAssignedJobID() {
        return this.mapredJobID;
      }
      
      /**
       * Set the mapred ID for this job as assigned by the 
       * mapred framework.
       * @param mapredJobID the mapred job ID for this job.
       */
      public void setAssignedJobID(JobID mapredJobID) {
        this.mapredJobID = mapredJobID;
      }
      
      /**
       * @return the mapred job conf of this job
       */
      public JobConf getJobConf() {
        return this.theJobConf;
      }
        
      /**
       * Set the mapred job conf for this job.
       * @param jobConf the mapred job conf for this job.
       */
      public void setJobConf(JobConf jobConf) {
        this.theJobConf = jobConf;
      }
        
      /**
       * @return the state of this job
       */
      public synchronized int getState() {
        return this.state;
      }
        
      /**
       * Set the state for this job.
       * @param state the new state for this job.
       */
      protected synchronized void setState(int state) {
        this.state = state;
      }
        
      /**
       * @return the message of this job
       */
      public String getMessage() {
        return this.message;
      }
        
      /**
       * Set the message for this job.
       * @param message the message for this job.
       */
      public void setMessage(String message) {
        this.message = message;
      }
        
      /**
       * @return the job client of this job
       */
      public JobClient getJobClient(){
              return this.jc;
      }
    
      /**
       * @return the depending jobs of this job
       */
      public ArrayList<Job> getDependingJobs() {
        return this.dependingJobs;
      }
    get/set

    当Job处于writing状态下的时候,可以向依赖列表中添加所依赖的Job:

      /**
       * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
       * is waiting to run, not during or afterwards.
       * 
       * @param dependingJob Job that this Job depends on.
       * @return <tt>true</tt> if the Job was added.
       */
      public synchronized boolean addDependingJob(Job dependingJob) {
        if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
          if (this.dependingJobs == null) {
            this.dependingJobs = new ArrayList<Job>();
          }
          return this.dependingJobs.add(dependingJob);
        } else {
          return false;
        }
      }

    还提供了是否处于完成状态和是否处于准备状态的判断方法:

      /**
       * @return true if this job is in a complete state
       */
      public boolean isCompleted() {
        return this.state == Job.FAILED || 
          this.state == Job.DEPENDENT_FAILED ||
          this.state == Job.SUCCESS;
      }
        
      /**
       * @return true if this job is in READY state
       */
      public boolean isReady() {
        return this.state == Job.READY;
      }
        

    提供了检查正在运行的Job的状态,如果完成,判断是成功还是失败,代码如下:

    /**
       * Check the state of this running job. The state may 
       * remain the same, become SUCCESS or FAILED.
       */
      private void checkRunningState() {
        RunningJob running = null;
        try {
          running = jc.getJob(this.mapredJobID);
          if (running.isComplete()) {
            if (running.isSuccessful()) {
              this.state = Job.SUCCESS;
            } else {
              this.state = Job.FAILED;
              this.message = "Job failed!";
              try {
                running.killJob();
              } catch (IOException e1) {
    
              }
              try {
                this.jc.close();
              } catch (IOException e2) {
    
              }
            }
          }
    
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
          try {
            if (running != null)
              running.killJob();
          } catch (IOException e1) {
    
          }
          try {
            this.jc.close();
          } catch (IOException e1) {
    
          }
        }
      }

    实现了检查并更新Job的状态的checkState()方法:

    /**
       * Check and update the state of this job. The state changes  
       * depending on its current state and the states of the depending jobs.
       */
       synchronized int checkState() {
        if (this.state == Job.RUNNING) {
          checkRunningState();
        }
        if (this.state != Job.WAITING) {
          return this.state;
        }
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          this.state = Job.READY;
          return this.state;
        }
        Job pred = null;
        int n = this.dependingJobs.size();
        for (int i = 0; i < n; i++) {
          pred = this.dependingJobs.get(i);
          int s = pred.checkState();
          if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
            break; // a pred is still not completed, continue in WAITING
            // state
          }
          if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
            this.state = Job.DEPENDENT_FAILED;
            this.message = "depending job " + i + " with jobID "
              + pred.getJobID() + " failed. " + pred.getMessage();
            break;
          }
          // pred must be in success state
          if (i == n - 1) {
            this.state = Job.READY;
          }
        }
        return this.state;
      }
        

    最后包含提交Job的方法submit(),代码如下:

      /**
       * Submit this job to mapred. The state becomes RUNNING if submission 
       * is successful, FAILED otherwise.  
       */
      protected synchronized void submit() {
        try {
          if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
            FileSystem fs = FileSystem.get(theJobConf);
            Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
            for (int i = 0; i < inputPaths.length; i++) {
              if (!fs.exists(inputPaths[i])) {
                try {
                  fs.mkdirs(inputPaths[i]);
                } catch (IOException e) {
    
                }
              }
            }
          }
          RunningJob running = jc.submitJob(theJobConf);
          this.mapredJobID = running.getID();
          this.state = Job.RUNNING;
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
        }
      }
        
    }

    完整的Job类源代码如下:

    package org.apache.hadoop.mapred.jobcontrol;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobID;
    import org.apache.hadoop.mapred.RunningJob;
    import org.apache.hadoop.util.StringUtils;
    
    /** This class encapsulates a MapReduce job and its dependency. It monitors 
     *  the states of the depending jobs and updates the state of this job.
     *  A job starts in the WAITING state. If it does not have any depending jobs, or
     *  all of the depending jobs are in SUCCESS state, then the job state will become
     *  READY. If any depending jobs fail, the job will fail too. 
     *  When in READY state, the job can be submitted to Hadoop for execution, with
     *  the state changing into RUNNING state. From RUNNING state, the job can get into 
     *  SUCCESS or FAILED state, depending the status of the job execution.
     *  
     */
    
    public class Job {
    
      // A job will be in one of the following states
      final public static int SUCCESS = 0;
      final public static int WAITING = 1;
      final public static int RUNNING = 2;
      final public static int READY = 3;
      final public static int FAILED = 4;
      final public static int DEPENDENT_FAILED = 5;
        
        
      private JobConf theJobConf;
      private int state;
      private String jobID;         // assigned and used by JobControl class
      private JobID mapredJobID; // the job ID assigned by map/reduce
      private String jobName;        // external name, assigned/used by client app
      private String message;        // some info for human consumption, 
      // e.g. the reason why the job failed
      private ArrayList<Job> dependingJobs;    // the jobs the current job depends on
        
      private JobClient jc = null;        // the map reduce job client
        
      /** 
       * Construct a job.
       * @param jobConf a mapred job configuration representing a job to be executed.
       * @param dependingJobs an array of jobs the current job depends on
       */
      public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
        this.theJobConf = jobConf;
        this.dependingJobs = dependingJobs;
        this.state = Job.WAITING;
        this.jobID = "unassigned";
        this.mapredJobID = null; //not yet assigned 
        this.jobName = "unassigned";
        this.message = "just initialized";
        this.jc = new JobClient(jobConf);
      }
      
      /**
       * Construct a job.
       * 
       * @param jobConf mapred job configuration representing a job to be executed.
       * @throws IOException
       */
      public Job(JobConf jobConf) throws IOException {
        this(jobConf, null);
      }
        
      @Override
      public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("job name:	").append(this.jobName).append("
    ");
        sb.append("job id:	").append(this.jobID).append("
    ");
        sb.append("job state:	").append(this.state).append("
    ");
        sb.append("job mapred id:	").append(this.mapredJobID==null ? "unassigned" 
            : this.mapredJobID).append("
    ");
        sb.append("job message:	").append(this.message).append("
    ");
            
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          sb.append("job has no depending job:	").append("
    ");
        } else {
          sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:
    ");
          for (int i = 0; i < this.dependingJobs.size(); i++) {
            sb.append("	 depending job ").append(i).append(":	");
            sb.append((this.dependingJobs.get(i)).getJobName()).append("
    ");
          }
        }
        return sb.toString();
      }
        
      /**
       * @return the job name of this job
       */
      public String getJobName() {
        return this.jobName;
      }
        
      /**
       * Set the job name for  this job.
       * @param jobName the job name
       */
      public void setJobName(String jobName) {
        this.jobName = jobName;
      }
        
      /**
       * @return the job ID of this job assigned by JobControl
       */
      public String getJobID() {
        return this.jobID;
      }
        
      /**
       * Set the job ID for  this job.
       * @param id the job ID
       */
      public void setJobID(String id) {
        this.jobID = id;
      }
        
      /**
       * @return the mapred ID of this job
       * @deprecated use {@link #getAssignedJobID()} instead
       */
      @Deprecated
      public String getMapredJobID() {
        return this.mapredJobID.toString();
      }
        
      /**
       * Set the mapred ID for this job.
       * @param mapredJobID the mapred job ID for this job.
       * @deprecated use {@link #setAssignedJobID(JobID)} instead
       */
      @Deprecated
      public void setMapredJobID(String mapredJobID) {
        this.mapredJobID = JobID.forName(mapredJobID);
      }
        
      /**
       * @return the mapred ID of this job as assigned by the 
       * mapred framework.
       */
      public JobID getAssignedJobID() {
        return this.mapredJobID;
      }
      
      /**
       * Set the mapred ID for this job as assigned by the 
       * mapred framework.
       * @param mapredJobID the mapred job ID for this job.
       */
      public void setAssignedJobID(JobID mapredJobID) {
        this.mapredJobID = mapredJobID;
      }
      
      /**
       * @return the mapred job conf of this job
       */
      public JobConf getJobConf() {
        return this.theJobConf;
      }
        
    
      /**
       * Set the mapred job conf for this job.
       * @param jobConf the mapred job conf for this job.
       */
      public void setJobConf(JobConf jobConf) {
        this.theJobConf = jobConf;
      }
        
      /**
       * @return the state of this job
       */
      public synchronized int getState() {
        return this.state;
      }
        
      /**
       * Set the state for this job.
       * @param state the new state for this job.
       */
      protected synchronized void setState(int state) {
        this.state = state;
      }
        
      /**
       * @return the message of this job
       */
      public String getMessage() {
        return this.message;
      }
        
      /**
       * Set the message for this job.
       * @param message the message for this job.
       */
      public void setMessage(String message) {
        this.message = message;
      }
        
    
      /**
       * @return the job client of this job
       */
      public JobClient getJobClient(){
              return this.jc;
      }
    
      /**
       * @return the depending jobs of this job
       */
      public ArrayList<Job> getDependingJobs() {
        return this.dependingJobs;
      }
      
      /**
       * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
       * is waiting to run, not during or afterwards.
       * 
       * @param dependingJob Job that this Job depends on.
       * @return <tt>true</tt> if the Job was added.
       */
      public synchronized boolean addDependingJob(Job dependingJob) {
        if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
          if (this.dependingJobs == null) {
            this.dependingJobs = new ArrayList<Job>();
          }
          return this.dependingJobs.add(dependingJob);
        } else {
          return false;
        }
      }
        
      /**
       * @return true if this job is in a complete state
       */
      public boolean isCompleted() {
        return this.state == Job.FAILED || 
          this.state == Job.DEPENDENT_FAILED ||
          this.state == Job.SUCCESS;
      }
        
      /**
       * @return true if this job is in READY state
       */
      public boolean isReady() {
        return this.state == Job.READY;
      }
        
      /**
       * Check the state of this running job. The state may 
       * remain the same, become SUCCESS or FAILED.
       */
      private void checkRunningState() {
        RunningJob running = null;
        try {
          running = jc.getJob(this.mapredJobID);
          if (running.isComplete()) {
            if (running.isSuccessful()) {
              this.state = Job.SUCCESS;
            } else {
              this.state = Job.FAILED;
              this.message = "Job failed!";
              try {
                running.killJob();
              } catch (IOException e1) {
    
              }
              try {
                this.jc.close();
              } catch (IOException e2) {
    
              }
            }
          }
    
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
          try {
            if (running != null)
              running.killJob();
          } catch (IOException e1) {
    
          }
          try {
            this.jc.close();
          } catch (IOException e1) {
    
          }
        }
      }
        
      /**
       * Check and update the state of this job. The state changes  
       * depending on its current state and the states of the depending jobs.
       */
       synchronized int checkState() {
        if (this.state == Job.RUNNING) {
          checkRunningState();
        }
        if (this.state != Job.WAITING) {
          return this.state;
        }
        if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
          this.state = Job.READY;
          return this.state;
        }
        Job pred = null;
        int n = this.dependingJobs.size();
        for (int i = 0; i < n; i++) {
          pred = this.dependingJobs.get(i);
          int s = pred.checkState();
          if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
            break; // a pred is still not completed, continue in WAITING
            // state
          }
          if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
            this.state = Job.DEPENDENT_FAILED;
            this.message = "depending job " + i + " with jobID "
              + pred.getJobID() + " failed. " + pred.getMessage();
            break;
          }
          // pred must be in success state
          if (i == n - 1) {
            this.state = Job.READY;
          }
        }
    
        return this.state;
      }
        
      /**
       * Submit this job to mapred. The state becomes RUNNING if submission 
       * is successful, FAILED otherwise.  
       */
      protected synchronized void submit() {
        try {
          if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
            FileSystem fs = FileSystem.get(theJobConf);
            Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
            for (int i = 0; i < inputPaths.length; i++) {
              if (!fs.exists(inputPaths[i])) {
                try {
                  fs.mkdirs(inputPaths[i]);
                } catch (IOException e) {
    
                }
              }
            }
          }
          RunningJob running = jc.submitJob(theJobConf);
          this.mapredJobID = running.getID();
          this.state = Job.RUNNING;
        } catch (IOException ioe) {
          this.state = Job.FAILED;
          this.message = StringUtils.stringifyException(ioe);
        }
      }
        
    }
    Job

    JobControl类深入剖析

    在JobControl类的起始部分,定义了一些数据域,包括线程所处的状态,以及其他相关的信息,具体代码如下:

      // The thread can be in one of the following state
      private static final int RUNNING = 0;
      private static final int SUSPENDED = 1;
      private static final int STOPPED = 2;
      private static final int STOPPING = 3;
      private static final int READY = 4;
        
      private int runnerState;            // the thread state
        
      private Map<String, Job> waitingJobs;
      private Map<String, Job> readyJobs;
      private Map<String, Job> runningJobs;
      private Map<String, Job> successfulJobs;
      private Map<String, Job> failedJobs;
        
      private long nextJobID;
      private String groupName;

    接下来是对应的构造函数:

      /** 
       * Construct a job control for a group of jobs.
       * @param groupName a name identifying this group
       */
      public JobControl(String groupName) {
        this.waitingJobs = new Hashtable<String, Job>();
        this.readyJobs = new Hashtable<String, Job>();
        this.runningJobs = new Hashtable<String, Job>();
        this.successfulJobs = new Hashtable<String, Job>();
        this.failedJobs = new Hashtable<String, Job>();
        this.nextJobID = -1;
        this.groupName = groupName;
        this.runnerState = JobControl.READY;
      }

    接着是一个将Map的Jobs转换为ArrayList的转换方法(toArrayList),代码如下:

    private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
        ArrayList<Job> retv = new ArrayList<Job>();
        synchronized (jobs) {
          for (Job job : jobs.values()) {
            retv.add(job);
          }
        }
        return retv;
    }

    类中当然少不了一些get方法:

      /**
       * @return the jobs in the success state
       */
      public ArrayList<Job> getSuccessfulJobs() {
        return JobControl.toArrayList(this.successfulJobs);
      }
      public ArrayList<Job> getFailedJobs() {
        return JobControl.toArrayList(this.failedJobs);
      }
      private String getNextJobID() {
        nextJobID += 1;
        return this.groupName + this.nextJobID;
      }

    类中还有将Job插入Job队列的方法:

     private static void addToQueue(Job aJob, Map<String, Job> queue) {
        synchronized(queue) {
          queue.put(aJob.getJobID(), aJob);
        }        
      }
        
      private void addToQueue(Job aJob) {
        Map<String, Job> queue = getQueue(aJob.getState());
        addToQueue(aJob, queue);    
      }

    既然有插入队列,就有从Job队列根据Job运行状态而取出的方法,代码如下:

      private Map<String, Job> getQueue(int state) {
        Map<String, Job> retv = null;
        if (state == Job.WAITING) {
          retv = this.waitingJobs;
        } else if (state == Job.READY) {
          retv = this.readyJobs;
        } else if (state == Job.RUNNING) {
          retv = this.runningJobs;
        } else if (state == Job.SUCCESS) {
          retv = this.successfulJobs;
        } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
          retv = this.failedJobs;
        } 
        return retv;
      }

    添加一个新的Job的方法:

      /**
       * Add a new job.
       * @param aJob the new job
       */
      synchronized public String addJob(Job aJob) {
        String id = this.getNextJobID();
        aJob.setJobID(id);
        aJob.setState(Job.WAITING);
        this.addToQueue(aJob);
        return id;    
      }
        
      /**
       * Add a collection of jobs
       * 
       * @param jobs
       */
      public void addJobs(Collection<Job> jobs) {
        for (Job job : jobs) {
          addJob(job);
        }
      }

    获取线程的状态,设置、停止线程的方法:

    /**
       * @return the thread state
       */
      public int getState() {
        return this.runnerState;
      }
        
      /**
       * set the thread state to STOPPING so that the 
       * thread will stop when it wakes up.
       */
      public void stop() {
        this.runnerState = JobControl.STOPPING;
      }
        
      /**
       * suspend the running thread
       */
      public void suspend () {
        if (this.runnerState == JobControl.RUNNING) {
          this.runnerState = JobControl.SUSPENDED;
        }
      }
        
      /**
       * resume the suspended thread
       */
      public void resume () {
        if (this.runnerState == JobControl.SUSPENDED) {
          this.runnerState = JobControl.RUNNING;
        }
      }
        

    检查运行、等待的Jobs,将符合条件的添加至相应的队列: 

      synchronized private void checkRunningJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.runningJobs;
        this.runningJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.RUNNING) {
            System.out.println("The state of the running job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void checkWaitingJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.waitingJobs;
        this.waitingJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.WAITING) {
            System.out.println("The state of the waiting job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void startReadyJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.readyJobs;
        this.readyJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
          nextJob.submit();
          //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
          this.addToQueue(nextJob);
        }    
      }

    判断是否所有的JOb都结束的方法:

      synchronized public boolean allFinished() {
        return this.waitingJobs.size() == 0 &&
          this.readyJobs.size() == 0 &&
          this.runningJobs.size() == 0;
      }
        

    检查运行Jobs的状态、更新等待Job状态、在准备状态下提交的Run方法:

    /**
       *  The main loop for the thread.
       *  The loop does the following:
       *      Check the states of the running jobs
       *      Update the states of waiting jobs
       *      Submit the jobs in ready state
       */
      public void run() {
        this.runnerState = JobControl.RUNNING;
        while (true) {
          while (this.runnerState == JobControl.SUSPENDED) {
            try {
              Thread.sleep(5000);
            }
            catch (Exception e) {
                        
            }
          }
          checkRunningJobs();    
          checkWaitingJobs();        
          startReadyJobs();        
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
          try {
            Thread.sleep(5000);
          }
          catch (Exception e) {
                    
          }
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
        }
        this.runnerState = JobControl.STOPPED;
      }
    
    }

    完整的JobControl类:

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.mapred.jobcontrol;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Hashtable;
    import java.util.Map;
    
    /** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
     *  the states of the jobs by placing them into different tables according to their 
     *  states. 
     *  
     *  This class provides APIs for the client app to add a job to the group and to get 
     *  the jobs in the group in different states. When a 
     *  job is added, an ID unique to the group is assigned to the job. 
     *  
     *  This class has a thread that submits jobs when they become ready, monitors the
     *  states of the running jobs, and updates the states of jobs based on the state changes 
     *  of their depending jobs states. The class provides APIs for suspending/resuming
     *  the thread,and for stopping the thread.
     *  
     */
    public class JobControl implements Runnable{
    
      // The thread can be in one of the following state
      private static final int RUNNING = 0;
      private static final int SUSPENDED = 1;
      private static final int STOPPED = 2;
      private static final int STOPPING = 3;
      private static final int READY = 4;
        
      private int runnerState;            // the thread state
        
      private Map<String, Job> waitingJobs;
      private Map<String, Job> readyJobs;
      private Map<String, Job> runningJobs;
      private Map<String, Job> successfulJobs;
      private Map<String, Job> failedJobs;
        
      private long nextJobID;
      private String groupName;
        
      /** 
       * Construct a job control for a group of jobs.
       * @param groupName a name identifying this group
       */
      public JobControl(String groupName) {
        this.waitingJobs = new Hashtable<String, Job>();
        this.readyJobs = new Hashtable<String, Job>();
        this.runningJobs = new Hashtable<String, Job>();
        this.successfulJobs = new Hashtable<String, Job>();
        this.failedJobs = new Hashtable<String, Job>();
        this.nextJobID = -1;
        this.groupName = groupName;
        this.runnerState = JobControl.READY;
      }
        
      private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
        ArrayList<Job> retv = new ArrayList<Job>();
        synchronized (jobs) {
          for (Job job : jobs.values()) {
            retv.add(job);
          }
        }
        return retv;
      }
        
      /**
       * @return the jobs in the waiting state
       */
      public ArrayList<Job> getWaitingJobs() {
        return JobControl.toArrayList(this.waitingJobs);
      }
        
      /**
       * @return the jobs in the running state
       */
      public ArrayList<Job> getRunningJobs() {
        return JobControl.toArrayList(this.runningJobs);
      }
        
      /**
       * @return the jobs in the ready state
       */
      public ArrayList<Job> getReadyJobs() {
        return JobControl.toArrayList(this.readyJobs);
      }
        
      /**
       * @return the jobs in the success state
       */
      public ArrayList<Job> getSuccessfulJobs() {
        return JobControl.toArrayList(this.successfulJobs);
      }
        
      public ArrayList<Job> getFailedJobs() {
        return JobControl.toArrayList(this.failedJobs);
      }
        
      private String getNextJobID() {
        nextJobID += 1;
        return this.groupName + this.nextJobID;
      }
        
      private static void addToQueue(Job aJob, Map<String, Job> queue) {
        synchronized(queue) {
          queue.put(aJob.getJobID(), aJob);
        }        
      }
        
      private void addToQueue(Job aJob) {
        Map<String, Job> queue = getQueue(aJob.getState());
        addToQueue(aJob, queue);    
      }
        
      private Map<String, Job> getQueue(int state) {
        Map<String, Job> retv = null;
        if (state == Job.WAITING) {
          retv = this.waitingJobs;
        } else if (state == Job.READY) {
          retv = this.readyJobs;
        } else if (state == Job.RUNNING) {
          retv = this.runningJobs;
        } else if (state == Job.SUCCESS) {
          retv = this.successfulJobs;
        } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
          retv = this.failedJobs;
        } 
        return retv;
      }
    
      /**
       * Add a new job.
       * @param aJob the new job
       */
      synchronized public String addJob(Job aJob) {
        String id = this.getNextJobID();
        aJob.setJobID(id);
        aJob.setState(Job.WAITING);
        this.addToQueue(aJob);
        return id;    
      }
        
      /**
       * Add a collection of jobs
       * 
       * @param jobs
       */
      public void addJobs(Collection<Job> jobs) {
        for (Job job : jobs) {
          addJob(job);
        }
      }
        
      /**
       * @return the thread state
       */
      public int getState() {
        return this.runnerState;
      }
        
      /**
       * set the thread state to STOPPING so that the 
       * thread will stop when it wakes up.
       */
      public void stop() {
        this.runnerState = JobControl.STOPPING;
      }
        
      /**
       * suspend the running thread
       */
      public void suspend () {
        if (this.runnerState == JobControl.RUNNING) {
          this.runnerState = JobControl.SUSPENDED;
        }
      }
        
      /**
       * resume the suspended thread
       */
      public void resume () {
        if (this.runnerState == JobControl.SUSPENDED) {
          this.runnerState = JobControl.RUNNING;
        }
      }
        
      synchronized private void checkRunningJobs() {
            
        Map<String, Job> oldJobs = null;
        oldJobs = this.runningJobs;
        this.runningJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.RUNNING) {
            System.out.println("The state of the running job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void checkWaitingJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.waitingJobs;
        this.waitingJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          int state = nextJob.checkState();
          /*
            if (state != Job.WAITING) {
            System.out.println("The state of the waiting job " +
            nextJob.getJobName() + " has changed to: " + nextJob.getState());
            }
          */
          this.addToQueue(nextJob);
        }
      }
        
      synchronized private void startReadyJobs() {
        Map<String, Job> oldJobs = null;
        oldJobs = this.readyJobs;
        this.readyJobs = new Hashtable<String, Job>();
            
        for (Job nextJob : oldJobs.values()) {
          //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
          nextJob.submit();
          //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
          this.addToQueue(nextJob);
        }    
      }
        
      synchronized public boolean allFinished() {
        return this.waitingJobs.size() == 0 &&
          this.readyJobs.size() == 0 &&
          this.runningJobs.size() == 0;
      }
        
      /**
       *  The main loop for the thread.
       *  The loop does the following:
       *      Check the states of the running jobs
       *      Update the states of waiting jobs
       *      Submit the jobs in ready state
       */
      public void run() {
        this.runnerState = JobControl.RUNNING;
        while (true) {
          while (this.runnerState == JobControl.SUSPENDED) {
            try {
              Thread.sleep(5000);
            }
            catch (Exception e) {
                        
            }
          }
          checkRunningJobs();    
          checkWaitingJobs();        
          startReadyJobs();        
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
          try {
            Thread.sleep(5000);
          }
          catch (Exception e) {
                    
          }
          if (this.runnerState != JobControl.RUNNING && 
              this.runnerState != JobControl.SUSPENDED) {
            break;
          }
        }
        this.runnerState = JobControl.STOPPED;
      }
    
    }
    JobControl

    参考资料

    《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

  • 相关阅读:
    修改oracle的sys、system密码
    错误随手笔记
    JS 全选
    ider向虚拟机上传jar包
    Spring中的八大设计模式
    事务的隔离级别
    hive常用函数全集
    Kafka常用命令
    字符设备驱动框架学习总结
    根文件系统熟悉(一)根文件系统构建过程记录
  • 原文地址:https://www.cnblogs.com/wuyudong/p/hadoop-jobcontrol.html
Copyright © 2011-2022 走看看