zoukankan      html  css  js  c++  java
  • FileInputFormat 的实现之TextInputFormat

    说明

    TextInputFormat默认是按行切分记录record,本篇在于理解,对于同一条记录record,如果被切分在不同的split时是怎么处理的。首先getSplits是在逻辑上划分,并没有物理切分,也就是只是记录每个split从文件的个位置读到哪个位置,文件还是一个整体。所以在LineRecordReader中,它的处理方式是每个split多读一行,也就是读到下一个split的第一行。然后除了每个文件的第一个split,其他split都跳过第一行,进而避免重复读取,这种方式去处理。
    

    FileInputFomat 之 getSplits

    TextInputFormat 继承TextInputFormat,并没有重写getSplits,而是沿用父类的getSplits方法,下面看下该方法的源码
    
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        //getFormatMinSplitSize() == 1,getMinSplitSize(job)为用户设置的切片最小值,默认1。 job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // getMaxSplitSize(job)为用户设置的切片最大值,context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", Long.MAX_VALUE);
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            //LocatedFileStatus带有blockLocation信息
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            //判断文件是否可切分
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              //真正的切片设置大小判断,computeSplitSize方法中的实现,返回值 Math.max(minSize, Math.min(maxSize, blockSize));
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              if (LOG.isDebugEnabled()) {
                // Log only if the file is big enough to be splitted
                if (length > Math.min(file.getBlockSize(), minSize)) {
                  LOG.debug("File is not splittable so no parallelization "
                      + "is possible: " + file.getPath());
                }
              }
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
      }
    

    FileInputFomat 之 createRecordReader,主要是看LineRecordReader

    public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        //设置record的分隔符
        String delimiter = context.getConfiguration().get(
            "textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        return new LineRecordReader(recordDelimiterBytes);
      }
    

    LineRecordReader的方法initialize和nextKeyValue方法

    public void initialize(InputSplit genericSplit,
                             TaskAttemptContext context) throws IOException {
        FileSplit split = (FileSplit) genericSplit;
        Configuration job = context.getConfiguration();
        this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        
        //判断是否压缩,赋值对应的SplitLineReader
        CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
        if (null!=codec) {
          isCompressedInput = true;	
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job,
                this.recordDelimiterBytes);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn;
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, this.recordDelimiterBytes);
            filePosition = fileIn;
          }
        } else {
          fileIn.seek(start);
          in = new UncompressedSplitLineReader(
              fileIn, job, this.recordDelimiterBytes, split.getLength());
          filePosition = fileIn;
        }
        //这句是关键,由于getSplits的时候,并不能保证一条record记录,不被切分到不同的split。所以处理方式是,除了每个文件的第一个split,其他每个split多读一行
        //所以避免重复读,不是开始的split都跳过第一行。
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
    

    接下来是nextKeyValue

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
          value = new Text();
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        //这个in具体看是CompressedSplitLineReader还是UncompressedSplitLineReader,重写了其中的readerLine方法
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          if (pos == 0) {
            //跳过utf的开头
            newSize = skipUtfByteOrderMark();
          } else {
            //readerLine有两种实现方法,一种readCustomLine这种是自己定义了record的分隔符,还有一种是readDefaultLine,这种是没有自定义分隔符,默认的读取数据的方式,用
    ,
    或者
    分割
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }
    
  • 相关阅读:
    jvm基本结构和解析
    多态的意思
    java中对象的简单解读
    double类型和int类型的区别
    python 解析xml文件
    win10不能映射Ubuntu共享文件
    Qt程序打包
    Ubuntu boot分区文件误删,系统无法启动,怎么解
    ubuntu Boot空间不够问题“The volume boot has only 5.1MB disk space remaining”
    Ubuntu 分辨率更改 xrandr Failed to get size of gamma for output default
  • 原文地址:https://www.cnblogs.com/hts-technology/p/11266130.html
Copyright © 2011-2022 走看看