zoukankan      html  css  js  c++  java
  • (一)MapReduce篇之InputFormat,InputSplit,RecordReader(转)

        平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

     

      不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。

    InputSplit:

      我们知道Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

    1. public abstract class InputSplit {  
    2.   public abstract long getLength() throws IOException, InterruptedException;  
    3.   
    4.   public abstract   
    5.     String[] getLocations() throws IOException, InterruptedException;  
    6. }  

      getLength()用来获取InputSplit的大小,以支持对InputSplits进行排序,而getLocations()则用来获取存储分片的位置列表。
      我们来看一个简单InputSplit子类:FileSplit。

    1. public class FileSplit extends InputSplit implements Writable {  
    2.   private Path file;  
    3.   private long start;  
    4.   private long length;  
    5.   private String[] hosts;  
    6.   
    7.   FileSplit() {}  
    8.   
    9.   public FileSplit(Path file, long start, long length, String[] hosts) {  
    10.     this.file = file;  
    11.     this.start = start;  
    12.     this.length = length;  
    13.     this.hosts = hosts;  
    14.   }  
    15.  //序列化、反序列化方法,获得hosts等等……  
    16. }  


      从上面的源码我们可以看到,一个FileSplit是由文件路径,分片开始位置,分片大小和存储分片数据的hosts列表组成,由这些信息我们就可以从输入文件中切分出提供给单个Mapper的输入数据。这些属性会在Constructor设置,我们在后面会看到这会在InputFormat的getSplits()中构造这些分片。

      我们再看CombineFileSplit:

    1. public class CombineFileSplit extends InputSplit implements Writable {  
    2.   
    3.   private Path[] paths;  
    4.   private long[] startoffset;  
    5.   private long[] lengths;  
    6.   private String[] locations;  
    7.   private long totLength;  
    8.   
    9.   public CombineFileSplit() {}  
    10.   public CombineFileSplit(Path[] files, long[] start,   
    11.                           long[] lengths, String[] locations) {  
    12.     initSplit(files, start, lengths, locations);  
    13.   }  
    14.   
    15.   public CombineFileSplit(Path[] files, long[] lengths) {  
    16.     long[] startoffset = new long[files.length];  
    17.     for (int i = 0; i < startoffset.length; i++) {  
    18.       startoffset[i] = 0;  
    19.     }  
    20.     String[] locations = new String[files.length];  
    21.     for (int i = 0; i < locations.length; i++) {  
    22.       locations[i] = "";  
    23.     }  
    24.     initSplit(files, startoffset, lengths, locations);  
    25.   }  
    26.     
    27.   private void initSplit(Path[] files, long[] start,   
    28.                          long[] lengths, String[] locations) {  
    29.     this.startoffset = start;  
    30.     this.lengths = lengths;  
    31.     this.paths = files;  
    32.     this.totLength = 0;  
    33.     this.locations = locations;  
    34.     for(long length : lengths) {  
    35.       totLength += length;  
    36.     }  
    37.   }  
    38.   //一些getter和setter方法,和序列化方法  
    39. }  


      与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和存储分片数据的host列表,由于CombineFileSplit是针对小文件的,它把很多小文件包在一个InputSplit内,这样一个Mapper就可以处理很多小文件。要知道我们上面的FileSplit是对应一个输入文件的,也就是说如果用FileSplit对应的FileInputFormat来作为输入格式,那么即使文件特别小,也是单独计算成一个输入分片来处理的。当我们的输入是由大量小文件组成的,就会导致有同样大量的InputSplit,从而需要同样大量的Mapper来处理,这将很慢,想想有一堆map task要运行!!这是不符合Hadoop的设计理念的,Hadoop是为处理大文件优化的。

      最后介绍TagInputSplit,这个类就是封装了一个InputSplit,然后加了一些tags在里面满足我们需要这些tags数据的情况,我们从下面就可以一目了然。

    1. class TaggedInputSplit extends InputSplit implements Configurable, Writable {  
    2.   
    3.   private Class<? extends InputSplit> inputSplitClass;  
    4.   
    5.   private InputSplit inputSplit;  
    6.   
    7.   @SuppressWarnings("unchecked")  
    8.   private Class<? extends InputFormat> inputFormatClass;  
    9.   
    10.   @SuppressWarnings("unchecked")  
    11.   private Class<? extends Mapper> mapperClass;  
    12.   
    13.   private Configuration conf;  
    14.   //getters and setters,序列化方法,getLocations()、getLength()等  
    15. }  

         现在我们对InputSplit的概念有了一些了解,我们继续看它是怎么被使用和计算出来的。

    InputFormat:

      通过使用InputFormat,MapReduce框架可以做到:

      1、验证作业的输入的正确性

      2、将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mapper task

      3、提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对供Mapper使用。

    1. public abstract class InputFormat<K, V> {  
    2.   
    3.   public abstract   
    4.     List<InputSplit> getSplits(JobContext context  
    5.                                ) throws IOException, InterruptedException;  
    6.     
    7.   public abstract   
    8.     RecordReader<K,V> createRecordReader(InputSplit split,  
    9.                                          TaskAttemptContext context  
    10.                                         ) throws IOException,   
    11.                                                  InterruptedException;  
    12.   
    13. }  

      上面是InputFormat的源码,getSplits用来获取由输入文件计算出来的InputSplits,我们在后面会看到计算InputSplits的时候会考虑到输入文件是否可分割、文件存储时分块的大小和文件大小等因素;而createRecordReader()提供了前面第三点所说的RecordReader的实现,以将K-V对从InputSplit中正确读出来,比如LineRecordReader就以偏移值为key,一行的数据为value,这就使得所有其createRecordReader()返回了LineRecordReader的InputFormat都是以偏移值为key,一行数据为value的形式读取输入分片的。

    FileInputFormat:

      PathFilter被用来进行文件筛选,这样我们就可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。

    1. public interface PathFilter {  
    2.   boolean accept(Path path);  
    3. }  

      FileInputFormat是InputFormat的子类,它包含了一个MultiPathFilter,这个MultiPathFilter由一个过滤隐藏文件(名字前缀为'-'或'.')的PathFilter和一些可能存在的用户自定义的PathFilters组成,MultiPathFilter会在listStatus()方法中使用,而listStatus()方法又被getSplits()方法用来获取输入文件,也就是说实现了在获取输入分片前先进行文件过滤。

    1. private static class MultiPathFilter implements PathFilter {  
    2.   private List<PathFilter> filters;  
    3.   
    4.   public MultiPathFilter(List<PathFilter> filters) {  
    5.     this.filters = filters;  
    6.   }  
    7.   
    8.   public boolean accept(Path path) {  
    9.     for (PathFilter filter : filters) {  
    10.       if (!filter.accept(path)) {  
    11.         return false;  
    12.       }  
    13.     }  
    14.     return true;  
    15.   }  
    16. }  

       这些PathFilter会在listStatus()方法中用到,listStatus()是用来获取输入数据列表的。

      下面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。

    1. public List<InputSplit> getSplits(JobContext job  
    2.                                   ) throws IOException {  
    3.   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
    4.   long maxSize = getMaxSplitSize(job);  
    5.   
    6.   // generate splits  
    7.   List<InputSplit> splits = new ArrayList<InputSplit>();  
    8.   List<FileStatus>files = listStatus(job);  
    9.   for (FileStatus file: files) {  
    10.     Path path = file.getPath();  
    11.     FileSystem fs = path.getFileSystem(job.getConfiguration());  
    12.     long length = file.getLen();  
    13.     BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  
    14.     if ((length != 0) && isSplitable(job, path)) {   
    15.       long blockSize = file.getBlockSize();  
    16.       long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
    17.   
    18.       long bytesRemaining = length;  
    19.       while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
    20.         int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
    21.         splits.add(new FileSplit(path, length-bytesRemaining, splitSize,   
    22.                                  blkLocations[blkIndex].getHosts()));  
    23.         bytesRemaining -= splitSize;  
    24.       }  
    25.         
    26.       if (bytesRemaining != 0) {  
    27.         splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
    28.                    blkLocations[blkLocations.length-1].getHosts()));  
    29.       }  
    30.     } else if (length != 0) {  
    31.       splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));  
    32.     } else {   
    33.       //Create empty hosts array for zero length files  
    34.       splits.add(new FileSplit(path, 0, length, new String[0]));  
    35.     }  
    36.   }  
    37.     
    38.   // Save the number of input files in the job-conf  
    39.   job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
    40.   
    41.   LOG.debug("Total # of splits: " + splits.size());  
    42.   return splits;  
    43. }  
    44. //……setters and getters  


      就这样,利用FileInputFormat 的getSplits方法,我们就计算出了我们的作业的所有输入分片了。

      那这些计算出来的分片是怎么被map读取出来的呢?就是InputFormat中的另一个方法createRecordReader(),FileInputFormat并没有对这个方法做具体的要求,而是交给子类自行去实现它。
    RecordReader
      RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从类图中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

      我们再深入看看上面提到的RecordReader的一个子类:LineRecordReader。

      LineRecordReader由一个FileSplit构造出来,start是这个FileSplit的起始位置,pos是当前读取分片的位置,end是分片结束位置,in是打开的一个读取这个分片的输入流,它是使用这个FileSplit对应的文件名来打开的。key和value则分别是每次读取的K-V对。然后我们还看到可以利用getProgress()来跟踪读取分片的进度,这个函数就是根据已经读取的K-V对占总K-V对的比例来显示进度的。

    1. public class LineRecordReader extends RecordReader<LongWritable, Text> {  
    2.   private static final Log LOG = LogFactory.getLog(LineRecordReader.class);  
    3.   
    4.   private CompressionCodecFactory compressionCodecs = null;  
    5.   private long start;  
    6.   private long pos;  
    7.   private long end;  
    8.   private LineReader in;  
    9.   private int maxLineLength;  
    10.   private LongWritable key = null;  
    11.   private Text value = null;  
    12.   
    13.   //我们知道LineRecordReader是读取一个InputSplit的,它从InputSplit中不断以其定义的格式读取K-V对  
    14.   //initialize函数主要是计算分片的始末位置,以及打开想要的输入流以供读取K-V对,输入流另外处理分片经过压缩的情况  
    15.   public void initialize(InputSplit genericSplit,  
    16.                          TaskAttemptContext context) throws IOException {  
    17.     FileSplit split = (FileSplit) genericSplit;  
    18.     Configuration job = context.getConfiguration();  
    19.     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",  
    20.                                     Integer.MAX_VALUE);  
    21.     start = split.getStart();  
    22.     end = start + split.getLength();  
    23.     final Path file = split.getPath();  
    24.     compressionCodecs = new CompressionCodecFactory(job);  
    25.     final CompressionCodec codec = compressionCodecs.getCodec(file);  
    26.   
    27.     // open the file and seek to the start of the split  
    28.     FileSystem fs = file.getFileSystem(job);  
    29.     FSDataInputStream fileIn = fs.open(split.getPath());  
    30.     boolean skipFirstLine = false;  
    31.     if (codec != null) {  
    32.       in = new LineReader(codec.createInputStream(fileIn), job);  
    33.       end = Long.MAX_VALUE;  
    34.     } else {  
    35.       if (start != 0) {  
    36.         skipFirstLine = true;  
    37.         --start;  
    38.         fileIn.seek(start);  
    39.       }  
    40.       in = new LineReader(fileIn, job);  
    41.     }  
    42.     if (skipFirstLine) {  // skip first line and re-establish "start".  
    43.       start += in.readLine(new Text(), 0,  
    44.                            (int)Math.min((long)Integer.MAX_VALUE, end - start));  
    45.     }  
    46.     this.pos = start;  
    47.   }  
    48.     
    49.   public boolean nextKeyValue() throws IOException {  
    50.     if (key == null) {  
    51.       key = new LongWritable();  
    52.     }  
    53.     key.set(pos); //对于LineRecordReader来说,它以偏移值为key,以一行为value  
    54.     if (value == null) {  
    55.       value = new Text();  
    56.     }  
    57.     int newSize = 0;  
    58.     while (pos < end) {  
    59.       newSize = in.readLine(value, maxLineLength,  
    60.                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),  
    61.                                      maxLineLength));  
    62.       if (newSize == 0) {  
    63.         break;  
    64.       }  
    65.       pos += newSize;  
    66.       if (newSize < maxLineLength) {  
    67.         break;  
    68.       }  
    69.   
    70.       // line too long. try again  
    71.       LOG.info("Skipped line of size " + newSize + " at pos " +   
    72.                (pos - newSize));  
    73.     }  
    74.     if (newSize == 0) {  
    75.       key = null;  
    76.       value = null;  
    77.       return false;  
    78.     } else {  
    79.       return true;  
    80.     }  
    81.   }  
    82.   
    83.   @Override  
    84.   public LongWritable getCurrentKey() {  
    85.     return key;  
    86.   }  
    87.   
    88.   @Override  
    89.   public Text getCurrentValue() {  
    90.     return value;  
    91.   }  
    92.   
    93.   /**  
    94.    * Get the progress within the split  
    95.    */  
    96.   public float getProgress() {  
    97.     if (start == end) {  
    98.       return 0.0f;  
    99.     } else {  
    100.       return Math.min(1.0f, (pos - start) / (float)(end - start));//读取进度由已读取InputSplit大小比总InputSplit大小  
    101.     }  
    102.   }  
    103.     
    104.   public synchronized void close() throws IOException {  
    105.     if (in != null) {  
    106.       in.close();   
    107.     }  
    108.   }  
    109. }  

    其它的一些RecordReader如SequenceFileRecordReaderCombineFileRecordReader.java等则对应不同的InputFormat。

      下面继续看看这些RecordReader是如何被MapReduce框架使用的。

      我们先看看Mapper.class是什么样的:

    1. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {  
    2.   
    3.   public class Context   
    4.     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {  
    5.     public Context(Configuration conf, TaskAttemptID taskid,  
    6.                    RecordReader<KEYIN,VALUEIN> reader,  
    7.                    RecordWriter<KEYOUT,VALUEOUT> writer,  
    8.                    OutputCommitter committer,  
    9.                    StatusReporter reporter,  
    10.                    InputSplit split) throws IOException, InterruptedException {  
    11.       super(conf, taskid, reader, writer, committer, reporter, split);  
    12.     }  
    13.   }  
    14.     
    15.   /**  
    16.    * Called once at the beginning of the task.  
    17.    */  
    18.   protected void setup(Context context  
    19.                        ) throws IOException, InterruptedException {  
    20.     // NOTHING  
    21.   }  
    22.   
    23.   /**  
    24.    * Called once for each key/value pair in the input split. Most applications  
    25.    * should override this, but the default is the identity function.  
    26.    */  
    27.   @SuppressWarnings("unchecked")  
    28.   protected void map(KEYIN key, VALUEIN value,   
    29.                      Context context) throws IOException, InterruptedException {  
    30.     context.write((KEYOUT) key, (VALUEOUT) value);  
    31.   }  
    32.   
    33.   /**  
    34.    * Called once at the end of the task.  
    35.    */  
    36.   protected void cleanup(Context context  
    37.                          ) throws IOException, InterruptedException {  
    38.     // NOTHING  
    39.   }  
    40.     
    41.   /**  
    42.    * Expert users can override this method for more complete control over the  
    43.    * execution of the Mapper.  
    44.    * @param context  
    45.    * @throws IOException  
    46.    */  
    47.   public void run(Context context) throws IOException, InterruptedException {  
    48.     setup(context);  
    49.     while (context.nextKeyValue()) {  
    50.       map(context.getCurrentKey(), context.getCurrentValue(), context);  
    51.     }  
    52.     cleanup(context);  
    53.   }  


      我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);

        最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从text他.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

       我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext

    1. public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>   
    2.   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {  
    3.   private RecordReader<KEYIN,VALUEIN> reader;  
    4.   private InputSplit split;  
    5.   
    6.   public MapContext(Configuration conf, TaskAttemptID taskid,  
    7.                     RecordReader<KEYIN,VALUEIN> reader,  
    8.                     RecordWriter<KEYOUT,VALUEOUT> writer,  
    9.                     OutputCommitter committer,  
    10.                     StatusReporter reporter,  
    11.                     InputSplit split) {  
    12.     super(conf, taskid, writer, committer, reporter);  
    13.     this.reader = reader;  
    14.     this.split = split;  
    15.   }  
    16.   
    17.   /**  
    18.    * Get the input split for this map.  
    19.    */  
    20.   public InputSplit getInputSplit() {  
    21.     return split;  
    22.   }  
    23.   
    24.   @Override  
    25.   public KEYIN getCurrentKey() throws IOException, InterruptedException {  
    26.     return reader.getCurrentKey();  
    27.   }  
    28.   
    29.   @Override  
    30.   public VALUEIN getCurrentValue() throws IOException, InterruptedException {  
    31.     return reader.getCurrentValue();  
    32.   }  
    33.   
    34.   @Override  
    35.   public boolean nextKeyValue() throws IOException, InterruptedException {  
    36.     return reader.nextKeyValue();  
    37.   }  
    38.   
    39. }  
    40.       


      我们可以看到MapContext直接是使用传入的RecordReader来进行K-V对的读取了。

     到现在,我们已经知道输入文件是如何被读取、过滤、分片、读出K-V对,然后交给我们的Mapper类来处理的了。

     最后,我们来看看FileInputFormat的几个子类。

    TextInputFormat:

      TextInputFormat是FileInputFormat的子类,其createRecordReader()方法返回的就是LineRecordReader。

    1. public class TextInputFormat extends FileInputFormat<LongWritable, Text> {  
    2.   
    3.   @Override  
    4.   public RecordReader<LongWritable, Text>   
    5.     createRecordReader(InputSplit split,  
    6.                        TaskAttemptContext context) {  
    7.     return new LineRecordReader();  
    8.   }  
    9.   
    10.   @Override  
    11.   protected boolean isSplitable(JobContext context, Path file) {  
    12.     CompressionCodec codec =   
    13.       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);  
    14.     return codec == null;  
    15.   }  
    16. }  


      我们还看到isSplitable()方法,当文件使用压缩的形式,这个文件就不可分割,否则就读取不到正确的数据了。这从某种程度上将影响分片的计算。有时我们希望一个文件只被一个Mapper处理的时候,我们就可以重写isSplitable()方法,告诉MapReduce框架,我哪些文件可以分割,哪些文件不能分割而只能作为一个分片。

      

    NLineInputFormat;

      NLineInputFormat也是FileInputFormat的子类,与名字一致,它是根据行数来划分InputSplits而不是像TextInputFormat那样依赖分片大小和行的长度的。也就是说,TextInputFormat当一行很长或分片比较小时,获取的分片可能只包含很少的K-V对,这样一个map task处理的K-V对就很少,这可能很不理想。因此我们可以使用NLineInputFormat来控制一个map task处理的K-V对,这是通过分割InputSplits时按行数分割的方法来实现的,这我们在代码中可以看出来。我们可以设置mapreduce.input.lineinputformat.linespermap来设置这个行数。
    1. public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {   
    2.   public static final String LINES_PER_MAP =   
    3.     "mapreduce.input.lineinputformat.linespermap";  
    4.   
    5.   public RecordReader<LongWritable, Text> createRecordReader(  
    6.       InputSplit genericSplit, TaskAttemptContext context)   
    7.       throws IOException {  
    8.     context.setStatus(genericSplit.toString());  
    9.     return new LineRecordReader();  
    10.   }  
    11.   
    12.   /**   
    13.    * Logically splits the set of input files for the job, splits N lines  
    14.    * of the input as one split.  
    15.    *   
    16.    * @see FileInputFormat#getSplits(JobContext)  
    17.    */  
    18.   public List<InputSplit> getSplits(JobContext job)  
    19.   throws IOException {  
    20.     List<InputSplit> splits = new ArrayList<InputSplit>();  
    21.     int numLinesPerSplit = getNumLinesPerSplit(job);  
    22.     for (FileStatus status : listStatus(job)) {  
    23.       splits.addAll(getSplitsForFile(status,  
    24.         job.getConfiguration(), numLinesPerSplit));  
    25.     }  
    26.     return splits;  
    27.   }  
    28.     
    29.   public static List<FileSplit> getSplitsForFile(FileStatus status,  
    30.       Configuration conf, int numLinesPerSplit) throws IOException {  
    31.     List<FileSplit> splits = new ArrayList<FileSplit> ();  
    32.     Path fileName = status.getPath();  
    33.     if (status.isDir()) {  
    34.       throw new IOException("Not a file: " + fileName);  
    35.     }  
    36.     FileSystem  fs = fileName.getFileSystem(conf);  
    37.     LineReader lr = null;  
    38.     try {  
    39.       FSDataInputStream in  = fs.open(fileName);  
    40.       lr = new LineReader(in, conf);  
    41.       Text line = new Text();  
    42.       int numLines = 0;  
    43.       long begin = 0;  
    44.       long length = 0;  
    45.       int num = -1;  
    46.       while ((num = lr.readLine(line)) > 0) {  
    47.         numLines++;  
    48.         length += num;  
    49.         if (numLines == numLinesPerSplit) {  
    50.           // NLineInputFormat uses LineRecordReader, which always reads  
    51.           // (and consumes) at least one character out of its upper split  
    52.           // boundary. So to make sure that each mapper gets N lines, we  
    53.           // move back the upper split limits of each split   
    54.           // by one character here.  
    55.           if (begin == 0) {  
    56.             splits.add(new FileSplit(fileName, begin, length - 1,  
    57.               new String[] {}));  
    58.           } else {  
    59.             splits.add(new FileSplit(fileName, begin - 1, length,  
    60.               new String[] {}));  
    61.           }  
    62.           begin += length;  
    63.           length = 0;  
    64.           numLines = 0;  
    65.         }  
    66.       }  
    67.       if (numLines != 0) {  
    68.         splits.add(new FileSplit(fileName, begin, length, new String[]{}));  
    69.       }  
    70.     } finally {  
    71.       if (lr != null) {  
    72.         lr.close();  
    73.       }  
    74.     }  
    75.     return splits;   
    76.   }  
    77.     
    78.   /**  
    79.    * Set the number of lines per split  
    80.    * @param job the job to modify  
    81.    * @param numLines the number of lines per split  
    82.    */  
    83.   public static void setNumLinesPerSplit(Job job, int numLines) {  
    84.     job.getConfiguration().setInt(LINES_PER_MAP, numLines);  
    85.   }  
    86.   
    87.   /**  
    88.    * Get the number of lines per split  
    89.    * @param job the job  
    90.    * @return the number of lines per split  
    91.    */  
    92.   public static int getNumLinesPerSplit(JobContext job) {  
    93.     return job.getConfiguration().getInt(LINES_PER_MAP, 1);  
    94.   }  


      现在,我们对Hadoop的输入格式和其在MapReduce中如何被使用有了具体的了解了。
     

  • 相关阅读:
    poj 1113 Wall 凸包的应用
    NYOJ 78 圈水池 (入门级凸包)
    Monotone Chain Convex Hull(单调链凸包)
    poj Sudoku(数独) DFS
    poj 3009 Curling 2.0(dfs)
    poj 3083 Children of the Candy Corn
    Python join()方法
    通过FISH和下一代测序检测肺腺癌ALK基因融合比较
    华大病原微生物检测
    NGS检测ALK融合大起底--转载
  • 原文地址:https://www.cnblogs.com/xuepei/p/3664698.html
Copyright © 2011-2022 走看看