zoukankan      html  css  js  c++  java
  • MapReduce获取分片数目

    问题

    MapReduce Application中mapper的数目和分片的数目是一样的,可是分片数目和什么有关呢?

    1. 默认情况下。分片和输入文件的分块数是相等的。也不全然相等,假设block size大小事128M,文件大小为128.1M,文件的block数目为2。可是application执行过程中。你会发现分片数目是1,而不是2,当中的机理,后面会分析
    2. 有的程序会设置map的数目,那么map数目是如何影响分片的数目的呢?
    3. 假设文件大小为0,是否会作为一个分片传给map任务?

    流程

    FileInputFormat.getSplits返回文件的分片数目,这部分将介绍其执行流程,后面将粘贴其源代码并给出凝视
    1. 通过listStatus()获取输入文件列表files,当中会遍历输入文件夹的子文件夹,并过滤掉部分文件。如文件_SUCCESS
    2. 获取全部的文件大小totalSIze
    3. goalSIze=totalSize/numMaps。

      numMaps是用户指定的map数目

    4. files中取出一个文件file
    5. 计算splitSize。splitSize=max(minSplitSize,min(file.blockSize,goalSize)),当中minSplitSize是同意的最小分片大小。默觉得1B
    6. 后面依据splitSize大小将file分片。在分片的时候,假设剩余的大小不大于splitSize*1.1,且大于0B的时候。会将该区域整个作为一个分片

      这样做是为了防止一个mapper处理的数据太小

    7. 将file的分片增加到splits中
    8. 返回4。直到将files遍历完
    9. 结束。返回splits

    源代码

     public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
    	  //获取输入文件列表files,当中会遍历输入文件夹的子文件夹,并过滤掉部分文件,如文件_SUCCESS
        FileStatus[] files = listStatus(job);
        
        // Save the number of input files for metrics/loadgen
        job.setLong(NUM_INPUT_FILES, files.length);
        long totalSize = 0;                           // compute total size
        for (FileStatus file: files) {                // check we have valid files
          if (file.isDirectory()) {
            throw new IOException("Not a file: "+ file.getPath());
          }
          totalSize += file.getLen();
        }
    
        /*
         * numSplits为设置的map数目
         * 期待的分片大小
         */
        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        /*
         * FileInputFormat.SPLIT_MINSIZE为參数值:mapreduce.input.fileinputformat.split.minsize,默觉得0
         * minSplitSize默觉得1
         */
         long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
        // generate splits
        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(fs, path)) {
              long blockSize = file.getBlockSize();
              /*
               * 计算分片的大小,每个文件都要计算一次
               *computeSplitSize的计算公式为 Math.max(minSize, Math.min(goalSize, blockSize));
               */
              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                String[] splitHosts = getSplitHosts(blkLocations,
                    length-bytesRemaining, splitSize, clusterMap);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    splitHosts));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                String[] splitHosts = getSplitHosts(blkLocations, length
                    - bytesRemaining, bytesRemaining, clusterMap);
                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                    splitHosts));
              }
            } else {
              String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
              splits.add(makeSplit(path, 0, length, splitHosts));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        LOG.debug("Total # of splits: " + splits.size());
        return splits.toArray(new FileSplit[splits.size()]);
      }

    总结

    看源代码还是非常实用的。非常多时候,博客或者书介绍的不是非常中肯,或者会有错误。看源代码就不会出现这些问题。





  • 相关阅读:
    阿里巴巴的云原生应用开源探索与实践
    Helm 3 发布 | 云原生生态周报 Vol. 27
    带你上手一款下载超 10 万次的 IDEA 插件
    最强CP!阿里云联手支付宝小程序如何助力双11?
    媲美5G的Wifi网速、“备战”资产一键领……揭秘双11小二背后的保障力量
    dubbo-go 的开发、设计与功能介绍
    饿了么交付中心语言栈转型总结
    数据一致性检测的应用场景与最佳实践
    2684亿!阿里CTO张建锋:不是任何一朵云都撑得住双11
    《DNS稳定保障系列3--快如闪电,域名解析秒级生效》
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8514629.html
Copyright © 2011-2022 走看看