zoukankan      html  css  js  c++  java
  • MapReduce部分源码解读(一)

     1 /**
     2  * Licensed to the Apache Software Foundation (ASF) under one
     3  * or more contributor license agreements.  See the NOTICE file
     4  * distributed with this work for additional information
     5  * regarding copyright ownership.  The ASF licenses this file
     6  * to you under the Apache License, Version 2.0 (the
     7  * "License"); you may not use this file except in compliance
     8  * with the License.  You may obtain a copy of the License at
     9  *
    10  *     http://www.apache.org/licenses/LICENSE-2.0
    11  *
    12  * Unless required by applicable law or agreed to in writing, software
    13  * distributed under the License is distributed on an "AS IS" BASIS,
    14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15  * See the License for the specific language governing permissions and
    16  * limitations under the License.
    17  */
    18 
    19 package org.apache.hadoop.mapreduce.lib.input;
    20 
    21 import org.apache.hadoop.classification.InterfaceAudience;
    22 import org.apache.hadoop.classification.InterfaceStability;
    23 import org.apache.hadoop.fs.Path;
    24 import org.apache.hadoop.io.LongWritable;
    25 import org.apache.hadoop.io.Text;
    26 import org.apache.hadoop.io.compress.CompressionCodec;
    27 import org.apache.hadoop.io.compress.CompressionCodecFactory;
    28 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
    29 import org.apache.hadoop.mapreduce.InputFormat;
    30 import org.apache.hadoop.mapreduce.InputSplit;
    31 import org.apache.hadoop.mapreduce.JobContext;
    32 import org.apache.hadoop.mapreduce.RecordReader;
    33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
    34 
    35 import com.google.common.base.Charsets;
    36 
    37 /** An {@link InputFormat} for plain text files.  Files are broken into lines.
    38  * Either linefeed or carriage-return are used to signal end of line.  Keys are
    39  * the position in the file, and values are the line of text.. */
    40 @InterfaceAudience.Public
    41 @InterfaceStability.Stable
    42 public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    43 
    44   @Override
    45   public RecordReader<LongWritable, Text> 
    46     createRecordReader(InputSplit split,
    47                        TaskAttemptContext context) {
    48     String delimiter = context.getConfiguration().get(
    49         "textinputformat.record.delimiter");
    50     byte[] recordDelimiterBytes = null;
    51     if (null != delimiter)
    52       recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    53     return new LineRecordReader(recordDelimiterBytes);
    54   }
    55 
    56   @Override
    57   protected boolean isSplitable(JobContext context, Path file) {
    58     final CompressionCodec codec =
    59       new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    60     if (null == codec) {
    61       return true;
    62     }
    63     return codec instanceof SplittableCompressionCodec;
    64   }
    65 
    66 }
    TextInputFormat

    父类(TextInputFormat本身含义为把每一行解析成键值对)

      1 /**
      2  * Licensed to the Apache Software Foundation (ASF) under one
      3  * or more contributor license agreements.  See the NOTICE file
      4  * distributed with this work for additional information
      5  * regarding copyright ownership.  The ASF licenses this file
      6  * to you under the Apache License, Version 2.0 (the
      7  * "License"); you may not use this file except in compliance
      8  * with the License.  You may obtain a copy of the License at
      9  *
     10  *     http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  */
     18 
     19 package org.apache.hadoop.mapreduce.lib.input;
     20 
     21 import java.io.IOException;
     22 import java.util.ArrayList;
     23 import java.util.List;
     24 
     25 import org.apache.commons.logging.Log;
     26 import org.apache.commons.logging.LogFactory;
     27 import org.apache.hadoop.classification.InterfaceAudience;
     28 import org.apache.hadoop.classification.InterfaceStability;
     29 import org.apache.hadoop.conf.Configuration;
     30 import org.apache.hadoop.fs.FileStatus;
     31 import org.apache.hadoop.fs.FileSystem;
     32 import org.apache.hadoop.fs.LocatedFileStatus;
     33 import org.apache.hadoop.fs.Path;
     34 import org.apache.hadoop.fs.PathFilter;
     35 import org.apache.hadoop.fs.BlockLocation;
     36 import org.apache.hadoop.fs.RemoteIterator;
     37 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
     38 import org.apache.hadoop.mapred.SplitLocationInfo;
     39 import org.apache.hadoop.mapreduce.InputFormat;
     40 import org.apache.hadoop.mapreduce.InputSplit;
     41 import org.apache.hadoop.mapreduce.Job;
     42 import org.apache.hadoop.mapreduce.JobContext;
     43 import org.apache.hadoop.mapreduce.Mapper;
     44 import org.apache.hadoop.mapreduce.security.TokenCache;
     45 import org.apache.hadoop.util.ReflectionUtils;
     46 import org.apache.hadoop.util.StringUtils;
     47 
     48 import com.google.common.base.Stopwatch;
     49 import com.google.common.collect.Lists;
     50 
     51 /** 
     52  * A base class for file-based {@link InputFormat}s.
     53  * 
     54  * <p><code>FileInputFormat</code> is the base class for all file-based 
     55  * <code>InputFormat</code>s. This provides a generic implementation of
     56  * {@link #getSplits(JobContext)}.
     57  * Subclasses of <code>FileInputFormat</code> can also override the 
     58  * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
     59  * not split-up and are processed as a whole by {@link Mapper}s.
     60  */
     61 @InterfaceAudience.Public
     62 @InterfaceStability.Stable
     63 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     64   public static final String INPUT_DIR = 
     65     "mapreduce.input.fileinputformat.inputdir";
     66   public static final String SPLIT_MAXSIZE = 
     67     "mapreduce.input.fileinputformat.split.maxsize";
     68   public static final String SPLIT_MINSIZE = 
     69     "mapreduce.input.fileinputformat.split.minsize";
     70   public static final String PATHFILTER_CLASS = 
     71     "mapreduce.input.pathFilter.class";
     72   public static final String NUM_INPUT_FILES =
     73     "mapreduce.input.fileinputformat.numinputfiles";
     74   public static final String INPUT_DIR_RECURSIVE =
     75     "mapreduce.input.fileinputformat.input.dir.recursive";
     76   public static final String LIST_STATUS_NUM_THREADS =
     77       "mapreduce.input.fileinputformat.list-status.num-threads";
     78   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
     79 
     80   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
     81 
     82   private static final double SPLIT_SLOP = 1.1;   // 10% slop
     83   
     84   @Deprecated
     85   public static enum Counter { 
     86     BYTES_READ
     87   }
     88 
     89   private static final PathFilter hiddenFileFilter = new PathFilter(){
     90       public boolean accept(Path p){
     91         String name = p.getName(); 
     92         return !name.startsWith("_") && !name.startsWith("."); 
     93       }
     94     }; 
     95 
     96   /**
     97    * Proxy PathFilter that accepts a path only if all filters given in the
     98    * constructor do. Used by the listPaths() to apply the built-in
     99    * hiddenFileFilter together with a user provided one (if any).
    100    */
    101   private static class MultiPathFilter implements PathFilter {
    102     private List<PathFilter> filters;
    103 
    104     public MultiPathFilter(List<PathFilter> filters) {
    105       this.filters = filters;
    106     }
    107 
    108     public boolean accept(Path path) {
    109       for (PathFilter filter : filters) {
    110         if (!filter.accept(path)) {
    111           return false;
    112         }
    113       }
    114       return true;
    115     }
    116   }
    117   
    118   /**
    119    * @param job
    120    *          the job to modify
    121    * @param inputDirRecursive
    122    */
    123   public static void setInputDirRecursive(Job job,
    124       boolean inputDirRecursive) {
    125     job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
    126         inputDirRecursive);
    127   }
    128  
    129   /**
    130    * @param job
    131    *          the job to look at.
    132    * @return should the files to be read recursively?
    133    */
    134   public static boolean getInputDirRecursive(JobContext job) {
    135     return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
    136         false);
    137   }
    138 
    139   /**
    140    * Get the lower bound on split size imposed by the format.
    141    * @return the number of bytes of the minimal split for this format
    142    */
    143   protected long getFormatMinSplitSize() {
    144     return 1;
    145   }
    146 
    147   /**
    148    * Is the given filename splitable? Usually, true, but if the file is
    149    * stream compressed, it will not be.
    150    * 
    151    * <code>FileInputFormat</code> implementations can override this and return
    152    * <code>false</code> to ensure that individual input files are never split-up
    153    * so that {@link Mapper}s process entire files.
    154    * 
    155    * @param context the job context
    156    * @param filename the file name to check
    157    * @return is this file splitable?
    158    */
    159   protected boolean isSplitable(JobContext context, Path filename) {
    160     return true;
    161   }
    162 
    163   /**
    164    * Set a PathFilter to be applied to the input paths for the map-reduce job.
    165    * @param job the job to modify
    166    * @param filter the PathFilter class use for filtering the input paths.
    167    */
    168   public static void setInputPathFilter(Job job,
    169                                         Class<? extends PathFilter> filter) {
    170     job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
    171                                     PathFilter.class);
    172   }
    173 
    174   /**
    175    * Set the minimum input split size
    176    * @param job the job to modify
    177    * @param size the minimum size
    178    */
    179   public static void setMinInputSplitSize(Job job,
    180                                           long size) {
    181     job.getConfiguration().setLong(SPLIT_MINSIZE, size);
    182   }
    183 
    184   /**
    185    * Get the minimum split size
    186    * @param job the job
    187    * @return the minimum number of bytes that can be in a split
    188    */
    189   public static long getMinSplitSize(JobContext job) {
    190     return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    191   }
    192 
    193   /**
    194    * Set the maximum split size
    195    * @param job the job to modify
    196    * @param size the maximum split size
    197    */
    198   public static void setMaxInputSplitSize(Job job,
    199                                           long size) {
    200     job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
    201   }
    202 
    203   /**
    204    * Get the maximum split size.
    205    * @param context the job to look at.
    206    * @return the maximum number of bytes a split can include
    207    */
    208   public static long getMaxSplitSize(JobContext context) {
    209     return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
    210                                               Long.MAX_VALUE);
    211   }
    212 
    213   /**
    214    * Get a PathFilter instance of the filter set for the input paths.
    215    *
    216    * @return the PathFilter instance set for the job, NULL if none has been set.
    217    */
    218   public static PathFilter getInputPathFilter(JobContext context) {
    219     Configuration conf = context.getConfiguration();
    220     Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
    221         PathFilter.class);
    222     return (filterClass != null) ?
    223         (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
    224   }
    225 
    226   /** List input directories.
    227    * Subclasses may override to, e.g., select only files matching a regular
    228    * expression. 
    229    * 
    230    * @param job the job to list input paths for
    231    * @return array of FileStatus objects
    232    * @throws IOException if zero items.
    233    */
    234   protected List<FileStatus> listStatus(JobContext job
    235                                         ) throws IOException {
    236     Path[] dirs = getInputPaths(job);
    237     if (dirs.length == 0) {
    238       throw new IOException("No input paths specified in job");
    239     }
    240     
    241     // get tokens for all the required FileSystems..
    242     TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
    243                                         job.getConfiguration());
    244 
    245     // Whether we need to recursive look into the directory structure
    246     boolean recursive = getInputDirRecursive(job);
    247 
    248     // creates a MultiPathFilter with the hiddenFileFilter and the
    249     // user provided one (if any).
    250     List<PathFilter> filters = new ArrayList<PathFilter>();
    251     filters.add(hiddenFileFilter);
    252     PathFilter jobFilter = getInputPathFilter(job);
    253     if (jobFilter != null) {
    254       filters.add(jobFilter);
    255     }
    256     PathFilter inputFilter = new MultiPathFilter(filters);
    257     
    258     List<FileStatus> result = null;
    259 
    260     int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
    261         DEFAULT_LIST_STATUS_NUM_THREADS);
    262     Stopwatch sw = new Stopwatch().start();
    263     if (numThreads == 1) {
    264       result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
    265     } else {
    266       Iterable<FileStatus> locatedFiles = null;
    267       try {
    268         LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
    269             job.getConfiguration(), dirs, recursive, inputFilter, true);
    270         locatedFiles = locatedFileStatusFetcher.getFileStatuses();
    271       } catch (InterruptedException e) {
    272         throw new IOException("Interrupted while getting file statuses");
    273       }
    274       result = Lists.newArrayList(locatedFiles);
    275     }
    276     
    277     sw.stop();
    278     if (LOG.isDebugEnabled()) {
    279       LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
    280     }
    281     LOG.info("Total input paths to process : " + result.size()); 
    282     return result;
    283   }
    284 
    285   private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
    286       PathFilter inputFilter, boolean recursive) throws IOException {
    287     List<FileStatus> result = new ArrayList<FileStatus>();
    288     List<IOException> errors = new ArrayList<IOException>();
    289     for (int i=0; i < dirs.length; ++i) {
    290       Path p = dirs[i];
    291       FileSystem fs = p.getFileSystem(job.getConfiguration()); 
    292       FileStatus[] matches = fs.globStatus(p, inputFilter);
    293       if (matches == null) {
    294         errors.add(new IOException("Input path does not exist: " + p));
    295       } else if (matches.length == 0) {
    296         errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
    297       } else {
    298         for (FileStatus globStat: matches) {
    299           if (globStat.isDirectory()) {
    300             RemoteIterator<LocatedFileStatus> iter =
    301                 fs.listLocatedStatus(globStat.getPath());
    302             while (iter.hasNext()) {
    303               LocatedFileStatus stat = iter.next();
    304               if (inputFilter.accept(stat.getPath())) {
    305                 if (recursive && stat.isDirectory()) {
    306                   addInputPathRecursively(result, fs, stat.getPath(),
    307                       inputFilter);
    308                 } else {
    309                   result.add(stat);
    310                 }
    311               }
    312             }
    313           } else {
    314             result.add(globStat);
    315           }
    316         }
    317       }
    318     }
    319 
    320     if (!errors.isEmpty()) {
    321       throw new InvalidInputException(errors);
    322     }
    323     return result;
    324   }
    325   
    326   /**
    327    * Add files in the input path recursively into the results.
    328    * @param result
    329    *          The List to store all files.
    330    * @param fs
    331    *          The FileSystem.
    332    * @param path
    333    *          The input path.
    334    * @param inputFilter
    335    *          The input filter that can be used to filter files/dirs. 
    336    * @throws IOException
    337    */
    338   protected void addInputPathRecursively(List<FileStatus> result,
    339       FileSystem fs, Path path, PathFilter inputFilter) 
    340       throws IOException {
    341     RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
    342     while (iter.hasNext()) {
    343       LocatedFileStatus stat = iter.next();
    344       if (inputFilter.accept(stat.getPath())) {
    345         if (stat.isDirectory()) {
    346           addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
    347         } else {
    348           result.add(stat);
    349         }
    350       }
    351     }
    352   }
    353   
    354   
    355   /**
    356    * A factory that makes the split for this class. It can be overridden
    357    * by sub-classes to make sub-types
    358    */
    359   protected FileSplit makeSplit(Path file, long start, long length, 
    360                                 String[] hosts) {
    361     return new FileSplit(file, start, length, hosts);
    362   }
    363   
    364   /**
    365    * A factory that makes the split for this class. It can be overridden
    366    * by sub-classes to make sub-types
    367    */
    368   protected FileSplit makeSplit(Path file, long start, long length, 
    369                                 String[] hosts, String[] inMemoryHosts) {
    370     return new FileSplit(file, start, length, hosts, inMemoryHosts);
    371   }
    372 
    373   /** 
    374    * Generate the list of files and make them into FileSplits.
    375    * @param job the job context
    376    * @throws IOException
    377    */
    378   public List<InputSplit> getSplits(JobContext job) throws IOException {
    379     Stopwatch sw = new Stopwatch().start();
    380     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    381     long maxSize = getMaxSplitSize(job);
    382 
    383     // generate splits
    384     List<InputSplit> splits = new ArrayList<InputSplit>();
    385     List<FileStatus> files = listStatus(job);
    386     for (FileStatus file: files) {
    387       Path path = file.getPath();
    388       long length = file.getLen();
    389       if (length != 0) {
    390         BlockLocation[] blkLocations;
    391         if (file instanceof LocatedFileStatus) {
    392           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    393         } else {
    394           FileSystem fs = path.getFileSystem(job.getConfiguration());
    395           blkLocations = fs.getFileBlockLocations(file, 0, length);
    396         }
    397         if (isSplitable(job, path)) {
    398           long blockSize = file.getBlockSize();
    399           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    400 
    401           long bytesRemaining = length;
    402           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    403             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    404             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    405                         blkLocations[blkIndex].getHosts(),
    406                         blkLocations[blkIndex].getCachedHosts()));
    407             bytesRemaining -= splitSize;
    408           }
    409 
    410           if (bytesRemaining != 0) {
    411             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    412             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    413                        blkLocations[blkIndex].getHosts(),
    414                        blkLocations[blkIndex].getCachedHosts()));
    415           }
    416         } else { // not splitable
    417           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
    418                       blkLocations[0].getCachedHosts()));
    419         }
    420       } else { 
    421         //Create empty hosts array for zero length files
    422         splits.add(makeSplit(path, 0, length, new String[0]));
    423       }
    424     }
    425     // Save the number of input files for metrics/loadgen
    426     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    427     sw.stop();
    428     if (LOG.isDebugEnabled()) {
    429       LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    430           + ", TimeTaken: " + sw.elapsedMillis());
    431     }
    432     return splits;
    433   }
    434 
    435   protected long computeSplitSize(long blockSize, long minSize,
    436                                   long maxSize) {
    437     return Math.max(minSize, Math.min(maxSize, blockSize));
    438   }
    439 
    440   protected int getBlockIndex(BlockLocation[] blkLocations, 
    441                               long offset) {
    442     for (int i = 0 ; i < blkLocations.length; i++) {
    443       // is the offset inside this block?
    444       if ((blkLocations[i].getOffset() <= offset) &&
    445           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
    446         return i;
    447       }
    448     }
    449     BlockLocation last = blkLocations[blkLocations.length -1];
    450     long fileLength = last.getOffset() + last.getLength() -1;
    451     throw new IllegalArgumentException("Offset " + offset + 
    452                                        " is outside of file (0.." +
    453                                        fileLength + ")");
    454   }
    455 
    456   /**
    457    * Sets the given comma separated paths as the list of inputs 
    458    * for the map-reduce job.
    459    * 
    460    * @param job the job
    461    * @param commaSeparatedPaths Comma separated paths to be set as 
    462    *        the list of inputs for the map-reduce job.
    463    */
    464   public static void setInputPaths(Job job, 
    465                                    String commaSeparatedPaths
    466                                    ) throws IOException {
    467     setInputPaths(job, StringUtils.stringToPath(
    468                         getPathStrings(commaSeparatedPaths)));
    469   }
    470 
    471   /**
    472    * Add the given comma separated paths to the list of inputs for
    473    *  the map-reduce job.
    474    * 
    475    * @param job The job to modify
    476    * @param commaSeparatedPaths Comma separated paths to be added to
    477    *        the list of inputs for the map-reduce job.
    478    */
    479   public static void addInputPaths(Job job, 
    480                                    String commaSeparatedPaths
    481                                    ) throws IOException {
    482     for (String str : getPathStrings(commaSeparatedPaths)) {
    483       addInputPath(job, new Path(str));
    484     }
    485   }
    486 
    487   /**
    488    * Set the array of {@link Path}s as the list of inputs
    489    * for the map-reduce job.
    490    * 
    491    * @param job The job to modify 
    492    * @param inputPaths the {@link Path}s of the input directories/files 
    493    * for the map-reduce job.
    494    */ 
    495   public static void setInputPaths(Job job, 
    496                                    Path... inputPaths) throws IOException {
    497     Configuration conf = job.getConfiguration();
    498     Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
    499     StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
    500     for(int i = 1; i < inputPaths.length;i++) {
    501       str.append(StringUtils.COMMA_STR);
    502       path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
    503       str.append(StringUtils.escapeString(path.toString()));
    504     }
    505     conf.set(INPUT_DIR, str.toString());
    506   }
    507 
    508   /**
    509    * Add a {@link Path} to the list of inputs for the map-reduce job.
    510    * 
    511    * @param job The {@link Job} to modify
    512    * @param path {@link Path} to be added to the list of inputs for 
    513    *            the map-reduce job.
    514    */
    515   public static void addInputPath(Job job, 
    516                                   Path path) throws IOException {
    517     Configuration conf = job.getConfiguration();
    518     path = path.getFileSystem(conf).makeQualified(path);
    519     String dirStr = StringUtils.escapeString(path.toString());
    520     String dirs = conf.get(INPUT_DIR);
    521     conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
    522   }
    523   
    524   // This method escapes commas in the glob pattern of the given paths.
    525   private static String[] getPathStrings(String commaSeparatedPaths) {
    526     int length = commaSeparatedPaths.length();
    527     int curlyOpen = 0;
    528     int pathStart = 0;
    529     boolean globPattern = false;
    530     List<String> pathStrings = new ArrayList<String>();
    531     
    532     for (int i=0; i<length; i++) {
    533       char ch = commaSeparatedPaths.charAt(i);
    534       switch(ch) {
    535         case '{' : {
    536           curlyOpen++;
    537           if (!globPattern) {
    538             globPattern = true;
    539           }
    540           break;
    541         }
    542         case '}' : {
    543           curlyOpen--;
    544           if (curlyOpen == 0 && globPattern) {
    545             globPattern = false;
    546           }
    547           break;
    548         }
    549         case ',' : {
    550           if (!globPattern) {
    551             pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
    552             pathStart = i + 1 ;
    553           }
    554           break;
    555         }
    556         default:
    557           continue; // nothing special to do for this character
    558       }
    559     }
    560     pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
    561     
    562     return pathStrings.toArray(new String[0]);
    563   }
    564   
    565   /**
    566    * Get the list of input {@link Path}s for the map-reduce job.
    567    * 
    568    * @param context The job
    569    * @return the list of input {@link Path}s for the map-reduce job.
    570    */
    571   public static Path[] getInputPaths(JobContext context) {
    572     String dirs = context.getConfiguration().get(INPUT_DIR, "");
    573     String [] list = StringUtils.split(dirs);
    574     Path[] result = new Path[list.length];
    575     for (int i = 0; i < list.length; i++) {
    576       result[i] = new Path(StringUtils.unEscapeString(list[i]));
    577     }
    578     return result;
    579   }
    580 
    581 }
    FileInputFormat

    父类

      1 /**
      2  * Licensed to the Apache Software Foundation (ASF) under one
      3  * or more contributor license agreements.  See the NOTICE file
      4  * distributed with this work for additional information
      5  * regarding copyright ownership.  The ASF licenses this file
      6  * to you under the Apache License, Version 2.0 (the
      7  * "License"); you may not use this file except in compliance
      8  * with the License.  You may obtain a copy of the License at
      9  *
     10  *     http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing, software
     13  * distributed under the License is distributed on an "AS IS" BASIS,
     14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     15  * See the License for the specific language governing permissions and
     16  * limitations under the License.
     17  */
     18 
     19 package org.apache.hadoop.mapreduce;
     20 
     21 import java.io.IOException;
     22 import java.util.List;
     23 
     24 import org.apache.hadoop.classification.InterfaceAudience;
     25 import org.apache.hadoop.classification.InterfaceStability;
     26 import org.apache.hadoop.fs.FileSystem;
     27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     28 
     29 /** 
     30  * <code>InputFormat</code> describes the input-specification for a 
     31  * Map-Reduce job. 
     32  * 
     33  * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
     34  * job to:<p>
     35  * <ol>
     36  *   <li>
     37  *   Validate the input-specification of the job. 
     38  *   <li>
     39  *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
     40  *   which is then assigned to an individual {@link Mapper}.
     41  *   </li>
     42  *   <li>
     43  *   Provide the {@link RecordReader} implementation to be used to glean
     44  *   input records from the logical <code>InputSplit</code> for processing by 
     45  *   the {@link Mapper}.
     46  *   </li>
     47  * </ol>
     48  * 
     49  * <p>The default behavior of file-based {@link InputFormat}s, typically 
     50  * sub-classes of {@link FileInputFormat}, is to split the 
     51  * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
     52  * bytes, of the input files. However, the {@link FileSystem} blocksize of  
     53  * the input files is treated as an upper bound for input splits. A lower bound 
     54  * on the split size can be set via 
     55  * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
     56  * mapreduce.input.fileinputformat.split.minsize</a>.</p>
     57  * 
     58  * <p>Clearly, logical splits based on input-size is insufficient for many 
     59  * applications since record boundaries are to respected. In such cases, the
     60  * application has to also implement a {@link RecordReader} on whom lies the
     61  * responsibility to respect record-boundaries and present a record-oriented
     62  * view of the logical <code>InputSplit</code> to the individual task.
     63  *
     64  * @see InputSplit
     65  * @see RecordReader
     66  * @see FileInputFormat
     67  */
     68 @InterfaceAudience.Public
     69 @InterfaceStability.Stable
     70 public abstract class InputFormat<K, V> {
     71 
     72   /** 
     73    * Logically split the set of input files for the job.  
     74    * 
     75    * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
     76    * for processing.</p>
     77    *
     78    * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
     79    * input files are not physically split into chunks. For e.g. a split could
     80    * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
     81    * also creates the {@link RecordReader} to read the {@link InputSplit}.
     82    * 
     83    * @param context job configuration.
     84    * @return an array of {@link InputSplit}s for the job.
     85    */
     86   public abstract 
     87     List<InputSplit> getSplits(JobContext context
     88                                ) throws IOException, InterruptedException;
     89   
     90   /**
     91    * Create a record reader for a given split. The framework will call
     92    * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
     93    * the split is used.
     94    * @param split the split to be read
     95    * @param context the information about the task
     96    * @return a new record reader
     97    * @throws IOException
     98    * @throws InterruptedException
     99    */
    100   public abstract 
    101     RecordReader<K,V> createRecordReader(InputSplit split,
    102                                          TaskAttemptContext context
    103                                         ) throws IOException, 
    104                                                  InterruptedException;
    105 
    106 }
    InputFormat源码
       * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
       * for processing.</p>

    *
    * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
    * input files are not physically split into chunks. For e.g. a split could
    * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
    * also creates the {@link RecordReader} to read the {@link InputSplit}.
    *
    * @param context job configuration.
    * @return an array of {@link InputSplit}s for the job.
    */
    public abstract
                                List<InputSplit> getSplits(JobContext context
                       ) throws IOException, InterruptedException;

      意思是:每一个文件逻辑上切分成若干个split(由getsplit方法),一个split对应一个mapper任务

     /**
       * Create a record reader for a given split. The framework will call
       * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
       * the split is used.
       * @param split the split to be read
       * @param context the information about the task
       * @return a new record reader
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract 
        RecordReader<K,V> createRecordReader(InputSplit split,
                                             TaskAttemptContext context
                                            ) throws IOException, 
                                                     InterruptedException;
    
    }
    

      意思是:split本质上是文件内容一部分,由RecordReader来处理文件内容(键值对),进入RecordReader查看,可得该抽象类将data数据拆分成键值对,目的是输入给Mapper

    /**
    * The record reader breaks the data into key/value pairs for input to the
    * {@link Mapper}.
    * @param <KEYIN>
    * @param <VALUEIN>
    */

    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    
      /**
       * Called once at initialization.
       * @param split the split that defines the range of records to read
       * @param context the information about the task
       * @throws IOException
       * @throws InterruptedException
       */
    

      由此总结,源码分析的

                      文件-----------通过----------->getsplits()-----------分解为------------>InputSplit------------通过-------------->RecordReader类(由createRecordReader()方法创建的)-------处理---------->map(k1,v1)

    第一部分:文件切分  

       问题1:如何将文件切分成split,查看自雷的getsplits()方法

     1  /** 
     2    * Generate the list of files and make them into FileSplits.
     3    * @param job the job context
     4    * @throws IOException
     5    */
     6   public List<InputSplit> getSplits(JobContext job) throws IOException {
     7     Stopwatch sw = new Stopwatch().start();
     8     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     9     long maxSize = getMaxSplitSize(job);
    10 
    11     // generate splits
    12     List<InputSplit> splits = new ArrayList<InputSplit>();
    13     List<FileStatus> files = listStatus(job);
    14     for (FileStatus file: files) {
    15       Path path = file.getPath();
    16       long length = file.getLen();
    17       if (length != 0) {
    18         BlockLocation[] blkLocations;
    19         if (file instanceof LocatedFileStatus) {
    20           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    21         } else {
    22           FileSystem fs = path.getFileSystem(job.getConfiguration());
    23           blkLocations = fs.getFileBlockLocations(file, 0, length);
    24         }
    25         if (isSplitable(job, path)) {
    26           long blockSize = file.getBlockSize();
    27           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    28 
    29           long bytesRemaining = length;
    30           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    31             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    32             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    33                         blkLocations[blkIndex].getHosts(),
    34                         blkLocations[blkIndex].getCachedHosts()));
    35             bytesRemaining -= splitSize;
    36           }
    37 
    38           if (bytesRemaining != 0) {
    39             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    40             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    41                        blkLocations[blkIndex].getHosts(),
    42                        blkLocations[blkIndex].getCachedHosts()));
    43           }
    44         } else { // not splitable
    45           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
    46                       blkLocations[0].getCachedHosts()));
    47         }
    48       } else { 
    49         //Create empty hosts array for zero length files
    50         splits.add(makeSplit(path, 0, length, new String[0]));
    51       }
    52     }
    53     // Save the number of input files for metrics/loadgen
    54     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    55     sw.stop();
    56     if (LOG.isDebugEnabled()) {
    57       LOG.debug("Total # of splits generated by getSplits: " + splits.size()
    58           + ", TimeTaken: " + sw.elapsedMillis());
    59     }
    60     return splits;
    61   }
    getSplits
      /** 
       * Generate the list of files and make them into FileSplits.
         将文件切分成split
       * @param job the job context
       * @throws IOException
       */
    

    (1) 通过add方法将切片加入列表

    (2)add方法中通过makesplit方法实现逻辑块的切分

    (3)makeSplit内部使用FileSplit进行文件切分

    (4)FileSplit三个参数的意义如下

    hosts值得是包含块的节点列表,即block块。从start开始处理,处理多长length,处理的数据信息位于那个block块上,因此Split是逻辑切分

    没有真正切分,如此对程序的影响,不会真正去读磁盘数据,而是使用HDFS读数据方法。

     (5)分析文件长度不为0程序如何执行

    if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) { //如果文件被切分,并非所有文件 都可以切分,比如密码文件,通常有文件结构决定是否可以被切分
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize, //length-bytesRemaining为剩余字节
                           blkLocations[blkIndex].getHosts(), 
    blkLocations[blkIndex].getCachedHosts()));
    bytesRemaining
    -= splitSize; }
    1. 如果文件大小300,length=300,bytesRemaining=300
    2. 执行第一次makesplit(0,128)  按splitSize=128切分
    3. bytesRemaining=300-128=172
    4. 执行第二次makesplit(300-172=128,128)
    5. bytesRemaining=172-128=44
    6. 执行第三次makesplit(300-44=256,128)

    文件不允许被分割,执行以下程序

    } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
    

      (6)分析文件块大小

    380   long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  //结果分析为1L
    381   long maxSize = getMaxSplitSize(job);//最大为Long的最大值

        查看變量minsize的源代碼  

       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    

      点击getFormatMinSplitSize()查看,为1L

      /**
       * Get the lower bound on split size imposed by the format.
       * @return the number of bytes of the minimal split for this format
       */
      protected long getFormatMinSplitSize() {
        return 1;
      }
    

      点击getMinSplitSize()查看,计算办法为返回当前文件块的最小尺寸,如果配置文件中没有SPLIT_MINSIZE参数则返回1L

      /**
       * Get the minimum split size
       * @param job the job
       * @return the minimum number of bytes that can be in a split
       */
      public static long getMinSplitSize(JobContext job) {
        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
      }
    

      从而得到minsplit最小值为1L

    而真正计算block是在399行,默认情况下inputsplit和block的大小均为128M,换句话说,一个map处理数据块的大小是一个block块大小

    397        if (isSplitable(job, path)) {
    398          long blockSize = file.getBlockSize();
    399          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

         当inputsplit和block的大小不同的时候,就会产生网络传输,如果inputsplit比block大,则inputsplit所需的是一个block块是不够的的,必须在找一个block块。

    如果inputsplit比block小,block块中得一部分数据是没有被处理的,可能被别的map处理,也就可鞥产生网络传输,也是一种数据本地化的。

        因为源码中使用的是for循环,因此,没一个文件都会去切分split

        

     (7)两个50M,一个200M的文件和一个空文件会产生几个split

         空白文件也会产生split,两个50M产生两个split,一个200M产生2个split,共需要5个map任务

     **********************************************************

    第二部分 通过createRecordReader()处理map任务

        

     (1)解读createRecordReader  

    /** An {@link InputFormat} for plain text files.  Files are broken into lines.
     * Either linefeed or carriage-return are used to signal end of line.  Keys are
     * the position in the file, and values are the line of text.. */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    
      @Override
      public RecordReader<LongWritable, Text> 
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get(
            "textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
          recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);//delimiter为分隔符,编码格式为utf-8,在解析的时候如果不是这个格式将会出错
    return new LineRecordReader(recordDelimiterBytes); }

    LineRecordReader为RecordReader的子类

    /**
     * Treats keys as offset in file and value as line.   key为偏移量,value为每一行的值
     */
    @InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"})
    @InterfaceStability.Evolving
    public class LineRecordReader extends RecordReader<LongWritable, Text> {
      private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
      public static final String MAX_LINE_LENGTH = 
        "mapreduce.input.linerecordreader.line.maxlength";
    
      private long start;
      private long pos;
      private long end;

    RecordReader类

     /**
       * Called once at initialization.
       * @param split the split that defines the range of records to read
       * @param context the information about the task
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract void initialize(InputSplit split,    //初始化,执行一次
                                      TaskAttemptContext context
                                      ) throws IOException, InterruptedException;
    
      /**
       * Read the next key, value pair.
       * @return true if a key/value pair was read
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract     //读取下一个键值对  这个键值对是map端的k1和v1
      boolean nextKeyValue() throws IOException, InterruptedException;
    
      /**
       * Get the current key   //得到key值
       * @return the current key or null if there is no current key
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract
      KEYIN getCurrentKey() throws IOException, InterruptedException;
      
      /**
       * Get the current value.
       * @return the object that was read
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract 
      VALUEIN getCurrentValue() throws IOException, InterruptedException;
      
      /**
       * The current progress of the record reader through its data.
       * @return a number between 0.0 and 1.0 that is the fraction of the data read
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract float getProgress() throws IOException, InterruptedException;
      
      /**
       * Close the record reader.
       */
      public abstract void close() throws IOException;
    }

        上述没有key和value的值,这是需要注意的,下面却提供了key和value的get方法,因此key和value在类的字段中存放,

    在方法体中对key和value赋值,然后再利用getCurrentkey和getCurrentValue获得key和value。

        比如:while(rs.next()){rs.getLong()}

                  Enumeration里面有一个hasMoreElement()方法也是上述情况,hashtable方法,用element做迭代,最后归结到Enumeration

        因此上述程序可理解为:

              while(rr.nextKeyValue()){key=rr.getCurrentKey(),value=rr.getCurrentValue(),map(key,value,context)}

       通过源代码可以验证上述猜想,key和value的类型已经固定,因此在mapreduce中可以省略<k1,v1>不写

              

             其中SplitLineReader为行读取器。

    (2)

       

           **split.getStart()处理被处理数据的起始位置,和行没有关系

        

        起始位置赋给了pos当前位置,现在查找netKeyVAalue()方法

        LineRecordReader类中

    public boolean nextKeyValue() throws IOException {
        if (key == null) {
          key = new LongWritable();
        }
        key.set(pos);
        if (value == null) {
          value = new Text();
        }
        int newSize = 0;
        // We always read one extra line, which lies outside the upper
        // split limit i.e. (end - 1)
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
          if (pos == 0) {
            newSize = skipUtfByteOrderMark();
          } else {
            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
            pos += newSize;
          }
    
          if ((newSize == 0) || (newSize < maxLineLength)) {
            break;
          }
    
          // line too long. try again
          LOG.info("Skipped line of size " + newSize + " at pos " + 
                   (pos - newSize));
        }
        if (newSize == 0) {
          key = null;
          value = null;
          return false;
        } else {
          return true;
        }
      }

    如:hello you 

          hello  me

         上述文件中,会被切分成一个split,在这里

          第一次调用nextKeyValue()的时候start=0,value=hello you,end=19,pos=0,key=0,newsize=10

          第二次调用nextKeyValue()的时候key=10,value=hello me,newsize=10

         由readLine()方法从输入流中读取给定文本,返回值为被读取字节的数量

          newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));读取一行数据,将数据放入value中,返回值为被读取字节的长度,还包括新行(换行)

         总结,key和value的值就是通过nextKeyValue()方法赋值的。

    第三部分 当key,value被赋值之后,剩下的问题就是如何被map函数所调用?

     *****************************************************************

    从map类分析:   

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.apache.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.mapreduce.task.MapContextImpl;
    
    /** 
     * Maps input key/value pairs to a set of intermediate key/value pairs.  
     * 
     * <p>Maps are the individual tasks which transform input records into a 
     * intermediate records. The transformed intermediate records need not be of 
     * the same type as the input records. A given input pair may map to zero or 
     * many output pairs.</p> 
     * 
     * <p>The Hadoop Map-Reduce framework spawns one map task for each 
     * {@link InputSplit} generated by the {@link InputFormat} for the job.
     * <code>Mapper</code> implementations can access the {@link Configuration} for 
     * the job via the {@link JobContext#getConfiguration()}.
     * 
     * <p>The framework first calls 
     * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
     * {@link #map(Object, Object, Context)} 
     * for each key/value pair in the <code>InputSplit</code>. Finally 
     * {@link #cleanup(Context)} is called.</p>
     * 
     * <p>All intermediate values associated with a given output key are 
     * subsequently grouped by the framework, and passed to a {@link Reducer} to  
     * determine the final output. Users can control the sorting and grouping by 
     * specifying two key {@link RawComparator} classes.</p>
     *
     * <p>The <code>Mapper</code> outputs are partitioned per 
     * <code>Reducer</code>. Users can control which keys (and hence records) go to 
     * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
     * 
     * <p>Users can optionally specify a <code>combiner</code>, via 
     * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
     * intermediate outputs, which helps to cut down the amount of data transferred 
     * from the <code>Mapper</code> to the <code>Reducer</code>.
     * 
     * <p>Applications can specify if and how the intermediate
     * outputs are to be compressed and which {@link CompressionCodec}s are to be
     * used via the <code>Configuration</code>.</p>
     *  
     * <p>If the job has zero
     * reduces then the output of the <code>Mapper</code> is directly written
     * to the {@link OutputFormat} without sorting by keys.</p>
     * 
     * <p>Example:</p>
     * <p><blockquote><pre>
     * public class TokenCounterMapper 
     *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
     *    
     *   private final static IntWritable one = new IntWritable(1);
     *   private Text word = new Text();
     *   
     *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     *     StringTokenizer itr = new StringTokenizer(value.toString());
     *     while (itr.hasMoreTokens()) {
     *       word.set(itr.nextToken());
     *       context.write(word, one);
     *     }
     *   }
     * }
     * </pre></blockquote></p>
     *
     * <p>Applications may override the {@link #run(Context)} method to exert 
     * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
     * etc.</p>
     * 
     * @see InputFormat
     * @see JobContext
     * @see Partitioner  
     * @see Reducer
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    
      /**
       * The <code>Context</code> passed on to the {@link Mapper} implementations.
       */
      public abstract class Context
        implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
      }
      
      /**
       * Called once at the beginning of the task.任务执行开始调用
       */
      protected void setup(Context context
                           ) throws IOException, InterruptedException {
        // NOTHING
      }
    
      /**input split的每一个键值对都调用一次
       * Called once for each key/value pair in the input split. Most applications
       * should override this, but the default is the identity function.
       */
      @SuppressWarnings("unchecked")
      protected void map(KEYIN key, VALUEIN value, 
                         Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
      }
    
      /**
       * Called once at the end of the task.
       */
      protected void cleanup(Context context
                             ) throws IOException, InterruptedException {
        // NOTHING
      }
      
      /**
       * Expert users can override this method for more complete control over the
       * execution of the Mapper.
       * @param context
       * @throws IOException
       */
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);  执行一次
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);//一个inputsplit调用一次map函数
          }
        } finally {
          cleanup(context);  执行一次
        }
      }
    }
    nextKeyValue()方法查看,

       找到nextkeyvalue的实现,ctrl+t也可进入

        

    MapContextImpl类下面提供了该类的实现

    其种reader由RecordReader类提供

     另外通过Context源码查看

    进入该类

    研究mapcontext,mapcontext是在构造函数中赋值的

     

    查看WrappedMapper類的nextkeyvalue()方法

    通過查看其實現,可以查找其實鮮類

    可觀察到以下reader的實現情況

    reader最終是有RecordReader來聲明的。

    總結:

    调用MapContextImpl有参构造方法,然后将RecordReader赋值进去(57行),从而可以调用80行的nextkeyvalue()方法,然后MapContextImpl的父类Context调用nextkeyvalue()

     总结:从源代码的角度分析map函数处理的<k1,v1>是如何从HDFS文件中获取的?答:

    1.从TextInputFormat入手分析,找到父类FileInputFormat,找到父类InputFormat。
    在InputFormat中找到2个方法,分别是getSplits(...)和createRecordReader(...)。
    通过注释知道getSplits(...)作用是把输入文件集合中的所有内容解析成一个个的InputSplits,每一个InputSplit对应一个mapper task。
    createRecordReader(...)作用是创建一个RecordReader的实现类。RecordReader作用是解析InputSplit产生一个个的<k,v>。
    2.在FileInputFormat中找到getSplits(...)的实现。
    通过实现,获知
    (1)每个SplitSize的大小和默认的block大小一致,好处是满足数据本地性。
    (2)每个输入文件都会产生一个InputSplit,即使是空白文件,也会产生InputSPlit;
    如果一个文件非常大,那么会按照InputSplit大小,切分产生多个InputSplit。
    3.在TextInputFormat中找到createRecordReader(...)的实现,在方法中找到了LineRecordReader。
    接下来分析LineRecordReader类。
    在RecordReader类中,通过查看多个方法,知晓key、value作为类的属性存在的,且知道了nextKeyValue()方法的用法。
    在LineRecordReader类中,重点分析了nextKeyValue(...)方法。在这个方法中,重点分析了newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
    在in.readLine(...)中,第一个形参存储被读取的行文本内容,返回值表示被读取内容的字节数。
    通过以上代码,分析了InputSplit中的内容是如何转化为一个个的<k,v>。
    4.从Mapper类中进行分析,发现了setup()、cleanup()、map()、run()。
    在run()方法中,通过while,调用context.nextKeyValue(...)。
    进一步分析Context的接口类是org.apache.hadoop.mapreduce.lib.map.WrappedMapper.MapContext,MapContext调用了nextKeyValue(...)。最终找到了MapContext的实现了MapContextImpl类org.apache.hadoop.mapreduce.task.MapContextImpl。
    在这个类的构造方法中,发现传入了RecordReader的实现类。

     

     

  • 相关阅读:
    在网易和百度实习之后,我才明白了这些事
    从Java小白到收获BAT等offer,分享我这两年的经验和感悟
    曾经做的一个JS小游戏——《Battle City》
    适配器(Adapter)模式
    装饰器(Decorator)模式
    Java IO
    JDBC中驱动加载的过程分析
    从PipedInputStream/PipedOutputStream谈起
    从InputStream到ByteArrayInputStream
    JDK中的动态代理
  • 原文地址:https://www.cnblogs.com/jackchen-Net/p/6405951.html
Copyright © 2011-2022 走看看