zoukankan      html  css  js  c++  java
  • Hadoop二次排序

    1、实现要求:对年份按照升序排列,对气温进行降序排列

    2、实现步骤

    1、定义组合key

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class comKey implements WritableComparable<comKey>{
    
        private int year;
        private int temp;
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getTemp() {
            return temp;
        }
    
        public void setTemp(int temp) {
            this.temp = temp;
        }
    
        //对key进行比较实现
        public int compareTo(comKey o) {
            int y1 = o.getYear();
            int t1 = o.getTemp();
            //如果年份相同
            if(year == y1){
                //气温降序
                return -(temp - t1);
            }
            else {
                return year - y1;
            }
        }
    
        //串行化过程
        public void write(DataOutput out) throws IOException {
    
            out.writeInt(year);
            out.writeInt(temp);
    
        }
    
        //反串行化过程
        public void readFields(DataInput in) throws IOException {
    
            int year = in.readInt();
            int temp = in.readInt();
    
    
        }
    }
    

    2、mapper实现

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * mapper:输出为组合key,输出value为空值
     */
    public class MaxTempMapper extends Mapper<LongWritable,Text,comKey,NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] arr = line.split(" ");
            comKey keyout = new comKey();
            keyout.setTemp(Integer.parseInt(arr[0]));
            keyout.setYear(Integer.parseInt(arr[1]));
            context.write(keyout,NullWritable.get());
        }
    }
    

    3、reducer类

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class MaxTempReducer extends Reducer<comKey,NullWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(comKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            int year = key.getYear();
            int temp = key.getTemp();
            context.write(new IntWritable(year),new IntWritable(temp));
    
        }
    }
    

    4、对组合key进行排序

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.RawComparator;
    
    public class ComKeyComparator implements RawComparator<comKey> {
        public int compare(byte[] bytes, int i, int i1, byte[] bytes1, int i2, int i3) {
            return 0;
        }
    
        public int compare(comKey o1, comKey o2) {
            return o1.compareTo(o2);
        }
    }
    

    5、按照年份进行分组对比器实现

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 按照年份进行分组对比器实现
     */
    public class YearGroupComparator extends WritableComparator{
        protected YearGroupComparator() {
            super(comKey.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            comKey a1 = (comKey) a;
            comKey b1 = (comKey) b;
            return a1.compareTo(b1);
    
    
        }
    }

    6、设置分区数

    package com.cr.secondarySort;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class yearPartitioner extends Partitioner<comKey,NullWritable> {
        public int getPartition(comKey comKey, NullWritable nullWritable, int i) {
    
            int year = comKey.getYear();
            return year % i;
        }
    }
    

    7、运行

    运行报错,out文件夹为空
    • 由于文件重名,导入了另外包下面的类
    • 反串行化的时候没有拿到当前的year 和temp,直接初始化了
      //反串行化过程
        public void readFields(DataInput in) throws IOException {
    
    //        int year = in.readInt();
    //        int temp = in.readInt();
            year = in.readInt();
            temp = in.readInt();
    到这里,out文件夹是可以输出分区文件了,但是发现分区文件里面并没有对year进行排序,也没有进行reduce捏合
    那么问题出在了哪呢,
    • 首先就是reducer里面,需要迭代comkey-->nullWritable,取出所有的组合键里面的year和temperature写入context
    public class MaxTempReducer extends Reducer<ComKey,NullWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(ComKey key, Iterable<NullWritable> values, Context context) {
            values.forEach(nullWritable -> {
                int year = key.getYear();
                int temp = key.getTemp();
                try {
                    context.write(new IntWritable(year),new IntWritable(temp));
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    • 其次就是进入reduce之后,需要对key相同的进行组合,
     @Override
        public int compare(WritableComparable a, WritableComparable b) {
            System.out.println("comkeyComparator " + a + "," + b);
            comKey a1 = (comKey) a;
            comKey b1 = (comKey) b;
            return a1.compareTo(b1);这是之前的代码,这里调用的是comkey里面的重写方法compareTo(),但是这个方法实际上是mapper到reduce的shuffle过程
    那么我们需要对获取到的year进行一个排序
    /**
     * 按照年份进行分组对比器实现
     */
    public class YearGroupComparator extends WritableComparator {
        YearGroupComparator() {
            super(ComKey.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            int y1 = ((ComKey) a).getYear();
            int y2 = ((ComKey) b).getYear();
            return y1 - y2;
        }
    }
    下面贴上最后的一部分运行结果

    part-r-00000
    2001	39
    2001	38
    2001	38
    2001	38
    2001	38
    2001	37
    2001	36
    2001	36
    2001	36
    2001	35
    2001	35
    2001	34
    2001	34
    2001	34
    2001	33
    2001	33
    2001	33
    2001	32
    2001	31
    2001	31
    part-r-00001
    2038	22
    2038	22
    2038	22
    2038	21
    2038	21
    2038	21
    2038	21
    2038	21
    2038	20
    2038	20
    2038	20
    2038	20
    2038	19
    2038	19
    2038	19
    2038	19
    2038	19
    2038	19
    2038	19
    2038	18
    part-r-00002
    2000	39
    2000	39
    2000	38
    2000	38
    2000	38
    2000	38
    2000	38
    2000	37
    2000	37
    2000	37
    2000	37
    2000	37
    2000	36
    2000	35
    2000	35
    2000	35
    2000	35
    2000	35
    2000	34
    2000	34
    2000	34
    这个终于满足了我们的要求,气温按照不同的年份进行聚合。年份升序排列,气温降序


    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    Tutorial中代码的区别及不同效果
    Session
    代码解析&Filter用户授权例子
    web后台运作过程
    工厂纸杯生产流水线管理系统
    Webservice和EJB
    Week8——hashcode()和equals()方法
    Week7——JSON
    Week6——Lifecycle of JSF and Facelets
    Week5——Ajax
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10326954.html
Copyright © 2011-2022 走看看