zoukankan      html  css  js  c++  java
  • 【hadoop代码笔记】Hadoop作业提交中EagerTaskInitializationListener的作用

    在整理FairScheduler实现的task调度逻辑时,注意到EagerTaskInitializationListener类。差不多应该是job提交相关的逻辑代码中最简单清楚的一个了。

    todo:标红文字表示要加前向链接,待相关文字草稿提交后。

    一、概述

    继承自JobInProgressListener,实现了jobAdded,jobRemoved,jobUpdated方法。哦,不能说实现,应该说继承,JobInProgressListener居然是个抽象类,看着怎么这样的listener也应该是个interface。

    在该listener被注册后,就响应jobAdded,jobRemoved,jobUpdated动作。在EagerTaskInitializationListener中,响应这三种动作来维护内部的一个job列表(List<JobInProgress> jobInitQueue),并启动线程对job列表中的job异步的进行初始化。

    二、主要代码逻辑

    1. 在job被添加到JobTracker时,注册的Lister会响应该方法。即当有作业提交到JobTracker时,该方法会把JIP加到jobInitQueue列表中,并且根据作业优先级和启动时间来调整其顺序。
    2. jobInitManagerThread会一直产看jobInitManagerThread列表中的job,逐一取出来初始化其task。

    三、主要成员

    1   private JobInitManager jobInitManager = new JobInitManager(); //一个job初始化线程,关注job队列jobInitQueue,取出进行初始化
    2   private Thread jobInitManagerThread; // JobInitManager线程
    3   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>(); //响应lister的几种方法,维护的job队列
    4   private ExecutorService threadPool; //一个线程池,里面的一个线程取一个job进行初始化
    5   private int numThreads; //线程池的线程数,可配置

    四、主要方法

         1. EagerTaskInitializationListener的jobAdded方法 :

    首先关注的代码片段是该listener的jobAdded方法,前面说过,在FairScheduler的start方法中(taskTrackerManager.addJobInProgressListener(eagerInitListener))会把EagerTaskInitializationListener注册到JobTracker,在jobTracker中加入job的时候(addJob被调用),触发其上所有的jobListener的jobAdded方法。

           在EagerTaskInitializationListener中,jobAdded只是简单的把job加入到一个List<JobInProgress>类型的 jobInitQueue中。并不直接对其进行初始化,对其中的job的处理由另外线程来做。

      

    @Override
      public void jobAdded(JobInProgress job) {
        synchronized (jobInitQueue) {
          jobInitQueue.add(job);
          resortInitQueue();
          jobInitQueue.notifyAll();
        }
    
      }

        2. JobInitManager类:

    一个线程,对jobInitQueue上保存的每个Job启动一个线程来执行初始化工作。在其run方法中会一直检查jobInitQueue是否有作业,有则拿出来从线程池中取一个线程处理。

    class JobInitManager implements Runnable {
       
        public void run() {
          JobInProgress job = null;
          while (true) {
            try {
              synchronized (jobInitQueue) {
                while (jobInitQueue.isEmpty()) {
                  jobInitQueue.wait();
                }
                job = jobInitQueue.remove(0);
              }
              threadPool.execute(new InitJob(job));
            } catch (InterruptedException t) {
              LOG.info("JobInitManagerThread interrupted.");
              break;
            } 
          }
          LOG.info("Shutting down thread pool");
          threadPool.shutdownNow();
        }
      }
    JobInitManager

       3. InitJob

     一个线程类定义,真正处理每一个job的初始化。其实调用的是job的初始化方法(JobInProgress initTasks

    static class InitJob implements Runnable {
        private JobInProgress job;
        public InitJob(JobInProgress job) {
          this.job = job;
        }
        
        public void run() 
       {
          job.initTasks();            
        }
      }

     完。

  • 相关阅读:
    人生转折点:弃文从理
    人生第一站:大三暑假实习僧
    监听器启动顺序和java常见注解
    java常识和好玩的注释
    182. Duplicate Emails (Easy)
    181. Employees Earning More Than Their Managers (Easy)
    180. Consecutive Numbers (Medium)
    178. Rank Scores (Medium)
    177. Nth Highest Salary (Medium)
    176. Second Highest Salary(Easy)
  • 原文地址:https://www.cnblogs.com/douba/p/EagerTaskInitializationListener.html
Copyright © 2011-2022 走看看