zoukankan      html  css  js  c++  java
  • hadoop old API CombineFileInputFormat

    来自:http://f.dataguru.cn/thread-271645-1-1.html

    简介

    本文主要介绍下面4个方面

    1.为什么要使用CombineFileInputFormat

    2.CombineFileInputFormat实现原理

    3.怎样使用CombineFileInputFormat

    4.现存的问题

    使用CombineFileInputFormat的目的

    在开发MR的程序时,mapper的主要作用是对数据的收集。一般情况下,为了能让mapper更快的运行,我们会对文件进行split,以便多个mapper同时运行。在这种情况下,为了让程序更好更快的运行,我们需要控制mapper的个数。Mapper的个数主要由文件的大小及我们所设置的mapred.min.split.size以及blockSize所决定(详细参考:http://ai-longyu.iteye.com/blog/1566633

    上面所说的在我们使用TextInputFormat和分析单个文件时是没有问题的,基本上mapper的个数能够控制在我们所预期的范围内。但是当我们使用多个文件作为input的时候,mapper的个数就不再是我们所期望的那样了,因为TextInputFormat继承的是FileInputFormat,而FileInputFormatsplit操作是只针对单个文件,对于多个文件,是将每个文件进行split,而不能做一些合并的操作(尤其是大量的小文件)。

    你会想为什么不能进行合并呢,有没有实现合并的split呢?在这个时候,CombineFileInputFormat就闪亮登场了。这里所说的CombineFileInputFormat是由官方提供的,只要我们搞清楚了官方是怎么实现的,就能够自己也实现一个了。接下来将逐步分析CombineFileInputFormat的实现了。



    CombineFileInputFormat实现步骤

    这里插一句,官方的CombineFileInputFormat并不是线程安全的。

    先申明一下,这里分析所采用的源码是apache的1.0.3,分析的在org.apache.hadoop.mapred.lib.CombineFileInputFormat而不是org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat,这里分析的旧API,而没有分析新的API

    生成split的信息是由

    • public InputSplit[] getSplits(JobConf job, int numSplits)  

    Job参数:job的配置信息

    numSplits参数:期望的mapper数目,在这里根本就没有使用

    • //每个DN的最小split大小
    • long minSizeNode = 0;
    • //同机架的最小split大小
    • long minSizeRack = 0;
    • //最大的split大小
    • long maxSize = 0; 



    这几个变量都可以从job的配置信息中获取

    接下来就是获取input的路径列表,判断每个路径时候被Filter所允许,然后对允许的路径列表生成split信息列表,进入该类的核心方法

    • /**
    •   * Return all the splits in the specified set of paths
    •   *  
    • * @param job Job的配置信息
    • * @param paths 输入源的路径列表
    • * @param maxSize 最大的split大小
    • * @param minSizeNode 每个DN最小的split大小
    • * @param minSizeRack 每个rack最小的split大小
    • * @param splits split信息列表
    • * @throws IOException
    • */
    • private void getMoreSplits(JobConf job, Path[] paths,  
    •                             long maxSize, long minSizeNode, long minSizeRack,
    •                             List<CombineFileSplit> splits) 

    生成每个文件的OneFileInfo对象

    • // populate all the blocks for all files
    •     long totLength = 0;
    •     for (int i = 0; i < paths.length; i++) {
    •         //构建每个input文件的信息,并将文件中的每个
    •         //block信息收集到rackToBlocks、blockToNodes、nodeToBlocks中
    •       files = new OneFileInfo(paths, job,  
    •                                  rackToBlocks, blockToNodes, nodeToBlocks);
    •       //增加所有文件的大小
    •       totLength += files.getLength();
    •     } 



    在下面就开始真正的生成Split信息了



    第一次:将同DN上的所有block生成Split,生成方式:



    1.循环nodeToBlocks,获得每个DN上有哪些block

    2.循环这些block列表

    3.将block从blockToNodes中移除,避免同一个block被包含在多个split中

    4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中

    5.向临时变量curSplitSize增加block的大小

    6.判断curSplitSize是否已经超过了设置的maxSize

    a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks

    b) 没有超过,继续循环block列表,跳到第2步

    7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)

    a) 如果允许,执行并添加split信息

    b) 如果不被允许,将这些剩余的block归还blockToNodes

    8.重置

    9.跳到步骤1



    • // process all nodes and create splits that are local
    •     // to a node.  
    •     //创建同一个DN上的split
    •     for (Iterator<Map.Entry<String,  
    •          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();  
    •          iter.hasNext() {
    •       Map.Entry<String, List<OneBlockInfo>> one = iter.next();
    •       nodes.add(one.getKey());
    •       List<OneBlockInfo> blocksInNode = one.getValue();
    •       // for each block, copy it into validBlocks. Delete it from  
    •       // blockToNodes so that the same block does not appear in  
    •       // two different splits.
    •       for (OneBlockInfo oneblock : blocksInNode) {
    •         if (blockToNodes.containsKey(oneblock)) {
    •           validBlocks.add(oneblock);
    •           blockToNodes.remove(oneblock);
    •           curSplitSize += oneblock.length;
    •           // if the accumulated split size exceeds the maximum, then  
    •           // create this split.
    •           if (maxSize != 0 && curSplitSize >= maxSize) {
    •             // create an input split and add it to the splits array
    •             //创建这些block合并后的split,并将其split添加到split列表中
    •             addCreatedSplit(job, splits, nodes, validBlocks);
    •             //重置
    •             curSplitSize = 0;
    •             validBlocks.clear();
    •           }
    •         }
    •       }
    •       // if there were any blocks left over and their combined size is
    •       // larger than minSplitNode, then combine them into one split.
    •       // Otherwise add them back to the unprocessed pool. It is likely  
    •       // that they will be combined with other blocks from the same rack later on.
    •       //其实这里的注释已经说的很清楚,我再按照我的理解说一下
    •       /**
    •        * 这里有几种情况:
    •        * 1、在这个DN上还有没有被split的block,
    •        * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),
    •        * 将把这些block合并成一个split
    •        * 2、剩余的block的大小还是没有达到,将剩余的这些block
    •        * 归还给blockToNodes,等以后统一处理
    •        */
    •       if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
    •         // create an input split and add it to the splits array
    •         addCreatedSplit(job, splits, nodes, validBlocks);
    •       } else {
    •         for (OneBlockInfo oneblock : validBlocks) {
    •           blockToNodes.put(oneblock, oneblock.hosts);
    •         }
    •       }
    •       validBlocks.clear();
    •       nodes.clear();
    •       curSplitSize = 0;
    •     } 


    第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)



    • // if blocks in a rack are below the specified minimum size, then keep them
    •     // in 'overflow'. After the processing of all racks is complete, these overflow
    •     // blocks will be combined into splits.
    •     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
    •     ArrayList<String> racks = new ArrayList<String>();
    •     // Process all racks over and over again until there is no more work to do.
    •     //这里处理的就不再是同一个DN上的block
    •     //同一个DN上的已经被处理过了(上面的代码),这里是一些
    •     //还没有被处理的block
    •     while (blockToNodes.size() > 0) {
    •       // Create one split for this rack before moving over to the next rack.  
    •       // Come back to this rack after creating a single split for each of the  
    •       // remaining racks.
    •       // Process one rack location at a time, Combine all possible blocks that
    •       // reside on this rack as one split. (constrained by minimum and maximum
    •       // split size).
    •       // iterate over all racks  
    •     //创建同机架的split
    •       for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =  
    •            rackToBlocks.entrySet().iterator(); iter.hasNext() {
    •         Map.Entry<String, List<OneBlockInfo>> one = iter.next();
    •         racks.add(one.getKey());
    •         List<OneBlockInfo> blocks = one.getValue();
    •         // for each block, copy it into validBlocks. Delete it from  
    •         // blockToNodes so that the same block does not appear in  
    •         // two different splits.
    •         boolean createdSplit = false;
    •         for (OneBlockInfo oneblock : blocks) {
    •             //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split
    •           if (blockToNodes.containsKey(oneblock)) {
    •             validBlocks.add(oneblock);
    •             blockToNodes.remove(oneblock);
    •             curSplitSize += oneblock.length;
    •       
    •             // if the accumulated split size exceeds the maximum, then  
    •             // create this split.
    •             if (maxSize != 0 && curSplitSize >= maxSize) {
    •               // create an input split and add it to the splits array
    •               addCreatedSplit(job, splits, getHosts(racks), validBlocks);
    •               createdSplit = true;
    •               break;
    •             }
    •           }
    •         }
    •         // if we created a split, then just go to the next rack
    •         if (createdSplit) {
    •           curSplitSize = 0;
    •           validBlocks.clear();
    •           racks.clear();
    •           continue;
    •         }
    •         //还有没有被split的block
    •         //如果这些block的大小大于了同机架的最小split,
    •         //则创建split
    •         //否则,将这些block留到后面处理
    •         if (!validBlocks.isEmpty()) {
    •           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
    •             // if there is a mimimum size specified, then create a single split
    •             // otherwise, store these blocks into overflow data structure
    •             addCreatedSplit(job, splits, getHosts(racks), validBlocks);
    •           } else {
    •             // There were a few blocks in this rack that remained to be processed.
    •             // Keep them in 'overflow' block list. These will be combined later.
    •             overflowBlocks.addAll(validBlocks);
    •           }
    •         }
    •         curSplitSize = 0;
    •         validBlocks.clear();
    •         racks.clear();
    •       }
    •     } 


    最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了


    源码总结:

    合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack

    将可以合并的block写到同一个split中



    使用自定义的CombineFileInputFormat

    MultiFileCombineInputFormat


    • package org.rollinkin.hadoop;
    • import java.io.IOException;
    • import org.apache.hadoop.io.LongWritable;
    • import org.apache.hadoop.io.Text;
    • import org.apache.hadoop.mapred.InputSplit;
    • import org.apache.hadoop.mapred.JobConf;
    • import org.apache.hadoop.mapred.RecordReader;
    • import org.apache.hadoop.mapred.Reporter;
    • import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    • import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    • import org.apache.hadoop.mapred.lib.CombineFileSplit;
    • /**
    • * 多文件合并split的输入format
    • *  
    • * @author rollinkin
    • * @date 2012-10-29
    • * @version 1.0
    • * @since 1.0
    • */
    • public class MultiFileCombineInputFormat extends
    •         CombineFileInputFormat<LongWritable, Text> {
    •     @Override   
    •     public RecordReader<LongWritable, Text> getRecordReader(   
    •             InputSplit split, JobConf job, Reporter reporter)   
    •             throws IOException {   
    •          @SuppressWarnings({ "rawtypes", "unchecked" })
    •         Class<RecordReader<LongWritable, Text>> rrClass = (Class)CombineLineRecordReader.class;
    •         return new CombineFileRecordReader<LongWritable, Text>(job,(CombineFileSplit) split, reporter,rrClass);   
    •            
    •     }  
    • } 

    CombineLineRecordReader,这个其实没有什么内容,就是包装了一个Reader


    • package org.rollinkin.hadoop;
    • import java.io.IOException;
    • import org.apache.hadoop.conf.Configuration;
    • import org.apache.hadoop.io.LongWritable;
    • import org.apache.hadoop.io.Text;
    • import org.apache.hadoop.mapred.FileSplit;
    • import org.apache.hadoop.mapred.LineRecordReader;
    • import org.apache.hadoop.mapred.RecordReader;
    • import org.apache.hadoop.mapred.Reporter;
    • import org.apache.hadoop.mapred.lib.CombineFileSplit;
    • public class CombineLineRecordReader implements
    •         RecordReader<LongWritable, Text> {
    •     private LineRecordReader delegate;
    •     public CombineLineRecordReader(CombineFileSplit split, Configuration conf,
    •             Reporter reporter, Integer idx) throws IOException {
    •         FileSplit fileSplit = new FileSplit(split.getPath(idx),
    •                 split.getOffset(idx), split.getLength(idx),
    •                 split.getLocations());
    •         delegate = new LineRecordReader(conf, fileSplit);
    •     }
    •     @Override
    •     public boolean next(LongWritable key, Text value) throws IOException {
    •         return delegate.next(key, value);
    •     }
    •     @Override
    •     public LongWritable createKey() {
    •         return delegate.createKey();
    •     }
    •     @Override
    •     public Text createValue() {
    •         return delegate.createValue();
    •     }
    •     @Override
    •     public long getPos() throws IOException {
    •         return delegate.getPos();
    •     }
    •     @Override
    •     public void close() throws IOException {
    •         delegate.close();
    •     }
    •     @Override
    •     public float getProgress() throws IOException {
    •         return delegate.getProgress();
    •     }
    • } 


    具体的使用我就不再留了,其实很简单,就是把你的InputFormat设置成MultiFileCombineInputFormat 就可以了(在2012-11-09之前提供了一个reader实际上是不可用,他存在跨块读取的问题,

    这里就不在提供了。如果使用了,请更新一下。哎,又传播错误的消息了)



    现存问题

      • 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
      • 这里只实现了简单的Text的方式的合并,对于可压缩的、二进制等文件没有提供
      • 这里提供的自定义的实现,只是简单的按行读取
  • 相关阅读:
    Flink入门(一)——Apache Flink介绍
    ES入门宝典(详细截图版)
    什么是全文检索
    Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
    Hbase入门(四)——表结构设计-RowKey
    Hbase入门(三)——数据模型
    Hbase入门(二)——安装与配置
    Hbase入门(一)——初识Hbase
    Flink1.9整合Kafka
    ShutdownHook
  • 原文地址:https://www.cnblogs.com/sunxucool/p/3966315.html
Copyright © 2011-2022 走看看