zoukankan      html  css  js  c++  java
  • Hadoop TextInputFormat源码分析

    from:http://blog.csdn.net/lzm1340458776/article/details/42707047

    InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:

    (1).数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split。

    (2).为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,共Mapper使用。

    InputFormat抽象类中只有两个方法,分别对应上面两个功能,源码如下:

     1 public abstract class InputFormat<K, V> {
     2 
     3 
     4   public abstract 
     5     List<InputSplit> getSplits(JobContext context
     6                                ) throws IOException, InterruptedException;
     7   
     8 
     9   public abstract 
    10     RecordReader<K,V> createRecordReader(InputSplit split,
    11                                          TaskAttemptContext context
    12                                         ) throws IOException, 
    13                                                  InterruptedException;
    14 
    15 }

    getSplits()方法返回的是InputSplit类型的集合,InputSplit分片由两个特点:

    (1):逻辑分片即只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,它只记录一些元信息,比如起始位置、长度以及所在的节点列表等;

    (2):必须可序列化,分片信息要上传到HDFS文件系统,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。

    createRecordReader()方法返回的是RecordReader对象,该对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value对,会作为MapTask的map方法的输入。

    我们本节就一最常用的TextInputFormat为例讲解分片和读取分片数据。

    我们先来看看,TextInputFormat、FileInputFormat和InputFormat三者之间的关系,如下:

    public class TextInputFormat extends FileInputFormat;
    public abstract class FileInputFormat<K, V> extends InputFormat;
    public abstract class InputFormat。

    最顶的父类InputFormat只有两个未实现的抽象方法 getSplits()和createRecordReader();而FileInputFormat包含的方法比较多,如下图:

    我们在自己的MapReduce程序中设置输入目录就是调用这里面的方法。

    TextInputFormat这个类只有两个方法,源码如下:

    /** An {@link InputFormat} for plain text files.  Files are broken into lines.
     * Either linefeed or carriage-return are used to signal end of line.  Keys are
     * the position in the file, and values are the line of text.. */
    public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    
      @Override
      public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        return new LineRecordReader();
      }
    
      @Override
      protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = 
          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        if (null == codec) {
          return true;
        }
        return codec instanceof SplittableCompressionCodec;
      }
    
    }

    isSplitable方法表示是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。

    接下来,我们只关注那两个主要方法,首先来看:

    一:getSplits()方法,这个方法在FileInputFormat类中,它的子类一般只需要实现TextInputFormat中的两个方法而已,getSplits()方法代码如下:

    /** 
       * Generate the list of files and make them into FileSplits.
       */ 
      public List<InputSplit> getSplits(JobContext job
                                        ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);    //Long.MAX_VALUE
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus>files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          long length = file.getLen();    //整个文件的长度
          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
          if ((length != 0) && isSplitable(job, path)) {    //默认是true,但是如果是压缩的,则是false
            long blockSize = file.getBlockSize();        //64M,67108864B
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);    //计算split大小  Math.max(minSize, Math.min(maxSize, blockSize))
    
            long bytesRemaining = length;            
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                                       blkLocations[blkIndex].getHosts()));        //hosts是主机名,name是IP
              bytesRemaining -= splitSize;        //剩余块的大小
            }
            
            if (bytesRemaining != 0) {    //最后一个
              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                         blkLocations[blkLocations.length-1].getHosts()));
            }
          } else if (length != 0) {    //isSplitable(job, path)等于false
            splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
          } else {
            //Create empty hosts array for zero length files
            splits.add(new FileSplit(path, 0, length, new String[0]));
          }
        }
        
        // Save the number of input files in the job-conf
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    
        LOG.debug("Total # of splits: " + splits.size());
        return splits;
      }

    代码分析:

    (1):minSize=Math.max(getFormatMinSplitSize(),getMinSplitSize(job)):

    getFormatMinSplitSize()返回了一个固定值即1;getMinSplitSize(job)是获取在mapred-site.xml文件中"mapred.min.split.size"属性配置的值,默认是0。

    (2):maxSize=getMaxSplitSize(job):

    getMaxSplitSize(job)是获取"mapred.max.split.size"属性配置的值,默认是Long.MAX_VALUE,即Long类型的最大值(注:这个属性在mapred-site.xml文件中并没有,所以不推荐配置)。

    (3):接下来是遍历输入目录下所有文件的FileStatus信息列表。

    (4):然后对每一个文件获取它的目录、文件的长度、文件对应的所有块信息(可能有多个块,每个块对应3个副本);

    (5):然后如果文件长度不为0且支持分割(isSplitable方法等于true):获取block大小,默认是64MB,通过方法computeSplitSize(blockSize,minSize,maxSize)计算分片的大小,这个方法的源码返回Max.max(minSize,Math.min(maxSize,blockSize))。

    (6):将bytesRemaiining(剩余分片字节数)设置为整个文件的长度。如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit即当bytesRemaing/splitSize>SPLIT_SLOP(SPLIT_SLOP是固定值为1.1)时进入while循环执行getBlockIndex(blkLocations, length-bytesRemaining)获取block的索引,第二个参数就是这个block在整个文件中的偏移量,在循环中会从0越来越大,该方法代码如下:

     protected int getBlockIndex(BlockLocation[] blkLocations, 
                                  long offset) {
        for (int i = 0 ; i < blkLocations.length; i++) {
          // is the offset inside this block?
          if ((blkLocations[i].getOffset() <= offset) &&
              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
            return i;
          }
        }
        BlockLocation last = blkLocations[blkLocations.length -1];
        long fileLength = last.getOffset() + last.getLength() -1;
        throw new IllegalArgumentException("Offset " + offset + 
                                           " is outside of file (0.." +
                                           fileLength + ")");
      }

    这个方法中的if语句的条件会限制获取到这个偏移量对应的block的索引。

    (7)将这个索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits中。

    (8)bytesRemaining -= splitSize是修改剩余字节大小。

    (9)判断循环是否继续,如果满足条件则继续执行循环条件,否则跳出循环。

    (10)跳出循环之后如果剩余bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block加入到splits集合中。

    (11)自此,我们已经走完了getSplits()方法中的第一个if条件,下面说第二个if条件,当不允许分割即isSplitable==false,则将第一个block、文件目录、开始位置0,长度为整个文件的长度封装到一个InputSplit中,加入到splits中。

    (12)执行else条件即当文件的长度==0时,则会splits.add(new FileSplit(path,0,length,new String[0]))没有block,并且初始和长度都为0。

    (13)将输入目录下文件的个数赋值给"mapreduce.input.num.files",方便以后的校对。

    (14) 返回分片信息splits。

    以上就是getSplits获取分片的过程,当使用基于FileInputFormat继承InputFormat时,为了提高MapTask的数据本地性,应尽量使InputSplit大小与block大小相同,因为当一个分片包含多个block的时候,总会从其他节点读取数据,也就是做不到所有的计算都是本地化,为了发挥计算本地化性能,应该尽量使InputSplit大小和块大小相当。

    特殊问题:就是如果分片大小超过block大小,但是InputSplit中封装了单个block的所在主机信息啊,这样能读取多个block数据吗?这个问题留到最后讲解。

    二:createRecordReader()方法,该方法返回一个RecordReader对象,实现了类似的迭代功能,将某个InputSplit解析成一个个key/value对。RecordReader应该注意两点:

    (1):定位记录边界:为了能识别一条完整的记录,应该添加一些同步标示,TextInputFormat的标示是换行符;SequenceFileInputFormat的标示是每隔若干条记录会添加固定长度的同步字符串。为了解决InputSplit中第一条或者最后一条可能跨InputSplit的情况,RecordReader规定每个InputSplit的第一条不完整记录划给前一个InputSplit。

    (2):解析key/value:将每个记录分解成key和value两个部分,TextInputFormat每一行的内容是value,该行在整个文件中的偏移量为key;SequenceFileInputFormat的记录共有四个字段组成:前两个字段分别是整个记录的长度和key的长度,均为4字节,后两个字段分别是key和value的内容。

    TextInputFormat使用的RecordReader是org.apache.hadoop.mapreduce.lib.input.LineRecordReader。这个类中的方法:首先是initialize()方法,该方法主要是获取分片信息的初始化位置和结束位置,以及输入流(若没有则是压缩流);mapper的key/value是通过LineRecordReader.nextKeyValue()方法将key和value读取到key和value中的,在这个方法中key被设置为在文件中的偏移量,value通过LineReader.readLine(value,maxLineLength,Math.max((int)Math.min(Integer.Max_VALUE,end-pos),maxLineLength))这个方法读取一行数据放入value中,方法代码如下:

    /**
       * Read one line from the InputStream into the given Text.  A line
       * can be terminated by one of the following: '
    ' (LF) , '
    ' (CR),
       * or '
    ' (CR+LF).  EOF also terminates an otherwise unterminated
       * line.
       *
       * @param str the object to store the given line (without newline)
       * @param maxLineLength the maximum number of bytes to store into str;
       *  the rest of the line is silently discarded.
       * @param maxBytesToConsume the maximum number of bytes to consume
       *  in this call.  This is only a hint, because if the line cross
       *  this threshold, we allow it to happen.  It can overshoot
       *  potentially by as much as one buffer length.
       *
       * @return the number of bytes read including the (longest) newline
       * found.
       *
       * @throws IOException if the underlying stream throws
       */
      public int readLine(Text str, int maxLineLength,
                          int maxBytesToConsume) throws IOException {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
         */
        str.clear();
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
          int startPosn = bufferPosn; //starting from where we left off the last time
          if (bufferPosn >= bufferLength) {
            startPosn = bufferPosn = 0;
            if (prevCharCR)
              ++bytesConsumed; //account for CR from previous read
            bufferLength = in.read(buffer);    //从输入流中读取一定数量的字节,并将其存储在缓冲区数组 b 中。以整数形式返回实际读取的字节数。
            if (bufferLength <= 0)        //结束了,没数据了
              break; // EOF
          }
          //'
    ',ASCII码:10,意义:换行NL;;;;'
    ' ,ASCII码:13,意义: 回车CR
          for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
            if (buffer[bufferPosn] == LF) {            //如果是换行字符
    
              newlineLength = (prevCharCR) ? 2 : 1;
              ++bufferPosn; // at next invocation proceed from following byte,越过换行字符
              break;
            }
            if (prevCharCR) { //CR + notLF, we are at notLF,如果是回车字符
    
              newlineLength = 1;
              break;
            }
            prevCharCR = (buffer[bufferPosn] == CR);
          }
          int readLength = bufferPosn - startPosn;
          if (prevCharCR && newlineLength == 0)    //表示还没遇到换行,有回车字符,且缓存最后一个是
    
            --readLength; //CR at the end of the buffer
          bytesConsumed += readLength;
          int appendLength = readLength - newlineLength;    //newlineLength换行符个数
          if (appendLength > maxLineLength - txtLength) {
            appendLength = maxLineLength - txtLength;
          }
          if (appendLength > 0) {
            str.append(buffer, startPosn, appendLength);    //将数据加入str
            txtLength += appendLength;
          }
        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);//循环条件没有换行并且没超过上限
    
        if (bytesConsumed > (long)Integer.MAX_VALUE)
          throw new IOException("Too many bytes before newline: " + bytesConsumed);    
        return (int)bytesConsumed;
      }

    这个方法的目的就是读取一行记录写入str中,bytesConsumed记录为读取的字节总数;bufferLength=in.read(buffer)从输入流读取bufferLength长度的字节数据放入buffer中;do-while中开始部分的if语句要保证将bufferLength个字节数据处理完毕之后再从输入流中读取下一行数据;newlineLength表示换行的标记符长度(0、1、2三种值),因为不同的系统换行标记可能不同,有三种: (回车符)、 (换行符)、 ( :Unix系统换行符末结束符; :window系统行末结束符; :MAC OS系统行末结束符)。

    接下来是进入for循环,for循环会挨个检查字符是否是 ,如果是回车符 ,还会将prevCharCR设置为true,当前字符如果是换行符 ,prevCharR==true时(表示上一个字符是回车符)则newlineLength=2(这表明当前系统的换行标记是 ),prevcharCR==false(表示上一个字符不是回车符)则newlineLength=1(这表明当前系统的换行标记是 ),并退出for循环;如果当前字符不是换行符 且prevCharCR==true(表明当前系统的换行标记是 )则newlineLength=1并退出for循环;这样就找到了换行标记,然后计算数据的长度appendLength(不包含换行符),将buffer中指定位置开始长度为appendLength的数据追加到str(这里其实就是value)中;txtLength表示的是str(这里其实是value中值的长度)。

    do-while循环的条件是:

    (1):没有发现换行标记即newlineLength==0;

    (2):读取的字节数量没有超过上限即bytesconsumed <maxBytesToConsume。

    当这两个条件要同时满足。这其中有个问题就是当前系统的换行标记是 ,但是这两个字符没有同时出现在这次读取的数据之中, 在下一个批次中,这没有关系,上面的for循环会检查 出现之后的下一个字符是否是 再对newliineLength进行设置的。从这个方法可以看出,即使是记录跨split、跨block也不能阻止它完整读取一行数据的决心啊。

    我们再回来看看LineRecordReader.nextKeyValue()方法,这个方法的代码如下:

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
          value = new Text();
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        while (getFilePosition() <= end) {
          newSize = in.readLine(value, maxLineLength,
              Math.max(maxBytesToConsume(pos), maxLineLength));
          if (newSize == 0) {
            break;
          }
          pos += newSize;
          if (newSize < maxLineLength) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }

    这个方法会控制split读取数据的结束位置,上面的readLine()方法只关注输入流不会管split的大小的。需要注意的是其中的while循环,其中的pos和end表示当前在文件中的偏移量和split的结束位置,即使这个split的最后一行跨split也会完整的读取一行。也就保证了一个记录的完整性。mapper获取key/value会通过getCurrentKey()和getCurrentValue()来达到。但是调用这两个方法前得先调用nextKeyValue()方法才能实现key和value的赋值。

    到这里我们回头看看上面那个特殊问题,就是split的大小超过block的大小数据读取的问题,我们前面已经讲过split是逻辑分片,不是物理分片,当MapTask的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能还在本机也可能不再本机,如果是后者的话就要从别的节点拉数据过来,因为实际数据是一个输入流,这个输入流面向整个问加你,不受什么block啊、split的影响,split越大可能需要从别的节点拉的数据也越大,从而效率也会越慢,拉数据的多少是由getSplits()方法中splitSize决定的。

    至此,TextInputFormat的分片和数据读取过程就讲完了。这只是一个例子,其他InputFormat可以参考这个。

  • 相关阅读:
    【LeetCode】Validate Binary Search Tree
    【LeetCode】Search in Rotated Sorted Array II(转)
    【LeetCode】Search in Rotated Sorted Array
    【LeetCode】Set Matrix Zeroes
    【LeetCode】Sqrt(x) (转载)
    【LeetCode】Integer to Roman
    贪心算法
    【LeetCode】Best Time to Buy and Sell Stock III
    【LeetCode】Best Time to Buy and Sell Stock II
    CentOS 6 上安装 pip、setuptools
  • 原文地址:https://www.cnblogs.com/wq920/p/6114040.html
Copyright © 2011-2022 走看看