zoukankan      html  css  js  c++  java
  • hadoop运行原理之Job运行(二) Job提交及初始化

      本篇主要介绍Job从客户端提交到JobTracker及其被初始化的过程。

      以WordCount为例,以前的程序都是通过JobClient.runJob()方法来提交Job,但是现在大多用Job.waitForCompletion(true)方法来提交(true表示打印出运行过程),但其本质都是一样的,最终都是通过JobClient的submitJobInternal()方法来提交Job。 

     1 public 
     2   RunningJob submitJobInternal(final JobConf job
     3                                ) throws FileNotFoundException, 
     4                                         ClassNotFoundException,
     5                                         InterruptedException,
     6                                         IOException {
     7           ......
     8           //为job获取id
     9         JobID jobId = jobSubmitClient.getNewJobId();
    10         Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    11         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    12       
    13         ......
    14         
    15         printTokens(jobId, jobCopy.getCredentials());
    16           status = jobSubmitClient.submitJob(
    17               jobId, submitJobDir.toString(), jobCopy.getCredentials());
    18         ......
    19   }

       submitJobInternal()方法主要完成这么几个工作:得到授权令牌;检查输出目录是否已存在;创建分片;将运行作业所需的资源复制到JobTracker的文件系统中。最后调用JobSubmissionProtocol的submitJob()方法。JobTracker继承了JobSubmissionProtocol接口,所以会转到去调用JobTracker的submitJob()方法。

      这里插一句,JobSubmissionProtocol接口有两个默认的子类实现:JobTracker和LocalJobRunner。如果使用的是hadoop的默认配置,在mapred-site.xml文件中{mapred.job.tracker}的值为“local”,此时JobSubmissionProtocol的实现使用LocalJobRunner,即使用的是本地文件系统。否则的话使用HDFS。这也是为什么我们在mapred-site.xml文件要配置{mapred.job.tracker}的原因。具体使用哪个JobSubmissionProtocol是在JobClient初始化的时候决定的。从下面JobClient的init()方法代码可以清晰的看到:

     1 public void init(JobConf conf) throws IOException {
     2     String tracker = conf.get("mapred.job.tracker", "local");
     3     tasklogtimeout = conf.getInt(
     4       TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
     5     this.ugi = UserGroupInformation.getCurrentUser();
     6     if ("local".equals(tracker)) {
     7       conf.setNumMapTasks(1);
     8       this.jobSubmitClient = new LocalJobRunner(conf);
     9     } else {
    10       this.rpcJobSubmitClient = 
    11           createRPCProxy(JobTracker.getAddress(conf), conf);
    12       this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
    13     }        
    14   }

      

      接着上面来说。看看JobTracker的submit()方法。

     1  JobStatus submitJob(JobID jobId, String jobSubmitDir,
     2       UserGroupInformation ugi, Credentials ts, boolean recovered)
     3       throws IOException {
     4     // Check for safe-mode
     5     checkSafeMode();
     6     ......
     7     JobInProgress job = null;    
     8    
     9     // Submit the job
    10       JobStatus status;
    11       try {
    12         status = addJob(jobId, job);
    13       } catch (IOException ioe) {
    14         LOG.info("Job " + jobId + " submission failed!", ioe);
    15         status = job.getStatus();
    16         status.setFailureInfo(StringUtils.stringifyException(ioe));
    17         failJob(job);
    18         throw ioe;
    19       }
    20       return status;
    21     }
    22   }

      首先检查系统是否处于安全模式。接着会创建JobInProgress对象,这个对象用来维护了Job运行的相关信息。然后来检查用户的队列权限,并检查内存的使用情况。最终调用addJob()方法来提交job。

    1     synchronized (jobs) {
    2       synchronized (taskScheduler) {
    3         jobs.put(job.getProfile().getJobID(), job);
    4         for (JobInProgressListener listener : jobInProgressListeners) {
    5           listener.jobAdded(job);
    6         }
    7       }
    8     }

      这里用到了观察者模式,jobInProgressListeners是一个List<JobInProgressListener>,代表所有已注册的监听器(观察者)。listener.jobAdded(job);这行语句则分别调用所有已注册listener的jobAdded()方法。从上一篇文章中我们知道,最主要的listener就是EagerTaskInitializationListener和JobQueueJobInProgressListener。

      JobQueueJobInProgressListener的jobAdded()方法比较简单,只有一句话,就是先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。

      下面是EagerTaskInitializationListener的jobAdded()方法:

    1   @Override
    2   public void jobAdded(JobInProgress job) {
    3     synchronized (jobInitQueue) {
    4       jobInitQueue.add(job);
    5       resortInitQueue();
    6       jobInitQueue.notifyAll();
    7     }
    8   }

       这个方法首先将job(JobInProgress)添加到初始化队列中;然后按优先级对队列中的JobInProcess进行排序。上篇文件中介绍了,在EagerTaskInitializationListener中监听到有新的job(JobInProgress)添加到队列中时,则会对其进行初始化工作。最终是调用了JobTracker的initJob()方法来对job进行初始化,这部分过程在下一篇文章再写吧。

      最后画个流程图来总结一下,画的不好,将就看一下吧。

      本文基于hadoop1.2.1

      如有错误,还请指正

      参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/3999128.html 

  • 相关阅读:
    67 个拯救前端开发者的工具、库和资源
    js常用的工具函数
    npm快捷键
    给bootstrap table设置行列单元格样式
    script标签中type为"text/x-template"或"text/html"
    【转载】Ogre3d 2.1 源码编译安装教程
    【转载】OGRE 2.1 Windows 编译
    【转载】DXUT11框架浅析(4)--调试相关
    【转载】GitHub详解
    【转载】3D/2D中的D3DXMatrixPerspectiveFovLH和D3DXMatrixOrthoLH投影函数详解
  • 原文地址:https://www.cnblogs.com/gwgyk/p/3999128.html
Copyright © 2011-2022 走看看