zoukankan      html  css  js  c++  java
  • 【Hadoop代码笔记】Hadoop作业提交之TaskTracker 启动task

    一、概要描述

    上篇博文描述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法后,TaskTracker怎么来处理这些Task。

    实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List<TaskInProgress> tasksToLaunch。
    TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程,在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

    本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

    二、 流程描述  

    1.tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
    2.TaskLauncher内部维护了一个List<TaskInProgress> tasksToLaunch,只是把task加入到该集合中
    3.taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
    4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
    5.TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
    6.在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
    7.JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用。
    8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用。
    9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。

    三、代码详细

    1. TaskTracker的 addToTaskQueue方法。

    接上文的最后一个方法的在heartbeat中把根据jobtracker的指令把需要launch的task调用addToTaskQueue方法加入task queue。

    //根据task的类型不同加入到不同的launcher中。
    private void addToTaskQueue(LaunchTaskAction action) {
       
    if (action.getTask().isMapTask()) {
          mapLauncher.addToTaskQueue(action);
        }
    else {
          reduceLauncher.addToTaskQueue(action);
        }
      }

    2. TaskLauncher 的addToTaskQueue方法,即把要launch的task加入到TaskLauncher内维护的一个列表List<TaskInProgress> tasksToLaunch;中。

    public void addToTaskQueue(LaunchTaskAction action) {
          synchronized (tasksToLaunch) {
            TaskInProgress tip = registerTask(action, this);
            tasksToLaunch.add(tip);
            tasksToLaunch.notifyAll();
          }
    }

    3. TaskLauncher线程的run方法。TaskLauncher是一个线程。一直检查task列表中有数据,取出一个来执行。

    public void run() {    
              TaskInProgress tip;
              synchronized (tasksToLaunch) {
                while (tasksToLaunch.isEmpty()) {
                  tasksToLaunch.wait();
                }
                //get the TIP
                tip = tasksToLaunch.remove(0);
               //wait for a slot to run
              synchronized (numFreeSlots) {
                while (numFreeSlots.get() == 0) {
                  numFreeSlots.wait();
                }
                LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
                    " and trying to launch "+tip.getTask().getTaskID());
                numFreeSlots.set(numFreeSlots.get() - 1);
                assert (numFreeSlots.get() >= 0);
              }
           
              //got a free slot. launch the task
              startNewTask(tip);
                  return; // ALL DONE
                 }
        }
      }

    4. TaskTracker的startNewTask 启动一个新task。该方法的主要代码就一句。

     localizeJob(tip);

    5. TaskTracker的localizeJob方法。 初始化job的目录 

    private void localizeJob(TaskInProgress tip) throws IOException {
            Path localJarFile = null;
            Task t = tip.getTask();
            JobID jobId = t.getJobID();
            Path jobFile = new Path(t.getJobFile());
            FileStatus status = null;
            long jobFileSize = -1;
            status = systemFS.getFileStatus(jobFile);
            jobFileSize = status.getLen();
    
            Path localJobFile = lDirAlloc.getLocalPathForWrite(
                    getLocalJobDir(jobId.toString())
                    + Path.SEPARATOR + "job.xml",
                    jobFileSize, fConf);
            RunningJob rjob = addTaskToJob(jobId, tip);
            synchronized (rjob) {
                if (!rjob.localized) {
    
                    FileSystem localFs = FileSystem.getLocal(fConf);
    
                    systemFS.copyToLocalFile(jobFile, localJobFile);
                    JobConf localJobConf = new JobConf(localJobFile);
                    Path workDir = lDirAlloc.getLocalPathForWrite(
                            (getLocalJobDir(jobId.toString())
                                    + Path.SEPARATOR + "work"), fConf);
    
                    System.setProperty("job.local.dir", workDir.toString());
                    localJobConf.set("job.local.dir", workDir.toString());
    
    
                    //把job的jar文件拷贝到本地文件系统并且解压。
                    String jarFile = localJobConf.getJar();
    
                    Path jarFilePath = new Path(jarFile);
                    status = systemFS.getFileStatus(jarFilePath);
                    jarFileSize = status.getLen();
    
                    //保证释放的目录容量有5倍的jar文件大小
                    localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
                            getLocalJobDir(jobId.toString())
                            + Path.SEPARATOR + "jars",
                            5 * jarFileSize, fConf), "job.jar");
    
                    //把jar文件拷贝到本地
                    systemFS.copyToLocalFile(jarFilePath, localJarFile);
                    localJobConf.setJar(localJarFile.toString());
                    OutputStream out = localFs.create(localJobFile);
                    localJobConf.writeXml(out);
    
                    // also unjar the job.jar files 
                    RunJar.unJar(new File(localJarFile.toString()),
                            new File(localJarFile.getParent().toString()));
                }
                rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                        localJobConf.getKeepFailedTaskFiles());
                rjob.localized = true;
                rjob.jobConf = localJobConf;
            }
        }
        launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
    }

    6. TaskTracker的addTaskToJob方法。只是把job和task的关系加入到runningJobs中。

    private RunningJob addTaskToJob(JobID jobId, 
                                      TaskInProgress tip) {
        synchronized (runningJobs) {
          RunningJob rJob = null;
          if (!runningJobs.containsKey(jobId)) {
            rJob = new RunningJob(jobId);
            rJob.localized = false;
            rJob.tasks = new HashSet<TaskInProgress>();
            runningJobs.put(jobId, rJob);
          } else {
            rJob = runningJobs.get(jobId);
          }
          synchronized (rJob) {
            rJob.tasks.add(tip);
          }
          runningJobs.notify(); //notify the fetcher thread
          return rJob;
        }
      }

    7. TaskTracker的launchTaskForJob方法。调用TaskInprogress的launchTask方法。

    private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) {
        synchronized (tip) {
          tip.setJobConf(jobConf);
          tip.launchTask();
        }
      }

    8. TaskIProgress的 launchTask方法。

        public synchronized void launchTask() throws IOException {
            if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
                    this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
                    this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
                localizeTask(task);
                if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
                    this.taskStatus.setRunState(TaskStatus.State.RUNNING);
                }
                //创建一个Runner来运行。
                this.runner = task.createRunner(TaskTracker.this, this);
                this.runner.start();
                this.taskStatus.setStartTime(System.currentTimeMillis());
            }

    9.TaskinProgress的localizeTask方法。把Task相关的文件拷贝到本地。

    private void localizeTask(Task task) throws IOException{
    
          Path localTaskDir = 
            lDirAlloc.getLocalPathForWrite(
              TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
                task.getTaskID().toString(), task.isTaskCleanupTask()), 
              defaultJobConf );
          
          FileSystem localFs = FileSystem.getLocal(fConf);
          // create symlink for ../work if it already doesnt exist
          String workDir = lDirAlloc.getLocalPathToRead(
                             TaskTracker.getLocalJobDir(task.getJobID().toString())
                             + Path.SEPARATOR  
                             + "work", defaultJobConf).toString();
          String link = localTaskDir.getParent().toString() 
                          + Path.SEPARATOR + "work";
          File flink = new File(link);
          if (!flink.exists())
            FileUtil.symLink(workDir, link);
          
          // 创建task的工作目录
          Path cwd = lDirAlloc.getLocalPathForWrite(
                       getLocalTaskDir(task.getJobID().toString(), 
                          task.getTaskID().toString(), task.isTaskCleanupTask()) 
                       + Path.SEPARATOR + MRConstants.WORKDIR,
                       defaultJobConf);
    
          Path localTaskFile = new Path(localTaskDir, "job.xml");
          task.setJobFile(localTaskFile.toString());
          localJobConf.set("mapred.local.dir",
                           fConf.get("mapred.local.dir")); 
                
          localJobConf.set("mapred.task.id", task.getTaskID().toString());     
          }
                OutputStream out = localFs.create(localTaskFile);
                 localJobConf.writeXml(out);
         
          task.setConf(localJobConf);
        }

    10.Task是个抽象类,两个子类分别是MapTask和ReduceTask。先关注Map的TaskRunner。

    public TaskRunner createRunner(TaskTracker tracker, 
          TaskTracker.TaskInProgress tip) {
        return new MapTaskRunner(tip, tracker, this.conf);
      }

    11. TaskRunner线程的Run方法,有420行代码!主要作用是根据配置信息,构造java命令,启动一个java进程。

    拼接一个java指令,启动一个单独的java进程来执行每一个map或者reduce任务。这个java命令的class是Child。即这个java进程最终调用的是Child类的main函数。

    public final void run() {
        try {
          
          //before preparing the job localize 
          //all the archives
          TaskAttemptID taskid = t.getTaskID();
          LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
          File jobCacheDir = null;
          if (conf.getJar() != null) {
            jobCacheDir = new File(
                              new Path(conf.getJar()).getParent().toString());
          }
          File workDir = new File(lDirAlloc.getLocalPathToRead(
                                    TaskTracker.getLocalTaskDir( 
                                      t.getJobID().toString(), 
                                      t.getTaskID().toString(),
                                      t.isTaskCleanupTask())
                                    + Path.SEPARATOR + MRConstants.WORKDIR,
                                    conf). toString());
    
          URI[] archives = DistributedCache.getCacheArchives(conf);
          URI[] files = DistributedCache.getCacheFiles(conf);
          FileStatus fileStatus;
          FileSystem fileSystem;
          Path localPath;
          String baseDir;
    
          if ((archives != null) || (files != null)) {
            if (archives != null) {
              String[] archivesTimestamps = 
                                   DistributedCache.getArchiveTimestamps(conf);
              Path[] p = new Path[archives.length];
              for (int i = 0; i < archives.length;i++){
                fileSystem = FileSystem.get(archives[i], conf);
                fileStatus = fileSystem.getFileStatus(
                                          new Path(archives[i].getPath()));
                String cacheId = DistributedCache.makeRelative(archives[i],conf);
                String cachePath = TaskTracker.getCacheSubdir() + 
                                     Path.SEPARATOR + cacheId;
                
                localPath = lDirAlloc.getLocalPathForWrite(cachePath,
                                          fileStatus.getLen(), conf);
                baseDir = localPath.toString().replace(cacheId, "");
                p[i] = DistributedCache.getLocalCache(archives[i], conf, 
                                                      new Path(baseDir),
                                                      fileStatus,
                                                      true, Long.parseLong(
                                                            archivesTimestamps[i]),
                                                      new Path(workDir.
                                                            getAbsolutePath()), 
                                                      false);
                
              }
              DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
            }
            if ((files != null)) {
              String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
              Path[] p = new Path[files.length];
              for (int i = 0; i < files.length;i++){
                fileSystem = FileSystem.get(files[i], conf);
                fileStatus = fileSystem.getFileStatus(
                                          new Path(files[i].getPath()));
                String cacheId = DistributedCache.makeRelative(files[i], conf);
                String cachePath = TaskTracker.getCacheSubdir() +
                                     Path.SEPARATOR + cacheId;
                
                localPath = lDirAlloc.getLocalPathForWrite(cachePath,
                                          fileStatus.getLen(), conf);
                baseDir = localPath.toString().replace(cacheId, "");
                p[i] = DistributedCache.getLocalCache(files[i], conf, 
                                                      new Path(baseDir),
                                                      fileStatus,
                                                      false, Long.parseLong(
                                                               fileTimestamps[i]),
                                                      new Path(workDir.
                                                            getAbsolutePath()), 
                                                      false);
              }
              DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
            }
            Path localTaskFile = new Path(t.getJobFile());
            FileSystem localFs = FileSystem.getLocal(conf);
            localFs.delete(localTaskFile, true);
            OutputStream out = localFs.create(localTaskFile);
            try {
              conf.writeXml(out);
            } finally {
              out.close();
            }
          }
              
          if (!prepare()) {
            return;
          }
    
          String sep = System.getProperty("path.separator");
          StringBuffer classPath = new StringBuffer();
          // start with same classpath as parent process
          classPath.append(System.getProperty("java.class.path"));
          classPath.append(sep);
          if (!workDir.mkdirs()) {
            if (!workDir.isDirectory()) {
              LOG.fatal("Mkdirs failed to create " + workDir.toString());
            }
          }
          
          String jar = conf.getJar();
          if (jar != null) {       
            // if jar exists, it into workDir
            File[] libs = new File(jobCacheDir, "lib").listFiles();
            if (libs != null) {
              for (int i = 0; i < libs.length; i++) {
                classPath.append(sep);            // add libs from jar to classpath
                classPath.append(libs[i]);
              }
            }
            classPath.append(sep);
            classPath.append(new File(jobCacheDir, "classes"));
            classPath.append(sep);
            classPath.append(jobCacheDir);
           
          }
    
          // include the user specified classpath
              
          //archive paths
          Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
          if (archiveClasspaths != null && archives != null) {
            Path[] localArchives = DistributedCache
              .getLocalCacheArchives(conf);
            if (localArchives != null){
              for (int i=0;i<archives.length;i++){
                for(int j=0;j<archiveClasspaths.length;j++){
                  if (archives[i].getPath().equals(
                                                   archiveClasspaths[j].toString())){
                    classPath.append(sep);
                    classPath.append(localArchives[i]
                                     .toString());
                  }
                }
              }
            }
          }
          //file paths
          Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
          if (fileClasspaths!=null && files != null) {
            Path[] localFiles = DistributedCache
              .getLocalCacheFiles(conf);
            if (localFiles != null) {
              for (int i = 0; i < files.length; i++) {
                for (int j = 0; j < fileClasspaths.length; j++) {
                  if (files[i].getPath().equals(
                                                fileClasspaths[j].toString())) {
                    classPath.append(sep);
                    classPath.append(localFiles[i].toString());
                  }
                }
              }
            }
          }
    
          classPath.append(sep);
          classPath.append(workDir);
          //  Build exec child jmv args.
          Vector<String> vargs = new Vector<String>(8);
          File jvm =                                  // use same jvm as parent
            new File(new File(System.getProperty("java.home"), "bin"), "java");
    
          vargs.add(jvm.toString());
    
          // Add child (task) java-vm options.
          //
          // The following symbols if present in mapred.child.java.opts value are
          // replaced:
          // + @taskid@ is interpolated with value of TaskID.
          // Other occurrences of @ will not be altered.
          //
          // Example with multiple arguments and substitutions, showing
          // jvm GC logging, and start of a passwordless JVM JMX agent so can
          // connect with jconsole and the likes to watch child memory, threads
          // and get thread dumps.
          //
          //  <property>
          //    <name>mapred.child.java.opts</name>
          //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc 
          //           -Dcom.sun.management.jmxremote.authenticate=false 
          //           -Dcom.sun.management.jmxremote.ssl=false 
          //    </value>
          //  </property>
          //
          String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
          javaOpts = javaOpts.replace("@taskid@", taskid.toString());
          String [] javaOptsSplit = javaOpts.split(" ");
          
          // Add java.library.path; necessary for loading native libraries.
          //
          // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
          //    parent processes' java.library.path to the child. 
          // 2. We also add the 'cwd' of the task to it's java.library.path to help 
          //    users distribute native libraries via the DistributedCache.
          // 3. The user can also specify extra paths to be added to the 
          //    java.library.path via mapred.child.java.opts.
          //
          String libraryPath = System.getProperty("java.library.path");
          if (libraryPath == null) {
            libraryPath = workDir.getAbsolutePath();
          } else {
            libraryPath += sep + workDir;
          }
          boolean hasUserLDPath = false;
          for(int i=0; i<javaOptsSplit.length ;i++) { 
            if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
              javaOptsSplit[i] += sep + libraryPath;
              hasUserLDPath = true;
              break;
            }
          }
          if(!hasUserLDPath) {
            vargs.add("-Djava.library.path=" + libraryPath);
          }
          for (int i = 0; i < javaOptsSplit.length; i++) {
            vargs.add(javaOptsSplit[i]);
          }
    
          // add java.io.tmpdir given by mapred.child.tmp
          String tmp = conf.get("mapred.child.tmp", "./tmp");
          Path tmpDir = new Path(tmp);
          
          // if temp directory path is not absolute 
          // prepend it with workDir.
          if (!tmpDir.isAbsolute()) {
            tmpDir = new Path(workDir.toString(), tmp);
          }
          FileSystem localFs = FileSystem.getLocal(conf);
          if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
            throw new IOException("Mkdirs failed to create " + tmpDir.toString());
          }
          vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
    
          // Add classpath.
          vargs.add("-classpath");
          vargs.add(classPath.toString());
    
          // Setup the log4j prop
          long logSize = TaskLog.getTaskLogLength(conf);
          vargs.add("-Dhadoop.log.dir=" + 
              new File(System.getProperty("hadoop.log.dir")
              ).getAbsolutePath());
          vargs.add("-Dhadoop.root.logger=INFO,TLA");
          vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
          vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
    
          if (conf.getProfileEnabled()) {
            if (conf.getProfileTaskRange(t.isMapTask()
                                         ).isIncluded(t.getPartition())) {
              File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
              vargs.add(String.format(conf.getProfileParams(), prof.toString()));
            }
          }
    
          // Add main class and its arguments 
          vargs.add(Child.class.getName());  // main of Child
          // pass umbilical address
          InetSocketAddress address = tracker.getTaskTrackerReportAddress();
          vargs.add(address.getAddress().getHostAddress()); 
          vargs.add(Integer.toString(address.getPort())); 
          vargs.add(taskid.toString());                      // pass task identifier
    
          String pidFile = lDirAlloc.getLocalPathForWrite(
                (TaskTracker.getPidFile(t.getJobID().toString(), 
                 taskid.toString(), t.isTaskCleanupTask())),
                this.conf).toString();
          t.setPidFile(pidFile);
          tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
    
          // set memory limit using ulimit if feasible and necessary ...
          String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
          List<String> setup = null;
          if (ulimitCmd != null) {
            setup = new ArrayList<String>();
            for (String arg : ulimitCmd) {
              setup.add(arg);
            }
          }
    
          // Set up the redirection of the task's stdout and stderr streams
          File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
          File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
          stdout.getParentFile().mkdirs();
          tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
    
          Map<String, String> env = new HashMap<String, String>();
          StringBuffer ldLibraryPath = new StringBuffer();
          ldLibraryPath.append(workDir.toString());
          String oldLdLibraryPath = null;
          oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
          if (oldLdLibraryPath != null) {
            ldLibraryPath.append(sep);
            ldLibraryPath.append(oldLdLibraryPath);
          }
          env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
          jvmManager.launchJvm(this, 
              jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
                  workDir, env, pidFile, conf));
          synchronized (lock) {
            while (!done) {
              lock.wait();
            }
          }
          tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
          if (exitCodeSet) {
            if (!killed && exitCode != 0) {
              if (exitCode == 65) {
                tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
              }
              throw new IOException("Task process exit with nonzero status of " +
                  exitCode + ".");
            }
          }
        } catch (FSError e) {
          LOG.fatal("FSError", e);
          try {
            tracker.fsError(t.getTaskID(), e.getMessage());
          } catch (IOException ie) {
            LOG.fatal(t.getTaskID()+" reporting FSError", ie);
          }
        } catch (Throwable throwable) {
          LOG.warn(t.getTaskID()+" Child Error", throwable);
          ByteArrayOutputStream baos = new ByteArrayOutputStream();
          throwable.printStackTrace(new PrintStream(baos));
          try {
            tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
          } catch (IOException e) {
            LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
          }
        } finally {
          try{
            URI[] archives = DistributedCache.getCacheArchives(conf);
            URI[] files = DistributedCache.getCacheFiles(conf);
            if (archives != null){
              for (int i = 0; i < archives.length; i++){
                DistributedCache.releaseCache(archives[i], conf);
              }
            }
            if (files != null){
              for(int i = 0; i < files.length; i++){
                DistributedCache.releaseCache(files[i], conf);
              }
            }
          }catch(IOException ie){
            LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
          }
          tip.reportTaskFinished();
        }
      }
    TaskRunner的run方法 

    12.JvmManager的 launchJvm方法。在TaskRunner的run方法,是构造一个java命令的参数,调用JvmManager的launchJvm方法执行。

    public void launchJvm(TaskRunner t, JvmEnv env) {
        if (t.getTask().isMapTask()) {
          mapJvmManager.reapJvm(t, env);
        } else {
          reduceJvmManager.reapJvm(t, env);
        }
      }

    13. JvmManagerForType的reapJvm方法

    private synchronized void reapJvm( 
            TaskRunner t, JvmEnv env) {
          if (t.getTaskInProgress().wasKilled()) {
           //如果task被杀死则直接返回
            return;
          }
          boolean spawnNewJvm = false;
          JobID jobId = t.getTask().getJobID();
          //检查是否有空闲的槽,如果小于最大jvm数,则重新开启一个jvm,不让你从现有job的空闲jvm中选择一个,或者杀死另外job的空闲jvm
          int numJvmsSpawned = jvmIdToRunner.size();
          JvmRunner runnerToKill = null;
          if (numJvmsSpawned >= maxJvms) {
            //go through the list of JVMs for all jobs.
            Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
              jvmIdToRunner.entrySet().iterator();
            
            while (jvmIter.hasNext()) {
              JvmRunner jvmRunner = jvmIter.next().getValue();
              JobID jId = jvmRunner.jvmId.getJobId();
              //look for a free JVM for this job; if one exists then just break
              if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
                setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
                LOG.info("No new JVM spawned for jobId/taskid: " + 
                         jobId+"/"+t.getTask().getTaskID() +
                         ". Attempting to reuse: " + jvmRunner.jvmId);
                return;
              }
          
              if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
                  (!jId.equals(jobId) && !jvmRunner.isBusy())) {
                runnerToKill = jvmRunner;
                spawnNewJvm = true;
              }
            }
          } else {
            spawnNewJvm = true;
          }
    
          if (spawnNewJvm) {
            if (runnerToKill != null) {
              LOG.info("Killing JVM: " + runnerToKill.jvmId);
              runnerToKill.kill();
            }
            spawnNewJvm(jobId, env, t);
            return;
          }
    }

    14. JvmManagerForType的spawnNewJvm方法。重新启动一个jvm。

     private void spawnNewJvm(JobID jobId, JvmEnv env,  
            TaskRunner t) {
          JvmRunner jvmRunner = new JvmRunner(env,jobId);
          jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
          jvmRunner.setDaemon(true);
          jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
          setRunningTaskForJvm(jvmRunner.jvmId, t);
          LOG.info(jvmRunner.getName());
          jvmRunner.start();
        }

    15. JvmRunner线程的run方法。

      public void run() {
            runChild(env);
          }

    16. JvmRunner线程的runChild方法。其中掉ShellCommandExecutor的execute方法。ShellCommandExecutor封装了shell执行。即把前面步骤构造的JvmEnv类型的执行信息分装成一个字符串列表,使用该列表构造一个ShellCommandExecutor来执行命令。

    public void runChild(JvmEnv env) {
    env.vargs.add(Integer.toString(jvmId.getId()));
              List<String> wrappedCommand = 
                TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
                    env.logSize, env.pidFile);
              shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
                  env.workDir, env.env);
              shexec.execute();
    }

     完.

    为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_mapreduce_tasktracker_launch_task.html。谢谢!

  • 相关阅读:
    圈水池 nyoj 78 凸包算法
    凸包算法入门
    nyoj 633 幂
    软件下载地址
    概率论与数理统计
    迷宫最短路径 问题
    将项目发布至开发环境测试环境的方法
    一些JavaScript技巧
    随机生成10个不重复的0-100的数字
    Git添加远程库和从远程库中获取(新手傻瓜式教学)
  • 原文地址:https://www.cnblogs.com/douba/p/hadoop_mapreduce_tasktracker_launch_task.html
Copyright © 2011-2022 走看看