zoukankan      html  css  js  c++  java
  • hadoop InputFormat getSplits

     /** Splits files returned by {@link #listStatus(JobConf)} when
       * they're too big.*/ 
      public InputSplit[] getSplits(JobConf job, int numSplits)
        throws IOException {
        //计时器,
        StopWatch sw = new StopWatch().start();
        //
        FileStatus[] files = listStatus(job);
        
        // Save the number of input files for metrics/loadgen
        //设置配置中文件个数mapreduce.input.fileinputformat.numinputfiles
        job.setLong(NUM_INPUT_FILES, files.length);
        // 计算所有文件的大小总和
        long totalSize = 0;                           // compute total size
        for (FileStatus file: files) {                // check we have valid files
          if (file.isDirectory()) {
            throw new IOException("Not a file: "+ file.getPath());
          }
          totalSize += file.getLen();
        }
        // 每个split目标大小,用总的文件大小 / (max(设置的split个数,1)),
        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
        // 每个split大小的最小值,读取mapreduce.input.fileinputformat.split.minsize配置,如果没有配置的话那么
        // 取minSplitSize =1
        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
    
        // 生成 splits
        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        //遍历文件列表
        for (FileStatus file: files) {
          //获取一个文件路径
          Path path = file.getPath();
          //获取文件大小
          long length = file.getLen();
          if (length != 0) {
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            //判断file是否包含file的location,也就是,是否包含BlockLocation等信息,
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              //去构造BlockLocation信息
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //判断文件是否可以切分
            if (isSplitable(fs, path)) {
              //获取文件的BlockSize大小
              long blockSize = file.getBlockSize();
              //splitSize最终由 goalSize(设置的每个split大小的目标值),minSize(设置的每个split大小的最小值),blockSize(file的block数量)三个值所决定,逻辑关系如下:
              // Math.max(minSize, Math.min(goalSize, blockSize))
              // Math.max(minSize, Math.min((totalSize / (numSplits == 0 ? 1 : numSplits)), blockSize))
              // numSplits这个设置,只有在totalSize/numSplits < blockSize才会生效
              // minSize 只有在大于blockSize的时候才会生效
              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
              //文件为读取长度
              long bytesRemaining = length;
              //如果剩余的大小/split的大小大雨1.1,那么就商城生成一个split
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                    length-bytesRemaining, splitSize, clusterMap);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    splitHosts[0], splitHosts[1]));
                bytesRemaining -= splitSize;
              }
              //剩余的一点点数据也要生成一个split,
              if (bytesRemaining != 0) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                    - bytesRemaining, bytesRemaining, clusterMap);
                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                    splitHosts[0], splitHosts[1]));
              }
            } else {
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
              splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits.toArray(new FileSplit[splits.size()]);
      }
  • 相关阅读:
    域名、主机名、网站名以及 URL 基础概念
    c# oracle 数据库连接以及参数化查询
    c# 委托(Func、Action)
    xcode pod install 安装失败,提示缺少文件
    一个服务器的IIS只能绑定一个HTTPS也就是443端口
    APP UI设计及切图规范
    Day7 字符串和常用数据结构
    Day6 函数和模块的使用
    Day5 练习
    python 疑难杂症
  • 原文地址:https://www.cnblogs.com/fantiantian/p/9346782.html
Copyright © 2011-2022 走看看