/** * * @param job 配置信息 * @param split split * @param recordDelimiter 分列的字符 * @throws IOException */ public LineRecordReader(Configuration job, FileSplit split, byte[] recordDelimiter) throws IOException { //如果没有配置每一行的最大长度,那么每行默认的最大长度为Integer类型的最大值 this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); //默认io.compression.codecs gzip compressionCodecs = new CompressionCodecFactory(job); codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); if (isCompressedInput()) { decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, recordDelimiter); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); filePosition = fileIn; } } else { //从打开的文件找到指定位置 fileIn.seek(start); in = new SplitLineReader(fileIn, job, recordDelimiter); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. //如果不是第一个dplit,那么将start设置成当前位置+第一行的长度。 //跨过第一行,因为第一行数据已经在上一个split中读取了 if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }