zoukankan      html  css  js  c++  java
  • TaskTracker执行map或reduce任务的过程2

    TaskTracker执行map或reduce任务的过程(二)

    上次说到,当MapLauncher或ReduceLancher(用于执行任务的线程,它们扩展自TaskLauncher),从它们所维护的LinkedList也即队列中获取到TaskInProgress,并且TaskTracker有空闲的slot时,该线程就调用了TaskTracker的startNewTask(tip)方法,如下所示: 

    复制代码
     public void run() {
          while (!Thread.interrupted()) {
            try {
              TaskInProgress tip;
              Task task;
              synchronized (tasksToLaunch) {
                while (tasksToLaunch.isEmpty()) {
                  tasksToLaunch.wait();//当队列为空时呗阻塞,知道有新的tip到来才会被唤醒
                }
                //get the TIP
                tip = tasksToLaunch.remove(0);
                task = tip.getTask();
          ......//当有空闲的slot时执行启动一个任务
              startNewTask(tip);
          ......
          }
        }
    复制代码

      接下了来就让我们看下startNewTask(tip)的神秘面纱吧,由于在其内部通过实习Runnable创建了一个线程,我们只需分析线程体的run方法即可,关键代码如下,为便于说明,给3个核心语句分别标识为**1,**2:

    复制代码
    public void run() {
            try {
              RunningJob rjob = localizeJob(tip);        //**1
              tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); 
              // task本地化已经完成,此刻如果rjob.jobConf或者rjob.ugi为空的话,会抛出异常
          launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2
    ......

    } }
    复制代码

      **1的源码如下,

        Task t = tip.getTask();
        JobID jobId = t.getJobID();
        RunningJob rjob = addTaskToJob(jobId, tip);
        InetSocketAddress ttAddr = getTaskTrackerReportAddress();

      从中我们可以看出,首先创建了一个该任务所属的RunningJob,并把它放入到一个该TaskTracker所维护的TreeMap<jobId,RunningJob>中,同时在RunningJob中记录将要执行的task,也即把tip放入到RunningJob.tasks(一个HashSet<TaskInProgress>)中。由此,我们可以知道,每个TaskTracker都维护者一个TreeMap用以记录它正在执行的哪个作业的哪些任务(map、reduce任务)。

      接下来localizeJob(tip)要做的就是调用initializeJob(t, rjob, ttAddr)初始化工作目录,并下载相应的job.xml以及job.jar(TaskController负责)文件,TaskController最后调用RunJar.unJar()将包解压到相应的工作目录,,至此初始化工作完成,调用launchTaskForJob开始执行Task。

      **2的核心代码为:

    复制代码
     protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException {
        synchronized (tip) {
          jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
                      localStorage.getDirsString());
          tip.setJobConf(jobConf);
          tip.setUGI(rjob.ugi);
          tip.launchTask(rjob);
        }
      }
    复制代码

      由此看出,它主要是调用TaskTracker.TaskInProgress的launchTask()方法,在该方法中它创建了一个TaskRunner线程,并启这个线程执行这个task,其run方法核心代码如下:

    复制代码
    public final void run() {
        //设置工作目录
    final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(), t.isTaskCleanupTask())).toString());
    ......

    // 设置环境变量 List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager); .......

        //启动Task子进程 launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir); } }
    复制代码

      未完待续...... 

     
     
     
    标签: Hadoop
  • 相关阅读:
    完全二分图生成树计数
    [luogu 1880]石子合并
    [vijos 1770]大内密探
    母函数入门笔记(施工中…
    【补】20160816训练记录
    20160819训练记录
    20160817训练记录
    POJ 2228 naptime
    POJ 3585 Accumulation Degree
    POJ 2182 Lost Cows
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3289210.html
Copyright © 2011-2022 走看看