zoukankan      html  css  js  c++  java
  • Hadoop2.x Yarn作业提交(客户端)

    转自:http://blog.csdn.net/lihm0_1/article/details/22186833

    YARN作业提交的客户端仍然使用RunJar类,和MR1一样,可参考 
    http://blog.csdn.net/lihm0_1/article/details/13629375
    在1.x中是向JobTracker提交,而在2.x中换成了ResourceManager,客户端的代理对象也有所变动,换成了YarnRunner,但大致流程和1类似,主要的流程集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等)最有执行作业提交。这里仍然以WordCount为例介绍提交流程.

    1. public static void main(String[] args) throws Exception {    
    2.     // 创建一个job    
    3.     Configuration conf = new Configuration();   
    4.     conf.set("mapreduce.job.queuename", "p1");  
    5.       
    6.     @SuppressWarnings("deprecation")  
    7.         Job job = new Job(conf, "WordCount");    
    8.     job.setJarByClass(WordCount.class);   
    9.     job.setJar("/root/wordcount-2.3.0.jar");  
    10.   
    11.   
    12.     // 设置输入输出类型    
    13.     job.setOutputKeyClass(Text.class);    
    14.     job.setOutputValueClass(IntWritable.class);    
    15.   
    16.   
    17.     // 设置map和reduce类    
    18.     job.setMapperClass(WordCountMapper.class);    
    19.     job.setReducerClass(WordCountReduce.class);    
    20.   
    21.   
    22.     // 设置输入输出流    
    23.     FileInputFormat.addInputPath(job, new Path("/tmp/a.txt"));    
    24.     FileOutputFormat.setOutputPath(job, new Path("/tmp/output"));    
    25.   
    26.   
    27.     job.waitForCompletion(true);//此处进入作业提交流程,然后循环监控作业状态  
    28. }    

    此处和1.x相同,提交作业,循环监控作业状态

    1. public boolean waitForCompletion(boolean verbose  
    2.                                  ) throws IOException, InterruptedException,  
    3.                                           ClassNotFoundException {  
    4.   if (state == JobState.DEFINE) {  
    5.     submit();//提交  
    6.   }  
    7.   if (verbose) {  
    8.     monitorAndPrintJob();  
    9.   } else {  
    10.     // get the completion poll interval from the client.  
    11.     int completionPollIntervalMillis =   
    12.       Job.getCompletionPollInterval(cluster.getConf());  
    13.     while (!isComplete()) {  
    14.       try {  
    15.         Thread.sleep(completionPollIntervalMillis);  
    16.       } catch (InterruptedException ie) {  
    17.       }  
    18.     }  
    19.   }  
    20.   return isSuccessful();  
    21. }  

    主要分析submit函数,来看作业是如何提交的,此处已经和1.x不同。但仍分为两个阶段1、连接master 2、作业提交

    1. public void submit()   
    2.        throws IOException, InterruptedException, ClassNotFoundException {  
    3.   ensureState(JobState.DEFINE);  
    4.   setUseNewAPI();  
    5.   //连接RM  
    6.   connect();  
    7.   final JobSubmitter submitter =   
    8.       getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    9.   status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
    10.     public JobStatus run() throws IOException, InterruptedException,   
    11.     ClassNotFoundException {  
    12.         //提交作业  
    13.       return submitter.submitJobInternal(Job.this, cluster);  
    14.     }  
    15.   });  
    16.   state = JobState.RUNNING;  
    17.   LOG.info("The url to track the job: " + getTrackingURL());  
    18. }  

    连接master时会建立Cluster实例,下面是Cluster构造函数,其中重点初始化部分

    1. public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)   
    2.     throws IOException {  
    3.   this.conf = conf;  
    4.   this.ugi = UserGroupInformation.getCurrentUser();  
    5.   initialize(jobTrackAddr, conf);  
    6. }  

    创建客户端代理阶段用到了java.util.ServiceLoader,目前2.3.0版本包含两个LocalClientProtocolProvider(本地作业) YarnClientProtocolProvider(Yarn作业),此处会根据mapreduce.framework.name的配置创建相应的客户端

    1. private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)  
    2.     throws IOException {  
    3.   
    4.   
    5.   synchronized (frameworkLoader) {  
    6.     for (ClientProtocolProvider provider : frameworkLoader) {  
    7.       LOG.debug("Trying ClientProtocolProvider : "  
    8.           + provider.getClass().getName());  
    9.       ClientProtocol clientProtocol = null;   
    10.       try {  
    11.         if (jobTrackAddr == null) {  
    12.             //创建YARNRunner对象  
    13.           clientProtocol = provider.create(conf);  
    14.         } else {  
    15.           clientProtocol = provider.create(jobTrackAddr, conf);  
    16.         }  
    17.                 //初始化Cluster内部成员变量  
    18.         if (clientProtocol != null) {  
    19.           clientProtocolProvider = provider;  
    20.           client = clientProtocol;  
    21.           LOG.debug("Picked " + provider.getClass().getName()  
    22.               + " as the ClientProtocolProvider");  
    23.           break;  
    24.         }  
    25.         else {  
    26.           LOG.debug("Cannot pick " + provider.getClass().getName()  
    27.               + " as the ClientProtocolProvider - returned null protocol");  
    28.         }  
    29.       }   
    30.       catch (Exception e) {  
    31.         LOG.info("Failed to use " + provider.getClass().getName()  
    32.             + " due to error: " + e.getMessage());  
    33.       }  
    34.     }  
    35.   }  
    36.     //异常处理,如果此处出现异常,则证明加载的jar包有问题,YarnRunner所处的jar不存在?  
    37.   if (null == clientProtocolProvider || null == client) {  
    38.     throw new IOException(  
    39.         "Cannot initialize Cluster. Please check your configuration for "  
    40.             + MRConfig.FRAMEWORK_NAME  
    41.             + " and the correspond server addresses.");  
    42.   }  
    43. }  

    provider创建代理实际是创建了一个YranRunner对象,因为我们这里提交的不是Local的,而是Yarn作业

    1. @Override  
    2. public ClientProtocol create(Configuration conf) throws IOException {  
    3.   if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {  
    4.     return new YARNRunner(conf);  
    5.   }  
    6.   return null;  
    7. }  

    创建客户端代理的流程如下:
    Cluster->ClientProtocol(YarnRunner)->ResourceMgrDelegate->client(YarnClientImpl)->rmClient(ApplicationClientProtocol)
    在YarnClientImpl的serviceStart阶段会创建RPC代理,注意其中的协议。

    1. protected void serviceStart() throws Exception {  
    2.   try {  
    3.     rmClient = ClientRMProxy.createRMProxy(getConfig(),  
    4.           ApplicationClientProtocol.class);  
    5.   } catch (IOException e) {  
    6.     throw new YarnRuntimeException(e);  
    7.   }  
    8.   super.serviceStart();  
    9. }  

    YarnRunner的构造函数如下:

    1. public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,  
    2.     ClientCache clientCache) {  
    3.   this.conf = conf;  
    4.   try {  
    5.     this.resMgrDelegate = resMgrDelegate;  
    6.     this.clientCache = clientCache;  
    7.     this.defaultFileContext = FileContext.getFileContext(this.conf);  
    8.   } catch (UnsupportedFileSystemException ufe) {  
    9.     throw new RuntimeException("Error in instantiating YarnClient", ufe);  
    10.   }  
    11. }  

    下面看最核心的提交部分JobSubmitter.submitJobInternal

    1. JobStatus submitJobInternal(Job job, Cluster cluster)   
    2. throws ClassNotFoundException, InterruptedException, IOException {  
    3.   
    4.   
    5.   //检测输出目录合法性,是否已存在,或未设置  
    6.   checkSpecs(job);  
    7.   
    8.   
    9.   Configuration conf = job.getConfiguration();  
    10.   addMRFrameworkToDistributedCache(conf);  
    11.     //获得登录区,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/root/.staging ,可通过yarn.app.mapreduce.am.staging-dir修改  
    12.   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);  
    13.   //主机名和地址设置  
    14.   InetAddress ip = InetAddress.getLocalHost();  
    15.   if (ip != null) {  
    16.     submitHostAddress = ip.getHostAddress();  
    17.     submitHostName = ip.getHostName();  
    18.     conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);  
    19.     conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);  
    20.   }  
    21.   //获取新的JobID,此处需要RPC调用  
    22.   JobID jobId = submitClient.getNewJobID();  
    23.   job.setJobID(jobId);  
    24.   //获取提交目录:/tmp/hadoop-yarn/staging/root/.staging/job_1395778831382_0002  
    25.   Path submitJobDir = new Path(jobStagingArea, jobId.toString());  
    26.   JobStatus status = null;  
    27.   try {  
    28.     conf.set(MRJobConfig.USER_NAME,  
    29.         UserGroupInformation.getCurrentUser().getShortUserName());  
    30.     conf.set("hadoop.http.filter.initializers",   
    31.         "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");  
    32.     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());  
    33.     LOG.debug("Configuring job " + jobId + " with " + submitJobDir   
    34.         + " as the submit dir");  
    35.     // get delegation token for the dir  
    36.     TokenCache.obtainTokensForNamenodes(job.getCredentials(),  
    37.         new Path[] { submitJobDir }, conf);  
    38.       
    39.     populateTokenCache(conf, job.getCredentials());  
    40.   
    41.   
    42.     // generate a secret to authenticate shuffle transfers  
    43.     if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {  
    44.       KeyGenerator keyGen;  
    45.       try {  
    46.         keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);  
    47.         keyGen.init(SHUFFLE_KEY_LENGTH);  
    48.       } catch (NoSuchAlgorithmException e) {  
    49.         throw new IOException("Error generating shuffle secret key", e);  
    50.       }  
    51.       SecretKey shuffleKey = keyGen.generateKey();  
    52.       TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),  
    53.           job.getCredentials());  
    54.     }  
    55.     //向集群中拷贝所需文件,下面会单独分析(1)  
    56.     copyAndConfigureFiles(job, submitJobDir);  
    57.     Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);  
    58.       
    59.     // 写分片文件job.split job.splitmetainfo,具体写入过程与MR1相同,可参考以前文章  
    60.     LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
    61.     int maps = writeSplits(job, submitJobDir);  
    62.     conf.setInt(MRJobConfig.NUM_MAPS, maps);  
    63.     LOG.info("number of splits:" + maps);  
    64.   
    65.   
    66.     // write "queue admins of the queue to which job is being submitted"  
    67.     // to job file.  
    68.     //设置队列名  
    69.     String queue = conf.get(MRJobConfig.QUEUE_NAME,  
    70.         JobConf.DEFAULT_QUEUE_NAME);  
    71.     AccessControlList acl = submitClient.getQueueAdmins(queue);  
    72.     conf.set(toFullPropertyName(queue,  
    73.         QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());  
    74.   
    75.   
    76.     // removing jobtoken referrals before copying the jobconf to HDFS  
    77.     // as the tasks don't need this setting, actually they may break  
    78.     // because of it if present as the referral will point to a  
    79.     // different job.  
    80.     TokenCache.cleanUpTokenReferral(conf);  
    81.   
    82.   
    83.     if (conf.getBoolean(  
    84.         MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,  
    85.         MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {  
    86.       // Add HDFS tracking ids  
    87.       ArrayList<String> trackingIds = new ArrayList<String>();  
    88.       for (Token<? extends TokenIdentifier> t :  
    89.           job.getCredentials().getAllTokens()) {  
    90.         trackingIds.add(t.decodeIdentifier().getTrackingId());  
    91.       }  
    92.       conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,  
    93.           trackingIds.toArray(new String[trackingIds.size()]));  
    94.     }  
    95.   
    96.   
    97.     // Write job file to submit dir  
    98.     //写入job.xml  
    99.     writeConf(conf, submitJobFile);  
    100.       
    101.     //  
    102.     // Now, actually submit the job (using the submit name)  
    103.     //这里才开始真正提交,见下面分析(2)  
    104.     printTokens(jobId, job.getCredentials());  
    105.     status = submitClient.submitJob(  
    106.         jobId, submitJobDir.toString(), job.getCredentials());  
    107.     if (status != null) {  
    108.       return status;  
    109.     } else {  
    110.       throw new IOException("Could not launch job");  
    111.     }  
    112.   } finally {  
    113.     if (status == null) {  
    114.       LOG.info("Cleaning up the staging area " + submitJobDir);  
    115.       if (jtFs != null && submitJobDir != null)  
    116.         jtFs.delete(submitJobDir, true);  
    117.   
    118.   
    119.     }  
    120.   }  
    121. }  

    (1)文件拷贝过程如下,默认副本数为10

    1. private void copyAndConfigureFiles(Job job, Path jobSubmitDir)   
    2. throws IOException {  
    3.   Configuration conf = job.getConfiguration();  
    4.   short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);  
    5.   //开始拷贝  
    6.   copyAndConfigureFiles(job, jobSubmitDir, replication);  
    7.   
    8.   
    9.   // Set the working directory  
    10.   if (job.getWorkingDirectory() == null) {  
    11.     job.setWorkingDirectory(jtFs.getWorkingDirectory());            
    12.   }  
    13.   
    14.   
    15. }  

    下面是具体文件拷贝过程,注释里写的也比较清楚了

    1. // configures -files, -libjars and -archives.  
    2. private void copyAndConfigureFiles(Job job, Path submitJobDir,  
    3.     short replication) throws IOException {  
    4.   Configuration conf = job.getConfiguration();  
    5.   if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {  
    6.     LOG.warn("Hadoop command-line option parsing not performed. " +  
    7.              "Implement the Tool interface and execute your application " +  
    8.              "with ToolRunner to remedy this.");  
    9.   }  
    10.   
    11.   
    12.   // get all the command line arguments passed in by the user conf  
    13.   String files = conf.get("tmpfiles");  
    14.   String libjars = conf.get("tmpjars");  
    15.   String archives = conf.get("tmparchives");  
    16.   String jobJar = job.getJar();  
    17.   
    18.   
    19.   //  
    20.   // Figure out what fs the JobTracker is using.  Copy the  
    21.   // job to it, under a temporary name.  This allows DFS to work,  
    22.   // and under the local fs also provides UNIX-like object loading   
    23.   // semantics.  (that is, if the job file is deleted right after  
    24.   // submission, we can still run the submission to completion)  
    25.   //  
    26.   
    27.   
    28.   // Create a number of filenames in the JobTracker's fs namespace  
    29.   LOG.debug("default FileSystem: " + jtFs.getUri());  
    30.   if (jtFs.exists(submitJobDir)) {  
    31.     throw new IOException("Not submitting job. Job directory " + submitJobDir  
    32.         +" already exists!! This is unexpected.Please check what's there in" +  
    33.         " that directory");  
    34.   }  
    35.   submitJobDir = jtFs.makeQualified(submitJobDir);  
    36.   submitJobDir = new Path(submitJobDir.toUri().getPath());  
    37.   FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);  
    38.   //创建工作目录  
    39.   FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);  
    40.   Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);  
    41.   Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);  
    42.   Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);  
    43.   // add all the command line files/ jars and archive  
    44.   // first copy them to jobtrackers filesystem   
    45.   //建立上述所需目录    
    46.   if (files != null) {  
    47.     FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);  
    48.     String[] fileArr = files.split(",");  
    49.     for (String tmpFile: fileArr) {  
    50.       URI tmpURI = null;  
    51.       try {  
    52.         tmpURI = new URI(tmpFile);  
    53.       } catch (URISyntaxException e) {  
    54.         throw new IllegalArgumentException(e);  
    55.       }  
    56.       Path tmp = new Path(tmpURI);  
    57.       Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);  
    58.       try {  
    59.         URI pathURI = getPathURI(newPath, tmpURI.getFragment());  
    60.         DistributedCache.addCacheFile(pathURI, conf);  
    61.       } catch(URISyntaxException ue) {  
    62.         //should not throw a uri exception   
    63.         throw new IOException("Failed to create uri for " + tmpFile, ue);  
    64.       }  
    65.     }  
    66.   }  
    67.       
    68.   if (libjars != null) {  
    69.     FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);  
    70.     String[] libjarsArr = libjars.split(",");  
    71.     for (String tmpjars: libjarsArr) {  
    72.       Path tmp = new Path(tmpjars);  
    73.       Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);  
    74.       DistributedCache.addFileToClassPath(  
    75.           new Path(newPath.toUri().getPath()), conf);  
    76.     }  
    77.   }  
    78.       
    79.   if (archives != null) {  
    80.     FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);   
    81.     String[] archivesArr = archives.split(",");  
    82.     for (String tmpArchives: archivesArr) {  
    83.       URI tmpURI;  
    84.       try {  
    85.         tmpURI = new URI(tmpArchives);  
    86.       } catch (URISyntaxException e) {  
    87.         throw new IllegalArgumentException(e);  
    88.       }  
    89.       Path tmp = new Path(tmpURI);  
    90.       Path newPath = copyRemoteFiles(archivesDir, tmp, conf,  
    91.         replication);  
    92.       try {  
    93.         URI pathURI = getPathURI(newPath, tmpURI.getFragment());  
    94.         DistributedCache.addCacheArchive(pathURI, conf);  
    95.       } catch(URISyntaxException ue) {  
    96.         //should not throw an uri excpetion  
    97.         throw new IOException("Failed to create uri for " + tmpArchives, ue);  
    98.       }  
    99.     }  
    100.   }  
    101.   
    102.   
    103.   if (jobJar != null) {   // copy jar to JobTracker's fs  
    104.     // use jar name if job is not named.   
    105.     if ("".equals(job.getJobName())){  
    106.       job.setJobName(new Path(jobJar).getName());  
    107.     }  
    108.     Path jobJarPath = new Path(jobJar);  
    109.     URI jobJarURI = jobJarPath.toUri();  
    110.     // If the job jar is already in fs, we don't need to copy it from local fs  
    111.     if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null  
    112.             || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme())   
    113.                 && jobJarURI.getAuthority().equals(  
    114.                                           jtFs.getUri().getAuthority()))) {  
    115.       //拷贝wordcount.jar,注意拷贝过去后会重命名为job.jar,副本数为10  
    116.       copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),   
    117.           replication);  
    118.       job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());  
    119.     }  
    120.   } else {  
    121.     LOG.warn("No job jar file set.  User classes may not be found. "+  
    122.     "See Job or Job#setJar(String).");  
    123.   }  
    124.   
    125.   
    126.   //  set the timestamps of the archives and files  
    127.   //  set the public/private visibility of the archives and files  
    128.   ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);  
    129.   // get DelegationToken for each cached file  
    130.   ClientDistributedCacheManager.getDelegationTokens(conf, job  
    131.       .getCredentials());  
    132. }  

    (2)真正的作业提交部分

      1. @Override  
      2. public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)  
      3. throws IOException, InterruptedException {  
      4.     
      5.   addHistoryToken(ts);  
      6.     
      7.   // Construct necessary information to start the MR AM  
      8.   ApplicationSubmissionContext appContext =  
      9.     createApplicationSubmissionContext(conf, jobSubmitDir, ts);  
      10.   
      11.   
      12.   // Submit to ResourceManager  
      13.   try {  
      14.     ApplicationId applicationId =  
      15.         resMgrDelegate.submitApplication(appContext);  
      16.   
      17.   
      18.     ApplicationReport appMaster = resMgrDelegate  
      19.         .getApplicationReport(applicationId);  
      20.     String diagnostics =  
      21.         (appMaster == null ?  
      22.             "application report is null" : appMaster.getDiagnostics());  
      23.     if (appMaster == null  
      24.         || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED  
      25.         || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {  
      26.       throw new IOException("Failed to run job : " +  
      27.           diagnostics);  
      28.     }  
      29.     return clientCache.getClient(jobId).getJobStatus(jobId);  
      30.   } catch (YarnException e) {  
      31.     throw new IOException(e);  
      32.   }  
      33. }  
     
  • 相关阅读:
    无锁数据结构(Lock-Free Data Structures)
    Grouping Sets:CUBE和ROLLUP从句
    SQL Server里Grouping Sets的威力
    第18/24周 乐观并发控制(Optimistic Concurrency)
    SQL Server里PIVOT运算符的”红颜祸水“
    数据库收缩:NOTRUNCATE与TRUNCATEONLY
    在SQL Server里为什么我们需要更新锁
    SQL Server里的自旋锁介绍
    SQL Server里的闩锁介绍
    配置内存中OLTP文件组提高性能
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5030940.html
Copyright © 2011-2022 走看看