zoukankan      html  css  js  c++  java
  • MR 的 mapper 数量问题


    看到群里面一篇文章涨了贱识

    http://www.cnblogs.com/xuxm2007/archive/2011/09/01/2162011.html

    之前关注过 reduceer 的数量问题,还没注意到 mapper 的数量怎么确定的

    文章中可以提炼出三点:

    1.block和split的关系;2.mapper数量是怎么确定的;3.一个split不会包含两个File的Block,不会跨越File边界

    还好自己手贱去翻了一下源码


     在hadoop2.2.0 的源码中关于mapper数量确定的核心代码为:

     1 for (FileStatus file: files) {
     2       Path path = file.getPath();
     3       long length = file.getLen();
     4       if (length != 0) {
     5         BlockLocation[] blkLocations;
     6         if (file instanceof LocatedFileStatus) {
     7           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
     8         } else {
     9           FileSystem fs = path.getFileSystem(job.getConfiguration());
    10           blkLocations = fs.getFileBlockLocations(file, 0, length);
    11         }
    12         if (isSplitable(job, path)) {
    13           long blockSize = file.getBlockSize();
    14           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    15 
    16           long bytesRemaining = length;
    17           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    18             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    19             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    20                                      blkLocations[blkIndex].getHosts()));
    21             bytesRemaining -= splitSize;
    22           }
    23 
    24           if (bytesRemaining != 0) {
    25             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    26             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    27                        blkLocations[blkIndex].getHosts()));
    28           }
    29         } else { // not splitable
    30           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
    31         }
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(makeSplit(path, 0, length, new String[0]));
    35       }
    36     }

    并没有看到文章中提到的goalSize,读一下源码就会发现和自己设定的mapper数量变量一点关系都没有

    再看hadoop1.x的代码,一样没有,又翻到以前下载的hadoop0.x的源代码,才找到了,读了一下就会发现文章中博主的观点都是对的,不过已经过时了而已

    那么现在的mapper数量是怎么确定的?


    想要自己设定mapper数量并不像设定reduceer数量那么简单直接调用Job.setNumReduceTasks(int)就可以了,Job类没有setNumMapTasks方法

    但可以通过Configuration.set(JobContext.NUM_MAPS, int)和在hadoop jar命令行提交时加参数-Dmapreduce.job.maps

    但测试并没有效果

    根据hadoop作业提交过程跟读源码发现在hadoop通过JobSubmitter类的submitJobInternal(Jobjob, Cluster cluster)方法向系统提交作业时有跟mapper数量有关的代码

        // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          int maps = writeSplits(job, submitJobDir);
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          LOG.info("number of splits:" + maps);

    mapper的数量通过writeSplits方法返回,该方法相关源代码:

      private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
         JobConf jConf = (JobConf)job.getConfiguration();
          int maps;
          if (jConf.getUseNewMapper()) {
            maps = writeNewSplits(job, jobSubmitDir);
          } else {
            maps = writeOldSplits(jConf, jobSubmitDir);
          }
          return maps;
      }

    新旧版本的jobcontext暂且不论,现在一般都是新的,所以由writeNewSplits方法确定

     1   @SuppressWarnings("unchecked")
     2   private <T extends InputSplit>
     3   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
     4       InterruptedException, ClassNotFoundException {
     5      Configuration conf = job.getConfiguration();
     6       InputFormat<?, ?> input =
     7       ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
     8 
     9       List<InputSplit> splits = input.getSplits(job);
    10       T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    11 
    12       // sort the splits into order based on size, so that the biggest
    13       // go first
    14       Arrays.sort(array, new SplitComparator());
    15       JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
    16         jobSubmitDir.getFileSystem(conf), array);
    17       return array.length;
    18   }

    从代码中可以读出1.map的数量就是split的数量;2.map的数量是由反射出的inputformat类算出的;

    inputfomat是一个接口,最常用的是它的实现类FileInputFormat和其子类TextInputFormat,在MR中如果不指定则默认为TextInputFormat

    split的计算方法在TextInputFormat中没有,在其父类FileInputFormat中

    在这里看到最开始贴出的问题源头源代码

     1 for (FileStatus file: files) {
     2       Path path = file.getPath();
     3       long length = file.getLen();
     4       if (length != 0) {
     5         BlockLocation[] blkLocations;
     6         if (file instanceof LocatedFileStatus) {
     7           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
     8         } else {
     9           FileSystem fs = path.getFileSystem(job.getConfiguration());
    10           blkLocations = fs.getFileBlockLocations(file, 0, length);
    11         }
    12         if (isSplitable(job, path)) {
    13           long blockSize = file.getBlockSize();
    14           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    15 
    16           long bytesRemaining = length;
    17           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    18             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    19             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    20                                      blkLocations[blkIndex].getHosts()));
    21             bytesRemaining -= splitSize;
    22           }
    23 
    24           if (bytesRemaining != 0) {
    25             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    26             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    27                        blkLocations[blkIndex].getHosts()));
    28           }
    29         } else { // not splitable
    30           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
    31         }
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(makeSplit(path, 0, length, new String[0]));
    35       }
    36     }

    其中核心是

     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(blockSize, minSize, maxSize);

      protected long computeSplitSize(long blockSize, long minSize,
        long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }

    其中的getMinSplitSize和getMaxSplitSize方法分别用于获取最小InputSplit和最大InputSplit的值,对应的配置参数分别为mapreduce.input.fileinputformat.split.minsize,默认值为1L和mapreduce.input.fileinputformat.split.maxsize,默认值为Long.MAX_VALUE,十六进制数值为 0x7fffffffffffffffL,对应的十进制为9223372036854775807,getFormatMinSplitSize方法返回该输入格式下InputSplit的下限。以上数字的单位都是byte。由此得出minSize的大小为1L,maxSize的大小为Long.MAX_VALUE,而blockSize就是hadoop块的大小,hadoop2.x后一般为128M,结合代码可以发现splitSize一般就是块的大小

    所以,结论来了,一般mapper的数量就是文件块的数量。

    不过这样设计也很有道理,因为块都是分散和副本存储的,所以可以参考块在哪个主机上就跟哪个主机分配map任务(不是唯一因素),实现本地性,提高效率。


    不过还存在三个问题值得思考一下

    1.如果根据特殊情况的需要非要自定义mapper的数量怎么办?

    那就只有修改块的大小、split的最小值和最大值来影响mapper的数量;

    2.如果多文件呢?

    这里还发现源头文章中说在FileInputFormat的getSplits方法中计算单位都是块的数量,这个结论是不正确的,单位还是byte

    代码隐藏的一个规律就是split是按文件划分的,虽然划的时候采用了SPLIT_SLOP(默认1.1),也难免会有大于0.1*blockSize小于blockSize的split

    不过没有往下个文件推,所以 一个split不会包含两个File的Block,不会跨越File边界

    (一个split也不一定就是blockSize的大小,除了最小split和最大split的值影响外还可能小于blockSize和大于blockSize小于1.1*blockSize)

    3.如果通过Configuration.set(JobContext.NUM_MAPS, int)自定义了mapper的数量会出现什么情况?

    结合hadoop作业的提交过程可发现这个值会被计算后的NUM_MAPS覆盖,所以设置了也没用


    都是手贱惹的祸

    欲为大树,何与草争;心若不动,风又奈何。
  • 相关阅读:
    查找算法之——符号表(引入篇)
    排序算法之——优先队列经典实现(基于二叉堆)
    C# Timer和多线程编程、委托、异步、Func/Action
    Tomcat汇总-部署多个项目(不同端口)
    数据库汇总(MySQL教材)
    基础知识
    常用工具&网址
    Phython开发
    单元测试
    软件项目过程和文档
  • 原文地址:https://www.cnblogs.com/admln/p/hadoop-mapper-numbers-question.html
Copyright © 2011-2022 走看看