zoukankan      html  css  js  c++  java
  • Hadoop-1.2.1学习之Job创建和提交源码分析

           在Hadoop中,MapReduce的Java作业通常由编写Mapper和Reducer開始。接着创建Job对象。然后使用该对象的set方法设置Mapper和Reducer以及诸如输入输出等參数,最后调用Job对象的waitForCompletion(true)方法提交作业并等待作业的完毕。虽然使用了寥寥数语就描写叙述了作业的创建和提交,但实际情况要复杂的多。本篇文章将通过分析源码来深入学习该过程。

           通常使用public Job(Configuration conf, String jobName)创建Job作业对象,都会指定作业名称,hadoop代码仅仅是将jobName设置为參数mapred.job.name的值。

    除了设置作业名称外,Job的构造函数还会使用Configuration对象初始化org.apache.hadoop.mapred.JobConf对象conf,以及使用UserGroupInformation.getCurrentUser()获取当前用户ugi。当中JobConf是描写叙述MapReduce作业的主要接口,包括设置作业名称在内的很多方法都是由该类完毕的。

    UserGroupInformation类用包括了用户和组的信息。该类封装了JAAS(Java Authentication AuthorizationService。Java认证和授权服务),并提供方法确定username和组。

           当创建了Job对象后一般会设置Mapper和Reducer。比方job.setMapperClass,正像上面提到的,该操作实际是由JobConf对象完毕的,详细代码例如以下,其他的设置方法类似:

    public void setMapperClass(Class<?

    extends Mapper> cls) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); }

           在设置完作业运行须要的參数后。运行job.waitForCompletion(true)向集群提交作业并等待作业运行完毕。当中的boolean类型的參数用于决定是否向用户打印作业的运行进度。该方法的详细代码例如以下:

    public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
        if (state == JobState.DEFINE) {
          submit();
        }
        if (verbose) {
          jobClient.monitorAndPrintJob(conf, info);
        } else {
          info.waitForCompletion();
        }
        return isSuccessful();
    }
    

           当新创建一个作业时,该作业的JobState state = JobState.DEFINE,所以上面的代码中会运行submit方法。当在submit返回后会依据參数verbose为true或false运行不同的方法。如今详细submit的实现:

    public void submit() throws IOException, InterruptedException, ClassNotFoundException {
        ensureState(JobState.DEFINE);
        setUseNewAPI();//默认使用新版本号中的API,除非显示设置了老版本号的API
        
        // Connect to the JobTracker and submit the job
        connect();
        info = jobClient.submitJobInternal(conf);
        super.setJobID(info.getID());
        state = JobState.RUNNING;
    }
    

           在submit中,先确认Job的state为JobState.DEFINE。并最后在将作业提交后设置为JobState.RUNNING。connect方法用于打开到JobTracker的连接,该方法的代码为:

    private void connect() throws IOException, InterruptedException {
        ugi.doAs(new PrivilegedExceptionAction<Object>() {
          public Object run() throws IOException {
            jobClient = new JobClient((JobConf) getConfiguration());    
            return null;
          }
        });
    }
    

           在进一步分析之前,须要先了解两个对象。各自是JobClient jobClient和RunningJobinfo,当中jobClient是用户作业与JobTracker交互的主要接口,该类具有提交作业,跟踪作业进度,訪问任务日志和获取MapReduce集群状态信息等功能。RunningJob是接口,用于查询正在执行的MapReduce作业的细节,当调用jobClient的submitJobInternal时。返回的是jobClient的内部类NetworkedJob(该类实现了RunningJob)。在connect方法中,主要是实例化了jobClient对象,而ugi的doAs方法的返回值为run方法的返回值,后面还会使用该方法(实际情况是该方法被大量使用)。在JobClient的构造方法中,主要完毕了连接JobTracker的工作,该工作又交给了init方法,该方法的详细实现为:

    public void init(JobConf conf) throws IOException {
    String tracker = conf.get("mapred.job.tracker", "local");
    // mapreduce.client.tasklog.timeout
        tasklogtimeout = conf.getInt(
          TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
        this.ugi = UserGroupInformation.getCurrentUser();
        if ("local".equals(tracker)) {
          conf.setNumMapTasks(1);
          this.jobSubmitClient = new LocalJobRunner(conf);
        } else {
          this.rpcJobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
          this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
        }        
    }
    

           在该方法中着重分析非单机模式下的情况。即mapred.job.tracker的值不是local。也即else语句中的代码。rpcJobSubmitClient和jobSubmitClient是类型为JobSubmissionProtocol的两个对象,JobClient和JobTracker使用该接口通信,JobClient使用该接口的方法提交作业及了解当前系统的状态。方法createRPCProxy和createProxy用于创建实现JobSubmissionProtocol的client对象。

           在连接到JobTracker后,接着使用jobClient的submitJobInternal向JobTracker提交作业。

    在该方法中首先确定存放作业文件的路径,该路径为${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging设置,若未设置mapreduce.jobtracker.staging.root.dir则使用/tmp/hadoop/mapred/staging/${user-name}/.staging。然后在上述文件夹创建名为作业Id的文件夹,并将參数mapreduce.job.dir设置为该值。即${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId,上面的文件夹均是相对于fs.default.name设置的值。接下来将作业的jar文件复制到${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId中,并重命名为job.jar文件。该工作由copyAndConfigureFiles方法完毕。接着须要在上述文件夹中创建job.xml文件。获取Reduce任务的数量,切割输入文件并依据切割所得块数设置Map任务的数量。做完上述工作后,使用以下的代码提交作业:

    status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials());

           当将作业提交到JobTracker后。作业的运行将由JobTracker负责,而做为提交作业的client能够选择是否打印作业运行进度。

           综上在Hadoop-1.2.1中作业的创建和提交包含例如以下的一些过程:

    • 设置作业的输入输出參数
    • 拷贝作业文件和配置文件到特定文件夹中
    • 计算作业的分片并设置Map任务的数量
    •  向JobTracker提交作业并可选的监控作业执行进度

  • 相关阅读:
    R语言数据集合
    转:EXCEL中如何获取从某一字符开始到最右边字符串
    转:EXCEL打乱顺序
    转:excel中怎样做柱状图
    转:linux复制/剪切文件到另一个文件夹
    转:Linux常用命令
    转:怎么在一张PPT里设置很多步骤出现的内容呀
    禅道分析
    转:BUG的严重级别分类 BUG状态标准
    转:Bug的严重等级和优先级
  • 原文地址:https://www.cnblogs.com/mthoutai/p/7338625.html
Copyright © 2011-2022 走看看