zoukankan      html  css  js  c++  java
  • 【Hadoop代码笔记】Hadoop作业提交之客户端作业提交

    1.      概要描述
    仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。

    2.      详细描述
    Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。

    在JobClient中作业提交的主要过程如下:

    1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
    2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
    3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
    4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormatcheckOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
    5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
    6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
    7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
    8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

      

    引用下Hadoop: The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。

    3.      涉及主要类介绍:

    Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等

    JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。

    这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。

    详细可参照JobClient中 的初始化代码:

      /**
       *如果是非local的就会 连接到指定的JobTracker  
       */
      public void init(JobConf conf) throws IOException {
        String tracker = conf.get("mapred.job.tracker", "local");
        if ("local".equals(tracker)) {
          this.jobSubmitClient = new LocalJobRunner(conf);
        } else {
          this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
        }        
      }
    
     /*
      * RPC不是本次主题重点,可参照后续发表的专题内容
      */
      private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
          Configuration conf) throws IOException {
        return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
            JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
      }
    初始化JobSubmissionProtocol

    InputFormat 重要,但暂不展开(此处会有链接)

    Split  重要,但暂不展开(此处会有链接)

    RowSplit 重要,但暂不展开(此处会有链接)

     4.     主要代码
    通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。

      public RunningJob submitJob(JobConf job) throws FileNotFoundException,
                                                      IOException {
        try {
          return submitJobInternal(job);
        } catch (InterruptedException ie) {
          throw new IOException("interrupted", ie);
        } catch (ClassNotFoundException cnfe) {
          throw new IOException("class not found", cnfe);
        }
      }
    JobClient submitJob

    实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接) 

     1 public RunningJob submitJobInternal(JobConf job)
     2             throws FileNotFoundException, ClassNotFoundException,
     3             InterruptedException, IOException {
     4 
     5         // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
     6         JobID jobId = jobSubmitClient.getNewJobId();
     7         // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
     8         // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
     9 
    10         Path submitJobDir = new Path(getSystemDir(), jobId.toString());
    11         Path submitJarFile = new Path(submitJobDir, "job.jar");
    12         Path submitSplitFile = new Path(submitJobDir, "job.split");
    13         configureCommandLineOptions(job, submitJobDir, submitJarFile);
    14         Path submitJobFile = new Path(submitJobDir, "job.xml");
    15         int reduces = job.getNumReduceTasks();
    16         JobContext context = new JobContext(job, jobId);
    17 
    18         // Check the output specification
    19         // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
    20 
    21         if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
    22             org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
    23                     .newInstance(context.getOutputFormatClass(), job);
    24             output.checkOutputSpecs(context);
    25         } else {
    26             job.getOutputFormat().checkOutputSpecs(fs, job);
    27         }
    28 
    29         // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
    30         // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
    31         // job.split名称。
    32         // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
    33 
    34         // Create the splits for the job
    35         LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    36         int maps;
    37         if (job.getUseNewMapper()) {
    38             maps = writeNewSplits(context, submitSplitFile);
    39         } else {
    40             maps = writeOldSplits(job, submitSplitFile);
    41         }
    42         job.set("mapred.job.split.file", submitSplitFile.toString());
    43         job.setNumMapTasks(maps);
    44 
    45         // Write job file to JobTracker's fs
    46         FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
    47                 new FsPermission(JOB_FILE_PERMISSION));
    48 
    49         try {
    50             job.writeXml(out);
    51         } finally {
    52             out.close();
    53         }
    54 
    55         // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
    56 
    57         JobStatus status = jobSubmitClient.submitJob(jobId);
    58         if (status != null) {
    59             return new NetworkedJob(status);
    60         } else {
    61             throw new IOException("Could not launch job");
    62         }
    63     }
     /**
       * JobTracker.submitJob() kicks off a new job.  
       *
       * Create a 'JobInProgress' object, which contains both JobProfile
       * and JobStatus.  Those two sub-objects are sometimes shipped outside
       * of the JobTracker.  But JobInProgress adds info that's useful for
       * the JobTracker alone.
       */
      public synchronized JobStatus submitJob(JobID jobId) throws IOException {
        if(jobs.containsKey(jobId)) {
          //job already running, don't start twice
          return jobs.get(jobId).getStatus();
        }
        
        JobInProgress job = new JobInProgress(jobId, this, this.conf);
        
        String queue = job.getProfile().getQueueName();
        if(!(queueManager.getQueues().contains(queue))) {      
          new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
          throw new IOException("Queue "" + queue + "" does not exist");        
        }
    
        // check for access
        try {
          checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
        } catch (IOException ioe) {
           LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                    + ". Ignoring job " + jobId, ioe);
          new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
          throw ioe;
        }
    
       return addJob(jobId, job); 
      }
    JobTracker submitJob

    为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html。谢谢!

  • 相关阅读:
    reset内容
    如何在鼠标悬停时图片旁边出现详情说明模块
    从零学习Entity Framework
    一款程序员用的小说下载器
    实习笔记(数据库相关)-2014
    ASP.NET json数据的序列化与反序列化
    不使用服务器控件的ASP.NET
    win7 下先装SQL2005 后装SQL2000 解决方案
    Jquery 插件封装成seajs的模块
    前端模块化实践——seajs的使用
  • 原文地址:https://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html
Copyright © 2011-2022 走看看