zoukankan      html  css  js  c++  java
  • MapReduce的InputFormat过程的学习

    转自:http://blog.csdn.net/androidlushangderen/article/details/41114259

    昨天经过几个小时的学习,把MapReduce的第一个阶段的过程学习了一下,也就是最最开始的时候从文件中的Data到key-value的映射,也就是InputFormat的过程。虽说过程不是很难,但是也存在很多细节的。也很少会有人对此做比较细腻的研究,学习。今天,就让我来为大家剖析一下这段代码的原理。我还为此花了一点时间做了几张结构图,便于大家理解。在这里先声明一下,我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的。

              InputFormat最最原始的形式就是一个接口。后面出现的各种Format都是他的衍生类。结构如下,只包含最重要的2个方法:

    1. public interface InputFormat<K, V> {  
    2.   
    3.   /**  
    4.    * Logically split the set of input files for the job.   
    5.    *  
    6.    * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} 
    7.    * for processing.</p> 
    8.    * 
    9.    * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the 
    10.    * input files are not physically split into chunks. For e.g. a split could 
    11.    * be <i><input-file-path, start, offset></i> tuple. 
    12.    *  
    13.    * @param job job configuration. 
    14.    * @param numSplits the desired number of splits, a hint. 
    15.    * @return an array of {@link InputSplit}s for the job. 
    16.    */  
    17.   InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;  
    18.   
    19.   /**  
    20.    * Get the {@link RecordReader} for the given {@link InputSplit}. 
    21.    * 
    22.    * <p>It is the responsibility of the <code>RecordReader</code> to respect 
    23.    * record boundaries while processing the logical split to present a  
    24.    * record-oriented view to the individual task.</p> 
    25.    *  
    26.    * @param split the {@link InputSplit} 
    27.    * @param job the job that this split belongs to 
    28.    * @return a {@link RecordReader} 
    29.    */  
    30.   RecordReader<K, V> getRecordReader(InputSplit split,  
    31.                                      JobConf job,   
    32.                                      Reporter reporter) throws IOException;  
    33. }  

    所以后面讲解,我也只是会围绕这2个方法进行分析。当然我们用的最多的是从文件中获得输入数据,也就是FileInputFormat这个类。继承关系如下:

    1. public abstract class FileInputFormat<K, V> implements InputFormat<K, V>  

    我们看里面的1个主要方法:

    1. public InputSplit[] getSplits(JobConf job, int numSplits)  

    返回的类型是一个InputSpilt对象,这是一个抽象的输入Spilt分片概念。结构如下:

    1. public interface InputSplit extends Writable {  
    2.   
    3.   /** 
    4.    * Get the total number of bytes in the data of the <code>InputSplit</code>. 
    5.    *  
    6.    * @return the number of bytes in the input split. 
    7.    * @throws IOException 
    8.    */  
    9.   long getLength() throws IOException;  
    10.     
    11.   /** 
    12.    * Get the list of hostnames where the input split is located. 
    13.    *  
    14.    * @return list of hostnames where data of the <code>InputSplit</code> is 
    15.    *         located as an array of <code>String</code>s. 
    16.    * @throws IOException 
    17.    */  
    18.   String[] getLocations() throws IOException;  
    19. }  

    提供了与数据相关的2个方法。后面这个返回的值会被用来传递给RecordReader里面去的。在想理解getSplits方法之前还有一个类需要理解,FileStatus,里面包装了一系列的文件基本信息方法:

    1. public class FileStatus implements Writable, Comparable {  
    2.   
    3.   private Path path;  
    4.   private long length;  
    5.   private boolean isdir;  
    6.   private short block_replication;  
    7.   private long blocksize;  
    8.   private long modification_time;  
    9.   private long access_time;  
    10.   private FsPermission permission;  
    11.   private String owner;  
    12.   private String group;  

    .....

    看到这里你估计会有点晕了,下面是我做的一张小小类图关系:

    可以看到,FileSpilt为了兼容新老版本,继承了新的抽象类InputSpilt,同时附上旧的接口形式的InputSpilt。下面我们看看里面的getspilt核心过程:

    1. /** Splits files returned by {@link #listStatus(JobConf)} when 
    2.    * they're too big.*/   
    3.   @SuppressWarnings("deprecation")  
    4.   public InputSplit[] getSplits(JobConf job, int numSplits)  
    5.     throws IOException {  
    6.     //获取所有的状态文件  
    7.     FileStatus[] files = listStatus(job);  
    8.       
    9.     // Save the number of input files in the job-conf  
    10.     //在job-cof中保存文件的数量  
    11.     job.setLong(NUM_INPUT_FILES, files.length);  
    12.     long totalSize = 0;                             
    13.     // compute total size,计算文件总的大小  
    14.     for (FileStatus file: files) {                // check we have valid files  
    15.       if (file.isDir()) {  
    16.           //如果是目录不是纯文件的直接抛异常  
    17.         throw new IOException("Not a file: "+ file.getPath());  
    18.       }  
    19.       totalSize += file.getLen();  
    20.     }  
    21.   
    22.     //用户期待的划分大小,总大小除以spilt划分数目  
    23.     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
    24.     //获取系统的划分最小值  
    25.     long minSize = Math.max(job.getLong("mapred.min.split.size", 1),  
    26.                             minSplitSize);  
    27.   
    28.     // generate splits  
    29.     //创建numSplits个FileSpilt文件划分量  
    30.     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);  
    31.     NetworkTopology clusterMap = new NetworkTopology();  
    32.     for (FileStatus file: files) {  
    33.       Path path = file.getPath();  
    34.       FileSystem fs = path.getFileSystem(job);  
    35.       long length = file.getLen();  
    36.       //获取此文件的block的位置列表  
    37.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);  
    38.       //如果文件系统可划分  
    39.       if ((length != 0) && isSplitable(fs, path)) {  
    40.         //计算此文件的总的block块的大小  
    41.         long blockSize = file.getBlockSize();  
    42.         //根据期待大小,最小大小,得出最终的split分片大小  
    43.         long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
    44.   
    45.         long bytesRemaining = length;  
    46.         //如果剩余待划分字节倍数为划分大小超过1.1的划分比例,则进行拆分  
    47.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
    48.           //获取提供数据的splitHost位置  
    49.           String[] splitHosts = getSplitHosts(blkLocations,   
    50.               length-bytesRemaining, splitSize, clusterMap);  
    51.           //添加FileSplit  
    52.           splits.add(new FileSplit(path, length-bytesRemaining, splitSize,   
    53.               splitHosts));  
    54.           //数量减少splitSize大小  
    55.           bytesRemaining -= splitSize;  
    56.         }  
    57.           
    58.         if (bytesRemaining != 0) {  
    59.           //添加刚刚剩下的没划分完的部分,此时bytesRemaining已经小于splitSize的1.1倍了  
    60.           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,   
    61.                      blkLocations[blkLocations.length-1].getHosts()));  
    62.         }  
    63.       } else if (length != 0) {  
    64.         //不划分,直接添加Spilt  
    65.         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);  
    66.         splits.add(new FileSplit(path, 0, length, splitHosts));  
    67.       } else {   
    68.         //Create empty hosts array for zero length files  
    69.         splits.add(new FileSplit(path, 0, length, new String[0]));  
    70.       }  
    71.     }  
    72.       
    73.     //最后返回FileSplit数组  
    74.     LOG.debug("Total # of splits: " + splits.size());  
    75.     return splits.toArray(new FileSplit[splits.size()]);  
    76.   }  

    里面有个computerSpiltSize方法很特殊,考虑了很多情况,总之最小值不能小于系统设定的最小值。要与期待值,块大小,系统允许最小值:

    1. protected long computeSplitSize(long goalSize, long minSize,  
    2.                                        long blockSize) {  
    3.     return Math.max(minSize, Math.min(goalSize, blockSize));  
    4.   }  

    上述过程的相应流程图如下:

    3种情况3中年执行流程。

          处理完getSpilt方法然后,也就是说已经把数据从文件中转划到InputSpilt中了,接下来就是给RecordRead去取出里面的一条条的记录了。当然这在FileInputFormat是抽象方法,必须由子类实现的,我在这里挑出了2个典型的子类SequenceFileInputFormat,和TextInputFormat。他们的实现RecordRead方法如下:

    1. public RecordReader<K, V> getRecordReader(InputSplit split,  
    2.                                       JobConf job, Reporter reporter)  
    3.     throws IOException {  
    4.   
    5.     reporter.setStatus(split.toString());  
    6.   
    7.     return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);  
    8.   }  
    1. public RecordReader<LongWritable, Text> getRecordReader(  
    2.                                           InputSplit genericSplit, JobConf job,  
    3.                                           Reporter reporter)  
    4.     throws IOException {  
    5.       
    6.     reporter.setStatus(genericSplit.toString());  
    7.     return new LineRecordReader(job, (FileSplit) genericSplit);  
    8.   }  


    可以看到里面的区别就在于LineRecordReader和SequenceFileRecordReader的不同了,这也就表明2种方式对应于数据的读取方式可能会不一样,继续往里深入看:

    1. /** An {@link RecordReader} for {@link SequenceFile}s. */  
    2. public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {  
    3.     
    4.   private SequenceFile.Reader in;  
    5.   private long start;  
    6.   private long end;  
    7.   private boolean more = true;  
    8.   protected Configuration conf;  
    9.   
    10.   public SequenceFileRecordReader(Configuration conf, FileSplit split)  
    11.     throws IOException {  
    12.     Path path = split.getPath();  
    13.     FileSystem fs = path.getFileSystem(conf);  
    14.     //从文件系统中读取数据输入流  
    15.     this.in = new SequenceFile.Reader(fs, path, conf);  
    16.     this.end = split.getStart() + split.getLength();  
    17.     this.conf = conf;  
    18.   
    19.     if (split.getStart() > in.getPosition())  
    20.       in.sync(split.getStart());                  // sync to start  
    21.   
    22.     this.start = in.getPosition();  
    23.     more = start < end;  
    24.   }  
    25.   
    26.   ......  
    27.     
    28.   /** 
    29.    * 获取下一个键值对 
    30.    */  
    31.   public synchronized boolean next(K key, V value) throws IOException {  
    32.     //判断还有无下一条记录  
    33.     if (!more) return false;  
    34.     long pos = in.getPosition();  
    35.     boolean remaining = (in.next(key) != null);  
    36.     if (remaining) {  
    37.       getCurrentValue(value);  
    38.     }  
    39.     if (pos >= end && in.syncSeen()) {  
    40.       more = false;  
    41.     } else {  
    42.       more = remaining;  
    43.     }  
    44.     return more;  
    45.   }  

    我们可以看到SequenceFileRecordReader是从输入流in中一个键值,一个键值的读取,另外一个的实现方式如下:

    1. /** 
    2.  * Treats keys as offset in file and value as line.  
    3.  */  
    4. public class LineRecordReader implements RecordReader<LongWritable, Text> {  
    5.   private static final Log LOG  
    6.     = LogFactory.getLog(LineRecordReader.class.getName());  
    7.   
    8.   private CompressionCodecFactory compressionCodecs = null;  
    9.   private long start;  
    10.   private long pos;  
    11.   private long end;  
    12.   private LineReader in;  
    13.   int maxLineLength;  
    14.   
    15.   ....  
    16.     
    17.   /** Read a line. */  
    18.   public synchronized boolean next(LongWritable key, Text value)  
    19.     throws IOException {  
    20.   
    21.     while (pos < end) {  
    22.       //设置key   
    23.       key.set(pos);  
    24.   
    25.       //根据位置一行一行读取,设置value  
    26.       int newSize = in.readLine(value, maxLineLength,  
    27.                                 Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),  
    28.                                          maxLineLength));  
    29.       if (newSize == 0) {  
    30.         return false;  
    31.       }  
    32.       pos += newSize;  
    33.       if (newSize < maxLineLength) {  
    34.         return true;  
    35.       }  
    36.   
    37.       // line too long. try again  
    38.       LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));  
    39.     }  
    40.   
    41.     return false;  
    42.   }  

    实现的方式为通过读的位置,从输入流中逐行读取key-value。通过这2种方法,就能得到新的key-value,就会用于后面的map操作。

    InputFormat的整个流程其实我忽略了很多细节。大体流程如上述所说。

  • 相关阅读:
    ZENCART 在文本格式郵件中轉換貨币符号
    ZENCART contact us 收不到邮件的问题
    zencart 如何修改在线人数和订单编号
    19.Oracle的动态监听和静态监听
    16.查看ORACLE的SAG和PGA的使用率
    1.Rman备份的基本命令
    4.ASM常用命令汇总
    2.Rman 备份、检查、维护、恢复
    2.oracle的备份和恢复之expdp和impdp命令02
    17.sqlnet.ora文件
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5043991.html
Copyright © 2011-2022 走看看