zoukankan      html  css  js  c++  java
  • ${mapred.local.dir}选择策略--Map Task存放中间结果

      上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略。

    1 mapred-site.xml
    2 <property>
    3   <name>mapred.local.dir</name>
    4   <value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local</value>
    5 </property>

      当${mapred.local.dir}配置有多个目录分别用来挂载不同的硬盘时,Map Task的结果应该存放在哪个目录中?首先还是看一下方法的调用层次,如下图所示:

      下面分析这两个方法:

     1    /** Get a path from the local FS. If size is known, we go
     2      *  round-robin over the set of disks (via the configured dirs) and return
     3      *  the first complete path which has enough space.
     4      *  
     5      *  If size is not known, use roulette selection -- pick directories
     6      *  with probability proportional to their available space.
     7      */
     8     public synchronized 
     9     Path getLocalPathForWrite(String pathStr, long size, 
    10                               Configuration conf, boolean checkWrite
    11                               ) throws IOException {
    12         //检查task目录是否有变化
    13       confChanged(conf);
    14       int numDirs = localDirsPath.length;    //获取${mapred.local.dir}目录的个数
    15       int numDirsSearched = 0;    //表示已经搜索过的次数
    16       //remove the leading slash from the path (to make sure that the uri
    17       //resolution results in a valid path on the dir being checked)
    18       if (pathStr.startsWith("/")) {    //是指output/spill0.out文件
    19         pathStr = pathStr.substring(1);
    20       }
    21       Path returnPath = null;
    22       Path path = new Path(pathStr);
    23       
    24       //当要写入的数据量大小未知时
    25       if(size == SIZE_UNKNOWN) {  //do roulette selection: pick dir with probability 
    26                     //proportional to available size
    27         long[] availableOnDisk = new long[dirDF.length];
    28         long totalAvailable = 0;
    29         
    30             //build the "roulette wheel"
    31         for(int i =0; i < dirDF.length; ++i) { 
    32             //分别计算每一个${mapred.local.dir}目录可用大小,并计算总的可用大小
    33           availableOnDisk[i] = dirDF[i].getAvailable();
    34           totalAvailable += availableOnDisk[i];
    35         }
    36 
    37         // Keep rolling the wheel till we get a valid path
    38         Random r = new java.util.Random();
    39         while (numDirsSearched < numDirs && returnPath == null) {
    40           long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
    41           int dir = 0;
    42           while (randomPosition > availableOnDisk[dir]) {
    43             randomPosition -= availableOnDisk[dir];
    44             dir++;
    45           }
    46           dirNumLastAccessed = dir;    //表示上次访问过的目录
    47           //从${mapred.local.dir}中选择一个目录,在其下创建output/spill0.out文件
    48           returnPath = createPath(path, checkWrite); 
    49           if (returnPath == null) { //如果创建失败(可能存在disk read-only的情况)
    50             totalAvailable -= availableOnDisk[dir];
    51             availableOnDisk[dir] = 0; // skip this disk
    52             numDirsSearched++;
    53           }
    54         }
    55       } else {  //写入的数据量已知
    56         while (numDirsSearched < numDirs && returnPath == null) {
    57           long capacity = dirDF[dirNumLastAccessed].getAvailable();
    58           if (capacity > size) {
    59               returnPath = createPath(path, checkWrite);
    60           }
    61           //使用轮流的方式来选择${mapred.local.dir}
    62           dirNumLastAccessed++;
    63           dirNumLastAccessed = dirNumLastAccessed % numDirs; 
    64           numDirsSearched++;
    65         } 
    66       }
    67       if (returnPath != null) {
    68         return returnPath;
    69       }
    70       
    71       //no path found
    72       throw new DiskErrorException("Could not find any valid local " +
    73           "directory for " + pathStr);
    74     }

      confChanged(conf)方法首先检查原来的目录配置是否改变,这个下面说;然后给numDirs赋值,它表示总的${mapred.local.dir}目录个数,localDirsPath数组变量在confChanged(conf)方法中被更新了;接着在准备创建output/spill0.out文件,这个文件就是Map Task的运算结果在缓冲区写满之后spill到disk生成的文件,序号0代表序号,最后会将多个spill文件合成一个file.out文件;接下来就要选择${mapred.local.dir}目录了。其过程如下:

      1、如果要写入的数据量大小未知时:

      a) 计算dirDF数组中每个元素的剩余大小,并计算所有元素的总大小totalAvailable;

      b) (循环)生成一个Long类型随机正数,该随机数对总大小totalAvailable取余后得randomPosition。

            (内层循环)若randomPosition > 某个disk剩余量,则randomPosition减去该disk剩余量,并与下一个disk剩余量比较……

      c) 选择了某个disk之后,如果这个disk不能创建文件,则排除这个disk,重新选择disk(总共尝试localDirsPath.length次)

      2、要写入的数据量大小已知时:将${mapred.local.dir}组织成一个数组,轮流的使用数组中的目录。dirNumLastAccessed表示上次访问过的目录;

      下面反过来分析下confChanged()方法。

       实际上该方法中的获取到的localDirs数组所代表的目录,是Map Task或Reduce Task的工作目录(即attempt_jobid_taskid_m_attemptid*)。因为每次不同的Task会使用不同的工作目录。所以每次不同的Task来read/write数据时,该方法都会为他们构造工作目录。具体代码如下: 

     1 /** This method gets called everytime before any read/write to make sure
     2      * that any change to localDirs is reflected immediately.
     3      */
     4     private synchronized void confChanged(Configuration conf
     5                                           ) throws IOException {
     6         //contextCfgItemName="mapred.local.dir"
     7       String newLocalDirs = conf.get(contextCfgItemName);
     8       if (!newLocalDirs.equals(savedLocalDirs)) { //savedLocalDirs代表上个task的工作目录
     9         String[] localDirs = conf.getStrings(contextCfgItemName);
    10         localFS = FileSystem.getLocal(conf);
    11         int numDirs = localDirs.length;  //${mapred.local.dir}目录的个数
    12         ArrayList<String> dirs = new ArrayList<String>(numDirs);
    13         ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
    14         for (int i = 0; i < numDirs; i++) {
    15           try {
    16             // filter problematic directories
    17             Path tmpDir = new Path(localDirs[i]);
    18             //检查task的工作目录(attempt....)是否存在,如果不存在,则新建
    19             if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
    20               try {
    21                 DiskChecker.checkDir(new File(localDirs[i]));
    22                 dirs.add(localDirs[i]);
    23                 dfList.add(new DF(new File(localDirs[i]), 30000));
    24               } catch (DiskErrorException de) {
    25                 LOG.warn( localDirs[i] + "is not writable
    " +
    26                     StringUtils.stringifyException(de));
    27               }
    28             } else {
    29               LOG.warn( "Failed to create " + localDirs[i]);
    30             }
    31           } catch (IOException ie) { 
    32             LOG.warn( "Failed to create " + localDirs[i] + ": " +
    33                 ie.getMessage() + "
    " + StringUtils.stringifyException(ie));
    34           } //ignore
    35         }
    36         localDirsPath = new Path[dirs.size()];
    37         for(int i=0;i<localDirsPath.length;i++) {
    38           localDirsPath[i] = new Path(dirs.get(i));
    39         }
    40         dirDF = dfList.toArray(new DF[dirs.size()]);
    41         savedLocalDirs = newLocalDirs;  //保存此次的task工作目录
    42         
    43         // randomize the first disk picked in the round-robin selection 
    44         //因为该task所有的工作目录都遍历过了,所以随机选择一个目录作为最后访问过的目录
    45         dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
    46       }
    47     }

       上面代码中的localDirsPath变量的内容如下所示: 

    /mapred/local/dir1/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0 

    /mapred/local/dir2/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0 

    /mapred/local/dir3/taskTracker/hadoop/jobcache/job_local1424926029_0001/attempt_local1424926029_0001_m_000000_0

     

      可以看到,这些路径中就只有${mapred.local.dir}不同,其下的目录结构都完全一样。

      说一下Task的工作目录。TaskTracker会在${mapred.local.dir}下生成相同的目录结构用来存放Map Task处理的结果数据,然后在Job完成时清理掉这些数据和目录。

      Task的工作目录就是指:${mapred.local.dir}/taskTracker/${user}/jobcache/jobID/taskID目录。在这个目录下的output文件夹中就存放着Map Task的结果,并以上述方式使用这些目录。

      才开始时,output目录下只有spill0.out文件(0代表序号),之后可能会产生多个spill文件。当Map Task执行完毕后会把所有属于该Task(即同一个taskid目录下)的spill文件合并成file.out文件。

       

      变量dirDF代表了一个DF数组,DF类代表了disk的使用情况(使用"df -k"命令得到),包含的属性如下:

     1 /**
     2  * Filesystem disk space usage statistics. Uses the unix 'df' program to get
     3  * mount points, and java.io.File for space utilization. Tested on Linux,
     4  * FreeBSD, Cygwin.
     5  */
     6 public class DF extends Shell {
     7 
     8   private final String dirPath;
     9   private final File dirFile;
    10   private String filesystem;
    11   private String mount;

      分析完写数据的部分后,读数据的部分就很简单了。使用getLocalPathToRead()方法,从整个${mapred.local.dir}/taskTracker/${user}/jobcache/jobID/taskID中寻找所需要的文件,找到后返回其路径信息即可。

      ${mapred.local.dir}的选择策略也有以下问题: 

    1、disk是只读的 

    2、Disk没有足够空间了(多个线程共享disk)

      

      本文基于hadoop1.2.1

      如有错误,还请指正

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/4124980.html

  • 相关阅读:
    013.ES6 -对象字面量增强型写法
    012. ES6
    011. ES6 语法
    10. 9. Vue 计算属性的setter和getter 以及 计算属性的缓存讲解
    4. Spring MVC 数据响应方式
    3. SpringMVC 组件解析
    9. Vue 计算属性
    【洛谷 2984】给巧克力
    【洛谷 1821】捉迷藏 Hide and Seek
    【洛谷 1821】银牛派对Silver Cow Party
  • 原文地址:https://www.cnblogs.com/gwgyk/p/4124980.html
Copyright © 2011-2022 走看看