zoukankan      html  css  js  c++  java
  • 【转】[Hadoop源码解读](三)MapReduce篇之Job类

    转自:http://www.cnblogs.com/lucius/p/3449914.html

    下面,我们只涉及MapReduce 1,而不涉及YARN。

                                              

      当我们在写MapReduce程序的时候,通常,在main函数里,我们会像下面这样做。建立一个Job对象,设置它的JobName,然后配置输入输出 路径,设置我们的Mapper类和Reducer类,设置InputFormat和正确的输出类型等等。然后我们会使用 job.waitForCompletion()提交到JobTracker,等待job运行并返回,这就是一般的Job设置过程。JobTracker 会初始化这个Job,获取输入分片,然后将一个一个的task任务分配给TaskTrackers执行。TaskTracker获取task是通过心跳的 返回值得到的,然后TaskTracker就会为收到的task启动一个JVM来运行。

    复制代码
     1         Configuration conf = getConf();  
     2         Job job = new Job(conf, "SelectGradeDriver");  
     3         job.setJarByClass(SelectGradeDriver.class);   
     4           
     5         Path in = new Path(args[0]);  
     6         Path out = new Path(args[1]);  
     7           
     8         FileInputFormat.setInputPaths(job, in);  
     9         FileOutputFormat.setOutputPath(job, out);  
    10           
    11         job.setMapperClass(SelectGradeMapper.class);  
    12         job.setReducerClass(SelectGradeReducer.class);  
    13           
    14         job.setInputFormatClass(TextInputFormat.class);  
    15         job.setOutputFormatClass(TextOutputFormat.class);  
    16           
    17         job.setMapOutputKeyClass(InstituteAndGradeWritable.class);  
    18         job.setMapOutputValueClass(Text.class);  
    19           
    20         job.setOutputKeyClass(InstituteAndGradeWritable.class);  
    21         job.setOutputValueClass(Text.class);  
    22           
    23         System.exit(job.waitForCompletion(true)? 0 : 1);  
    复制代码

      Job其实就是提供配置作业、获取作业配置、以及提交作业的功能,以及跟踪作业进度和控制作业。Job类继承于JobContext类。 JobContext提供了获取作业配置的功能,如作业ID,作业的Mapper类,Reducer类,输入格式,输出格式等等,它们除了作业ID之外, 都是只读的。 Job类在JobContext的基础上,提供了设置作业配置信息的功能、跟踪进度,以及提交作业的接口和控制作业的方法。

    复制代码
     1 public class Job extends JobContext {  
     2   public static enum JobState {DEFINE, RUNNING};
     3   private JobState state = JobState.DEFINE;
     4   private JobClient jobClient;
     5   private RunningJob info;
     6   public float setupProgress() throws IOException {
     7     ensureState(JobState.RUNNING);
     8     return info.setupProgress();
     9   }
    10 
    11 
    12   public float mapProgress() throws IOException {
    13     ensureState(JobState.RUNNING);
    14     return info.mapProgress();
    15   }
    16   public float reduceProgress() throws IOException {
    17     ensureState(JobState.RUNNING);
    18     return info.reduceProgress();
    19   }
    20   public boolean isComplete() throws IOException {
    21     ensureState(JobState.RUNNING);
    22     return info.isComplete();
    23   }
    24   public boolean isSuccessful() throws IOException {
    25     ensureState(JobState.RUNNING);
    26     return info.isSuccessful();
    27   }
    28   public void killJob() throws IOException {
    29     ensureState(JobState.RUNNING);
    30     info.killJob();
    31   }
    32  public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
    33                                                        ) throws IOException {
    34     ensureState(JobState.RUNNING);
    35     return info.getTaskCompletionEvents(startFrom);
    36   }
    37 
    38   public void killTask(TaskAttemptID taskId) throws IOException {
    39     ensureState(JobState.RUNNING);
    40     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
    41                   false);
    42   }
    43   public void failTask(TaskAttemptID taskId) throws IOException {
    44     ensureState(JobState.RUNNING);
    45     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
    46                   true);
    47   }
    48 
    49   public Counters getCounters() throws IOException {
    50     ensureState(JobState.RUNNING);
    51     return new Counters(info.getCounters());
    52   }
    53   public void submit() throws IOException, InterruptedException, 
    54                               ClassNotFoundException {
    55     ensureState(JobState.DEFINE);
    56     setUseNewAPI();
    57     
    58     // Connect to the JobTracker and submit the job
    59     connect();
    60     info = jobClient.submitJobInternal(conf);
    61     super.setJobID(info.getID());
    62     state = JobState.RUNNING;
    63    }
    64   private void connect() throws IOException, InterruptedException {
    65     ugi.doAs(new PrivilegedExceptionAction<Object>() {
    66       public Object run() throws IOException {
    67         jobClient = new JobClient((JobConf) getConfiguration());    
    68         return null;
    69       }
    70     });
    71   }
    72   public boolean waitForCompletion(boolean verbose
    73                                    ) throws IOException, InterruptedException,
    74                                             ClassNotFoundException {
    75     if (state == JobState.DEFINE) {
    76       submit();
    77     }
    78     if (verbose) {
    79       jobClient.monitorAndPrintJob(conf, info);
    80     } else {
    81       info.waitForCompletion();
    82     }
    83     return isSuccessful();
    84   }
    85   //lots of setters and others
    86 }
    复制代码

      一个Job对象有两种状态,DEFINE和RUNNING,Job对象被创建时的状态时DEFINE,当且仅当Job对象处于DEFINE状态,才可以用 来设置作业的一些配置,如Reduce task的数量、InputFormat类、工作的Mapper类,Partitioner类等等,这些设置是通过设置配置信息conf来实现的;当作业 通过submit()被提交,就会将这个Job对象的状态设置为RUNNING,这时候作业以及提交了,就不能再设置上面那些参数了,作业处于调度运行阶 段。处于RUNNING状态的作业我们可以获取作业、map task和reduce task的进度,通过代码中的*Progress()获得,这些函数是通过info来获取的,info是RunningJob对象,它是实际在运行的作业 的一组获取作业情况的接口,如Progress。

      在waitForCompletion()中,首先用submit()提交作业,然后等待info.waitForCompletion()返回作业执行 完毕。verbose参数用来决定是否将运行进度等信息输出给用户。submit()首先会检查是否正确使用了new API,这通过setUseNewAPI()检查旧版本的属性是否被设置来实现的[设置是否使用newAPI是因为执行Task时要根据使用的API版本 来执行不同版本的MapReduce,在后面讲MapTask时会说到],接着就connect()连接JobTracker并提交。实际提交作业的是一 个JobClient对象,提交作业后返回一个RunningJob对象,这个对象可以跟踪作业的进度以及含有由JobTracker设置的作业ID。

      getCounter()函数是用来返回这个作业的计数器列表的,计数器被用来收集作业的统计信息,比如失败的map task数量,reduce输出的记录数等等。它包括内置计数器和用户定义的计数器,用户自定义的计数器可以用来收集用户需要的特定信息。计数器首先被每 个task定期传输到TaskTracker,最后TaskTracker再传到JobTracker收集起来。这就意味着,计数器是全局的。

      关于Counter相关的类,为了保持篇幅简短,放在下一篇讲。

  • 相关阅读:
    centos6升级python
    MySQL的BLOB类型(解决mysql不支持mb4编码的时候存储emoji表情问题)
    librdkafka安装和php扩展php-rdkafka安装
    Mac High Sierra 降级安装Mac Sierra
    mysql常用命令
    PHP_CodeSniffer 安装和phpstorm配置
    SSH登录异常(someone is doing something nasty)
    java并发 —— Lock
    java 并发——线程
    java 并发——内置锁
  • 原文地址:https://www.cnblogs.com/conie/p/3583591.html
Copyright © 2011-2022 走看看