zoukankan      html  css  js  c++  java
  • MapReduce源代码分析之JobSubmitter(一)

            JobSubmitter。顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外。对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的全部业务逻辑。

    本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter。

            首先,我们先看下JobSubmitter的类成员变量。例如以下:

      // 文件系统FileSystem实例
      private FileSystem jtFs;
      // client通信协议ClientProtocol实例
      private ClientProtocol submitClient;
      // 提交作业的主机名
      private String submitHostName;
      // 提交作业的主机地址
      private String submitHostAddress;
            它一共同拥有四个类成员变量,分别为:

            1、文件系统FileSystem实例jtFs:用于操作作业执行须要的各种文件等。

            2、client通信协议ClientProtocol实例submitClient:用于与集群交互。完毕作业提交、作业状态查询等;

            3、提交作业的主机名submitHostName。

            4、提交作业的主机地址submitHostAddress。

            当中,client通信协议ClientProtocol实例submitClient是通过Cluster的client通信协议ClientProtocol实例client来赋值的,我们在《MapReduce源代码分析之新API作业提交(二):连接集群》一文中以前提到过。它依据MapReduce中參数mapreduce.framework.name的配置为yarn或local。有Yarn模式的YARNRunner和Local模式的LocalJobRunner两种情况。


            接下来,我们再看下JobSubmitter的构造函数,例如以下:

      JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 
      throws IOException {
    	  
    	// 依据入參赋值成员变量submitClient、jtFs
        this.submitClient = submitClient;
        this.jtFs = submitFs;
      }
            非常easy,依据入參赋值成员变量submitClient、jtFs而已。

            关键的来了,我们看下JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群,代码例如以下:

      /**
       * Internal method for submitting jobs to the system.
       * 
       * <p>The job submission process involves:
       * <ol>
       *   <li>
       *   Checking the input and output specifications of the job.
       *   </li>
       *   <li>
       *   Computing the {@link InputSplit}s for the job.
       *   </li>
       *   <li>
       *   Setup the requisite accounting information for the 
       *   {@link DistributedCache} of the job, if necessary.
       *   </li>
       *   <li>
       *   Copying the job's jar and configuration to the map-reduce system
       *   directory on the distributed file-system. 
       *   </li>
       *   <li>
       *   Submitting the job to the <code>JobTracker</code> and optionally
       *   monitoring it's status.
       *   </li>
       * </ol></p>
       * @param job the configuration to submit
       * @param cluster the handle to the Cluster
       * @throws ClassNotFoundException
       * @throws InterruptedException
       * @throws IOException
       */
      JobStatus submitJobInternal(Job job, Cluster cluster) 
      throws ClassNotFoundException, InterruptedException, IOException {
    
        //validate the jobs output specs 
    	// 调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在,
    	// 正确的情况应该是已配置且未存在,输出路径配置參数为mapreduce.output.fileoutputformat.outputdir,
    	// 之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output
        checkSpecs(job);
    
        // 从作业job中获取配置信息conf
        Configuration conf = job.getConfiguration();
        
        // 调用addMRFrameworkToDistributedCache()方法加入应用框架路径到分布式缓存中
        addMRFrameworkToDistributedCache(conf);
    
        // 通过JobSubmissionFiles的getStagingDir()静态方法获取作业运行时阶段区域路径jobStagingArea
        // 取參数yarn.app.mapreduce.am.staging-dir。參数未配置默觉得/tmp/hadoop-yarn/staging
    	// 然后后面是/提交作业username/.staging
    	// 通过之前的WordCount任务的运行,我们查看历史记录,得知參数yarn.app.mapreduce.am.staging-dir配置的为/user。
    	// 而提交作业username为hdfs,所以完整的路径应该为/user/hdfs/.staging
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
        
        //configure the command line options correctly on the submitting dfs
        // 获取当前本机地址
        InetAddress ip = InetAddress.getLocalHost();
        
        // 确定提交作业的主机地址、主机名,并设置入配置信息conf,相应參数分别为
        // mapreduce.job.submithostname
        // mapreduce.job.submithostaddress
        if (ip != null) {
          submitHostAddress = ip.getHostAddress();
          submitHostName = ip.getHostName();
          conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
          conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
        }
        
        // 生成作业ID。即JobID实例jobId
        JobID jobId = submitClient.getNewJobID();
        
        // 将jobId设置入job
        job.setJobID(jobId);
        
        // 构造提交作业路径Path实例submitJobDir,jobStagingArea后接/jobId。比方/job_1459913635503_0005
        // 之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        JobStatus status = null;
        
        // 设置作业一些參数:
        try {
        	
          // 设置mapreduce.job.user.name为当前用户。之前的WordCount演示样例配置的为hdfs用户
          conf.set(MRJobConfig.USER_NAME,
              UserGroupInformation.getCurrentUser().getShortUserName());
          
          // 设置hadoop.http.filter.initializers为AmFilterInitializer
          conf.set("hadoop.http.filter.initializers", 
              "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
          
          // 设置mapreduce.job.dir为submitJobDir,比方/user/hdfs/.staging/job_1459913635503_0005
          conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
          LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
              + " as the submit dir");
          
          // get delegation token for the dir
          // 获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法
          TokenCache.obtainTokensForNamenodes(job.getCredentials(),
              new Path[] { submitJobDir }, conf);
          
          // 获取密钥和令牌。并将它们存储到令牌缓存TokenCache中
          populateTokenCache(conf, job.getCredentials());
    
          // generate a secret to authenticate shuffle transfers
          if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
            KeyGenerator keyGen;
            try {
             
              int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
                  ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                      MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
                  : SHUFFLE_KEY_LENGTH;
              keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
              keyGen.init(keyLen);
            } catch (NoSuchAlgorithmException e) {
              throw new IOException("Error generating shuffle secret key", e);
            }
            SecretKey shuffleKey = keyGen.generateKey();
            TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
                job.getCredentials());
          }
    
          // 复制而且配置相关文件
          copyAndConfigureFiles(job, submitJobDir);
    
          // 获取配置文件路径:job.xml
          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          
          // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          
          // 调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo,
          // 并获得计算得到的map任务数目maps
          int maps = writeSplits(job, submitJobDir);
          
          // 配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          
          LOG.info("number of splits:" + maps);
    
          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          
          // 获取作业队列名queue,取參数mapreduce.job.queuename,參数未配置默觉得default,
          // 之前的WordCount任务演示样例中,作业队列名queue就为default
          String queue = conf.get(MRJobConfig.QUEUE_NAME,
              JobConf.DEFAULT_QUEUE_NAME);
          
          // 获取队列的訪问权限控制列表AccessControlList实例acl,通过client通信协议ClientProtocol实例submitClient的getQueueAdmins()方法。传入队列名queue。
          // 实际上之前的WordCount任务演示样例中,这里获取的是*
          AccessControlList acl = submitClient.getQueueAdmins(queue);
          
          // 配置信息中设置队列參数mapred.queue.default.acl-administer-jobs
          // 之前的WordCount任务演示样例中。该參数被设置成为*
          conf.set(toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
    
          // removing jobtoken referrals before copying the jobconf to HDFS
          // as the tasks don't need this setting, actually they may break
          // because of it if present as the referral will point to a
          // different job.
          // 清空缓存的令牌
          TokenCache.cleanUpTokenReferral(conf);
    
          // 依据參数确定是否须要追踪令牌ID
          // 取參数mapreduce.job.token.tracking.ids.enabled,參数未配置默觉得false
          if (conf.getBoolean(
              MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
              MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
            
        	// 通过job获取令牌ID。并存储到trackingIds列表中
        	// Add HDFS tracking ids
            ArrayList<String> trackingIds = new ArrayList<String>();
            for (Token<? extends TokenIdentifier> t :
                job.getCredentials().getAllTokens()) {
              trackingIds.add(t.decodeIdentifier().getTrackingId());
            }
            
            // 将trackingIds列表中的内容设置到參数mapreduce.job.token.tracking.ids中
            conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
                trackingIds.toArray(new String[trackingIds.size()]));
          }
    
          // Set reservation info if it exists
          // 如有必要,设置存在的预订信息
          // 參数为mapreduce.job.reservation.id
          ReservationId reservationId = job.getReservationId();
          if (reservationId != null) {
            conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
          }
    
          // Write job file to submit dir
          // 调用writeConf()方法,写入作业配置信息至文件job.xml
          writeConf(conf, submitJobFile);
          
          //
          // Now, actually submit the job (using the submit name)
          // 调用printTokens()方法打印令牌信息到Log文件
          printTokens(jobId, job.getCredentials());
          
          // 通过client通信协议ClientProtocol实例submitClient的submitJob()方法提交作业,
          // 并获取作业状态JobStatus实例status
          // 由集群连接一文的分析我们能够知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象,
          // 终于调用的是二者的submitJob()方法。我们留待以后分析
          status = submitClient.submitJob(
              jobId, submitJobDir.toString(), job.getCredentials());
          
          // 假设作业状态JobStatus实例status不为null。直接返回,否则抛出无法载入作业的IO异常
          if (status != null) {
            return status;
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
        	
          // 终于。抛出无法载入作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法。
          // 删除作业提交的相关文件夹或文件submitJobDir
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (jtFs != null && submitJobDir != null)
              jtFs.delete(submitJobDir, true);
    
          }
        }
      }
            submitJobInternal()方法篇幅比較长,逻辑也非常复杂,本文先介绍下它的大体逻辑。兴许分文会介绍各个环节的具体内容,且以下涉及到的之前WordCount作业演示样例在《Hadoop2.6.0版本号MapReudce演示样例之WordCount(一)》及其姊妹篇中。敬请注意!submitJobInternal()方法大体逻辑例如以下:

            1、调用checkSpecs()方法,校验作业输出路径是否配置。且是否已存在:

                  正确的情况应该是已配置且未存在。输出路径配置參数为mapreduce.output.fileoutputformat.outputdir,之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output。

            2、从作业job中获取配置信息conf;

            3、调用addMRFrameworkToDistributedCache()方法加入应用框架路径到分布式缓存中;

            4、通过JobSubmissionFiles的getStagingDir()静态方法获取作业运行时阶段区域路径jobStagingArea:

                  取參数yarn.app.mapreduce.am.staging-dir,參数未配置默觉得/tmp/hadoop-yarn/staging,然后后面是/提交作业username/.staging,通过之前的WordCount任务的运行,我们查看历史记录,得知參数yarn.app.mapreduce.am.staging-dir配置的为/user,而提交作业username为hdfs,所以完整的路径应该为/user/hdfs/.staging;

            5、获取当前本机地址ip。

            6、确定提交作业的主机地址、主机名。并设置入配置信息conf,相应參数分别为mapreduce.job.submithostname、mapreduce.job.submithostaddress;

            7、生成作业ID。即JobID实例jobId:

                  通过client通信协议ClientProtocol实例submitClient的getNewJobID()方法生成作业ID。即JobID实例jobId;

            8、 将jobId设置入job;

            9、构造提交作业路径Path实例submitJobDir:

                   jobStagingArea后接/jobId。比方/job_1459913635503_0005,之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005;

            10、设置作业一些參数:

                     10.1、设置mapreduce.job.user.name为当前用户。之前的WordCount演示样例配置的为hdfs用户。

                     10.2、设置hadoop.http.filter.initializers为AmFilterInitializer;

                     10.3、设置mapreduce.job.dir为submitJobDir,比方/user/hdfs/.staging/job_1459913635503_0005。

            11、获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法;

            12、通过populateTokenCache()方法获取密钥和令牌。并将它们存储到令牌缓存TokenCache中;

            14、复制而且配置相关文件:通过copyAndConfigureFiles()方法实现;

            15、获取配置文件路径:job.xml;

            16、调用writeSplits()方法。写分片数据文件job.split和分片元数据文件job.splitmetainfo,并获得计算得到的map任务数目maps;

            17、配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps。

            18、获取作业队列名queue。取參数mapreduce.job.queuename。參数未配置默觉得default。之前的WordCount任务演示样例中。作业队列名queue就为default;

            19、获取队列的訪问权限控制列表AccessControlList实例acl:

                    通过client通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue。实际上之前的WordCount任务演示样例中,这里获取的是*。

            20、配置信息中设置队列參数mapred.queue.default.acl-administer-jobs,之前的WordCount任务演示样例中,该參数被设置成为*。

            21、清空缓存的令牌:通过TokenCache的cleanUpTokenReferral()方法实现;

            22、依据參数确定是否须要追踪令牌ID,假设须要的话:

                    取參数mapreduce.job.token.tracking.ids.enabled,參数未配置默觉得false。通过job获取令牌ID,并存储到trackingIds列表中。将trackingIds列表中的内容设置到參数mapreduce.job.token.tracking.ids中;

            23、如有必要。设置存在的预订信息:參数为mapreduce.job.reservation.id;

            24、调用writeConf()方法,写入作业配置信息至文件job.xml;

            25、调用printTokens()方法打印令牌信息到Log文件;

            26、通过client通信协议ClientProtocol实例submitClient的submitJob()方法提交作业。并获取作业状态JobStatus实例status:

                    由集群连接一文的分析我们能够知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象。终于调用的是二者的submitJob()方法。我们留待以后分析。

            27、假设作业状态JobStatus实例status不为null,直接返回,否则抛出无法载入作业的IO异常:

                    终于,抛出无法载入作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,删除作业提交的相关文件夹或文件submitJobDir。


            总体流程如上,对于关键步骤的主要细节。限于篇幅。敬请关注《MapReduce源代码分析之JobSubmitter(二)》!





  • 相关阅读:
    Socket通信
    浏览器调用打印机
    python dict操作
    python list操作
    python 模块Example链接
    python random模块
    python configparser模块
    python unittest模块
    python timeit模块
    python datetime模块
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/7278827.html
Copyright © 2011-2022 走看看