     1 public abstract class InputFormat<K, V> {
     4   public abstract 
     5     List<InputSplit> getSplits(JobContext context
     6                                ) throws IOException, InterruptedException;
     9   public abstract 
    10     RecordReader<K,V> createRecordReader(InputSplit split,
    11                                          TaskAttemptContext context
    12                                         ) throws IOException, 
    13                                                  InterruptedException;
    15 }







    public class TextInputFormat extends FileInputFormat;
    public abstract class FileInputFormat<K, V> extends InputFormat;
    public abstract class InputFormat。

    最顶的父类InputFormat只有两个未实现的抽象方法 getSplits()和createRecordReader();而FileInputFormat包含的方法比较多,如下图:



    /** An {@link InputFormat} for plain text files.  Files are broken into lines.
     * Either linefeed or carriage-return are used to signal end of line.  Keys are
     * the position in the file, and values are the line of text.. */
    public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
      public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        return new LineRecordReader();
      protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = 
          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        if (null == codec) {
          return true;
        return codec instanceof SplittableCompressionCodec;




       * Generate the list of files and make them into FileSplits.
      public List<InputSplit> getSplits(JobContext job
                                        ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);    //Long.MAX_VALUE
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus>files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          long length = file.getLen();    //整个文件的长度
          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
          if ((length != 0) && isSplitable(job, path)) {    //默认是true,但是如果是压缩的,则是false
            long blockSize = file.getBlockSize();        //64M,67108864B
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);    //计算split大小  Math.max(minSize, Math.min(maxSize, blockSize))
            long bytesRemaining = length;            
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                                       blkLocations[blkIndex].getHosts()));        //hosts是主机名,name是IP
              bytesRemaining -= splitSize;        //剩余块的大小
            if (bytesRemaining != 0) {    //最后一个
              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
          } else if (length != 0) {    //isSplitable(job, path)等于false
            splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
          } else {
            //Create empty hosts array for zero length files
            splits.add(new FileSplit(path, 0, length, new String[0]));
        // Save the number of input files in the job-conf
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        LOG.debug("Total # of splits: " + splits.size());
        return splits;









    (6):将bytesRemaiining(剩余分片字节数)设置为整个文件的长度。如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit即当bytesRemaing/splitSize>SPLIT_SLOP(SPLIT_SLOP是固定值为1.1)时进入while循环执行getBlockIndex(blkLocations, length-bytesRemaining)获取block的索引,第二个参数就是这个block在整个文件中的偏移量,在循环中会从0越来越大,该方法代码如下:

     protected int getBlockIndex(BlockLocation[] blkLocations, 
                                  long offset) {
        for (int i = 0 ; i < blkLocations.length; i++) {
          // is the offset inside this block?
          if ((blkLocations[i].getOffset() <= offset) &&
              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
            return i;
        BlockLocation last = blkLocations[blkLocations.length -1];
        long fileLength = last.getOffset() + last.getLength() -1;
        throw new IllegalArgumentException("Offset " + offset + 
                                           " is outside of file (0.." +
                                           fileLength + ")");


    (7)将这个索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits中。

    (8)bytesRemaining -= splitSize是修改剩余字节大小。




    (12)执行else条件即当文件的长度==0时,则会splits.add(new FileSplit(path,0,length,new String[0]))没有block,并且初始和长度都为0。


    (14) 返回分片信息splits。







       * Read one line from the InputStream into the given Text.  A line
       * can be terminated by one of the following: '
    ' (LF) , '
    ' (CR),
       * or '
    ' (CR+LF).  EOF also terminates an otherwise unterminated
       * line.
       * @param str the object to store the given line (without newline)
       * @param maxLineLength the maximum number of bytes to store into str;
       *  the rest of the line is silently discarded.
       * @param maxBytesToConsume the maximum number of bytes to consume
       *  in this call.  This is only a hint, because if the line cross
       *  this threshold, we allow it to happen.  It can overshoot
       *  potentially by as much as one buffer length.
       * @return the number of bytes read including the (longest) newline
       * found.
       * @throws IOException if the underlying stream throws
      public int readLine(Text str, int maxLineLength,
                          int maxBytesToConsume) throws IOException {
        /* We're reading data from in, but the head of the stream may be
         * already buffered in buffer, so we have several cases:
         * 1. No newline characters are in the buffer, so we need to copy
         *    everything and read another buffer from the stream.
         * 2. An unambiguously terminated line is in buffer, so we just
         *    copy to str.
         * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
         *    in CR.  In this case we copy everything up to CR to str, but
         *    we also need to see what follows CR: if it's LF, then we
         *    need consume LF as well, so next call to readLine will read
         *    from after that.
         * We use a flag prevCharCR to signal if previous character was CR
         * and, if it happens to be at the end of the buffer, delay
         * consuming it until we have a chance to look at the char that
         * follows.
        int txtLength = 0; //tracks str.getLength(), as an optimization
        int newlineLength = 0; //length of terminating newline
        boolean prevCharCR = false; //true of prev char was CR
        long bytesConsumed = 0;
        do {
          int startPosn = bufferPosn; //starting from where we left off the last time
          if (bufferPosn >= bufferLength) {
            startPosn = bufferPosn = 0;
            if (prevCharCR)
              ++bytesConsumed; //account for CR from previous read
            bufferLength = in.read(buffer);    //从输入流中读取一定数量的字节,并将其存储在缓冲区数组 b 中。以整数形式返回实际读取的字节数。
            if (bufferLength <= 0)        //结束了,没数据了
              break; // EOF
    ' ,ASCII码:13,意义: 回车CR
          for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
            if (buffer[bufferPosn] == LF) {            //如果是换行字符
              newlineLength = (prevCharCR) ? 2 : 1;
              ++bufferPosn; // at next invocation proceed from following byte,越过换行字符
            if (prevCharCR) { //CR + notLF, we are at notLF,如果是回车字符
              newlineLength = 1;
            prevCharCR = (buffer[bufferPosn] == CR);
          int readLength = bufferPosn - startPosn;
          if (prevCharCR && newlineLength == 0)    //表示还没遇到换行,有回车字符,且缓存最后一个是
            --readLength; //CR at the end of the buffer
          bytesConsumed += readLength;
          int appendLength = readLength - newlineLength;    //newlineLength换行符个数
          if (appendLength > maxLineLength - txtLength) {
            appendLength = maxLineLength - txtLength;
          if (appendLength > 0) {
            str.append(buffer, startPosn, appendLength);    //将数据加入str
            txtLength += appendLength;
        } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);//循环条件没有换行并且没超过上限
        if (bytesConsumed > (long)Integer.MAX_VALUE)
          throw new IOException("Too many bytes before newline: " + bytesConsumed);    
        return (int)bytesConsumed;

    这个方法的目的就是读取一行记录写入str中,bytesConsumed记录为读取的字节总数;bufferLength=in.read(buffer)从输入流读取bufferLength长度的字节数据放入buffer中;do-while中开始部分的if语句要保证将bufferLength个字节数据处理完毕之后再从输入流中读取下一行数据;newlineLength表示换行的标记符长度(0、1、2三种值),因为不同的系统换行标记可能不同,有三种: (回车符)、 (换行符)、 ( :Unix系统换行符末结束符; :window系统行末结束符; :MAC OS系统行末结束符)。

    接下来是进入for循环,for循环会挨个检查字符是否是 ,如果是回车符 ,还会将prevCharCR设置为true,当前字符如果是换行符 ,prevCharR==true时(表示上一个字符是回车符)则newlineLength=2(这表明当前系统的换行标记是 ),prevcharCR==false(表示上一个字符不是回车符)则newlineLength=1(这表明当前系统的换行标记是 ),并退出for循环;如果当前字符不是换行符 且prevCharCR==true(表明当前系统的换行标记是 )则newlineLength=1并退出for循环;这样就找到了换行标记,然后计算数据的长度appendLength(不包含换行符),将buffer中指定位置开始长度为appendLength的数据追加到str(这里其实就是value)中;txtLength表示的是str(这里其实是value中值的长度)。



    (2):读取的字节数量没有超过上限即bytesconsumed <maxBytesToConsume。

    当这两个条件要同时满足。这其中有个问题就是当前系统的换行标记是 ,但是这两个字符没有同时出现在这次读取的数据之中, 在下一个批次中,这没有关系,上面的for循环会检查 出现之后的下一个字符是否是 再对newliineLength进行设置的。从这个方法可以看出,即使是记录跨split、跨block也不能阻止它完整读取一行数据的决心啊。


    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        if (value == null) {
          value = new Text();
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        while (getFilePosition() <= end) {
          newSize = in.readLine(value, maxLineLength,
              Math.max(maxBytesToConsume(pos), maxLineLength));
          if (newSize == 0) {
          pos += newSize;
          if (newSize < maxLineLength) {
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;




