zoukankan      html  css  js  c++  java
  • 2,MapReduce原理及源码解读

    MapReduce原理及源码解读

    一、分片

    灵魂拷问:为什么要分片?

    • 分而治之:MapReduce(MR)的核心思想就是分而治之;何时分,如何分就要从原理和源码来入手。做为码农大家都知道,不管一个程序多么复杂,在写代码和学习代码之前最重要的就是搞懂输入和输出,而MR的输入其实就是一个目录。而所谓的分而治之其实也是在把大文件分成小文件,然后一个机器处理一个小文件,最后再合并。所以MR的第一步就是对输入的文件进行分片。

    1.1 对谁分片

    • 对每个文件分片:分片是对输入目录中的每一个文件进行分片。后面的分片都是针对单个文件分片。

    • 源码解读(对谁分片):

    // 分片的源码位置
    package org.apache.hadoop.mapreduce.lib.input;
    abstract class FileInputFormat.java;
    
    // 下面代码所在方法
    method getSplits();
    
    // InputStatus表示一个切片类
    List<InputSplit> splits = new ArrayList<InputSplit>();
    // 得到所有输入文件
    List<FileStatus> files = listStatus(job);
    // 遍历每个文件。 根据每个文件来切片,而不是整个文件夹
    for (FileStatus file : files) {
          // 分片1
    }
    

    1.2 长度是否为0

    • 文件长度:当文件长度不为0时才会进行下面的分片操作;如果文件长度为0,则会向分片列表中添加一个空的hosts文件数组和空长度的文件。也就是说,空文件也会创建一个空的分片。
    • 源码解读(长度是否为0):
    for (FileStatus file : files) {
           Path path = file.getPath();
           // 获取文件大小
           long length = file.getLen();
           if (length != 0) {
                  // 分片2
            } else {// 如果文大小为空,默认就创建一个空的hosts文件数组和空长度的文件
               //Create empty hosts array for zero length files
                  splits.add(makeSplit(path, 0, length, new String[0]));
            }
    }
    

    1.3 是否可以分片

    • 压缩格式:并不是所有的文件都可以分片,有一些压缩格式的文件是不可以分片的。因此只会对可以分片的文件进行分片,而不可以分片的文件即使再大也会作为一个整体来处理,相当于一个片。
    • 源码解读(是否可以分片):
    // 如果可以分片
    if (isSplitable(job, path)) {
        // 分片3
    } else { // not splitable
            splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
            blkLocations[0].getCachedHosts()));
    }
    
    // 判断一个文件是否可以切片
    // FileInputFormat抽象类中默认返回true,子类TextInputFormat中实现如下
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
         final CompressionCodec codec =
               new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
         if (null == codec) {// 如果一个文件的压缩编码为null,那么表示可以切片
               return true;
         }// 如果一个文件的压缩编码是SplittableCompressionCodec的子类,那么表示当前文件也可以切片
         return codec instanceof SplittableCompressionCodec;
    }
    

    1.4 分片的大小

    • 分片大小:分片太大就失去了分片的意义;如果分片很小,则管理和构建map任务的时间就会增多,效率变低。并且如果分片跨越两个数据块,那么分片的部分数据需要通过网络传输到map任务运行的节点上,效率会更低。所以分片的最佳大小应该和HDFS的分块大小一致。Hadoop2默认128M。
    • 源码解读(分片大小):
    // FormatMinSplitSize是 1, MinSplitSize如果没配置默认是 1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // 如果没配置,则默认是 Long类型的最大值
    long maxSize = getMaxSplitSize(job);
    // 块大小,Hadoop2是128M,本地模式为32M
    long blockSize = file.getBlockSize();
    // 分片大小计算公式。默认就是blockSize的大小
    long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));
    
    
    • 自定义分片大小:由上面的公式可知,默认的分片大小就是blockSize的大小。如果要自定义大于blockSize,比如改为200M,就把minSize改为200;小于blockSize,比如20M,就把maxSize改为20
    • 1.1倍:最常见的问题就是:一个大小为130M的文件,在分片大小为128M的集群上会分成几片?答案是1片;因为 128*1.1>130,准确来说应该是130 / 128 < 1.1 (源码的公式)。也就是说,如果剩下的文件大小在分片大小的1.1倍以内,就不会再分片了。要这个1.1倍,是为了优化性能;试想如果不这样,当还剩下130M大小的时候,就会分成一块128M,一块2M,后面还要为这个2M的块单独开一个map任务,不划算。至于为什么是1.1,这个1.1是专家们通过反复试验得出来的结果。
    • 源码解读(1.1倍):
    // 当剩余文件的大小,大于分片大小的1.1倍时,才会分片
    private static final double SPLIT_SLOP = 1.1;   // 10% slop
    // bytesRemaining为文件剩余大小,splitSize为上面计算出的分片大小
    while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
            // 分片4
    }
    

    1.5 开始分片

    • 终于分片了:经过上面的层层条件,下面就是// 分片4中的分片代码。与HDFS的物理分块不同的是,MapReduce的分片只是逻辑上的分片,即按照偏移量分片。
     // 封装一个分片信息(包含文件的路径,分片的起始偏移量,要处理的大小,分片包含的块的信息,分片中包含的块存在哪儿些机器上)
    int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
    // makeSplit进行切片操作,返回值是一个切片,并且加入到切片列表中
    splits.add(makeSplit(path, length - bytesRemaining, splitSize,
                   blkLocations[blkIndex].getHosts(),
                   blkLocations[blkIndex].getCachedHosts()));
    // 剩余文件大小
    bytesRemaining -= splitSize;
    

    1.6 分片后读取会不会断行

    • 不会:由于分片时是按照长度进行分片的,那就有很大可能会把一行数据分在两个片里面,所以分片的时候确实会断行。如果读取并处理断行的数据,就会导致结果不正确,那是肯定不行的。所以LineRecordReader类就充当了读取记录的角色,保证读取不断行;其中nextKeyValue()方法里是真正给Mapper中的key赋值的地方,并且调用了父类LineReader类中的readLine()方法来给value赋值。
    • 源码解读(读取时不断行):
    public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
        @Override
        public RecordReader<LongWritable, Text>
        createRecordReader(InputSplit split,
                           TaskAttemptContext context) {
            String delimiter = context.getConfiguration().get(
                    "textinputformat.record.delimiter");
            // 行分隔符
            byte[] recordDelimiterBytes = null;
            if (null != delimiter)
                recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
             // 返回LineRecordReader对象
            return new LineRecordReader(recordDelimiterBytes);
        }
    }
    
    // 行记录读取类,提供读取片中数据的功能,并且保证不断行
    public class LineRecordReader extends RecordReader<LongWritable, Text> {
        // ......其他代码
        
        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws IOException {
            // ......
            
            // 如果不是第一个分片,则开始位置退到下一行记录的开始位置
            // 因为为了保证读取时不断行,每个块都会向后多读一行(最后一个除外)
            if (start != 0) {
                start += in.readLine(new Text(), 0, maxBytesToConsume(start));
            }
        }
        
        public boolean nextKeyValue() throws IOException {
            // 给Mapper中输入的key赋值
            key.set(pos);
            // 实例化Mapper中输入的value
            if (value == null) {
                value = new Text();
            }
            // 注意是<=end,在等于end时还会执行一次,多读了一行,所以不会断行
            while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
                if (pos == 0) {
                    newSize = skipUtfByteOrderMark();
                } else {
                    // 给Mapper中输入的value赋值。
                    // readLine方法会根据是否自定义行分隔符来调用不同的方法。
                    newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
                    pos += newSize;
                }
            }
        }
    }
    

    二、Map阶段

    2.1 实例化Mapper

    • 各种实例化:上面费了很大的劲来编写分片TextInputFormat,和读取类LineRecordReader;而这一切都是为了把输入数据很好的传给map()方法来运算,所以首先就要实例化我们自定义的Mapper类。

    • 源码解读(各种实例化):

    package org.apache.hadoop.mapred;
    class MapTask.java;
    
    method runNewMapper();
    
    // 通过反射来获取Mapper。在Job中设置的Mapper,也就是自己定义的继承自Mapper的类
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // 通过反射来得到 InputFormat。默认是TextInputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // 获得当前MapTask要处理的split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);
    // 根据InputFormat对象创建RecordReader对象。默认是LineRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    // 初始化。用来打开文件,并且调整文件的头指针
    input.initialize(split, mapperContext);
    // MapTask中调用Mapper的run()方法
    mapper.run(mapperContext);
    

    2.2 调用map()方法

    • 每行数据调用一次:从上面的代码中我们知道,MapTask中会调用Mapper类的run()方法;而run()方法会在while循环中调用map()方法,由退出条件可知,是每一行数据调用一次map()方法。
    • 源码解读(怎么调用map()方法):
    public void run(Context context) throws IOException, InterruptedException {
        // 在所有map执行之前初始化,也可以根据业务需要来重写此方法
        setup(context); 
        try {
            // context.nextKeyValue()其实就是LineRecordReader中的nextKeyValue()方法;
            // 在run方法中遍历所有的key,每行数据都执行一次自定义map方法;
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            // 父类Mapper中的setup()和cleanup()方法中什么都没做;
            // 只执行一次,可以根据业务需要来重写此方法;
            cleanup(context);
        }
    }
    

    三、Shuffle阶段

    灵魂拷问:哪来的Shuffle?

    • 理论与实现:看过源码的都知道,其实源码中根本就没有什么shuffle;shuffle只是一个过程,确切的来说是连贯Map阶段和reduce阶段的一个理论过程,而它的实现主要在MapTask和ReduceTask类中。shuffle阶段可以说是MapReduce中最核心的一个阶段。

    3.1 shuffle的概念

    • 作用:shuffle这个单词的本意是洗牌、打乱的意思,而在这里则是:将map端的无规则输出按照指定的规则“打乱”成具有一定规则的数据,以便reduce端接收和处理。
    • 流程:shuffle的范围是map输出后到reduce输入前。它的流程主要包括Map端shuffle和reduce端shuffle。
    • MapReduce大致流程:

    3.2 Map端Shuffle

    • 作用:Map端的shuffle过程是对Map的结果进行分区、排序、溢写、合并分区,最后写入磁盘;最终会得到一个分区有序的文件,即先按分区排序,再按key排序。
    • Map端shuffle大致流程:

    3.2.1 分区(partition)

    • 概念:对于map的每一个输出的键值对,都会根据key来生成partition再一起写入环形缓冲区。每一个reduceTask会处理一个partition(第0个reduceTask处理partition为0的分区,以此类推)。
    • 如何分区:默认情况下,分区号是key的hash值对numReduceTask数量取模的结果。也可以自定义分区。
    • 源码解读(如何分区):
    // 当设置的reduceTask数大于实际分区数时,可以正常执行,多出的分区为空文件;
    // 当设置的reduceTask数小于实际分区数时,会报错。
    job.setNumReduceTasks(4);
    // 如果设置的 numReduceTasks大于 1,而又没有设置自定义的 PartitionerClass
    // 则会调用系统默认的 HashPartitioner实现类来计算分区。
    job.setPartitionerClass(WordCountPartitioner.class);
    
    // 自定义分区
    public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
        private static HashMap<String, Integer> map = new HashMap<>();
        static {
            map.put("0734", 0);
            map.put("0561", 1);
            map.put("0428", 2);
        }
    
        // 当 Mapper的输出要写入环形缓冲区时,会调用此方法来计算当前<K,V>的分区号
        @Override
        public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
            String strText = text.toString();
            return map.getOrDefault(strText.substring(0, 4), 3);
        }
    }
    
    // MapTask.java$NewOutputCollector
    public void write(K key, V value) throws IOException, InterruptedException {
          // 把 K,V以及分区号写入环形缓冲区
          collector.collect(key, value,
                            partitioner.getPartition(key, value, partitions)); 
    }
    

    3.2.2 写入环形缓冲区

    • 概念:环形缓冲区是在内存中的一个字节数组kvbuffer。kvbuffer不仅存放map输出的<k, v>,还在另一端存放了<k, v>的索引(元数据) kvmeta,每个kvmeta包括value的起始位置、key的起始位置、partition值、value的长度,占用4个int长度。上图中的bufindex和kvindex分别表示kvbuffer和kvmeta的指针。环形缓冲区的默认大小是100M,当写入数据大小超过80%(80M)就会触发Spill,溢写到磁盘。
    • 源码解读(Spill):
    // SpillThread线程在MapTask$MapOutputBuffer类中初始化,在init()方法中启动。
    // 它会一直监视环形缓冲区,当大小超过80%的时候,就会调用sortAndSpill()方法。
    protected class SpillThread extends Thread {
        @Override
          public void run() {
                // ....
                // run方法中调用排序并溢写方法
              while (true) {
                  // ....
                  sortAndSpill();
              }
                //.... 
          }
    }
    

    3.2.3 排序并溢写(sortAndSpill):

    • 排序:触发溢写后,会先排序,再溢写。排序是根据partition和key的升序排序,移动的只是索引数据,排序的结果是将kvmeta中到的数据按照partition聚合在一起,同一个partition内再根据key排序。

    • 溢写:Spill线程根据排序后的kvmeta文件,将一个个partition输出到文件,在这次溢写过程中,会将环形缓冲区中已计算的数据(80M)写入到一个文件spill.out,所以引入了索引文件spill.index,它记录了partition在spill.out中的位置。

    3.2.4 合并(merge):

    • 概念:如果Map的数据很大,那么就会触发多次Spill,spill.out和spill.index文件也会很多。所以最后就要把这些文件合并,方便Reduce读取。
    • 合并过程:合并过程中,首先会根据spill.index文件,将spill.out文件中的partition使用归并排序分别写入到相应的segment中,然后再把所有的segment写入到一个file.out文件中,并用file.out.index来记录partition的索引。由于合并时可能有相同的key,所以如果设置了combine,那么在写入文件之前还会调用自定义的combine方法。

    3.3 Reduce端Shuffle

    3.3.1 拉取(Copy)

    • 前期工作:Reduce任务会通过HTTP向各个Map任务拉取它所需的partition数据。当Map任务成功完成之后会通知 TaskTracker状态已跟新,TaskTracker进而通知JobTracker(都是通过心跳机制实现),所以JobTracker中记录了Map输出和TaskTracker的映射关系。
    • 何时拉取:Reduce会定期向JobTracker获取Map的输出位置,一旦拿到输出位置,Reduce任务就会立即从此输出对应的TaskTracker上复制相应的partition数据到本地,而不是等到所有Map任务结束。

    3.3.2 排序合并(Merge Sort)

    • 合并:copy过来的数据会先放入内存缓冲区中(大小是 JVM的heap size的70%),如果缓冲区放得下就直接把数据写入内存,即内存到内存merge。如果缓冲区中的Map数据达到一定大小(缓冲区的66%)的时候,就会开启内存merge,并将merge后的数据写入磁盘,即内存到磁盘merge。当属于该Reduce任务的map输出全部拉取完成,则会在reduce任务的磁盘上生成多个文件(如果所有map输出的大小没有超过缓冲区大小,则数据只存在于内存中),这时开始最后的合并操作,即磁盘到磁盘merge。如果设置了combine,合并时也会执行。
    • 排序:由于map输出的数据已经是有序的,所以reduce在合并时的排序是归并排序,并且reduce端的copy和sort是同时进行的,最终会得到一个整体有序的数据。

    3.3.3 归并分组(reduce)

    • 归并分组(reduce):当reduce任务执行完拉取和排序合并后,就会对相同的key进行分组。默认情况下是根据key对象中重写的compareTo()方法来分组,如果设置了GroupingComparator,则会调用它的compare()方法来分组。reduce会把compareTo(或compare)方法计算返回为 0 的key分为一组,最终会得到一个组<key, Iterable<value,>>,其中组的key是这一组的第一个数据的key,Iterable<value,>则是相同key的value迭代器。最后再对每一个组调用Reducer的reduce()方法。

    • 源码解读(分组):

    // org.apache.hadoop.mapreduce.Reducer中的run()方法
    while (context.nextKey()) {
         // 调用自定义 reduce方法
         reduce(context.getCurrentKey(), context.getValues(), context);
         // .....
    }
    
    // org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法
    public boolean nextKey() throws IOException,InterruptedException {
        // 如果当前key与下一个key相同,则继续往下走;
        // 这一步就是把相同的key放到一组, 他们的value放到一个迭代器中;当下一个key不同时再调用reduce方法
        while (hasMore && nextKeyIsSame) {
          nextKeyValue();
        }
        if (hasMore) {
          if (inputKeyCounter != null) {
            // 计数器
            inputKeyCounter.increment(1);
          }
          // 当nextKeyIsSame为false时,会再调用一次nextKeyValue(),而它的返回值必为true;
          return nextKeyValue();
        } else {
          return false;
        }
    }
    
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (hasMore) {
          nextKey = input.getKey();
          // 在执行reduce方法之前调用ReduceContext中定义的GroupComparator
          // 如果key的compareTo方法返回0则 nextKeyIsSame为true,也就会分到一组
          nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                         currentRawKey.getLength(),
                                         nextKey.getData(),
                                         nextKey.getPosition(),
                                         nextKey.getLength() - nextKey.getPosition()
                                             ) == 0;
        } else {
          nextKeyIsSame = false;
        }
        inputValueCounter.increment(1);
        return true;
    }
    

    四、Reduce阶段

    4.1 执行reduce()方法

    • 归并:上面的Shuffle阶段已经将数据分组成了<key, Iteralble<value,>>格式的数据,所以对于相同的key只会调用一次reduce()方法。
    • 注意事项:在reduce()方法中,一定要重新创建key对象,不要直接使用参数中的key。

    4.2 输出最终结果

    • 完结:整个MapReduce的输出和输入有点类似。输出是实例化TextOutputFormat和LineRecordWrite对象。并由LineRecordWrite判断是不是NullWriteable,最后输出到文件
    参考文章:
  • 相关阅读:
    Python操作Excel
    unittest单元测试生成HTML测试报告
    pycharm安装 package报错:module 'pip' has no attribute 'main'
    Jenkins关闭、重启,Jenkins服务的启动、停止方法。
    selenium如何获取已定位元素的属性值?
    本周学习小结(23/03
    本周学习小结(16/03
    本周学习小结(09/03
    本周学习小结(02/03
    本周学习小结(24/02
  • 原文地址:https://www.cnblogs.com/shendeng23/p/12614506.html
Copyright © 2011-2022 走看看