zoukankan      html  css  js  c++  java
  • FileInputFormat 的实现之TextInputFormat

    说明

    TextInputFormat默认是按行切分记录record,本篇在于理解,对于同一条记录record,如果被切分在不同的split时是怎么处理的。首先getSplits是在逻辑上划分,并没有物理切分,也就是只是记录每个split从文件的个位置读到哪个位置,文件还是一个整体。所以在LineRecordReader中,它的处理方式是每个split多读一行,也就是读到下一个split的第一行。然后除了每个文件的第一个split,其他split都跳过第一行,进而避免重复读取,这种方式去处理。
    

    FileInputFomat 之 getSplits

    TextInputFormat 继承TextInputFormat,并没有重写getSplits,而是沿用父类的getSplits方法,下面看下该方法的源码
    
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        //getFormatMinSplitSize() == 1,getMinSplitSize(job)为用户设置的切片最小值,默认1。 job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // getMaxSplitSize(job)为用户设置的切片最大值,context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", Long.MAX_VALUE);
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            //LocatedFileStatus带有blockLocation信息
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //判断文件是否可切分
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              //真正的切片设置大小判断,computeSplitSize方法中的实现,返回值 Math.max(minSize, Math.min(maxSize, blockSize));
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              if (LOG.isDebugEnabled()) {
                // Log only if the file is big enough to be splitted
                if (length > Math.min(file.getBlockSize(), minSize)) {
                  LOG.debug("File is not splittable so no parallelization "
                      + "is possible: " + file.getPath());
                }
              }
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
      }
    

    FileInputFomat 之 createRecordReader,主要是看LineRecordReader

    public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        //设置record的分隔符
        String delimiter = context.getConfiguration().get(
            "textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        return new LineRecordReader(recordDelimiterBytes);
      }
    

    LineRecordReader的方法initialize和nextKeyValue方法

    public void initialize(InputSplit genericSplit,
                             TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        
        //判断是否压缩,赋值对应的SplitLineReader
        CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
        if (null!=codec) {
          isCompressedInput = true;	
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job,
                this.recordDelimiterBytes);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn;
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, this.recordDelimiterBytes);
            filePosition = fileIn;
          }
        } else {
          fileIn.seek(start);
          in = new UncompressedSplitLineReader(
              fileIn, job, this.recordDelimiterBytes, split.getLength());
          filePosition = fileIn;
        }
        //这句是关键,由于getSplits的时候,并不能保证一条record记录,不被切分到不同的split。所以处理方式是,除了每个文件的第一个split,其他每个split多读一行
        //所以避免重复读,不是开始的split都跳过第一行。
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
    

    接下来是nextKeyValue

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
          value = new Text();
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        //这个in具体看是CompressedSplitLineReader还是UncompressedSplitLineReader,重写了其中的readerLine方法
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          if (pos == 0) {
            //跳过utf的开头
            newSize = skipUtfByteOrderMark();
          } else {
            //readerLine有两种实现方法,一种readCustomLine这种是自己定义了record的分隔符,还有一种是readDefaultLine,这种是没有自定义分隔符,默认的读取数据的方式,用
    ,
    或者
    分割
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }
    
  • 相关阅读:
    WIN7右下角的声音图标不见了
    无法解决 equal to 运算中 "Chinese_PRC_BIN" 和 "Chinese_PRC_CI_AS" 之间的排序规则冲突
    查看表空间信息SQL集合
    Oracle分区表
    Oracle数据库的创建、数据导入导出
    Oracle查询出最最近一次的一条记录
    adb命令
    synergy在Windows和ubuntu 多台PC共享一套键盘鼠标
    git add 之后因为没提交正确文件需要撤销
    make clean-kernel && make kernel
  • 原文地址:https://www.cnblogs.com/hts-technology/p/11266130.html
Copyright © 2011-2022 走看看