zoukankan      html  css  js  c++  java
  • FileInputFormat

    MapReduce框架要处理数据的文件类型 FileInputFormat这个类决定。
    TextInputFormat是框架默认的文件类型,可以处理Text文件类型,如果你要处理的文件类型不是Text,
    譬如说是Xml或DB,你就需要自己实现或用库中已有的类型。
    FileInputFormat的主要方法之一getSplits完成的功能是获取job要处理的路径文件所在的block信息。
    数据结构:FileInputSplit 存储了文件的位置信息,如Host,所属文件信息,开始offset,还有长度信息。
    public class FileSplit extends InputSplit implements Writable {
      private Path file;
      private long start;
      private long length;
      private String[] hosts;
      private SplitLocationInfo[] hostInfos;
    …
    }
    方法介绍:
    blockSize:块大小
    minSize:最小分片大小,由参数mapred.min.split.size设置,默认为1
    maxSize:最大分片大小,由参数mapred.max.split.size设置,默认Long.MAX-VALUE
    计算splitsize的方法:Math.max(minSize,Math.min(maxSize,blockSize)
    FileInputFormat的另一个重要方法是CreateRecordReader.在这个方法里面会用到前面方法所获取到的InpustSplit.这个RecordReader会用来去读取数据,传递给maptask去执行处理。

    当InputSplit尺寸大于block并且其对应的所有block(包含副本)不在同一个节点上时,Map Task不可能完全实现数据的本地化,

    也就是说,总有一部分数据需要从远程节点上读取,因此得出,当使用基于FileInputFormat实现InputFormat时,为了提高数据本地性,应该尽量使InputSplit大小与block大小一致。

    因为不同的文件,在上传的时候可以具体指定blocksize,若不指定则使用系统默认的blocksize,所以在代码中它使用的是file.getblocksize().

    若文件的blocksize是32M,我们的文件是70M,而且文件是可以切分的,则系统是如何分片的呢?(根据源代码进行分析)

    如果我们的minsize=1,maxsize=128,则计算得到的splitsize=32M,每一个block一个inputsplit.

    如果我们的minsize=64,maxsize=128,则计算得到的splitsize=64M, 但因为不满足70/64>1.1的情况,所以还是只会分成一个fileinputsplit,这一个inputsplit包含了两个block的信息。

    试想一下,如果还拆分成两个inputsplit让两个map task去做,第二个maptask只获取一点点的数据,利用率不高。

    若我们的文件是xml文件类型,不管我们的文件是多大,都只能分给一个InputSplit去处理,因为它的isSplitable=false,xml不能切开处理,那样数据就会乱掉。

    /** 
       * Generate the list of files and make them into FileSplits.
       * @param job the job context
       * @throws IOException
       */
      public List<InputSplit> getSplits(JobContext job) throws IOException {
        Stopwatch sw = new Stopwatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.elapsedMillis());
        }
        return splits;
      }
    Looking for a job working at Home about MSBI
  • 相关阅读:
    hdu1430 魔板(康拓展开 bfs预处理)
    网络流EdmondsKarp算法模板理解
    poj3020 建信号塔(匈牙利算法 最小覆盖边集)
    bzoj 2465 小球
    bzoj 1822 冷冻波
    bzoj 1040 骑士
    Codeforces Round #460 (Div. 2)
    bzoj 1072 排列perm
    Codeforces Round #459 (Div. 2)
    bzoj 1087 互不侵犯King
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/4297178.html
Copyright © 2011-2022 走看看