zoukankan      html  css  js  c++  java
  • Hadoop_26_MapReduce_Reduce端使用GroupingComparator求同一订单中最大金额的订单

    1. 自定义GroupingComparator

    1.1.需求:有如下订单

    现在需要求出每一个订单中成交金额最大的一笔交易

    1.2.分析:

      1、利用“订单id和成交金额”Bean作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,

    发送到reduce

      2、在reduce端利用GroupingComparator将订单id相同的kv聚合成组,然后取第一个即是最大值

    定义订单信息bean,实现CompareTo()方法用于排序

    package cn.bigdata.hdfs.secondarySort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    /**
     * 订单信息bean,实现hadoop的序列化机制
     */
    public class OrderBean implements WritableComparable<OrderBean>{
    
        private Text itemid;
        private DoubleWritable amount;
    
        public OrderBean() {
        }
    
        public OrderBean(Text itemid, DoubleWritable amount) {
            set(itemid, amount);
    
        }
    
        public void set(Text itemid, DoubleWritable amount) {
    
            this.itemid = itemid;
            this.amount = amount;
    
        }
    
        public Text getItemid() {
            return itemid;
        }
    
        public DoubleWritable getAmount() {
            return amount;
        }
        //1.模型必须实现Comparable<T>接口
        /*2.Collections.sort(list);会自动调用compareTo,如果没有这句,list是不会排序的,也不会调用compareTo方法
          3.如果是数组则用的是Arrays.sort(a)方法*/
        //implements WritableComparable必须要实现的方法,用于比较排序
        @Override
        public int compareTo(OrderBean o) {
            //根據ID排序
            int cmp = this.itemid.compareTo(o.getItemid());
            //id相同根据金额排序
            if (cmp == 0) {
                cmp = -this.amount.compareTo(o.getAmount());
            }
            return cmp;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(itemid.toString());
            out.writeDouble(amount.get());
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            String readUTF = in.readUTF();
            double readDouble = in.readDouble();
            
            this.itemid = new Text(readUTF);
            this.amount= new DoubleWritable(readDouble);
        }
    
        @Override
        public String toString() {
            return itemid.toString() + "	" + amount.get();
        }
    }
    View Code

     自定义Partitioner用于分区

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{
        @Override
        public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
            //相同id的订单bean,会发往相同的partition
            //而且,产生的分区数,是会跟用户设置的reduce task数保持一致
            return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
        }
    }

    自定义GroupingComparator,在调用Reduce时对数据分组

    package cn.bigdata.hdfs.secondarySort;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 用于控制shuffle过程中reduce端对kv对的聚合逻辑
     * 利用reduce端的GroupingComparator来实现将一组bean看成相同的key
     */
    public class ItemidGroupingComparator extends WritableComparator {
    
        //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
        protected ItemidGroupingComparator() {
            super(OrderBean.class, true);
        }
        
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean abean = (OrderBean) a;
            OrderBean bbean = (OrderBean) b;
            //将item_id相同的bean都视为相同,从而聚合为一组
            //比较两个bean时,指定只比较bean中的orderid
            return abean.getItemid().compareTo(bbean.getItemid());
        }
    }

    编写mapreduce处理流程

    /**
     *    Order_0000001,Pdt_01,222.8
     *    Order_0000001,Pdt_05,25.8
     *    Order_0000002,Pdt_05,325.8
     *    Order_0000002,Pdt_03,522.8
     *    Order_0000002,Pdt_04,122.4
     *    Order_0000003,Pdt_01,222.8    
     */
    public class SecondarySort {
        
        static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
            
            OrderBean bean = new OrderBean();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                String[] fields = StringUtils.split(line, ",");
                
                bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
                
                context.write(bean, NullWritable.get());
            }
        }
        
        static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
            
            //到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个排在第一位
            //在设置了groupingcomparator以后,这里收到的kv数据就是:  <1001 87.6>,null  <1001 76.5>,null  .... 
            //此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>
            //要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key
            @Override
            protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
        
        
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(SecondarySort.class);
            
            job.setMapperClass(SecondarySortMapper.class);
            job.setReducerClass(SecondarySortReducer.class);
            
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path("F:/secondary"));
            FileOutputFormat.setOutputPath(job, new Path("F:/secondaryOut"));
            
            //在此设置自定义的Groupingcomparator类 
            job.setGroupingComparatorClass(ItemidGroupingComparator.class);
            //在此设置自定义的partitioner类
            job.setPartitionerClass(ItemIdPartitioner.class);
            //设置Reduce的数量
            job.setNumReduceTasks(2);
            job.waitForCompletion(true);
        }
    }

     文件:

    Order_0000001,Pdt_01,222.8
    Order_0000001,Pdt_05,25.8
    Order_0000002,Pdt_05,325.8
    Order_0000002,Pdt_03,522.8
    Order_0000002,Pdt_04,122.4
    Order_0000003,Pdt_01,222.8
  • 相关阅读:
    单例模式
    HashSet、LinkedHashSet、SortedSet、TreeSet
    ArrayList、LinkedList、CopyOnWriteArrayList
    HashMap、Hashtable、LinkedHashMap
    andrew ng machine learning week8 非监督学习
    andrew ng machine learning week7 支持向量机
    andrew ng machine learning week6 机器学习算法理论
    andrew ng machine learning week5 神经网络
    andrew ng machine learning week4 神经网络
    vue组件监听属性变化watch方法报[Vue warn]: Method "watch" has type "object" in the component definition. Did you reference the function correctly?
  • 原文地址:https://www.cnblogs.com/yaboya/p/9254640.html
Copyright © 2011-2022 走看看