zoukankan      html  css  js  c++  java
  • MapReduce从输入文件到Mapper处理之间的过程

    1、MapReduce代码入口

    FileInputFormat.setInputPaths(job, new Path(input)); //设置MapReduce输入格式
    job.waitForCompletion(true);

    2、InputFormat分析

    public abstract class InputFormat<K, V> {
        //获取输入文件的分片,仅是逻辑分片,并没有物理分片
        public abstract  List<InputSplit> getSplits(JobContext context);
        
        //创建RecordReader,从InputSplit中读取数据
        public abstract  RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) ;
    }

    不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片(InputSplit)会被单独的map task作为数据源

    3、InputSplit

    Mapper的输入是一个一个的输入分片(InputSplit)

    public abstract class InputSplit {
      public abstract long getLength();
      public abstract String[] getLocations();
    }
    
    public class FileSplit extends InputSplit implements Writable{
        private Path file; //文件路径
        private long start; //分片起始位置
        private long length;  //分片长度
        private String[] hosts; //存储分片的hosts
        
        public FileSplit(Path file, long start, long length, String[] hosts) {
            this.file = file;
            this.start = start;
            this.length = length;
            this.hosts = hosts;
        }
    }

    一个FileSplit对应Mapper的一个输入文件,不管这个文件有多么的小,也是作为一个单独的InputSplit来处理;
    在输入文件是由大量小文件组成的场景下,就会有大量的InputSplit,从而需要大量的Mapper的处理;
    大量的Mapper Task创建和销毁开销将是巨大的;可以采用CombineFileSplit将多个小文件进行合并再交由Mapper Task处理;

    4、FileInputFormat

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        /**
         * getFormatMinSplitSize() = 1
         * job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)
         * SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize"
         * mapred-default.xml中参数为0
         */
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //计算分片的最小值: max(1,0) = 1
        
        /**
         * SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize"
         * mapred-default.xml中参数为空
         */
        long maxSize = getMaxSplitSize(job); //计算分片的最大值:Long.MAX_VALUE
       
        //存储输入文件的分片结果
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
            Path path = file.getPath();
            long length = file.getLen();
            if (length != 0) {
                ...
                if (isSplitable(job, path)) { //能分片
                    long blockSize = file.getBlockSize();
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);{
                        //max(1, min(Long.MAX_VALUE, 64M)) = 64M 默认情况下splitSize=blockSize
                        return Math.max(minSize, Math.min(maxSize, blockSize)); 
                    }
    
                    //循环分片,当剩余数据与分片大小比值大于Split_Slop时,继续分片,小于等于时,停止分片
                    long bytesRemaining = length;
                    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP = 1.1
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
                        bytesRemaining -= splitSize;
                    }
    
                    //处理余下的数据
                    if (bytesRemaining != 0) {
                        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
                    }
                } else { // 不可分片,整块返回(有些压缩后是不能分片处理的)
                    splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
                }
            } else { 
                splits.add(makeSplit(path, 0, length, new String[0]));
            }
        }
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入文件数量
        LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

    5、PathFilter

    protected List<FileStatus> listStatus(JobContext job) throws IOException {
        ......
        List<PathFilter> filters = new ArrayList<PathFilter>();
        filters.add(hiddenFileFilter);
        PathFilter jobFilter = getInputPathFilter(job);
        if (jobFilter != null) {
          filters.add(jobFilter);
        }
        PathFilter inputFilter = new MultiPathFilter(filters);
        ......
    }

    PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入;
    PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false;

    public interface PathFilter {
        boolean accept(Path path);
    }
    
    //过滤掉文件名以_或者.开头的文件
    private static final PathFilter hiddenFileFilter = new PathFilter(){
        public boolean accept(Path p){
            String name = p.getName(); 
            return !name.startsWith("_") && !name.startsWith("."); 
        }
    }; 

    6、RecordReader

    RecordReader将InputSplit拆分成KEY-VALUE对

    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
        //InputSplit初始化
        public abstract void initialize(InputSplit split,TaskAttemptContext context) ;
        
        //读取分片下一个<key, value>对
        public abstract boolean nextKeyValue() throws IOException, InterruptedException;
        
        //获得当前读取到的KEY
        public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
        
        //获得当前读取到的VALUE
         public abstract  VALUEIN getCurrentValue() throws IOException, InterruptedException;
        
        //跟踪读取分片的进度
        public abstract float getProgress() throws IOException, InterruptedException;
        
        //关闭RecordReader
        public abstract void close() throws IOException;
    }

    7、Mapper

    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
        }
      
        //预处理,仅在map task启动时运行一次
        protected void setup(Context context) throws IOException, InterruptedException {
        }
    
        //对于InputSplit中的每一对<key, value>都会运行一次
        protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    
        //扫尾工作,比如关闭流等
        protected void cleanup(Context context) throws IOException, InterruptedException {
        }
      
        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }
    }

    模板模式的应用:run方法:
    1)setup
    2)循环从InputSplit中获得到的KV对调用map函数进行处理
    3)cleanup

    至此完成了MapReduce的输入文件是如何被过滤分片读取读出“K-V对”,然后交给Mapper类来处理

     

  • 相关阅读:
    springboot配置redis缓存
    【spark】local模式运行
    mybatis从入门到精通(二) 增删查改
    学习设计模式
    学习设计模式
    mybatis从入门到精通(一) 入门
    学习NIO 之 使用方法
    学习 NIO 之 零拷贝
    Java并发
    学习设计模式
  • 原文地址:https://www.cnblogs.com/luogankun/p/4129625.html
Copyright © 2011-2022 走看看