zoukankan      html  css  js  c++  java
  • MapReduce实战(七)GroupingComparator

    需求:

    Order_0000001,Pdt_01,222.8
    Order_0000001,Pdt_05,25.8
    Order_0000002,Pdt_05,325.8
    Order_0000002,Pdt_03,522.8
    Order_0000002,Pdt_04,122.4
    Order_0000003,Pdt_01,222.8

    按照订单的编号分组,计算出每组的商品价格最大值。

    分析:

    我们可以把订单编号当做key,然后按照在reduce端去找出每组的最大值。在这里,我想介绍另外一种方法,顺便介绍GroupingComparator。

    我们可以自定义一个类型,然后通过GroupingComparator来让其被看成一组(到达reduce端),如果我们对类型进行从大到小的排序,根据MapReduce的规则,同一组的内容到达reduce端,是取第一个内容的key作为reduce的key的,我们不妨利用这个规则,写一个OrderBean的类型,只要让其orderid相同,就被分到同一组,这样一来,到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位,就是我们想要的结果。

    代码:

    OrderBean.java:

    package com.darrenchan.mr.groupingcomparator;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    public class OrderBean implements WritableComparable<OrderBean>{
    
        private Text itemid;
        private DoubleWritable amount;
    
        public OrderBean() {
        }
    
        public OrderBean(Text itemid, DoubleWritable amount) {
            set(itemid, amount);
        }
    
        public void set(Text itemid, DoubleWritable amount) {
            this.itemid = itemid;
            this.amount = amount;
        }
    
        public Text getItemid() {
            return itemid;
        }
    
        public DoubleWritable getAmount() {
            return amount;
        }
    
        @Override
        public int compareTo(OrderBean o) {
    //        int cmp = this.itemid.compareTo(o.getItemid());
    //        if (cmp == 0) {
            int    cmp = -this.amount.compareTo(o.getAmount());
    //        }
            return cmp;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(itemid.toString());
            out.writeDouble(amount.get());
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            String readUTF = in.readUTF();
            double readDouble = in.readDouble();
            
            this.itemid = new Text(readUTF);
            this.amount= new DoubleWritable(readDouble);
        }
    
    
        @Override
        public String toString() {
            return itemid.toString() + "	" + amount.get();
        }
    
    }

    ItemidGroupingComparator.java:

    package com.darrenchan.mr.groupingcomparator;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 利用reduce端的GroupingComparator来实现将一组bean看成相同的key
     *
     */
    public class ItemidGroupingComparator extends WritableComparator {
    
        //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
        protected ItemidGroupingComparator() {
            super(OrderBean.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean abean = (OrderBean) a;
            OrderBean bbean = (OrderBean) b;
            
            //比较两个bean时,指定只比较bean中的orderid
            return abean.getItemid().compareTo(bbean.getItemid());
        }
    
    }

    ItemIdPartitioner.java:

    package com.darrenchan.mr.groupingcomparator;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    
    public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{
    
        @Override
        public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
            //相同id的订单bean,会发往相同的partition
            //而且,产生的分区数,是会跟用户设置的reduce task数保持一致
            return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
            
        }
    
    }

    SecondarySort.java:

    package com.darrenchan.mr.groupingcomparator;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.sun.xml.bind.v2.schemagen.xmlschema.List;
    
    /**
     * 
     */
    public class SecondarySort {
        
        static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
            
            OrderBean bean = new OrderBean();
            
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                String[] fields = StringUtils.split(line, ",");
                
                bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
                
                context.write(bean, NullWritable.get());
                
            }
            
        }
        
        static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
            
            //到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位
            @Override
            protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
        
        
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(SecondarySort.class);
            
            job.setMapperClass(SecondarySortMapper.class);
            job.setReducerClass(SecondarySortReducer.class);
            
            
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path("/grouping/srcdata"));
            FileOutputFormat.setOutputPath(job, new Path("/grouping/output"));
            
            //在此设置自定义的Groupingcomparator类 
            job.setGroupingComparatorClass(ItemidGroupingComparator.class);
            //在此设置自定义的partitioner类
            job.setPartitionerClass(ItemIdPartitioner.class);
            
            job.setNumReduceTasks(3);
            
            job.waitForCompletion(true);
            
        }
    
    }

    运行结果:

  • 相关阅读:
    Haskell Interactive Development in Emacs
    Access Java API in Groovy Script
    手工设置Eclipse文本编辑器的配色
    Color Theme of Emacs
    Gnucash的投资记录
    Special Forms and Syntax Sugars in Clojure
    Use w3m as Web Browser
    SSE指令集加速之 I420转BGR24
    【图像处理】 增加程序速度的方法
    TBB 入门笔记
  • 原文地址:https://www.cnblogs.com/DarrenChan/p/6773277.html
Copyright © 2011-2022 走看看