zoukankan      html  css  js  c++  java
  • MapReduce :基于 FileInputFormat 的 mapper 数量控制

    本篇分两部分,第一部分分析使用 java 提交 mapreduce 任务时对 mapper 数量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任务时对 mapper 数量的控制。

     

    环境:hadoop-3.0.2

    前言:

    熟悉 hadoop mapreduce 的人可能已经知道,即使在程序里对 conf 显式地设置了 mapred.map.tasks 或 mapreduce.job.maps,程序也并没有运行期望数量的 mapper。

    这是因为,mapper 的数量由输入的大小、HDFS 当前设置的 BlockSize、以及当前配置中的 split min size 和 split max size 等参数共同确定,并不会受到简单的人工设置 mapper num 的影响。

    因此,对于 mapper num 的控制,需要我们理解 hadoop 中对于 FileInputFormat 类中 getSplit() 方法的实现,针对性地配置 BlockSize、split min size、split max size 等参数,才能达到目的。

    重点:

    值得一提并且容易忽略的是,要区分 org.apache.hadoop.mapred.FileInputFormat类和 org.apache.hadoop.mapreduce.lib.input.FileInputFormat类,两者虽然相似,但在getSplit()上的实现是有区别的。

    重要区别是,hadoop streaming 中使用的 InputFormat 类,使用的是 org.apache.hadoop.mapred.FileInputFormat,仅仅需要指定 mapreduce.job.maps ,就能够设置 mapper num了(具体源码分析在第二部分)。而使用JAVA设计的 mapreduce 任务中使用的 InputFormat 类,使用的是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,则需要通过配置BlockSize、split min size、split max size 等参数来间接性地控制 mapper num。

     

    一、Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制

    1. 在java本地编辑 mapreduce 任务,(默认)使用 FileInputFormat 类的子类 TextInputFormat

    job.setInputFormatClass(TextInputFormat.class);
    
    
    

    2. mapper 的切分逻辑在 FileInputFormat 类中的 getSplits()实现:

    public List<InputSplit> getSplits(JobContext job) throws IOException {
            StopWatch sw = (new StopWatch()).start();
            long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
            long maxSize = getMaxSplitSize(job);
            List<InputSplit> splits = new ArrayList();
            List<FileStatus> files = this.listStatus(job);
            Iterator var9 = files.iterator();
    
            while(true) {
                while(true) {
                    while(var9.hasNext()) {
                        FileStatus file = (FileStatus)var9.next();
                        Path path = file.getPath();
                        long length = file.getLen();
                        if (length != 0L) {
                            BlockLocation[] blkLocations;
                            if (file instanceof LocatedFileStatus) {
                                blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                            } else {
                                FileSystem fs = path.getFileSystem(job.getConfiguration());
                                blkLocations = fs.getFileBlockLocations(file, 0L, length);
                            }
    
                            if (this.isSplitable(job, path)) {
                                long blockSize = file.getBlockSize();
                                long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
    
                                long bytesRemaining;
                                int blkIndex;
                                for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                    blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                    splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                                }
    
                                if (bytesRemaining != 0L) {
                                    blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                    splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                                }
                            } else {
                                if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
                                    LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                                }
    
                                splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                            }
                        } else {
                            splits.add(this.makeSplit(path, 0L, length, new String[0]));
                        }
                    }
    
                    job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                    sw.stop();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                    }
    
                    return splits;
                }
            }
        }

    3. 最后确定 mapper 数量在这里:

     1                         if (this.isSplitable(job, path)) {
     2                             long blockSize = file.getBlockSize();
     3                             long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
     4 
     5                             long bytesRemaining;
     6                             int blkIndex;
     7                             for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
     8                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
     9                                 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
    10                             }
    11 
    12                             if (bytesRemaining != 0L) {
    13                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
    14                                 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
    15                             }

    含义:

    a. 当 this.isSplitable 开启时,只要当前未分配的大小 bytesRemaining 大于 splitSize 的 1.1 倍,就添加一个 inputSplit, 即一个mapper 被生成。 

    b. 最后,不足 1.1 倍splitSize 的残余,补充为一个 mapper。因此,经常发现实际分配的 mapper 数比自己定义的会多 1 个。

    c. 为什么设置1.1倍?避免将不足 0.1 倍 splitSize 的量分配为一个 mapper, 避免浪费。

    4.  重要的两个量:BlockSize 和 splitSize

    long blockSize = file.getBlockSize();
    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

    其中,blockSize 是 hdfs 设置的,一般是 64MB 或 128MB,我的 hdfs 中为 128 MB = 132217728L。这个量可认为静态,我们不宜修改。

    观察 splitSize 的获得:

    1     protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    2         return Math.max(minSize, Math.min(maxSize, blockSize));
    3     }

    在 getSplits()中找到 minSize, maxSize, blockSize 的赋值:

    long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    找到这些量的赋值、默认值:

    maxSize 的 setter/getter:    除非用户重新设置,否则 maxSize 的默认值为 Long 的最大值 

    1 public static void setMaxInputSplitSize(Job job, long size) {
    2         job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", size);
    3     }
    4 
    5 public static long getMaxSplitSize(JobContext context) {
    6         return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
    7     }

    minSize 的 setter/getter:  除非用户重新设置,否则 minSize 的默认值为 1L

    protected long getFormatMinSplitSize() {
            return 1L;
        }
    
    public static void setMinInputSplitSize(Job job, long size) {
            job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", size);
        }
    
    public static long getMinSplitSize(JobContext job) {
            return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
        }

    因此容易算出,默认情况下,

    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize) = Math.max(Math.max(1L,1L), Math.min(9223372036854775807L, 128M=132217728L)) = 132217728L = 128M

    5. 控制 mapper 数量

    知道了上面的计算过程,我们要控制 mapper,在 BlockSize 不能动的情况下,就必须控制 minSize 和 maxSize 了。这里主要控制 maxSize。 

    TextInputFormat.setMinInputSplitSize(job, 1L);//设置minSize
    TextInputFormat.setMaxInputSplitSize(job, 10 * 1024 * 1024);//设置maxSize

    测试输入文件大小为 40MB, 很小, 在默认情况下, 被分配为 1 个或 2 个 mapper 执行成功。

    现在希望分配 4 个mapper:那么设置 maxSize 为10M ,那么 splitSize 计算为 10M。对于 40MB 的输入文件,理应分配 4 个mapper。

    实际运行,运行了 5 个mapper,认为成功摆脱了默认启动 2 个mapper 的限制,额外多出的 1 个 mapper 则猜测是上文提到的,对残余量的补充 mapper。

    6. 至此,对Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制方法如上。接下来讨论 streaming 使用的 org.apache.hadoop.mapred.FileInputFormat 的 mapper 控制。

    二、streaming 提交 mapreduce 任务, org.apache.hadoop.mapred.FileInputFormat 的 mapper num 控制

    1. 可通过 mapreduce.job.maps 直接控制,即使不是绝对精确。原因在下面的源码分析中可以看到。

    1 hadoop dfs -rm -r -f /output && 
    2 
    3 hadoop jar /opt/hadoop-3.0.2/share/hadoop/tools/lib/hadoop-streaming-3.0.2.jar 
    4 -D mapreduce.reduce.tasks=0 
    5 -D mapreduce.job.maps=7 
    6 -input /input 
    7 -output /output 
    8 -mapper "cat" 
    9 -inputformat TextInputFormat

    2. 将 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 中的 maxSize,尝试通过 streaming 的 -D 设置,是无效的。因为 streaming 使用的是 org.apache.hadoop.mapred.FileInputFormat,在下面的源码分析中可以看到。

    3. 查看 FileInputFormat 的 getSplits 源码

     1     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     2         StopWatch sw = (new StopWatch()).start();
     3         FileStatus[] files = this.listStatus(job);
     4         job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
     5         long totalSize = 0L;
     6         FileStatus[] var7 = files;
     7         int var8 = files.length;
     8 
     9         for(int var9 = 0; var9 < var8; ++var9) {
    10             FileStatus file = var7[var9];
    11             if (file.isDirectory()) {
    12                 throw new IOException("Not a file: " + file.getPath());
    13             }
    14 
    15             totalSize += file.getLen();
    16         }
    17 
    18         long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
    19         long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
    20         ArrayList<FileSplit> splits = new ArrayList(numSplits);
    21         NetworkTopology clusterMap = new NetworkTopology();
    22         FileStatus[] var13 = files;
    23         int var14 = files.length;
    24 
    25         for(int var15 = 0; var15 < var14; ++var15) {
    26             FileStatus file = var13[var15];
    27             Path path = file.getPath();
    28             long length = file.getLen();
    29             if (length == 0L) {
    30                 splits.add(this.makeSplit(path, 0L, length, new String[0]));
    31             } else {
    32                 FileSystem fs = path.getFileSystem(job);
    33                 BlockLocation[] blkLocations;
    34                 if (file instanceof LocatedFileStatus) {
    35                     blkLocations = ((LocatedFileStatus)file).getBlockLocations();
    36                 } else {
    37                     blkLocations = fs.getFileBlockLocations(file, 0L, length);
    38                 }
    39 
    40                 if (!this.isSplitable(fs, path)) {
    41                     if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
    42                         LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
    43                     }
    44 
    45                     String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
    46                     splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
    47                 } else {
    48                     long blockSize = file.getBlockSize();
    49                     long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
    50 
    51                     long bytesRemaining;
    52                     String[][] splitHosts;
    53                     for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    54                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
    55                         splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
    56                     }
    57 
    58                     if (bytesRemaining != 0L) {
    59                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
    60                         splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
    61                     }
    62                 }
    63             }
    64         }
    65 
    66         sw.stop();
    67         if (LOG.isDebugEnabled()) {
    68             LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    69         }
    70 
    71         return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
    72     }

    与 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 相似,但不同之处还是很重要的。主要在

    long blockSize = file.getBlockSize();
    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

    赋值:

    long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);

    4. 追溯这些量

    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
        return Math.max(minSize, Math.min(goalSize, blockSize));
    }
    private long minSplitSize = 1L;

    5. 分析

    minSize 默认为1L,blocksize是当前目标文件的块大小,而 splitSize 就是 BlockSize 和 goalSize 的小值。

    goalSize 的计算,就是输入文件总大小与 numSplits 的比值。而 numSplits 就是我们在streaming 中设置的 -D mapreduce.job.maps

    因此,在streaming中才可以简单地直接设置 mapper 的数量了。

    但是,只有当 goalsize 小于 blocksize 时,mapreduce.job.maps 才会生效!

    当 goalsize < blocksize,splitsize = goalsize,此时你设置的 mapreduce.job.maps 数量一般大于输入块的数量,因此配置生效。

    当 goalsize > blocksize,splitsize = blocksize,此时你设置的 mapreduce.job.maps 不足,一般少于输入块的数量,因此配置不生效。

    换句话说,如果输入只有一个文件,那么只要 -D mapreduce.job.maps > 1,配置大多数会生效。

     

  • 相关阅读:
    C# if else 使物体在X轴循环移动
    Jmeter之csv、用户自定义变量以及Query Type分析
    jmeter实战之笔记整理
    Badboy参数化
    Jmeter之Badboy录制脚本及简化脚本http请求
    jmeter之jdbc请求
    性能测试
    接口自动化学习--testNG
    接口自动化学习--mock
    Git工具使用小结
  • 原文地址:https://www.cnblogs.com/PigeonNoir/p/9229611.html
Copyright © 2011-2022 走看看