zoukankan      html  css  js  c++  java
  • Hadoop map任务数量的计算

    Hadoop中决定map个数的的因素有几个,由于版本的不同,决定因素也不一样,掌握这些因素对了解hadoop分片的划分有很大帮助,

    并且对优化hadoop性能也很有大的益处。

    旧API中getSplits方法:

     1 public InputSplit[] getSplits(JobConf job, int numSplits)
     2     throws IOException {
     3     FileStatus[] files = listStatus(job);
     4     
     5     // Save the number of input files in the job-conf
     6     job.setLong(NUM_INPUT_FILES, files.length);
     7     long totalSize = 0;                           // compute total size
     8     for (FileStatus file: files) {                // check we have valid files
     9       if (file.isDir()) {
    10         throw new IOException("Not a file: "+ file.getPath());
    11       }
    12       totalSize += file.getLen();
    13     }
    14 
    15     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    16     long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
    17                             minSplitSize);
    18 
    19     // generate splits
    20     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    21     NetworkTopology clusterMap = new NetworkTopology();
    22     for (FileStatus file: files) {
    23       Path path = file.getPath();
    24       FileSystem fs = path.getFileSystem(job);
    25       long length = file.getLen();
    26       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    27       if ((length != 0) && isSplitable(fs, path)) { 
    28         long blockSize = file.getBlockSize();
    29         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    30 
    31         long bytesRemaining = length;
    32         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    33           String[] splitHosts = getSplitHosts(blkLocations, 
    34               length-bytesRemaining, splitSize, clusterMap);
    35           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
    36               splitHosts));
    37           bytesRemaining -= splitSize;
    38         }
    39         
    40         if (bytesRemaining != 0) {
    41           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
    42                      blkLocations[blkLocations.length-1].getHosts()));
    43         }
    44       } else if (length != 0) {
    45         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
    46         splits.add(new FileSplit(path, 0, length, splitHosts));
    47       } else { 
    48         //Create empty hosts array for zero length files
    49         splits.add(new FileSplit(path, 0, length, new String[0]));
    50       }
    51     }
    52     LOG.debug("Total # of splits: " + splits.size());
    53     return splits.toArray(new FileSplit[splits.size()]);
    54   }
    55 
    56   protected long computeSplitSize(long goalSize, long minSize,
    57                                        long blockSize) {
    58     return Math.max(minSize, Math.min(goalSize, blockSize));
    59   }
    View Code

    新API中getSplits方法:

     1 public List<InputSplit> getSplits(JobContext job
     2                                     ) throws IOException {
     3     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     4     long maxSize = getMaxSplitSize(job);
     5 
     6     // generate splits
     7     List<InputSplit> splits = new ArrayList<InputSplit>();
     8     List<FileStatus>files = listStatus(job);
     9     for (FileStatus file: files) {
    10       Path path = file.getPath();
    11       FileSystem fs = path.getFileSystem(job.getConfiguration());
    12       long length = file.getLen();
    13       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    14       if ((length != 0) && isSplitable(job, path)) { 
    15         long blockSize = file.getBlockSize();
    16         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    17 
    18         long bytesRemaining = length;
    19         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    20           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    21           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
    22                                    blkLocations[blkIndex].getHosts()));
    23           bytesRemaining -= splitSize;
    24         }
    25         
    26         if (bytesRemaining != 0) {
    27           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
    28                      blkLocations[blkLocations.length-1].getHosts()));
    29         }
    30       } else if (length != 0) {
    31         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(new FileSplit(path, 0, length, new String[0]));
    35       }
    36     }
    37     
    38     // Save the number of input files in the job-conf
    39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    40 
    41     LOG.debug("Total # of splits: " + splits.size());
    42     return splits;
    43   }
    44 
    45   protected long computeSplitSize(long blockSize, long minSize,
    46                                   long maxSize) {
    47     return Math.max(minSize, Math.min(maxSize, blockSize));
    48   }
    View Code

     测试一个输入文件大小为:0.52 KB 日志如下:

    new :
    blockSize:67108864 minSize:1 maxSize:9223372036854775807
    splitSize:67108864

    决定因素为 blockSize的大小.这个很容易理解

    old:
    blockSize:67108864 totalSize:529 numSplits:2 goalSize:264 minSplitSize:1 minSize:1
    splitSize:264

    numSplits为2,这个是在调用getSplits中传入的,这个地方要注意,经过查找发现这个参数为job.getNumMapTasks()的值如下

    JobConf: public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }

    mapred-default.xml中:

    <property>
    <name>mapred.map.tasks</name>
    <value>2</value>
    <description>The default number of map tasks per job.
    Ignored when mapred.job.tracker is "local".
    </description>
    </property>

    所以使用旧的API编写的MP程序,会产生2个map,而使用新的API则会产生1个map.

  • 相关阅读:
    MySQL补充
    不同操作系统下虚拟环境的搭建
    Python导学基础(二)变量与基础数据类型
    Python导学基础(一)介绍
    KM 算法
    题解-CF1065E Side Transmutations
    题解-CF1140E Palindrome-less Arrays
    题解-CF677D Vanya and Treasure
    splay文艺平衡树
    splay区间操作(bzoj1500)
  • 原文地址:https://www.cnblogs.com/lvfeilong/p/23849jffdslkfjd.html
Copyright © 2011-2022 走看看