zoukankan      html  css  js  c++  java
  • Job类

    当我们在写MapReduce程序的时候,通常,在main函数里,我们会像下面这样做。建立一个Job对象,设置它的JobName,然后配置输入输出路径,设置我们的Mapper类和Reducer类,设置InputFormat和正确的输出类型等等。然后我们会使用job.waitForCompletion()提交到JobTracker,等待job运行并返回,这就是一般的Job设置过程。JobTracker会初始化这个Job,获取输入分片,然后将一个一个的task任务分配给TaskTrackers执行。TaskTracker获取task是通过心跳的返回值得到的,然后TaskTracker就会为收到的task启动一个JVM来运行。

      

    1. Configuration conf = getConf();    
    2. Job job = new Job(conf, "SelectGradeDriver");    
    3. job.setJarByClass(SelectGradeDriver.class);     
    4.     
    5. Path in = new Path(args[0]);    
    6. Path out = new Path(args[1]);    
    7.     
    8. FileInputFormat.setInputPaths(job, in);    
    9. FileOutputFormat.setOutputPath(job, out);    
    10.     
    11. job.setMapperClass(SelectGradeMapper.class);    
    12. job.setReducerClass(SelectGradeReducer.class);    
    13.     
    14. job.setInputFormatClass(TextInputFormat.class);    
    15. job.setOutputFormatClass(TextOutputFormat.class);    
    16.     
    17. job.setMapOutputKeyClass(InstituteAndGradeWritable.class);    
    18. job.setMapOutputValueClass(Text.class);    
    19.     
    20. job.setOutputKeyClass(InstituteAndGradeWritable.class);    
    21. job.setOutputValueClass(Text.class);    
    22.     
    23. System.exit(job.waitForCompletion(true)? 0 : 1);    


      Job其实就是提供配置作业、获取作业配置、以及提交作业的功能,以及跟踪作业进度和控制作业。Job类继承于JobContext类。JobContext提供了获取作业配置的功能,如作业ID,作业的Mapper类,Reducer类,输入格式,输出格式等等,它们除了作业ID之外,都是只读的。 Job类在JobContext的基础上,提供了设置作业配置信息的功能、跟踪进度,以及提交作业的接口和控制作业的方法。

    1. public class Job extends JobContext {    
    2.   public static enum JobState {DEFINE, RUNNING};  
    3.   private JobState state = JobState.DEFINE;  
    4.   private JobClient jobClient;  
    5.   private RunningJob info;  
    6.   public float setupProgress() throws IOException {  
    7.     ensureState(JobState.RUNNING);  
    8.     return info.setupProgress();  
    9.   }  
    10.   
    11.   
    12.   public float mapProgress() throws IOException {  
    13.     ensureState(JobState.RUNNING);  
    14.     return info.mapProgress();  
    15.   }  
    16.   public float reduceProgress() throws IOException {  
    17.     ensureState(JobState.RUNNING);  
    18.     return info.reduceProgress();  
    19.   }  
    20.   public boolean isComplete() throws IOException {  
    21.     ensureState(JobState.RUNNING);  
    22.     return info.isComplete();  
    23.   }  
    24.   public boolean isSuccessful() throws IOException {  
    25.     ensureState(JobState.RUNNING);  
    26.     return info.isSuccessful();  
    27.   }  
    28.   public void killJob() throws IOException {  
    29.     ensureState(JobState.RUNNING);  
    30.     info.killJob();  
    31.   }  
    32.  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom  
    33.                                                        ) throws IOException {  
    34.     ensureState(JobState.RUNNING);  
    35.     return info.getTaskCompletionEvents(startFrom);  
    36.   }  
    37.   
    38.   public void killTask(TaskAttemptID taskId) throws IOException {  
    39.     ensureState(JobState.RUNNING);  
    40.     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),   
    41.                   false);  
    42.   }  
    43.   public void failTask(TaskAttemptID taskId) throws IOException {  
    44.     ensureState(JobState.RUNNING);  
    45.     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),   
    46.                   true);  
    47.   }  
    48.   
    49.   public Counters getCounters() throws IOException {  
    50.     ensureState(JobState.RUNNING);  
    51.     return new Counters(info.getCounters());  
    52.   }  
    53.   public void submit() throws IOException, InterruptedException,   
    54.                               ClassNotFoundException {  
    55.     ensureState(JobState.DEFINE);  
    56.     setUseNewAPI();  
    57.       
    58.     // Connect to the JobTracker and submit the job  
    59.     connect();  
    60.     info = jobClient.submitJobInternal(conf);  
    61.     super.setJobID(info.getID());  
    62.     state = JobState.RUNNING;  
    63.    }  
    64.   private void connect() throws IOException, InterruptedException {  
    65.     ugi.doAs(new PrivilegedExceptionAction<Object>() {  
    66.       public Object run() throws IOException {  
    67.         jobClient = new JobClient((JobConf) getConfiguration());      
    68.         return null;  
    69.       }  
    70.     });  
    71.   }  
    72.   public boolean waitForCompletion(boolean verbose  
    73.                                    ) throws IOException, InterruptedException,  
    74.                                             ClassNotFoundException {  
    75.     if (state == JobState.DEFINE) {  
    76.       submit();  
    77.     }  
    78.     if (verbose) {  
    79.       jobClient.monitorAndPrintJob(conf, info);  
    80.     } else {  
    81.       info.waitForCompletion();  
    82.     }  
    83.     return isSuccessful();  
    84.   }  
    85.   //lots of setters and others  
    86. }  

     

      一个Job对象有两种状态,DEFINE和RUNNING,Job对象被创建时的状态时DEFINE,当且仅当Job对象处于DEFINE状态,才可以用来设置作业的一些配置,如Reduce task的数量、InputFormat类、工作的Mapper类,Partitioner类等等,这些设置是通过设置配置信息conf来实现的;当作业通过submit()被提交,就会将这个Job对象的状态设置为RUNNING,这时候作业以及提交了,就不能再设置上面那些参数了,作业处于调度运行阶段。处于RUNNING状态的作业我们可以获取作业、map task和reduce task的进度,通过代码中的*Progress()获得,这些函数是通过info来获取的,info是RunningJob对象,它是实际在运行的作业的一组获取作业情况的接口,如Progress。

    在waitForCompletion()中,首先用submit()提交作业,然后等待info.waitForCompletion()返回作业执行完毕。verbose参数用来决定是否将运行进度等信息输出给用户。submit()首先会检查是否正确使用了new API,这通过setUseNewAPI()检查旧版本的属性是否被设置来实现的,接着就connect()连接JobTracker并提交。实际提交作业的是一个JobClient对象,提交作业后返回一个RunningJob对象,这个对象可以跟踪作业的进度以及含有由JobTracker设置的作业ID。

      getCounter()函数是用来返回这个作业的计数器列表的,计数器被用来收集作业的统计信息,比如失败的map task数量,reduce输出的记录数等等。它包括内置计数器和用户定义的计数器,用户自定义的计数器可以用来收集用户需要的特定信息。计数器首先被每个task定期传输到TaskTracker,最后TaskTracker再传到JobTracker收集起来。这就意味着,计数器是全局的。

    转自:

  • 相关阅读:
    SDWebImage缓存下载图片
    NSMutableUrlRequest自定义封装网络请求
    第152题:乘积最大子序列
    第142题:环形链表II
    第17题:电话号码的组合
    第129题:求根到叶子节点数字之和
    第125题:验证回文串
    第122题:买卖股票的最佳时机II
    第121题:买卖股票的最佳时机
    第120题:三角形最小路径和
  • 原文地址:https://www.cnblogs.com/xuepei/p/3667206.html
Copyright © 2011-2022 走看看