zoukankan      html  css  js  c++  java
  • 【Hadoop离线基础总结】MapReduce案例之自定义groupingComparator

    MapReduce案例之自定义groupingComparator


    求取Top 1的数据

    • 需求
      求出每一个订单中成交金额最大的一笔交易
    订单id			商品id	成交金额
    Order_0000005	Pdt_01	222.8
    Order_0000005	Pdt_05	25.8
    Order_0000002	Pdt_03	322.8
    Order_0000002	Pdt_04	522.4
    Order_0000002	Pdt_05	822.4
    Order_0000003	Pdt_01	222.8
    
    • 代码实现

    自定义一个javaBean,命名为OrderBean

    package cn.itcast.demo5;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class OrderBean implements WritableComparable<OrderBean> {
        //定义orderId和price变量
        private String orderId;
        private Double price;
    
        /**
         * 重写compareTo方法
         *
         * @param o
         * @return
         */
        @Override
        public int compareTo(OrderBean o) {
            //先对orderId进行比较,如果相同,将它们的price放一起比较,不同就不比较
            int result = this.orderId.compareTo(o.orderId);
            //进行判断
            if (result == 0) {
                int i = this.price.compareTo(o.price);
                return -i;      //返回i求取最小值,返回-i求取最大值
            }
            return result;
    
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(orderId);
            out.writeDouble(price);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.orderId = in.readUTF();
            this.price = in.readDouble();
        }
    
        //生成get(),set()方法
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setPrice(Double price) {
            this.price = price;
        }
    
        //生成toString()方法
    
        @Override
        public String toString() {
            return orderId + "	" + price;
        }
    }
    

    定义一个Mapper类

    package cn.itcast.demo5;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //分割获取到的数据
            String[] split = value.toString().split("	");
    
            //创建orderBean对象
            OrderBean orderBean = new OrderBean();
            //给orderId赋值
            orderBean.setOrderId(split[0]);
            //给price赋值
            orderBean.setPrice(Double.valueOf(split[2]));
    
            context.write(orderBean, NullWritable.get());
        }
    }
    

    自定义分区(Partition)规则

    package cn.itcast.demo5;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class GroupPartitioner extends Partitioner<OrderBean, NullWritable> {
        /**
         * 重写分区方法
         *
         * @param orderBean
         * @param nullWritable
         * @param i
         * @return
         */
        @Override
        public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
            //参照HashPartitioner的重写方法
            return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
        }
    }
    

    自定义分组(groupingComparator)规则

    package cn.itcast.demo5;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class GroupComparator extends WritableComparator {
    
        //重写无参构造方法,定义反射出来的对象是OrderBean类
        public GroupComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean first = (OrderBean) a;
            OrderBean second = (OrderBean) b;
            //比较orderId,如果相同就认为是同一组数据
            return first.getOrderId().compareTo(second.getOrderId());
        }
    }
    

    定义一个Reducer类

    package cn.itcast.demo5;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
        /**
         * 直接将收到的k2,v2的值转换为k3,v3输出
         *
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, values.iterator().next());
        }
    }
    

    程序main函数入口

    package cn.itcast.demo5;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class GroupMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //获取Job对象
            Job job = Job.getInstance(super.getConf(), "myGroupComparator");
            //输入数据,设置输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义groupingComparator/input/orders.txt"));
    
            //自定义Map逻辑
            job.setMapperClass(GroupMapper.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            //自定义Partition逻辑
            job.setPartitionerClass(GroupPartitioner.class);
    
            //自定义分组逻辑
            job.setGroupingComparatorClass(GroupComparator.class);
    
            //自定义reduce逻辑
            job.setReducerClass(GroupReducer.class);
            //设置k3,v3输出类型
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            //输出数据,设置输出路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义groupingComparator/output_top1"));
    
            //提交任务至集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
            System.exit(run);
        }
    }
    
    • 运行结果
    Order_0000002	822.4
    Order_0000003	222.8
    Order_0000005	222.8
    

    求取TopN的数据

    • 需求
      求取Top1运用了GroupBy的规则,排序后,不需要再进行操作,就会自动输出首个数据
      如果要获取TopN的数据就需要在Reduce逻辑中添加循环遍历,所有的NullWritable转换为DoubleWritable,其他都不变

    • 代码实现

    自定义一个javaBean,命名为OrderBean

    package cn.itcast.demo6;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class OrderBean implements WritableComparable<OrderBean> {
        //定义orderId和price变量
        private String orderId;
        private Double price;
    
        /**
         * 重写compareTo方法
         *
         * @param o
         * @return
         */
        @Override
        public int compareTo(OrderBean o) {
            //先对orderId进行比较,如果相同,将它们的price放一起比较,不同就不比较
            int result = this.orderId.compareTo(o.orderId);
            //进行判断
            if (result == 0) {
                int i = this.price.compareTo(o.price);
                return -i;      //返回i求取最小值,返回-i求取最大值
            }
            return result;
    
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(orderId);
            out.writeDouble(price);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.orderId = in.readUTF();
            this.price = in.readDouble();
        }
    
        //生成get(),set()方法
    
        public String getOrderId() {
            return orderId;
        }
    
        public void setOrderId(String orderId) {
            this.orderId = orderId;
        }
    
        public double getPrice() {
            return price;
        }
    
        public void setPrice(Double price) {
            this.price = price;
        }
    
        //生成toString()方法
    
        @Override
        public String toString() {
            return orderId + "	" + price;
        }
    }
    

    定义一个Mapper类

    package cn.itcast.demo6;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, DoubleWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //分割获取到的数据
            String[] split = value.toString().split("	");
    
            //创建orderBean对象
            OrderBean orderBean = new OrderBean();
            //给orderId赋值
            orderBean.setOrderId(split[0]);
            //给price赋值
            orderBean.setPrice(Double.valueOf(split[2]));
    
            DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2]));
            context.write(orderBean, doubleWritable);
        }
    }
    

    自定义分区(Partition)规则

    package cn.itcast.demo6;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class GroupPartitioner extends Partitioner<OrderBean, DoubleWritable> {
        /**
         * 重写分区方法
         *
         * @param orderBean
         * @param doubleWritable
         * @param i
         * @return
         */
        @Override
        public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) {
            //参照HashPartitioner的重写方法
            return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
        }
    }
    

    自定义分组(groupingComparator)规则

    package cn.itcast.demo6;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class GroupComparator extends WritableComparator {
    
        //重写无参构造方法,定义反射出来的对象是OrderBean类
        public GroupComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean first = (OrderBean) a;
            OrderBean second = (OrderBean) b;
            //比较orderId,如果相同就认为是同一组数据
            return first.getOrderId().compareTo(second.getOrderId());
        }
    }
    

    定义一个Reducer类

    package cn.itcast.demo6;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class GroupReducer extends Reducer<OrderBean, DoubleWritable, OrderBean, DoubleWritable> {
        /**
         * 直接将收到的k2,v2的值转换为k3,v3输出
         *
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            int i = 0;
            for (DoubleWritable value : values) {
                i++;
                if (i <= 2) {
                    context.write(key, value);
                } else {
                    break;
                }
            }
        }
    }
    

    程序main函数入口

    package cn.itcast.demo6;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class GroupMain extends Configured implements Tool {
        @Override
        public int run(String[] args) throws Exception {
            //获取Job对象
            Job job = Job.getInstance(super.getConf(), "myGroupComparator");
            //输入数据,设置输入路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义groupingComparator/input/orders.txt"));
    
            //自定义Map逻辑
            job.setMapperClass(GroupMapper.class);
            //设置k2,v2输出类型
            job.setMapOutputKeyClass(OrderBean.class);
            job.setMapOutputValueClass(DoubleWritable.class);
    
            //自定义Partition逻辑
            job.setPartitionerClass(GroupPartitioner.class);
    
            //自定义分组逻辑
            job.setGroupingComparatorClass(GroupComparator.class);
    
            //自定义reduce逻辑
            job.setReducerClass(GroupReducer.class);
            //设置k3,v3输出类型
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            //输出数据,设置输出路径
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/5.大数据离线第五天/自定义groupingComparator/output_top2"));
    
            //提交任务至集群
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
            System.exit(run);
        }
    }
    
    • 运行结果
    Order_0000002	822.4	822.4
    Order_0000002	522.4	522.4
    Order_0000003	222.8	222.8
    Order_0000005	222.8	222.8
    Order_0000005	25.8	25.8
    
  • 相关阅读:
    linux of函数实例
    Linux libenv 编译移植
    OpenTracing简单了解
    Byte Buddy简单学习
    JavaAgent简单学习
    TB2安装ubuntu16.04+kinetic的ROS包
    常用工具传送门
    ROS传送门
    结对第二次—文献摘要热词统计及进阶需求
    结对第一次—原型设计(文献摘要热词统计)
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772486.html
Copyright © 2011-2022 走看看