zoukankan      html  css  js  c++  java
  • 旧版API的TextInputFormat源码分析

    TextInputFormat类
    package org.apache.hadoop.mapred;
    
    import java.io.*;
    
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.*;
    
    /** An {@link InputFormat} for plain text files.  Files are broken into lines.
     * Either linefeed or carriage-return are used to signal end of line.  Keys are
     * the position in the file, and values are the line of text.. 
     */
    public class TextInputFormat extends FileInputFormat<LongWritable, Text>
      implements JobConfigurable {
    
      private CompressionCodecFactory compressionCodecs = null;  
      
      public void configure(JobConf conf) {
        compressionCodecs = new CompressionCodecFactory(conf); //获取所拥有的所有压缩器——工厂
      }
      
      protected boolean isSplitable(FileSystem fs, Path file) {
        return compressionCodecs.getCodec(file) == null; //根据后缀得到相应的压缩器 ,如果返回的是null,return false
      }
    
      public RecordReader<LongWritable, Text> getRecordReader(
                                              InputSplit genericSplit, JobConf job,
                                              Reporter reporter)
        throws IOException {
        
        reporter.setStatus(genericSplit.toString());
        return new LineRecordReader(job, (FileSplit) genericSplit);//进入定位记录边界和解析key/value阶段
      }
    }
    
    在TextInputFormat类中最后一句
    return new LineRecordReader(job, (FileSplit) genericSplit);
    进入LineRecordReader类里面,使用以下构造函数:
    public LineRecordReader(Configuration job, FileSplit split) throws IOException {
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                        Integer.MAX_VALUE);
        start = split.getStart();	//得到当前分片第一个字节在文件中的起始位置
        end = start + split.getLength();	//得到当前分片最后一个字节在文件中的位置
        final Path file = split.getPath();	//得到当前分片所在文件路径
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);//根据文件后缀得到相应的压缩器,注意有可能是对压缩文件的处理
    
        // open the file and seek to the start of the split
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        boolean skipFirstLine = false;
        if (codec != null) {	//如果是压缩文件,则存在相应的压缩器
        //codec.createInputStream(fileIn)从输入流fileIn读取出来的数据进行解压缩,从而获得一个CompressionInputStream,从底层的流读取未压缩的数据。
        	in = new LineReader(codec.createInputStream(fileIn), job);//调用LineReader的构造函数,构造LineReader对象in
          end = Long.MAX_VALUE;
        } else {//如果不是压缩文件,正常处理
          if (start != 0) {
            skipFirstLine = true;	
            --start;	//为什么--还没弄懂,跳过的第一行会是什么?
            fileIn.seek(start);//找到当前start的偏移位置,下一个read()函数从该偏移位置开始读取
          }
          in = new LineReader(fileIn, job);
        }
        if (skipFirstLine) {  // skip first line and re-establish "start".如果不是这个文件的第一条记录,跳过,value值给new Text(),start重新设置
          start += in.readLine(new Text(), 0,
                               (int)Math.min((long)Integer.MAX_VALUE, end - start));
        }
        this.pos = start;//更改当前类中的pos值,在next()函数中将会用到
      }
    LineRecordReader类里面另一个函数,将解析出的结果存入key/value:
      /** Read a line. */
      public synchronized boolean next(LongWritable key, Text value)
        throws IOException {
    
        while (pos < end) {//当前分片偏移小于当前分片最后一个字节在文件中的下一个偏移时(对于当前分片split,体现split是MR的处理单位),循环读取写入key/value对
          key.set(pos);
    
          int newSize = in.readLine(value, maxLineLength,
                                    Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                             maxLineLength));
          if (newSize == 0) {
            return false;
          }
          pos += newSize;
          if (newSize < maxLineLength) {
            return true;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
    
        return false;
      }
    

    LineReader中的构造函数,主要用于读取输入流中的一行,主要定位记录边界,并返回当前记录的字节数:

      /**
       * Read one line from the InputStream into the given Text.  A line
       * can be terminated by one of the following: '
    ' (LF) , '
    ' (CR),
       * or '
    ' (CR+LF).  EOF also terminates an otherwise unterminated
       * line.
       *
       * @param str the object to store the given line (without newline)
       * @param maxLineLength the maximum number of bytes to store into str;
       *  the rest of the line is silently discarded.
       * @param maxBytesToConsume the maximum number of bytes to consume
       *  in this call.  This is only a hint, because if the line cross
       *  this threshold, we allow it to happen.  It can overshoot
       *  potentially by as much as one buffer length.
       *
       * @return the number of bytes read including the (longest) newline
       * found.
       *
       * @throws IOException if the underlying stream throws
       */
      public int readLine(Text str, int maxLineLength,
                          int maxBytesToConsume) throws IOException {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
         */
        str.clear();
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
          int startPosn = bufferPosn; //starting from where we left off the last time
          if (bufferPosn >= bufferLength) {
            startPosn = bufferPosn = 0;
            if (prevCharCR)
              ++bytesConsumed; //account for CR from previous read
            bufferLength = in.read(buffer);
            if (bufferLength <= 0)
              break; // EOF
          }
          for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
            if (buffer[bufferPosn] == LF) {
              newlineLength = (prevCharCR) ? 2 : 1;
              ++bufferPosn; // at next invocation proceed from following byte
              break;
            }
            if (prevCharCR) { //CR + notLF, we are at notLF
              newlineLength = 1;
              break;
            }
            prevCharCR = (buffer[bufferPosn] == CR);
          }
          int readLength = bufferPosn - startPosn;
          if (prevCharCR && newlineLength == 0)
            --readLength; //CR at the end of the buffer
          bytesConsumed += readLength;
          int appendLength = readLength - newlineLength;
          if (appendLength > maxLineLength - txtLength) {
            appendLength = maxLineLength - txtLength;
          }
          if (appendLength > 0) {
            str.append(buffer, startPosn, appendLength);
            txtLength += appendLength;
          }
        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
    
        if (bytesConsumed > (long)Integer.MAX_VALUE)
          throw new IOException("Too many bytes before newline: " + bytesConsumed);    
        return (int)bytesConsumed;	//返回当前已读字节数
      }

    
    由上,还有两点说明:
    

    1.CompressionCodeFactory提供了getCodec()方法,从而将文件扩展名映射到相应的CompressionCodec,从getCodec方法接受一个Path对象,要想对从输入流读取而来的数据进行解压缩,则调用createInStream(InputStream in)方法,从而获得一个compressionInputStream,从而获得一个CompressionInputStream,从而从底层的流读取未压缩的数据;另一方面,CompressionCodec还提供压缩功能,如果想对一个正在被写入的输出流的数据进行压缩,我们可以使用createOutStream(OutputStream out)方法创建一个CompressionOutputStream,将其压缩格式写入底层的流。

    2.synchronized:把synchronized当作函数修饰符时,锁定的是调用这个同步方法对象。也就是说,当一个对象P1在不同的线程中执行这个同步方法时,他们之间会形成互斥,达到同步的效果。

  • 相关阅读:
    EXT2
    Ext.DomQuery

    spring.net+nhibernate
    网址
    一句代码就解决一个开发中常见的小bug....
    Swift 已经正式开源了! 你了解到了吗?
    [知识点随笔] UIView--transform 属性
    懒加载(延迟加载)之后,在使用数据过程中容易出现的bug
    java设计模式
  • 原文地址:https://www.cnblogs.com/eva_sj/p/3971167.html
Copyright © 2011-2022 走看看