zoukankan      html  css  js  c++  java
  • MapReduce job在JobTracker初始化源码级分析

      mapreduce job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业。而这个方法的核心提交方法是JobTracker.addJob(JobID jobId, JobInProgress job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListener(本文只讨论默认调度器)中,使用方法jobAdded(JobInProgress job),JobQueueJobInProgressListener任务是监控各个JobInProcess生命周期中的变化;EagerTaskInitializationListener是发现有新Job后对其初始化的。

      一、JobQueueJobInProgressListener.jobAdded(JobInProgress job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()), job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。 

      二、EagerTaskInitializationListener.jobAdded(JobInProgress job)方法。  

     1 /**
     2    * We add the JIP to the jobInitQueue, which is processed 
     3    * asynchronously to handle split-computation and build up
     4    * the right TaskTracker/Block mapping.
     5    */
     6   @Override
     7   public void jobAdded(JobInProgress job) {
     8     synchronized (jobInitQueue) {
     9       jobInitQueue.add(job);  //添加进List<JobInProgress> jobInitQueue
    10       resortInitQueue();
    11       jobInitQueue.notifyAll();  //唤醒阻塞的进程
    12     }
    13 
    14   }

      上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:

    1 public void start() throws IOException {
    2     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
    3     jobInitManagerThread.setDaemon(true);
    4     this.jobInitManagerThread.start();
    5   }

      start()方法会启动一个线程:JobInitManager。

     1 /////////////////////////////////////////////////////////////////
     2   //  Used to init new jobs that have just been created
     3   /////////////////////////////////////////////////////////////////
     4   class JobInitManager implements Runnable {
     5    
     6     public void run() {
     7       JobInProgress job = null;
     8       while (true) {
     9         try {
    10           synchronized (jobInitQueue) {
    11             while (jobInitQueue.isEmpty()) {
    12               jobInitQueue.wait();
    13             }
    14             job = jobInitQueue.remove(0);
    15           }
    16           threadPool.execute(new InitJob(job));
    17         } catch (InterruptedException t) {
    18           LOG.info("JobInitManagerThread interrupted.");
    19           break;
    20         } 
    21       }
    22       LOG.info("Shutting down thread pool");
    23       threadPool.shutdownNow();
    24     }
    25   }
    26   
    27   class InitJob implements Runnable {
    28   
    29     private JobInProgress job;
    30     
    31     public InitJob(JobInProgress job) {
    32       this.job = job;
    33     }
    34     
    35     public void run() {
    36       ttm.initJob(job);//对应JobTracker的对应方法
    37     }
    38   }

      JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:

     1   public void initJob(JobInProgress job) {
     2     if (null == job) {
     3       LOG.info("Init on null job is not valid");
     4       return;
     5     }
     6             
     7     try {
     8       JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     9       LOG.info("Initializing " + job.getJobID());
    10       job.initTasks();    //调用该实例的initTasks方 法,对job进行初始化
    11       // Inform the listeners if the job state has changed
    12       // Note : that the job will be in PREP state.
    13       JobStatus newStatus = (JobStatus)job.getStatus().clone();
    14       if (prevStatus.getRunState() != newStatus.getRunState()) {
    15         JobStatusChangeEvent event = 
    16           new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
    17               newStatus);
    18         synchronized (JobTracker.this) {
    19           updateJobInProgressListeners(event);
    20         }
    21       }
    22     } catch (KillInterruptedException kie) {
    23       //   If job was killed during initialization, job state will be KILLED
    24       LOG.error("Job initialization interrupted:
    " +
    25           StringUtils.stringifyException(kie));
    26       killJob(job);
    27     } catch (Throwable t) {
    28       String failureInfo = 
    29         "Job initialization failed:
    " + StringUtils.stringifyException(t);
    30       // If the job initialization is failed, job state will be FAILED
    31       LOG.error(failureInfo);
    32       job.getStatus().setFailureInfo(failureInfo);
    33       failJob(job);
    34     }
    35      }

      首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;

      job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits = createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks == splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,

  • 相关阅读:
    java回调函数这样说,应该明确了吧!
    Aamazon Web Service EC2 Ubuntu 新建用户而且用ssh连接host
    Html animation by css(Sequence Js Tutorial)
    shell脚本一键安装mysql5.7.x
    HDU 4544 湫湫系列故事――消灭兔子
    简明python教程 --C++程序员的视角(八):标准库
    简明python教程 --C++程序员的视角(七):异常
    简明python教程 --C++程序员的视角(六):输入输出IO
    简明python教程 --C++程序员的视角(五):面向对象的编程
    简明python教程 --C++程序员的视角(四):容器类型(字符串、元组、列表、字典)和参考
  • 原文地址:https://www.cnblogs.com/lxf20061900/p/3737263.html
Copyright © 2011-2022 走看看