zoukankan      html  css  js  c++  java
  • 【转】[Hadoop源码解读](一)MapReduce篇之InputFormat

    本文来源于:http://www.cnblogs.com/lucius/p/3449902.html

    平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如 job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被 读取。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库 的DBInputFormat等等。

      其实,一个输入格式InputFormat,主要无非就是要解决如何将数据分割成分片[比如多少行为一个分片],以及如何读取分片中的数据[比如按行读取]。前者由getSplits()完成,后者由RecordReader完成。

      不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。

    InputSplit:

      我们知道Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

    1 public abstract class InputSplit {
    2   public abstract long getLength() throws IOException, InterruptedException;
    3 
    4   public abstract 
    5     String[] getLocations() throws IOException, InterruptedException;
    6 }

    getLength()用来获取InputSplit的大小,以支持对InputSplits进行排序,而getLocations()则用来获取存储分片的位置列表。
      我们来看一个简单InputSplit子类:FileSplit。

    复制代码
     1 public class FileSplit extends InputSplit implements Writable {
     2   private Path file;
     3   private long start;
     4   private long length;
     5   private String[] hosts;
     6 
     7   FileSplit() {}
     8 
     9   public FileSplit(Path file, long start, long length, String[] hosts) {
    10     this.file = file;
    11     this.start = start;
    12     this.length = length;
    13     this.hosts = hosts;
    14   }
    15  //序列化、反序列化方法,获得hosts等等……
    16 }
    复制代码

     从上面的源码我们可以看到,一个FileSplit是由文件路径,分片开始位置,分片大小和存储分片数据的hosts列表组成,由这 些信息我们就可以从输入文件中切分出提供给单个Mapper的输入数据。这些属性会在Constructor设置,我们在后面会看到这会在 InputFormat的getSplits()中构造这些分片。

      我们再看CombineFileSplit:

    复制代码
     1 public class CombineFileSplit extends InputSplit implements Writable {
     2 
     3   private Path[] paths;
     4   private long[] startoffset;
     5   private long[] lengths;
     6   private String[] locations;
     7   private long totLength;
     8 
     9   public CombineFileSplit() {}
    10   public CombineFileSplit(Path[] files, long[] start, 
    11                           long[] lengths, String[] locations) {
    12     initSplit(files, start, lengths, locations);
    13   }
    14 
    15   public CombineFileSplit(Path[] files, long[] lengths) {
    16     long[] startoffset = new long[files.length];
    17     for (int i = 0; i < startoffset.length; i++) {
    18       startoffset[i] = 0;
    19     }
    20     String[] locations = new String[files.length];
    21     for (int i = 0; i < locations.length; i++) {
    22       locations[i] = "";
    23     }
    24     initSplit(files, startoffset, lengths, locations);
    25   }
    26   
    27   private void initSplit(Path[] files, long[] start, 
    28                          long[] lengths, String[] locations) {
    29     this.startoffset = start;
    30     this.lengths = lengths;
    31     this.paths = files;
    32     this.totLength = 0;
    33     this.locations = locations;
    34     for(long length : lengths) {
    35       totLength += length;
    36     }
    37   }
    38   //一些getter和setter方法,和序列化方法
    39 }
    复制代码
    复制代码
     1 public class CombineFileSplit extends InputSplit implements Writable {
     2 
     3   private Path[] paths;
     4   private long[] startoffset;
     5   private long[] lengths;
     6   private String[] locations;
     7   private long totLength;
     8 
     9   public CombineFileSplit() {}
    10   public CombineFileSplit(Path[] files, long[] start, 
    11                           long[] lengths, String[] locations) {
    12     initSplit(files, start, lengths, locations);
    13   }
    14 
    15   public CombineFileSplit(Path[] files, long[] lengths) {
    16     long[] startoffset = new long[files.length];
    17     for (int i = 0; i < startoffset.length; i++) {
    18       startoffset[i] = 0;
    19     }
    20     String[] locations = new String[files.length];
    21     for (int i = 0; i < locations.length; i++) {
    22       locations[i] = "";
    23     }
    24     initSplit(files, startoffset, lengths, locations);
    25   }
    26   
    27   private void initSplit(Path[] files, long[] start, 
    28                          long[] lengths, String[] locations) {
    29     this.startoffset = start;
    30     this.lengths = lengths;
    31     this.paths = files;
    32     this.totLength = 0;
    33     this.locations = locations;
    34     for(long length : lengths) {
    35       totLength += length;
    36     }
    37   }
    38   //一些getter和setter方法,和序列化方法
    39 }
    复制代码

    与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和存储分片数据的host列 表,由于CombineFileSplit是针对小文件的,它把很多小文件包在一个InputSplit内,这样一个Mapper就可以处理很多小文件。 要知道我们上面的FileSplit是对应一个输入文件的,也就是说如果用FileSplit对应的FileInputFormat来作为输入格式,那么 即使文件特别小,也是单独计算成一个输入分片来处理的。当我们的输入是由大量小文件组成的,就会导致有同样大量的InputSplit,从而需要同样大量 的Mapper来处理,这将很慢,想想有一堆map task要运行!!这是不符合Hadoop的设计理念的,Hadoop是为处理大文件优化的。

      最后介绍TagInputSplit,这个类就是封装了一个InputSplit,然后加了一些tags在里面满足我们需要这些tags数据的情况,我们从下面就可以一目了然。

    复制代码
     1 class TaggedInputSplit extends InputSplit implements Configurable, Writable {
     2 
     3   private Class<? extends InputSplit> inputSplitClass;
     4 
     5   private InputSplit inputSplit;
     6 
     7   @SuppressWarnings("unchecked")
     8   private Class<? extends InputFormat> inputFormatClass;
     9 
    10   @SuppressWarnings("unchecked")
    11   private Class<? extends Mapper> mapperClass;
    12 
    13   private Configuration conf;
    14   //getters and setters,序列化方法,getLocations()、getLength()等
    15 }
    复制代码

    现在我们对InputSplit的概念有了一些了解,我们继续看它是怎么被使用和计算出来的。

    InputFormat:

      通过使用InputFormat,MapReduce框架可以做到:

      1、验证作业的输入的正确性

      2、将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mapper task

      3、提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对供Mapper使用。

    复制代码
     1 public abstract class InputFormat<K, V> {
     2 
     3   public abstract 
     4     List<InputSplit> getSplits(JobContext context
     5                                ) throws IOException, InterruptedException;
     6   
     7   public abstract 
     8     RecordReader<K,V> createRecordReader(InputSplit split,
     9                                          TaskAttemptContext context
    10                                         ) throws IOException, 
    11                                                  InterruptedException;
    12 
    13 }
    复制代码

      上面是InputFormat的源码,getSplits用来获取由输入文件计算出来的InputSplits,我们在后面会看到计算 InputSplits的时候会考虑到输入文件是否可分割、文件存储时分块的大小和文件大小等因素;而createRecordReader()提供了前 面第三点所说的RecordReader的实现,以将K-V对从InputSplit中正确读出来,比如LineRecordReader就以偏移值为 key,一行的数据为value,这就使得所有其createRecordReader()返回了LineRecordReader的 InputFormat都是以偏移值为key,一行数据为value的形式读取输入分片的。

    FileInputFormat:

      PathFilter被用来进行文件筛选,这样我们就可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path) 方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来 设置用户自定义的PathFilter。

    1 public interface PathFilter {
    2   boolean accept(Path path);
    3 }

      FileInputFormat是InputFormat的子类,它包含了一个MultiPathFilter,这个MultiPathFilter由一 个过滤隐藏文件(名字前缀为'-'或'.')的PathFilter和一些可能存在的用户自定义的PathFilters组 成,MultiPathFilter会在listStatus()方法中使用,而listStatus()方法又被getSplits()方法用来获取输 入文件,也就是说实现了在获取输入分片前先进行文件过滤。

    复制代码
     1   private static class MultiPathFilter implements PathFilter {
     2     private List<PathFilter> filters;
     3 
     4     public MultiPathFilter(List<PathFilter> filters) {
     5       this.filters = filters;
     6     }
     7 
     8     public boolean accept(Path path) {
     9       for (PathFilter filter : filters) {
    10         if (!filter.accept(path)) {
    11           return false;
    12         }
    13       }
    14       return true;
    15     }
    16   }
    复制代码

    这些PathFilter会在listStatus()方法中用到,listStatus()是用来获取输入数据列表的。

      下面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分 片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得 到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过 computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize& lt;=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表 splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后 我们还设置了输入文件数量:mapreduce.input.num.files。

    复制代码
     1   public List<InputSplit> getSplits(JobContext job
     2                                     ) throws IOException {
     3     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     4     long maxSize = getMaxSplitSize(job);
     5 
     6     // generate splits
     7     List<InputSplit> splits = new ArrayList<InputSplit>();
     8     List<FileStatus>files = listStatus(job);
     9     for (FileStatus file: files) {
    10       Path path = file.getPath();
    11       FileSystem fs = path.getFileSystem(job.getConfiguration());
    12       long length = file.getLen();
    13       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    14       if ((length != 0) && isSplitable(job, path)) { 
    15         long blockSize = file.getBlockSize();
    16         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    17 
    18         long bytesRemaining = length;
    19         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    20           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    21           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
    22                                    blkLocations[blkIndex].getHosts()));
    23           bytesRemaining -= splitSize;
    24         }
    25         
    26         if (bytesRemaining != 0) {
    27           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
    28                      blkLocations[blkLocations.length-1].getHosts()));
    29         }
    30       } else if (length != 0) {
    31         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(new FileSplit(path, 0, length, new String[0]));
    35       }
    36     }
    37     
    38     // Save the number of input files in the job-conf
    39     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    40 
    41     LOG.debug("Total # of splits: " + splits.size());
    42     return splits;
    43   }
    44   //……setters and getters
    45 }
    复制代码

    就这样,利用FileInputFormat 的getSplits方法,我们就计算出了我们的作业的所有输入分片了。

      那这些计算出来的分片是怎么被map读取出来的呢?就是InputFormat中的另一个方法createRecordReader(),FileInputFormat并没有对这个方法做具体的要求,而是交给子类自行去实现它。
    RecordReader:
      RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从类图中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

      我们再深入看看上面提到的RecordReader的一个子类:LineRecordReader。

      LineRecordReader由一个FileSplit构造出来,start是这个FileSplit的起始位置,pos是当前读取分片的位 置,end是分片结束位置,in是打开的一个读取这个分片的输入流,它是使用这个FileSplit对应的文件名来打开的。key和value则分别是每 次读取的K-V对。然后我们还看到可以利用getProgress()来跟踪读取分片的进度,这个函数就是根据已经读取的K-V对占总K-V对的比例来显 示进度的。

    复制代码
      1 public class LineRecordReader extends RecordReader<LongWritable, Text> {
      2   private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
      3 
      4   private CompressionCodecFactory compressionCodecs = null;
      5   private long start;
      6   private long pos;
      7   private long end;
      8   private LineReader in;
      9   private int maxLineLength;
     10   private LongWritable key = null;
     11   private Text value = null;
     12 
     13   //我们知道LineRecordReader是读取一个InputSplit的,它从InputSplit中不断以其定义的格式读取K-V对
     14   //initialize函数主要是计算分片的始末位置,以及打开想要的输入流以供读取K-V对,输入流另外处理分片经过压缩的情况
     15   public void initialize(InputSplit genericSplit,
     16                          TaskAttemptContext context) throws IOException {
     17     FileSplit split = (FileSplit) genericSplit;
     18     Configuration job = context.getConfiguration();
     19     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
     20                                     Integer.MAX_VALUE);
     21     start = split.getStart();
     22     end = start + split.getLength();
     23     final Path file = split.getPath();
     24     compressionCodecs = new CompressionCodecFactory(job);
     25     final CompressionCodec codec = compressionCodecs.getCodec(file);
     26 
     27     // open the file and seek to the start of the split
     28     FileSystem fs = file.getFileSystem(job);
     29     FSDataInputStream fileIn = fs.open(split.getPath());
     30     boolean skipFirstLine = false;
     31     if (codec != null) {
     32       in = new LineReader(codec.createInputStream(fileIn), job);
     33       end = Long.MAX_VALUE;
     34     } else {
     35       if (start != 0) {
     36         skipFirstLine = true;
     37         --start;
     38         fileIn.seek(start);
     39       }
     40       in = new LineReader(fileIn, job);
     41     }
     42     if (skipFirstLine) {  // skip first line and re-establish "start".
     43       start += in.readLine(new Text(), 0,
     44                            (int)Math.min((long)Integer.MAX_VALUE, end - start));
     45     }
     46     this.pos = start;
     47   }
     48   
     49   public boolean nextKeyValue() throws IOException {
     50     if (key == null) {
     51       key = new LongWritable();
     52     }
     53     key.set(pos); //对于LineRecordReader来说,它以偏移值为key,以一行为value
     54     if (value == null) {
     55       value = new Text();
     56     }
     57     int newSize = 0;
     58     while (pos < end) {
     59       newSize = in.readLine(value, maxLineLength,
     60                             Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
     61                                      maxLineLength));
     62       if (newSize == 0) {
     63         break;
     64       }
     65       pos += newSize;
     66       if (newSize < maxLineLength) {
     67         break;
     68       }
     69 
     70       // line too long. try again
     71       LOG.info("Skipped line of size " + newSize + " at pos " + 
     72                (pos - newSize));
     73     }
     74     if (newSize == 0) {
     75       key = null;
     76       value = null;
     77       return false;
     78     } else {
     79       return true;
     80     }
     81   }
     82 
     83   @Override
     84   public LongWritable getCurrentKey() {
     85     return key;
     86   }
     87 
     88   @Override
     89   public Text getCurrentValue() {
     90     return value;
     91   }
     92 
     93   /**
     94    * Get the progress within the split
     95    */
     96   public float getProgress() {
     97     if (start == end) {
     98       return 0.0f;
     99     } else {
    100       return Math.min(1.0f, (pos - start) / (float)(end - start));//读取进度由已读取InputSplit大小比总InputSplit大小
    101     }
    102   }
    103   
    104   public synchronized void close() throws IOException {
    105     if (in != null) {
    106       in.close(); 
    107     }
    108   }
    109 }
    复制代码

    其它的一些RecordReader如SequenceFileRecordReader,CombineFileRecordReader.java等则对应不同的InputFormat。

      下面继续看看这些RecordReader是如何被MapReduce框架使用的。

      我们先看看Mapper.class是什么样的:

    复制代码
     1 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     2 
     3   public class Context 
     4     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     5     public Context(Configuration conf, TaskAttemptID taskid,
     6                    RecordReader<KEYIN,VALUEIN> reader,
     7                    RecordWriter<KEYOUT,VALUEOUT> writer,
     8                    OutputCommitter committer,
     9                    StatusReporter reporter,
    10                    InputSplit split) throws IOException, InterruptedException {
    11       super(conf, taskid, reader, writer, committer, reporter, split);
    12     }
    13   }
    14   
    15   /**
    16    * Called once at the beginning of the task.
    17    */
    18   protected void setup(Context context
    19                        ) throws IOException, InterruptedException {
    20     // NOTHING
    21   }
    22 
    23   /**
    24    * Called once for each key/value pair in the input split. Most applications
    25    * should override this, but the default is the identity function.
    26    */
    27   @SuppressWarnings("unchecked")
    28   protected void map(KEYIN key, VALUEIN value, 
    29                      Context context) throws IOException, InterruptedException {
    30     context.write((KEYOUT) key, (VALUEOUT) value);
    31   }
    32 
    33   /**
    34    * Called once at the end of the task.
    35    */
    36   protected void cleanup(Context context
    37                          ) throws IOException, InterruptedException {
    38     // NOTHING
    39   }
    40   
    41   /**
    42    * Expert users can override this method for more complete control over the
    43    * execution of the Mapper.
    44    * @param context
    45    * @throws IOException
    46    */
    47   public void run(Context context) throws IOException, InterruptedException {
    48     setup(context);
    49     while (context.nextKeyValue()) {
    50       map(context.getCurrentKey(), context.getCurrentValue(), context);
    51     }
    52     cleanup(context);
    53   }
    复制代码

    我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方 法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup() 中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后 map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);

      最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V 对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从text他.nextKeyValue()就是使用了相应的 RecordReader来获取K-V对的。

      我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

    复制代码
     1 public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
     2   extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
     3   private RecordReader<KEYIN,VALUEIN> reader;
     4   private InputSplit split;
     5 
     6   public MapContext(Configuration conf, TaskAttemptID taskid,
     7                     RecordReader<KEYIN,VALUEIN> reader,
     8                     RecordWriter<KEYOUT,VALUEOUT> writer,
     9                     OutputCommitter committer,
    10                     StatusReporter reporter,
    11                     InputSplit split) {
    12     super(conf, taskid, writer, committer, reporter);
    13     this.reader = reader;
    14     this.split = split;
    15   }
    16 
    17   /**
    18    * Get the input split for this map.
    19    */
    20   public InputSplit getInputSplit() {
    21     return split;
    22   }
    23 
    24   @Override
    25   public KEYIN getCurrentKey() throws IOException, InterruptedException {
    26     return reader.getCurrentKey();
    27   }
    28 
    29   @Override
    30   public VALUEIN getCurrentValue() throws IOException, InterruptedException {
    31     return reader.getCurrentValue();
    32   }
    33 
    34   @Override
    35   public boolean nextKeyValue() throws IOException, InterruptedException {
    36     return reader.nextKeyValue();
    37   }
    38 
    39 }
    40     
    复制代码

     我们可以看到MapContext直接是使用传入的RecordReader来进行K-V对的读取了。

      到现在,我们已经知道输入文件是如何被读取、过滤、分片、读出K-V对,然后交给我们的Mapper类来处理的了。

      最后,我们来看看FileInputFormat的几个子类。

    TextInputFormat:

      TextInputFormat是FileInputFormat的子类,其createRecordReader()方法返回的就是LineRecordReader。

    复制代码
     1 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
     2 
     3   @Override
     4   public RecordReader<LongWritable, Text> 
     5     createRecordReader(InputSplit split,
     6                        TaskAttemptContext context) {
     7     return new LineRecordReader();
     8   }
     9 
    10   @Override
    11   protected boolean isSplitable(JobContext context, Path file) {
    12     CompressionCodec codec = 
    13       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    14     return codec == null;
    15   }
    16 }
    复制代码

    我们还看到isSplitable()方法,当文件使用压缩的形式,这个文件就不可分割,否则就读取不到正确的数据了。这从某种程度上 将影响分片的计算。有时我们希望一个文件只被一个Mapper处理的时候,我们就可以重写isSplitable()方法,告诉MapReduce框架, 我哪些文件可以分割,哪些文件不能分割而只能作为一个分片。

      

    NLineInputFormat;

      NLineInputFormat也是FileInputFormat的子类,与名字一致,它是根据行数来划分InputSplits而不是像 TextInputFormat那样依赖分片大小和行的长度的。也就是说,TextInputFormat当一行很长或分片比较小时,获取的分片可能只包 含很少的K-V对,这样一个map task处理的K-V对就很少,这可能很不理想。因此我们可以使用NLineInputFormat来控制一个map task处理的K-V对,这是通过分割InputSplits时按行数分割的方法来实现的,这我们在代码中可以看出来。我们可以设置 mapreduce.input.lineinputformat.linespermap来设置这个行数。

    复制代码
     1 public class NLineInputFormat extends FileInputFormat<LongWritable, Text> { 
     2   public static final String LINES_PER_MAP = 
     3     "mapreduce.input.lineinputformat.linespermap";
     4 
     5   public RecordReader<LongWritable, Text> createRecordReader(
     6       InputSplit genericSplit, TaskAttemptContext context) 
     7       throws IOException {
     8     context.setStatus(genericSplit.toString());
     9     return new LineRecordReader();
    10   }
    11 
    12   /** 
    13    * Logically splits the set of input files for the job, splits N lines
    14    * of the input as one split.
    15    * 
    16    * @see FileInputFormat#getSplits(JobContext)
    17    */
    18   public List<InputSplit> getSplits(JobContext job)
    19   throws IOException {
    20     List<InputSplit> splits = new ArrayList<InputSplit>();
    21     int numLinesPerSplit = getNumLinesPerSplit(job);
    22     for (FileStatus status : listStatus(job)) {
    23       splits.addAll(getSplitsForFile(status,
    24         job.getConfiguration(), numLinesPerSplit));
    25     }
    26     return splits;
    27   }
    28   
    29   public static List<FileSplit> getSplitsForFile(FileStatus status,
    30       Configuration conf, int numLinesPerSplit) throws IOException {
    31     List<FileSplit> splits = new ArrayList<FileSplit> ();
    32     Path fileName = status.getPath();
    33     if (status.isDir()) {
    34       throw new IOException("Not a file: " + fileName);
    35     }
    36     FileSystem  fs = fileName.getFileSystem(conf);
    37     LineReader lr = null;
    38     try {
    39       FSDataInputStream in  = fs.open(fileName);
    40       lr = new LineReader(in, conf);
    41       Text line = new Text();
    42       int numLines = 0;
    43       long begin = 0;
    44       long length = 0;
    45       int num = -1;
    46       while ((num = lr.readLine(line)) > 0) {
    47         numLines++;
    48         length += num;
    49         if (numLines == numLinesPerSplit) {
    50           // NLineInputFormat uses LineRecordReader, which always reads
    51           // (and consumes) at least one character out of its upper split
    52           // boundary. So to make sure that each mapper gets N lines, we
    53           // move back the upper split limits of each split 
    54           // by one character here.
    55           if (begin == 0) {
    56             splits.add(new FileSplit(fileName, begin, length - 1,
    57               new String[] {}));
    58           } else {
    59             splits.add(new FileSplit(fileName, begin - 1, length,
    60               new String[] {}));
    61           }
    62           begin += length;
    63           length = 0;
    64           numLines = 0;
    65         }
    66       }
    67       if (numLines != 0) {
    68         splits.add(new FileSplit(fileName, begin, length, new String[]{}));
    69       }
    70     } finally {
    71       if (lr != null) {
    72         lr.close();
    73       }
    74     }
    75     return splits; 
    76   }
    77   
    78   /**
    79    * Set the number of lines per split
    80    * @param job the job to modify
    81    * @param numLines the number of lines per split
    82    */
    83   public static void setNumLinesPerSplit(Job job, int numLines) {
    84     job.getConfiguration().setInt(LINES_PER_MAP, numLines);
    85   }
    86 
    87   /**
    88    * Get the number of lines per split
    89    * @param job the job
    90    * @return the number of lines per split
    91    */
    92   public static int getNumLinesPerSplit(JobContext job) {
    93     return job.getConfiguration().getInt(LINES_PER_MAP, 1);
    94   }
    复制代码

    现在,我们对Hadoop的输入格式和其在MapReduce中如何被使用有了具体的了解了。

  • 相关阅读:
    牛客网Java刷题知识点之什么是进程、什么是线程、什么是多线程、多线程的好处和弊端、多线程的创建方式、JVM中的多线程解析、多线程运行图解
    [转]OData的初步认识 OData v4 Client Code Generator
    [转]OData – the best way to REST–实例讲解ASP.NET WebAPI OData (V4) Service & Client
    [转]Calling an OData Service From a .NET Client (C#)
    [转]Upgrading to Async with Entity Framework, MVC, OData AsyncEntitySetController, Kendo UI, Glimpse & Generic Unit of Work Repository Framework v2.0
    [转]ASP.NET web API 2 OData enhancements
    [转]Web Api系列教程第2季(OData篇)(二)——使用Web Api创建只读的OData服务
    [转]Creating an OData v3 Endpoint with Web API 2
    [转]Getting started with ASP.NET Web API OData in 3 simple steps
    [转]使用ASP.NET Web API 2创建OData v4 终结点
  • 原文地址:https://www.cnblogs.com/conie/p/3583559.html
Copyright © 2011-2022 走看看