zoukankan      html  css  js  c++  java
  • hadoop LineRecordReader

    /**
       *
       * @param job  配置信息
       * @param split  split
       * @param recordDelimiter   分列的字符
       * @throws IOException
       */
      public LineRecordReader(Configuration job, FileSplit split,
          byte[] recordDelimiter) throws IOException {
        //如果没有配置每一行的最大长度,那么每行默认的最大长度为Integer类型的最大值
        this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
          LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        //默认io.compression.codecs  gzip
        compressionCodecs = new CompressionCodecFactory(job);
        codec = compressionCodecs.getCodec(file);
    
        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        if (isCompressedInput()) {
          decompressor = CodecPool.getDecompressor(codec);
          if (codec instanceof SplittableCompressionCodec) {
            final SplitCompressionInputStream cIn =
              ((SplittableCompressionCodec)codec).createInputStream(
                fileIn, decompressor, start, end,
                SplittableCompressionCodec.READ_MODE.BYBLOCK);
            in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
            start = cIn.getAdjustedStart();
            end = cIn.getAdjustedEnd();
            filePosition = cIn; // take pos from compressed stream
          } else {
            in = new SplitLineReader(codec.createInputStream(fileIn,
                decompressor), job, recordDelimiter);
            filePosition = fileIn;
          }
        } else {
          //从打开的文件找到指定位置
          fileIn.seek(start);
          in = new SplitLineReader(fileIn, job, recordDelimiter);
          filePosition = fileIn;
        }
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        //如果不是第一个dplit,那么将start设置成当前位置+第一行的长度。
        //跨过第一行,因为第一行数据已经在上一个split中读取了
        if (start != 0) {
          start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
      }
  • 相关阅读:
    谈谈架构层级的“开闭原则”
    将MySQL数据库中的表结构导入excel 或word
    淘宝网-软件质量属性分析
    架构漫谈阅读有感
    机器学习-分类算法之决策树、随机森林
    机器学习-分类算法之逻辑回归
    机器学习-朴素贝叶斯算法
    机器学习-分类算法之k-近邻
    机器学习-模型选择
    机器学习-scikit-learn数据集
  • 原文地址:https://www.cnblogs.com/fantiantian/p/9346309.html
Copyright © 2011-2022 走看看