需求:
Order_0000001,Pdt_01,222.8
Order_0000001,Pdt_05,25.8
Order_0000002,Pdt_05,325.8
Order_0000002,Pdt_03,522.8
Order_0000002,Pdt_04,122.4
Order_0000003,Pdt_01,222.8
按照订单的编号分组,计算出每组的商品价格最大值。
分析:
我们可以把订单编号当做key,然后按照在reduce端去找出每组的最大值。在这里,我想介绍另外一种方法,顺便介绍GroupingComparator。
我们可以自定义一个类型,然后通过GroupingComparator来让其被看成一组(到达reduce端),如果我们对类型进行从大到小的排序,根据MapReduce的规则,同一组的内容到达reduce端,是取第一个内容的key作为reduce的key的,我们不妨利用这个规则,写一个OrderBean的类型,只要让其orderid相同,就被分到同一组,这样一来,到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位,就是我们想要的结果。
代码:
OrderBean.java:
package com.darrenchan.mr.groupingcomparator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean>{ private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { // int cmp = this.itemid.compareTo(o.getItemid()); // if (cmp == 0) { int cmp = -this.amount.compareTo(o.getAmount()); // } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); } @Override public String toString() { return itemid.toString() + " " + amount.get(); } }
ItemidGroupingComparator.java:
package com.darrenchan.mr.groupingcomparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 利用reduce端的GroupingComparator来实现将一组bean看成相同的key * */ public class ItemidGroupingComparator extends WritableComparator { //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象 protected ItemidGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //比较两个bean时,指定只比较bean中的orderid return abean.getItemid().compareTo(bbean.getItemid()); } }
ItemIdPartitioner.java:
package com.darrenchan.mr.groupingcomparator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) { //相同id的订单bean,会发往相同的partition //而且,产生的分区数,是会跟用户设置的reduce task数保持一致 return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
SecondarySort.java:
package com.darrenchan.mr.groupingcomparator; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.sun.xml.bind.v2.schemagen.xmlschema.List; /** * */ public class SecondarySort { static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean bean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ","); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ //到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位 @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SecondarySort.class); job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/grouping/srcdata")); FileOutputFormat.setOutputPath(job, new Path("/grouping/output")); //在此设置自定义的Groupingcomparator类 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //在此设置自定义的partitioner类 job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(3); job.waitForCompletion(true); } }
运行结果: