zoukankan      html  css  js  c++  java
  • hadoop LineRecordReader

    /**
       *
       * @param job  配置信息
       * @param split  split
       * @param recordDelimiter   分列的字符
       * @throws IOException
       */
      public LineRecordReader(Configuration job, FileSplit split,
          byte[] recordDelimiter) throws IOException {
        //如果没有配置每一行的最大长度,那么每行默认的最大长度为Integer类型的最大值
        this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
          LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        //默认io.compression.codecs  gzip
        compressionCodecs = new CompressionCodecFactory(job);
        codec = compressionCodecs.getCodec(file);
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        if (isCompressedInput()) {
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn; // take pos from compressed stream
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, recordDelimiter);
            filePosition = fileIn;
          }
        } else {
          //从打开的文件找到指定位置
          fileIn.seek(start);
          in = new SplitLineReader(fileIn, job, recordDelimiter);
          filePosition = fileIn;
        }
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        //如果不是第一个dplit,那么将start设置成当前位置+第一行的长度。
        //跨过第一行,因为第一行数据已经在上一个split中读取了
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
  • 相关阅读:
    UDP:用户数据报协议(User Datagram Protocol)
    线程池的使用
    SQL Server表和模式空间使用情况http://www.51myit.com/thread2466911.html
    bytetobmp and bmptobyte(Image)
    c# TCP例子转载
    POJ 4047Garden
    NYOJ 102 次方求模
    Sum
    POJ 1094 Sorting It All Out(经典拓扑,唯一排序)
    POJ 2387 Til the Cows Come Home(Dijkstra)
  • 原文地址:https://www.cnblogs.com/fantiantian/p/9346309.html
Copyright © 2011-2022 走看看