zoukankan      html  css  js  c++  java
  • Hadoop学习之自定义二次排序

    一、概述
        MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往 往有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,本文将通过一个实际的MapReduce二次排序例子讲述 二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map、reduce端的日志来验证所描述的处理流程的正确性。
    二、需求描述
    1、输入数据:
    sort1    1
    sort2    3
    sort2    77
    sort2    54
    sort1    2
    sort6    22
    sort6    221
    sort6    20
    2、目标输出
    sort1 1,2
    sort2 3,54,77
    sort6 20,22,221

    三、解决思路
    1、首先,在思考解决问题思路时,我们先应该深刻的理解MapReduce处理数据的整个流程,这是最基础的,不然的话是不可能找到解决问题的思路的,我描述一下MapReduce处理数据的大概简单流程:首先,MapReduce框架通过getSplit方法实现对原始文件的切片之后,每一个切片对应着一个map task,inputSplit输入到Map函数进行处理,中间结果经过环形缓冲区的排序,然后分区、自定义二次排序(如果有的话)和合并,再通过 shuffle操作将数据传输到reduce task端,reduce端也存在着缓冲区,数据也会在缓冲区和磁盘中进行合并排序等操作,然后对数据按照Key值进行分组,然后没处理完一个分组之后就 会去调用一次reduce函数,最终输出结果。大概流程我画了一下,如下图:


    2、具体解决思路

    (1)Map端处理:

       根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录合并,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对Key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的Key值和其对应的数据组合成一个新的Key值,然后新的Key值对应的还是之前的数字。那么我们就可以将原始数据的map输出变成类似下面的数据结构:
    {[sort1,1],1}
    {[sort2,3],3}
    {[sort2,77],77}
    {[sort2,54],54}
    {[sort1,2],2}
    {[sort6,22],22}
    {[sort6,221],221}
    {[sort6,20],20}
    那么我们只需要对[]里面的新key值进行排序就ok了。然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,所以我们需要根据新key值中的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:
    Partition1:{[sort1,1],1}、{[sort1,2],2}
    Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}
    Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}
    分区操作完成之后,我调用自己的自定义排序器对新的Key值进行排序。
    {[sort1,1],1}
    {[sort1,2],2}
    {[sort2,3],3}
    {[sort2,54],54}
    {[sort2,77],77}
    {[sort6,20],20}
    {[sort6,22],22}
    {[sort6,221],221}
    (2)Reduce端处理:
        经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,并且每处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成类似下面的数据结构:
    {sort1,[1,2]}
    {sort2,[3,54,77]}
    {sort6,[20,22,221]}

    四、具体实现
    1、自定义组合键

     1 package com.mr;
     2 import java.io.DataInput;
     3 import java.io.DataOutput;
     4 import java.io.IOException;
     5 import org.apache.Hadoop.io.IntWritable;
     6 import org.apache.hadoop.io.Text;
     7 import org.apache.hadoop.io.WritableComparable;
     8 import org.slf4j.Logger;
     9 import org.slf4j.LoggerFactory;
    10 /**
    11 * 自定义组合键
    12 * @author zenghzhaozheng
    13 */
    14 public class CombinationKey implements WritableComparable<CombinationKey>{
    15     private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class);
    16     private Text firstKey;
    17     private IntWritable secondKey;
    18     public CombinationKey() {
    19         this.firstKey = new Text();
    20         this.secondKey = new IntWritable();
    21     }
    22     public Text getFirstKey() {
    23         return this.firstKey;
    24     }
    25     public void setFirstKey(Text firstKey) {
    26         this.firstKey = firstKey;
    27     }
    28     public IntWritable getSecondKey() {
    29         return this.secondKey;
    30     }
    31     public void setSecondKey(IntWritable secondKey) {
    32         this.secondKey = secondKey;
    33     }
    34     @Override
    35     public void readFields(DataInput dateInput) throws IOException {
    36         // TODO Auto-generated method stub
    37         this.firstKey.readFields(dateInput);
    38         this.secondKey.readFields(dateInput);
    39     }
    40     @Override
    41     public void write(DataOutput outPut) throws IOException {
    42         this.firstKey.write(outPut);
    43         this.secondKey.write(outPut);
    44     }
    45     /**
    46     * 自定义比较策略
    47     * 注意:该比较策略用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,
    48     * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)
    49     */
    50     @Override
    51     public int compareTo(CombinationKey combinationKey) {
    52         logger.info("-------CombinationKey flag-------");
    53         return this.firstKey.compareTo(combinationKey.getFirstKey());
    54     }
    55 }

    说明:在自定义组合键的时候,我们需要特别注意,一定要实现WritableComparable接口,并且实现compareTo方法的比较策略。这个 用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调 整),但是其对我们最终的二次排序结果是没有影响的。我们二次排序的最终结果是由我们的自定义比较器决定的。
    2、自定义分区器

     1 package com.mr;
     2 import org.apache.hadoop.io.IntWritable;
     3 import org.apache.hadoop.mapreduce.Partitioner;
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 /**
     7 * 自定义分区
     8 * @author zengzhaozheng
     9 */
    10 public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{
    11     private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class);
    12     /**
    13     *  数据输入来源:map输出
    14     * @author zengzhaozheng
    15     * @param key map输出键值
    16     * @param value map输出value值
    17     * @param numPartitions 分区总数,即reduce task个数
    18     */
    19     @Override
    20     public int getPartition(CombinationKey key, IntWritable value,int numPartitions) {
    21         logger.info("--------enter DefinedPartition flag--------");
    22         /**
    23         * 注意:这里采用默认的hash分区实现方法
    24         * 根据组合键的第一个值作为分区
    25         * 这里需要说明一下,如果不自定义分区的话,mapreduce框架会根据默认的hash分区方法,
    26         * 将整个组合将相等的分到一个分区中,这样的话显然不是我们要的效果
    27         */
    28         logger.info("--------out DefinedPartition flag--------");
    29         return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    30     }
    31 }

    说明:具体说明看代码注释。

    3、自定义比较器

    package com.mr;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
    * 自定义二次排序策略
    * @author zengzhaoheng
    */
    public class DefinedComparator extends WritableComparator {
        private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class);
        public DefinedComparator() {
            super(CombinationKey.class,true);
        }
        @Override
        public int compare(WritableComparable combinationKeyOne,
                WritableComparable CombinationKeyOther) {
            logger.info("---------enter DefinedComparator flag---------");
                                                          
            CombinationKey c1 = (CombinationKey) combinationKeyOne;
            CombinationKey c2 = (CombinationKey) CombinationKeyOther;
                                                          
            /**
            * 确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
            * 另外,这个判断是可以调整最终输出的组合键第一个值的排序
            * 下面这种比较对第一个字段的排序是升序的,如果想降序这将c1和c2���过来(假设1)
            */
            if(!c1.getFirstKey().equals(c2.getFirstKey())){
                logger.info("---------out DefinedComparator flag---------");
                return c1.getFirstKey().compareTo(c2.getFirstKey());
                }
            else{//按照组合键的第二个键的升序排序,将c1和c2倒过来则是按照数字的降序排序(假设2)
                logger.info("---------out DefinedComparator flag---------");
                return c1.getSecondKey().get()-c2.getSecondKey().get();//0,负数,正数
            }
            /**
            * (1)按照上面的这种实现最终的二次排序结果为:
            * sort1    1,2
            * sort2    3,54,77
            * sort6    20,22,221
            * (2)如果实现假设1,则最终的二次排序结果为:
            * sort6    20,22,221
            * sort2    3,54,77
            * sort1    1,2
            * (3)如果实现假设2,则最终的二次排序结果为:
            * sort1    2,1
            * sort2    77,54,3
            * sort6    221,22,20
            */
            }
    }

    说明:自定义比较器决定了我们二次排序的结果。自定义比较器需要继承WritableComparator类,并且重写compare方法实现自己的比较策略。具体的排序问题请看注释。

  • 相关阅读:
    20200226 Java IO流——廖雪峰
    20200225 Java 多线程(2)-廖雪峰
    20200225 Java 多线程(1)-廖雪峰
    20200224 尚硅谷ElasticSearch【归档】
    20200224 一 概述
    20200222 尚硅谷Dubbo【归档】
    20200222 四、dubbo原理
    Improved robustness of reinforcement learning policies upon conversion to spiking neuronal network platforms applied to Atari Breakout game
    Reinforcement learning in populations of spiking neurons
    Solving the Distal Reward Problem through Linkage of STDP and Dopamine Signaling
  • 原文地址:https://www.cnblogs.com/sunfie/p/4651690.html
Copyright © 2011-2022 走看看