zoukankan      html  css  js  c++  java
  • mapreduce job提交流程源码级分析(二)(原创)

    上一小节(http://www.cnblogs.com/lxf20061900/p/3643581.html)讲到Job. submit()方法中的:

    info = jobClient.submitJobInternal(conf)方法用来上传资源提交Job的,这一节就讲讲这个方法。

    一、首先jobClient在构造函数中会构造了和JobTracker通信的对象jobSubmitClient,jobSubmitClient是JobSubmissionProtocol类型的动态代理类。JobSubmissionProtocol协议是JobClient与JobTracker通信专用协议。代码如下:

     private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
          Configuration conf) throws IOException {
        return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
            JobSubmissionProtocol.versionID, addr, 
            UserGroupInformation.getCurrentUser(), conf,
            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
      }

       getProxy方法的关键是Invoker类,Invoker类实现了 InvocationHandler接口,主要有两个成员变量,remoteId是Client.ConnectionId类型,保存连接地址和用户的 ticket,客户端连接服务器由<remoteAddress,protocol,ticket>唯一标识。

    Invoker类的invoke方法最重要的操作是:ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId)。Invocation实现了Writable接口,并封装了method和args,使得可以通过RPC传输;Client.call方法将Writable参数封装到一个Call中,并且连接JobTracker后将封装后call发送过去,同步等待call执行完毕,返回value。

     public Writable call(Writable param, ConnectionId remoteId)  
                           throws InterruptedException, IOException {
        Call call = new Call(param);
        Connection connection = getConnection(remoteId, call);
        connection.sendParam(call);                 // send the parameter
        boolean interrupted = false;
        synchronized (call) {
          while (!call.done) {
            try {
              call.wait();                           // wait for the result
            } catch (InterruptedException ie) {
              // save the fact that we were interrupted
              interrupted = true;
            }
          }
    
          if (interrupted) {
            // set the interrupt flag now that we are done waiting
            Thread.currentThread().interrupt();
          }
    
          if (call.error != null) {
            if (call.error instanceof RemoteException) {
              call.error.fillInStackTrace();
              throw call.error;
            } else { // local exception
              // use the connection because it will reflect an ip change, unlike
              // the remoteId
              throw wrapException(connection.getRemoteAddress(), call.error);
            }
          } else {
            return call.value;
          }
        }
      }

       上面的第四行代码用于建立同JobTracker的连接。而Client.getConnection方法中connection.setupIOstreams()才是真正建立连接的地方,其中的socket是通过默认的SocketFactory .createSocket(),而这个默认的SocketFactory是org.apache.hadoop.net. StandardSocketFactory。

    二、jobClient.submitJobInternal(conf)初始化staging目录(这是job提交的根目录):Path jobStagingArea=JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy),这个方法最终会调用jobTracker.getStagingAreaDirInternal()方法,代码如下:

      private String getStagingAreaDirInternal(String user) throws IOException {
        final Path stagingRootDir =
          new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
                "/tmp/hadoop/mapred/staging"));
        final FileSystem fs = stagingRootDir.getFileSystem(conf);
        return fs.makeQualified(new Path(stagingRootDir,
                                  user+"/.staging")).toString();
      }

    三、从JobTracker获取JobID。JobID jobId = jobSubmitClient.getNewJobId()。最终调用的是JobTracker.getNewJobId()方法。然后执行Path submitJobDir = new Path(jobStagingArea, jobId.toString());获得该job提交的路径,也就是在stagingDir目录下建一个以jobId为文件名的目录,可以查看配置文件中的"mapreduce.job.dir"来查看此完整目录。有了 submitJobDir之后就可以将job运行所需的全部文件上传到对应的目录下了,具体是调用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。

    四、copyAndConfigureFiles(jobCopy, submitJobDir)实现上传文件,包括-tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些归档文件)以及job.jar拷贝到HDFS中,这个方法最终调用jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication);这个方法实现文件上传。而前三种文件(tmpfiles(外部文件)、tmpjars(第三方jar包)、tmparchives(一些归档文件))的实际上传过程在copyRemoteFiles方法中,通过FileUtil.copy完成拷贝,这三种文件都是先分割文件列表后分别上传(每一类文件可以有多个)。然后是:

       // First we check whether the cached archives and files are legal.
        TrackerDistributedCacheManager.validate(job);
        //  set the timestamps of the archives and files
        TrackerDistributedCacheManager.determineTimestamps(job);
        //  set the public/private visibility of the archives and files
        TrackerDistributedCacheManager.determineCacheVisibilities(job);
        // get DelegationTokens for cache files
        TrackerDistributedCacheManager.getDelegationTokens(job,job.getCredentials());

    上面的代码是进行一些cached archives and files的校验和保存其时间戳和权限内容

      Job.jar通过fs.copyFromLocalFile方法拷贝到HDFS中。而job.jar(这是打包后的作业)文件则是直接通过fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);上传完成。我们在提交作业的时候会在本地先打包成jar文件然后将配置文件中的"mapred.jar"设置为本地jar包路径,当在这里拷贝到HDFS中后在重新将"mapred.jar"设置为HDFS对应job.jar包的路径。

      同时这四个文件都会设置replication个副本,防止热点出现。

    五、然后就会根据我们设置的outputFormat类执行output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。

    六、对输入文件进行分片操作了,int maps = writeSplits(context, submitJobDir)。writeSplits方法会根据是否使用了新API选择不同的方法写:

    private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        JobConf jConf = (JobConf)job.getConfiguration();
        int maps;
        if (jConf.getUseNewMapper()) {
          maps = writeNewSplits(job, jobSubmitDir);
        } else {
          maps = writeOldSplits(jConf, jobSubmitDir);
        }
        return maps;
      }
    使用了新API后,会调用writeNewSplits(job, jobSubmitDir)方法,这个方法代码如下:
     private <T extends InputSplit>
      int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        InputFormat<?, ?> input =
          ReflectionUtils.newInstance(job.getInputFormatClass(), conf);//默认是TextInputFormat
    
        List<InputSplit> splits = input.getSplits(job);
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    
        // sort the splits into order based on size, so that the biggest
        // go first,大文件优先处理
        Arrays.sort(array, new SplitComparator());
        JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
            jobSubmitDir.getFileSystem(conf), array);
        return array.length;//这是mapper的数量
      }
    
    

    可以看出该方法首先获取splits数组信息后,排序,将会优先处理大文件。JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array)方法会将split信息和SplitMetaInfo都写入HDFS中,其代码如下:

    public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
          Configuration conf, FileSystem fs, T[] splits) 
      throws IOException, InterruptedException {
        FSDataOutputStream out = createFile(fs, 
            JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
        SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
        out.close();
        writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
            info);
      }

    如上writeNewSplits会将信息写入job.split文件,然后返回SplitMetaInfo数组信息,再通过writeJobSplitMetaInfo方法SplitMetaInfo信息写入job.splitmetainfo中。

    七、然后将配置文件写入:jobCopy.writeXml(out);//写"job.xml"。

    八、通过 jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())提交job,最终调用的是JobTracker.submitJob。

    九、返回一个NetworkedJob(status, prof, jobSubmitClient)对象,它实现了RunningJob接口。这个对象可以在JobClient端(比如eclipse,不断的打印运行日志)。

    ps:

    一、hadoop版本是1.0.0;

    二、上述文件的提交目录可以在web ui中打开相应作业的配置文件查找"mapreduce.job.dir",就可以看到文件的上传目录。比如:hdfs://XXXX:8020/user/hadoop/.staging/job_201403141637_0160

    下一节关注上述的步骤八。

    错误之处还望大伙指点

    参考:

    http://www.kankanews.com/ICkengine/archives/87415.shtml

  • 相关阅读:
    跳出iframe
    leetcode 225. Implement Stack using Queues
    leetcode 206. Reverse Linked List
    leetcode 205. Isomorphic Strings
    leetcode 203. Remove Linked List Elements
    leetcode 198. House Robber
    leetcode 190. Reverse Bits
    leetcode leetcode 783. Minimum Distance Between BST Nodes
    leetcode 202. Happy Number
    leetcode 389. Find the Difference
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3655727.html
Copyright © 2011-2022 走看看