zoukankan      html  css  js  c++  java
  • 深入理解hadoop之排序

      MapReduce的排序是默认按照Key排序的,也就是说输出的时候,key会按照大小或字典顺序来输出,比如一个简单的wordcount,出现的结果也会是左侧的字母按照字典顺序排列。下面我们主要聊聊面试中比较常见的全排序和二次排序


    一、全排序

      全排序的方法一般有以下几种:

        1.使用一个分区。 但是该方法在处理大型文件的时候效率极低,因为一台机器必须处理所有的输出文件,从而丧失了mapreduce提供的并行架构的优势。这个比较简单,只要在APP中设置分区数量为1就可以了。

        2.自定义分区函数,自行设置分解区间。这个方法最关键的地方在于如何划分各分区,如果数据分布不均匀,分区函数设置不恰当,最后会产生数据倾斜。这个地方请看下面统计历年最高气温的例子。

        气温数据:

    2004 49
    1981 -22
    1981 -31
    1965 -47
    2027 -2
    1964 6
    2030 38
    2016 -33
    1963 13
    2000 21
    2019 0
    2049 43
    2039 8
    1989 -18
    2017 49
    1952 -47
    2016 -28
    1991 20
    1967 -39
    2022 -47
    2041 41
    2039 -38
    2021 33
    1969 38
    1981 0
    1960 -26
    2023 -12
    1969 12
    1996 -31
    1954 -36
    2026 34
    2013 -4
    1969 37
    1990 -22
    2007 -31
    1987 -8
    1972 -30
    2019 -17
    2042 -22
    2011 21
    2033 -25
    2013 10
    2047 30
    2008 -2
    2047 -5
    1994 14
    1960 7
    2037 44
    1990 -41
    2047 32
    2048 -22
    1977 -27
    2049 35
    2023 2
    1952 -44
    1979 -5
    1996 47
    2033 8
    2006 3
    2030 32
    1967 43
    1980 -6
    2001 39
    2049 -31
    2028 -16
    2029 31
    1962 -21
    2043 -7
    2040 34
    2001 9
    1977 -21
    2047 1
    2022 30
    2002 12
    1956 38
    2009 7
    2049 11
    1981 18
    2014 -29
    1967 -15
    2019 2
    1975 25
    1965 21
    2013 -36
    2024 -44
    1959 10
    1992 4
    1997 15
    2042 17
    2013 -14
    1993 -21
    2027 19
    2016 -44
    1989 -47
    1999 -6
    1993 -35
    1953 -21
    1952 12
    1969 -45
    2036 10
    1950 29
    2022 8
    1985 -45
    2044 -48
    1981 -12
    2033 -42
    1973 -49
    2011 27
    1958 -26
    2028 35
    2037 41
    1955 -36
    2001 -11
    1965 23
    1970 -14
    2015 -2
    1969 -19
    1997 3
    2016 -38
    2045 9
    1974 6
    1956 -39
    2012 1
    2022 -28
    1991 -31
    1974 -40
    1998 43
    2007 12
    2049 9
    2034 -18
    1956 48
    1974 40
    2009 -24
    2030 -44
    1957 27
    1979 -23
    2034 29
    2024 -34
    2034 -10
    2007 42
    2000 33
    1990 -44
    2048 -48
    1967 -30
    1969 12
    2030 26
    2023 -36
    2029 22
    2044 -2
    2043 -47
    2040 -18
    1990 -3
    1996 -16
    1974 -20
    2023 -11
    1990 -16
    1980 13
    2013 -8
    2001 41
    2015 -30
    1974 28
    2031 13
    1991 -33
    1985 -6
    1979 -34
    2041 12
    1957 -46
    2014 25
    1969 18
    1958 -39
    1955 -46
    2031 39
    2032 11
    1991 38
    2035 -43
    2005 -1
    2000 2
    2027 -28
    1984 -8
    1985 -47
    2045 -6
    1987 -21
    2004 35
    1968 -47
    1968 -19
    1995 -47
    1990 46
    1987 18
    2012 29
    1987 -12
    2048 -8
    1987 26
    2010 18
    1959 -20
    1978 8
    1997 38
    1963 24
    1991 8
    2005 -34
    2019 -4
    2042 43
    1951 6
    1956 -32
    1952 18
    2003 -15
    1979 29
    2026 35
    2032 -26
    2044 -25
    2039 -36
    2021 49
    2037 6
    2000 -22
    2027 34
    2024 38
    2019 15
    1954 -27
    2016 49
    2018 -43
    2048 23
    1978 9
    1977 5
    2047 -30
    2028 -12
    1991 -25
    2022 -36
    1974 -2
    2038 25
    2014 10
    2000 -7
    2033 16
    2020 5
    1985 7
    1951 -1
    1958 -8
    1963 -3
    1972 10
    1986 9
    1961 3
    1972 -20
    1979 -39
    1958 44
    2027 -48
    2007 -50
    2025 33
    1970 22
    2044 27
    2043 -48
    1950 1
    2023 31
    2041 -39
    2040 43
    2025 21
    2038 39
    1998 16
    1987 -50
    1967 -40
    2021 -27
    1961 6
    1981 22
    1990 7
    1993 -49
    2001 -5
    2003 21
    1990 47
    1986 -19
    2031 37
    1987 -14
    2019 16
    2008 45
    2044 1
    1977 5
    1952 10
    2047 5
    2044 21
    2002 29
    1992 28
    1980 -2
    1952 -47
    2008 15
    2017 17
    1970 1
    2045 -37
    2016 5
    1951 -28
    1978 5
    1954 9
    1966 18
    1957 45
    1998 -26
    1989 0
    1964 10
    2036 -44
    2037 -22
    1965 12
    2035 40
    1994 7
    2024 7
    1961 4
    2007 34
    1980 -36
    1950 -39
    1987 24
    1983 -4
    2007 46
    2009 -5
    1974 43
    2026 26
    1966 -21
    2006 -21
    1977 -3
    1979 -31
    2021 33
    2040 39
    2020 47
    1953 -42
    1955 2
    2017 0
    1973 31
    1955 4
    1973 -7
    2027 28
    1968 -17
    2029 -3
    2021 13
    1991 9
    2030 19
    1952 -35
    1987 14
    1954 -18
    2027 -23
    1989 12
    1983 13
    1966 -45
    2039 33
    2014 34
    2012 -30
    1953 -7
    2020 -21
    1987 22
    2041 45
    2046 0
    2017 26
    1951 9
    2000 -4
    1973 27
    1972 -3
    2036 -14
    1974 32
    1987 -8
    1993 3
    1969 17
    2011 -11
    2038 -50
    2040 -8
    1950 -22
    2036 13
    2025 29
    1986 27
    2038 41
    1971 37
    1970 45
    2045 -21
    2036 41
    1956 1
    2042 -48
    1955 -28
    1967 -34
    1999 -42
    1952 -9
    1962 -15
    1974 -19
    1959 19
    1965 -42
    1962 41
    2003 -12
    2029 14
    1969 26
    1992 -4
    1959 8
    1962 -18
    2000 8
    2025 -20
    2048 -15
    1996 25
    2017 -23
    1992 -10
    2001 30
    1960 45
    2034 33
    1983 -47
    2046 19
    2041 -4
    1978 -6
    1967 -49
    1993 8
    1987 -11
    2009 3
    1990 40
    1972 -6
    2029 -47
    1990 3
    2036 4
    1981 22
    2019 37
    1980 -47
    2003 -42
    1965 -6
    2007 45
    2040 -45
    1984 24
    2048 -15
    1984 -16
    1992 -39
    2040 -33
    1984 -24
    2046 28
    2023 -3
    1956 46
    1969 0
    1983 -4
    2030 -50
    2004 -36
    1958 16
    2025 -22
    1957 -6
    2001 -24
    2014 -49
    1965 16
    2043 42
    1966 -10
    1971 -13
    1996 48
    1976 11
    2026 -43
    1982 2
    1965 -50
    2038 40
    2024 -32
    1988 3
    2004 -45
    2039 8
    2029 -30
    1974 -11
    2033 29
    1968 -2
    2040 -8
    1989 -11
    1999 7
    2001 37
    2001 -44
    1979 -30
    2048 7
    1998 -21
    2005 49
    1975 44
    2031 31
    1982 12
    1987 35
    2004 -33
    2000 27
    2008 34
    1970 -26
    2047 0
    1974 35
    1977 -45
    1976 19
    1956 48
    2025 -37
    1991 0
    2041 -40
    1976 38
    2016 36
    2024 6
    2021 14
    2005 27
    1951 -38
    2046 16
    1976 26
    2044 -44
    1989 -47
    2025 26
    2045 43
    2045 -23
    2004 30
    2044 46
    1962 -20
    1954 7
    1975 -39
    1967 18
    2038 4
    1956 15
    2010 -14
    2032 -6
    1999 19
    2024 7
    1993 -23
    1961 -43
    2007 23
    1998 9
    2027 -29
    1950 29
    2010 -47
    1953 43
    2033 -19
    1977 28
    2013 -36
    2001 43
    2008 46
    2004 19
    1985 6
    2043 3
    2014 -21
    1992 7
    1990 8
    2020 44
    1957 -40
    2030 5
    1996 16
    2018 -5
    1989 -14
    2016 -11
    1988 -18
    2012 -3
    1998 -12
    1979 -41
    2043 1
    1978 -12
    1959 -29
    2048 -26
    1989 -31
    2026 33
    1960 32
    1978 14
    2003 36
    2012 15
    2036 34
    2040 -49
    1986 7
    1982 19
    1959 42
    2041 23
    2037 20
    2020 -24
    1977 -27
    2039 18
    2046 2
    2017 -23
    2012 30
    1962 28
    1985 42
    2023 15
    2030 -30
    1983 28
    1967 26
    1990 -11
    1968 -50
    2038 -11
    1995 34
    2005 -43
    2011 5
    1978 9
    1952 -48
    1955 27
    1958 -21
    2020 -36
    1985 -23
    1991 10
    1982 -17
    1999 3
    1999 -25
    2005 -11
    2048 -14
    1985 -18
    2006 -5
    1970 -21
    2026 -26
    1956 -20
    2043 -50
    1982 -24
    1998 8
    2034 28
    1966 -10
    2045 5
    1968 -49
    2001 48
    2026 -9
    2005 49
    2036 39
    2027 -45
    1972 -24
    2009 -49
    1961 38
    1991 36
    1975 37
    1978 12
    2003 -45
    2021 -46
    1962 -8
    1972 -8
    1961 39
    2009 23
    1995 30
    1996 -19
    1983 45
    1952 19
    1974 -24
    1992 33
    1981 -1
    1981 -32
    1984 0
    2049 -41
    2030 13
    1993 -27
    1980 -45
    1964 -10
    2013 39
    1975 24
    1972 43
    1977 -33
    1962 -44
    2016 -22
    2029 47
    1999 41
    2030 -17
    2023 36
    2018 32
    2025 20
    1966 14
    1986 29
    2036 -20
    2022 -36
    2027 -46
    1994 -8
    1992 34
    2017 1
    2021 32
    1966 28
    1987 -22
    1996 26
    1991 48
    1993 4
    1973 -28
    1981 -16
    2011 45
    1963 -14
    1986 -50
    1984 -26
    1980 30
    2024 42
    1979 31
    2030 3
    2035 17
    2036 30
    2017 -43
    1997 9
    2004 -25
    1999 40
    1993 16
    1965 -42
    2043 24
    2017 29
    2034 -39
    1952 -49
    2023 26
    1999 -31
    1986 23
    1962 -10
    1960 22
    2036 -30
    2044 38
    2014 -50
    1986 0
    2024 -40
    1962 -15
    1950 11
    2019 30
    1980 -16
    1992 -18
    1994 -40
    1989 33
    1999 23
    1999 -38
    2021 -38
    2033 17
    1995 -2
    2034 -9
    2017 -36
    1956 -41
    1961 1
    2020 46
    1991 -17
    2026 2
    2004 9
    1976 -7
    1956 -4
    1981 41
    2014 0
    1975 -41
    2005 47
    1966 -47
    1968 -27
    1953 48
    2028 32
    1963 40
    1982 34
    2031 27
    2008 1
    2037 10
    2000 -1
    2038 -4
    2044 -12
    1960 -4
    2014 10
    2038 -42
    1964 -48
    1994 -47
    1953 -30
    1987 -24
    2038 5
    2027 43
    1991 7
    2015 21
    2038 -2
    1999 28
    2026 -50
    1986 25
    2041 -24
    2029 -1
    2008 18
    1952 -41
    1969 -50
    1973 6
    1956 -20
    1966 -21
    1967 44
    1967 39
    2035 16
    1973 -45
    2035 38
    1958 22
    2000 -6
    2004 16
    2004 16
    2037 -38
    2028 -47
    1957 -41
    1985 41
    2028 -3
    2014 -32
    1980 -14
    1960 13
    2012 10
    1960 -27
    1983 -6
    1953 8
    1954 -42
    1979 43
    1992 -48
    1976 19
    1964 -11
    1970 -14
    2042 -10
    1990 -36
    1987 -8
    2023 31
    1959 -12
    2008 -40
    2033 7
    2012 46
    2002 -3
    1992 -35
    2044 17
    2010 14
    2018 -35
    1961 26
    2004 -24
    2045 33
    1965 -9
    1970 -16
    1977 40
    2030 -42
    2046 -30
    1963 36
    2019 -47
    2020 -12
    2026 -27
    1994 21
    1951 27
    1999 -10
    1990 36
    2003 -8
    1984 31
    2015 -26
    2015 14
    1981 -20
    1971 -47
    2033 -4
    1976 -29
    2037 25
    2013 33
    2011 1
    2000 -27
    2037 31
    1960 8
    2048 -26
    2037 -8
    2039 42
    1986 -38
    2038 13
    1984 -44
    2049 -43
    2012 3
    1962 -39
    1959 3
    1979 -3
    1996 -1
    1983 27
    1950 -43
    1957 36
    1951 -28
    2010 44
    2045 -22
    2023 0
    2038 37
    2011 -30
    2009 4
    1952 47
    1965 -35
    2005 -35
    1954 -9
    2040 14
    1987 -24
    1978 -15
    2009 22
    1964 48
    2003 -38
    1969 -20
    1983 -47
    2030 13
    1990 -45
    2013 42
    1988 -26
    2017 9
    2041 -43
    1964 -20
    2005 30
    2024 25
    2043 26
    1993 27
    2018 -41
    2008 -14
    2013 16
    2028 44
    1967 29
    1973 -5
    2027 -38
    1954 -12
    1963 -21
    2008 -3
    2049 -14
    2022 -34
    1976 -39
    1976 13
    2007 30
    2032 -15
    2007 -7
    2028 -37
    2012 29
    2029 -7
    2002 19
    2046 -1
    1979 0
    2008 -17
    1980 42
    1986 28
    1957 -5
    1966 48
    1994 43
    2047 23
    2024 -37
    1974 -36
    2022 -29
    2040 -21
    2004 12
    1978 40
    1982 -22
    1984 -8
    2030 6
    1968 -3
    1965 32
    1998 -15
    2039 10
    2033 36
    1977 36
    2045 43
    2045 -17
    2021 38
    1969 -43
    2021 -7
    2018 10
    2008 40
    2012 31
    2011 28
    1999 -36
    1985 -18
    2008 4
    2040 -46
    1954 33
    2035 -28
    1980 -3
    2038 20
    1959 29
    1979 13
    2006 8
    2029 22
    1962 -44
    1978 37
    1993 -3
    1988 23
    1991 39
    2013 8
    1955 43
    1973 0
    1976 -3
    1963 3
    2031 -15
    2003 31
    2002 16
    1981 -44
    1959 19
    2023 -34
    2039 4
    1994 -21
    1951 36
    1997 11
    2013 13
    1950 32
    2020 -12
    2016 -22
    2009 -38
    2031 13
    1986 -43
    1959 28
    2049 10
    1954 -45
    2018 -1
    2008 48
    2034 -41
    1982 -2
    1972 -11
    2045 -34
    1958 10
    1997 31
    2013 -13
    2025 -19
    2038 -32
    2041 -21
    2013 0
    2034 3
    2036 -23
    2008 -22
    2034 3
    2042 41
    2002 1
    2043 -2
    1950 19
    2041 21
    2005 -16
    2030 -36
    2001 45
    1964 33
    2027 -25
    2046 -5
    2044 -42
    1965 -37
    2004 22
    2029 46
    1966 7
    2008 -48
    2016 -22
    2033 -28
    1999 -33
    1987 11
    1995 18
    1969 -13
    2023 9
    2018 1
    2015 39
    2017 31
    1975 44
    1991 32
    2045 10
    2046 -35
    1952 40
    1950 -38
    1996 -39
    2031 14
    2037 -48
    2002 41

        

        Map端

    package com.heima.hdfs.mr3;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Map实现对气温文本进行切割,输入key是偏移量,输入map是text,输出的key值是年份,输出的value是气温
     * 简单的对文本文档进行切割,
     */
    public class MaxTempMapper extends Mapper<LongWritable ,Text,IntWritable,IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] arr =value.toString().split(" ");
           context.write(new IntWritable(Integer.parseInt(arr[0])),new IntWritable(Integer.parseInt(arr[1])));
    
        }
    }

        Reduce端

    package com.heima.hdfs.mr3;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * 这个地方要注意:相同的key值会进入同一个分区,同一个分区里的数据会进入同一个reduce里面
     */
    public class MaxTempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int max = Integer.MIN_VALUE;
            for(IntWritable iw:values){
                max=max>iw.get()?max:iw.get();
            }
            context.write(key,new IntWritable(max));
         }
    }

      App端

    package com.heima.hdfs.mr3;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/7/5 0005.
     */
    public class MaxTempApp {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            job.setJobName("MaxTempApp");
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            job.setNumReduceTasks(3);
            job.setPartitionerClass(YearPartitioner.class);
            job.setJarByClass(MaxTempApp.class);
            job.setMapperClass(MaxTempMapper.class);
            job.setReducerClass(MaxTempReducer.class);
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.waitForCompletion(true);
        }
    }

        3.通过hadoop采样机制,对键空间进行采样,较为均匀的划分数据集,采样的核心思想是只查看一小部分键,获得键的近似分布,由此构建分区,在hadoop中已经自带了采样器,不需要开发人员自己编写

        Map端

    package com.heima.hdfs.allsort;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/7/5 0005.
     */
    public class MaxTempMapper extends Mapper<IntWritable,IntWritable,IntWritable,IntWritable> {
        @Override
        protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
            context.write(key,value);
        }
    }

      Reduce端

    package com.heima.hdfs.allsort;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/7/5 0005.
     */
    public class MaxTempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int max = Integer.MIN_VALUE;
            for(IntWritable iw :values){
                max = max>iw.get()?max:iw.get();
            }
            context.write(key,new IntWritable(max));
        }
    }

    App端

    package com.heima.hdfs.allsort;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    
    import java.io.IOException;
    
    /**
     * Created by Administrator on 2018/7/5 0005.
     */
    public class MaxTempApp {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            job.setJobName("MaxTempApp");
            job.setNumReduceTasks(3);
            job.setInputFormatClass(SequenceFileInputFormat.class);
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            job.setJarByClass(MaxTempApp.class);
            job.setMapperClass(MaxTempMapper.class);
            job.setReducerClass(MaxTempReducer.class);
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            //设置全排序分区
            job.setPartitionerClass(TotalOrderPartitioner.class);
            //创建采样器这里概率是1,6000个key会全部取出来
            InputSampler.Sampler<IntWritable,IntWritable> sampler =new InputSampler.RandomSampler<IntWritable,IntWritable>(1,100000,3);
            TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("e:/mr/tmp/par.lst"));
            InputSampler.writePartitionFile(job,sampler);
            job.waitForCompletion(true);
        }
    }

    Map起始阶段

    
    

        在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定 Mapper的输入是<LongWritable,Text> 的原因。然后调用自定义Mapper的map方法,将一个个<LongWritable,Text>键值对输入给Mapper的map方法

    
    

      Map最后阶段

    
    

        在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法

    
    

      Reduce阶段

    
    

        在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的 reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致

     

     二、二次排序

      我们都知道map端的输出结果经过partition()分区函数之后会对key值进行排序,经过shuffle阶段之后,向相同的key值会进入到同一个分组中去,也就是说key的排序是有序的,但是有时候需要对Key排序的同时还需要对Value进行排序,比如上面求每年最高气温的案例时,这时候就要用到二次排序了。经过本人的理解,二次排序可以大致分为以下几个阶段。

      

    Map起始阶段

        在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现。在这里我们使用的是TextInputFormat,它提供的RecordReader会将文本的行号作为Key,这一行的文本作为Value。这就是自定 Mapper的输入是<LongWritable,Text> 的原因。然后调用自定义Mapper的map方法,将一个个<LongWritable,Text>键值对输入给Mapper的map方法

      Map最后阶段

        在Map阶段的最后,会先调用job.setPartitionerClass()对这个Mapper的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用job.setSortComparatorClass()设置的Key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass()设置 Key比较函数类,则使用Key实现的compareTo()方法

      Reduce阶段

        在Reduce阶段,reduce()方法接受所有映射到这个Reduce的map输出后,也会调用job.setSortComparatorClass()方法设置的Key比较函数类,对所有数据进行排序。然后开始构造一个Key对应的Value迭代器。这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value放在一个Value迭代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。最后就是进入Reducer的 reduce()方法,reduce()方法的输入是所有的Key和它的Value迭代器,同样注意输入与输出的类型必须与自定义的Reducer中声明的一致

      排序的案例仍然为上述求取每年最高气温的案例

    二次排序的具体流程

      在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类key,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。

      1、自定义 key

        所有自定义的组合key应该实现接口WritableComparable,WritableComparable接口继承自writable和comparable这两个接,口因为writable接口是可序列化的并且可比较的。WritableComparable。组合key按照年份升序按照气温降序,实现的代码如下

    public class Combokey implements WritableComparable<Combokey> {
        private int year ;
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getTemp() {
            return temp;
        }
    
        public void setTemp(int temp) {
            this.temp = temp;
        }
    
        private int temp;
        /*
        * 对key进行比较实现
        * */
        @Override
        public int compareTo(Combokey o) {
            System.out.println("Combokey.compareTo()"+o.toString());
             int y0 =o.getYear();
            int t0=o.getTemp();
            //年份相同(s升序)
            if(year==y0){
                //气温降序
                return -(temp-t0);
            }else{
                return (year-y0);
            }
        }
        /*
        * 串行化过程
        * */
        @Override
        public void write(DataOutput out) throws IOException {
            //年份
            out.writeInt(year);
            //气温
            out.writeInt(temp);
        }
        //反串行化的过程
        @Override
        public void readFields(DataInput in) throws IOException {
            year = in.readInt();
            temp = in.readInt();
        }
        public  String toString(){
            return  year+":"+temp;
        }
    }

     2.自定义分区

        自定义分区函数类FirstPartitioner,是key的第一次比较,完成对所有key的排序。该分区类按照年份进行分区,相同的年份会进入到同一个分区中去。

    public class YearPartitioner extends Partitioner<Combokey,NullWritable>{
        @Override
        public int getPartition(Combokey key, NullWritable nullWritable, int numPartitions) {
            System.out.println("YearPartitioner.getPartition"+key);
            int year = key.getYear();
            return  year%numPartitions;
        }
    }

    3、Key的比较类CombokeyComparator 

        这是Key的第二次比较,这个类继承自WirtableComparator这个类,对所有的Key进行排序,即同时完成Combokey中的first和second排序。

    public class CombokeyComparator extends WritableComparator{
        protected CombokeyComparator(){
            super(Combokey.class,true);
        }
        public int compare(WritableComparable a,WritableComparable b){
            System.out.println("CombokeyComparator"+a+","+b);
            Combokey k1 = (Combokey)a;
            Combokey k2 = (Combokey)b;
            return k1.compareTo(k2);
        }
    }

    4、定义分组类函数YearGroupComparator 

        在Reduce阶段,构造一个与 Key 相对应的 Value 迭代器的时候,只要year相同就属于同一个组,放在一个Value迭代器,不同的year按照年份升序进行排序。

    public class YearGroupComparator extends WritableComparator{
        protected YearGroupComparator(){
            super(Combokey.class,true);
        }
        public int compare(WritableComparable a,WritableComparable b){
            System.out.println("YearGroupComparator"+a+","+b);
            Combokey key1 = (Combokey)a;
            Combokey key2 = (Combokey)b;
            return  key1.getYear()-key2.getYear();
        }
    }

    5.Map端,输入的(key,value)缩进长度和文本文档,输出的key是组合key,value值是控值

    public class MaxTempMapper extends Mapper<LongWritable,Text,Combokey,NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("MaxTempMapper.map");
            String[] arr = value.toString().split(" ");
            Combokey keyout = new Combokey();
            keyout.setYear(Integer.parseInt(arr[0]));
            keyout.setTemp(Integer.parseInt(arr[1]));
            context.write(keyout,NullWritable.get());
        }
    }

    6.Reduce端,将组合key切割成key为year,value为气温的一个列表

    public class MaxTempReducer extends Reducer<Combokey,NullWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(Combokey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            int year = key.getYear();
            int temp = key.getTemp();
            System.out.println("MaxTempReducer.reduce"+year+","+temp);
            context.write(new IntWritable(year),new IntWritable(temp));
        }
    }

    7.APP端

    public class MaxTempApp {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
            job.setJobName("MaxTempApp");
            FileInputFormat.addInputPath(job,new Path("e:/mr/tmp/1.txt"));
            FileOutputFormat.setOutputPath(job,new Path("e:/mr/tmp/out"));
            job.setJarByClass(MaxTempApp.class);
            //设置Map类
            job.setMapperClass(MaxTempMapper.class);
            //设置Reduce类
            job.setReducerClass(MaxTempReducer.class);
            //设置Map输出类型
            job.setMapOutputKeyClass(Combokey.class);
            job.setMapOutputValueClass(NullWritable.class);
            //设置reduce输出类型
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            //设置分区类
            job.setPartitionerClass(YearPartitioner.class);
            //设置分组对比器
            job.setGroupingComparatorClass(YearGroupComparator.class);
            //设置排序对比器
            job.setSortComparatorClass(CombokeyComparator.class);
            job.setNumReduceTasks(3);
            job.waitForCompletion(true);
        }
    }
  • 相关阅读:
    css3基础篇二
    css3基础篇一
    react基础篇六
    react基础篇五
    react基础篇四
    react基础篇三
    react基础篇二
    react基础篇一
    矩阵
    POJ 3071 Football
  • 原文地址:https://www.cnblogs.com/bigdata-stone/p/9311370.html
Copyright © 2011-2022 走看看