zoukankan      html  css  js  c++  java
  • hadoop运行原理之Job运行(一) JobTracker启动及初始化

      这部分的计划是这样的,首先解释JobTracker的启动过程和作业从JobClient提交到JobTracker上;然后分析TaskTracker和heartbeat;最后将整个流程debug一遍来加深映象。

      在看JobTracker源代码的时候就会发现,它里边有main()方法,这就说明了它是一个独立的java进程。在hadoop根目录下的bin文件夹中的hadoop脚本中可以看到,它指定了JobTracker类。如下图所示:

      JobTracker的main()方法中最主要的是以下两条语句:

     1 public static void main(String argv[]
     2                           ) throws IOException, InterruptedException {
     3     StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
     4     
     5     try {
     6       if(argv.length == 0) {
     7         JobTracker tracker = startTracker(new JobConf());//用来生成JobTracker对象
     8         tracker.offerService();//初始化JobTracker,并启动作业调度器
     9       }
    10       else {
    11         if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
    12           dumpConfiguration(new PrintWriter(System.out));
    13         }
    14         else {
    15           System.out.println("usage: JobTracker [-dumpConfiguration]");
    16           System.exit(-1);
    17         }
    18       }
    19     } catch (Throwable e) {
    20       LOG.fatal(StringUtils.stringifyException(e));
    21       System.exit(-1);
    22     }
    23   }

      startTracker()方法比较简单,通过几次方法调用最终生成JobTracker对象。下面重点分析offerService()方法。由于篇幅限制,只列出了最重要的部分:

     1 public void offerService() throws InterruptedException, IOException {
     2      ......
     3 
     4     // Initialize the JobTracker FileSystem within safemode
     5     setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
     6     initializeFilesystem();
     7     setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
     8     
     9     // Initialize JobTracker
    10     initialize();
    11     
    12      ......
    13     taskScheduler.start();

      首先进入安全模式下(SAFEMODE_ENTER),初始化文件系统,然后退出安全模式(SAFEMODE_LEAVE)。然后初始化JobTracker。最后启动作业调度器(TaskScheduler)。默认的作业调度器是JobQueueTaskScheduler,在mapred-default.xml中配置。所以taskScheduler.start()会调用JobQueueTaskScheduler的start()方法。如下所示:

      JobQueueTaskScheduler使用FIFO来对job进行调度。下面来进入到JobQueueTaskScheduler来分析start()方法。

    1 @Override
    2   public synchronized void start() throws IOException {
    3     super.start();
    4     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
    5     eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    6     eagerTaskInitializationListener.start();
    7     taskTrackerManager.addJobInProgressListener(
    8         eagerTaskInitializationListener);
    9   }

      这里用到了观察者模式,JobQueueTaskScheduler向JobTracker注册了两个JobInProgressListener:EagerTaskInitializationListener和JobQueueJobInProgressListener,分别用于作业初始化和作业排序。

      这里的taskTrackerManager实际上是JobTracker,因为JobTracker的父类就是TaskTrackerManager。在JobTracker的startTracker()方法中,将JobTracker实例传递给TaskTrackerManager。如下所示:  

    1 public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize) 
    2   throws IOException, InterruptedException {
    3     DefaultMetricsSystem.initialize("JobTracker");
    4     JobTracker result = null;
    5     while (true) {
    6       try {
    7         result = new JobTracker(conf, identifier);
    8         result.taskScheduler.setTaskTrackerManager(result);
    9        ......

      在eagerTaskInitializationListener.start()方法启动一个线程JobInitManager,这个线程用来监控jobInitQueue,即List<JobInProgress>。当有新的job(JobInProgress)加入到队列中时,JobInitManager线程就对其进行初始化。

     1 class JobInitManager implements Runnable {
     2    
     3     public void run() {
     4       JobInProgress job = null;
     5       while (true) {
     6         try {
     7           synchronized (jobInitQueue) {
     8             while (jobInitQueue.isEmpty()) {
     9               jobInitQueue.wait();
    10             }
    11             job = jobInitQueue.remove(0); //从队列中拿出一个job
    12           }
    13           threadPool.execute(new InitJob(job)); //对job进行初始化
    14         } catch (InterruptedException t) {
    15           LOG.info("JobInitManagerThread interrupted.");
    16           break;
    17         } 
    18       }
    19       LOG.info("Shutting down thread pool");
    20       threadPool.shutdownNow();
    21     }
    22   }
    23 
    24 class InitJob implements Runnable {
    25   
    26     private JobInProgress job;
    27     
    28     public InitJob(JobInProgress job) {
    29       this.job = job;
    30     }
    31     
    32     public void run() {
    33       ttm.initJob(job); //实质上调用JobTracker的initJob()方法进行初始化
    34     }
    35   }

      这里JobInitManager线程最终调用了JobTracker的initJob()方法来对job进行初始化。具体过程下篇文章中再写。

       最后画个流程图来总结一下,画的不好,将就看一下吧。

       本文基于hadoop1.2.1

       如有错误,还请指正

       参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/3998753.html 

  • 相关阅读:
    Java实现 LeetCode 34 在排序数组中查找元素的第一个和最后一个位置
    Java实现 LeetCode 34 在排序数组中查找元素的第一个和最后一个位置
    MFC的消息映射机制揭秘
    vc++窗口的创建过程(MFC消息机制的经典文章)
    映射窗口句柄对象
    评侯捷的<深入浅出MFC>和李久进的<MFC深入浅出>
    主函数 main WinMain _tmain _tWinMain 的区别
    深入分析MFC文档视图结构(项目实践)
    深入解析MFC -- 句柄与对象的关系
    深入浅出Win32多线程设计之MFC的多线程-线程与消息队列(经典)
  • 原文地址:https://www.cnblogs.com/gwgyk/p/3998753.html
Copyright © 2011-2022 走看看