zoukankan      html  css  js  c++  java
  • 【Hadoop代码笔记】Hadoop作业提交之客户端作业提交

    1.      概要描述
    仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。

    2.      详细描述
    Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。

    在JobClient中作业提交的主要过程如下:

    1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
    2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
    3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
    4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormatcheckOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
    5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
    6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
    7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
    8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

      

    引用下Hadoop: The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。

    3.      涉及主要类介绍:

    Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等

    JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。

    这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。

    详细可参照JobClient中 的初始化代码:

      /**
       *如果是非local的就会 连接到指定的JobTracker  
       */
      public void init(JobConf conf) throws IOException {
        String tracker = conf.get("mapred.job.tracker", "local");
        if ("local".equals(tracker)) {
          this.jobSubmitClient = new LocalJobRunner(conf);
        } else {
          this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
        }        
      }
    
     /*
      * RPC不是本次主题重点,可参照后续发表的专题内容
      */
      private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
          Configuration conf) throws IOException {
        return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
            JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
      }
    初始化JobSubmissionProtocol

    InputFormat 重要,但暂不展开(此处会有链接)

    Split  重要,但暂不展开(此处会有链接)

    RowSplit 重要,但暂不展开(此处会有链接)

     4.     主要代码
    通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。

      public RunningJob submitJob(JobConf job) throws FileNotFoundException,
                                                      IOException {
        try {
          return submitJobInternal(job);
        } catch (InterruptedException ie) {
          throw new IOException("interrupted", ie);
        } catch (ClassNotFoundException cnfe) {
          throw new IOException("class not found", cnfe);
        }
      }
    JobClient submitJob

    实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接) 

     1 public RunningJob submitJobInternal(JobConf job)
     2             throws FileNotFoundException, ClassNotFoundException,
     3             InterruptedException, IOException {
     4 
     5         // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
     6         JobID jobId = jobSubmitClient.getNewJobId();
     7         // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
     8         // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
     9 
    10         Path submitJobDir = new Path(getSystemDir(), jobId.toString());
    11         Path submitJarFile = new Path(submitJobDir, "job.jar");
    12         Path submitSplitFile = new Path(submitJobDir, "job.split");
    13         configureCommandLineOptions(job, submitJobDir, submitJarFile);
    14         Path submitJobFile = new Path(submitJobDir, "job.xml");
    15         int reduces = job.getNumReduceTasks();
    16         JobContext context = new JobContext(job, jobId);
    17 
    18         // Check the output specification
    19         // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
    20 
    21         if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
    22             org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
    23                     .newInstance(context.getOutputFormatClass(), job);
    24             output.checkOutputSpecs(context);
    25         } else {
    26             job.getOutputFormat().checkOutputSpecs(fs, job);
    27         }
    28 
    29         // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
    30         // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
    31         // job.split名称。
    32         // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
    33 
    34         // Create the splits for the job
    35         LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    36         int maps;
    37         if (job.getUseNewMapper()) {
    38             maps = writeNewSplits(context, submitSplitFile);
    39         } else {
    40             maps = writeOldSplits(job, submitSplitFile);
    41         }
    42         job.set("mapred.job.split.file", submitSplitFile.toString());
    43         job.setNumMapTasks(maps);
    44 
    45         // Write job file to JobTracker's fs
    46         FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
    47                 new FsPermission(JOB_FILE_PERMISSION));
    48 
    49         try {
    50             job.writeXml(out);
    51         } finally {
    52             out.close();
    53         }
    54 
    55         // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
    56 
    57         JobStatus status = jobSubmitClient.submitJob(jobId);
    58         if (status != null) {
    59             return new NetworkedJob(status);
    60         } else {
    61             throw new IOException("Could not launch job");
    62         }
    63     }
     /**
       * JobTracker.submitJob() kicks off a new job.  
       *
       * Create a 'JobInProgress' object, which contains both JobProfile
       * and JobStatus.  Those two sub-objects are sometimes shipped outside
       * of the JobTracker.  But JobInProgress adds info that's useful for
       * the JobTracker alone.
       */
      public synchronized JobStatus submitJob(JobID jobId) throws IOException {
        if(jobs.containsKey(jobId)) {
          //job already running, don't start twice
          return jobs.get(jobId).getStatus();
        }
        
        JobInProgress job = new JobInProgress(jobId, this, this.conf);
        
        String queue = job.getProfile().getQueueName();
        if(!(queueManager.getQueues().contains(queue))) {      
          new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
          throw new IOException("Queue "" + queue + "" does not exist");        
        }
    
        // check for access
        try {
          checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
        } catch (IOException ioe) {
           LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                    + ". Ignoring job " + jobId, ioe);
          new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
          throw ioe;
        }
    
       return addJob(jobId, job); 
      }
    JobTracker submitJob

    为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html。谢谢!

  • 相关阅读:
    codevs 1115 开心的金明
    POJ 1125 Stockbroker Grapevine
    POJ 2421 constructing roads
    codevs 1390 回文平方数 USACO
    codevs 1131 统计单词数 2011年NOIP全国联赛普及组
    codevs 1313 质因数分解
    洛谷 绕钉子的长绳子
    洛谷 P1276 校门外的树(增强版)
    codevs 2627 村村通
    codevs 1191 数轴染色
  • 原文地址:https://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html
Copyright © 2011-2022 走看看