zoukankan      html  css  js  c++  java
  • mapreduce实现分组求最大

    package cn.itcastcat.bigdata.secondarysort;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 利用reduce端的GroupingComparator来实现将一组bean看成相同的key
     * 
     *
     */
    public class ItemidGroupingComparator extends WritableComparator {
    
        //传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
        protected ItemidGroupingComparator() {
            super(OrderBean.class, true);
        }
        
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            OrderBean abean = (OrderBean) a;
            OrderBean bbean = (OrderBean) b;
            
            //比较两个bean时,指定只比较bean中的orderid
            return abean.getItemid().compareTo(bbean.getItemid());
            
        }
    
    }
    package cn.itcastcat.bigdata.secondarysort;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    
    public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{
    
        @Override
        public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
            //相同id的订单bean,会发往相同的partition
            //而且,产生的分区数,是会跟用户设置的reduce task数保持一致
            return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
            
        }
    
    }
    package cn.itcastcat.bigdata.secondarysort;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.sun.xml.bind.v2.schemagen.xmlschema.List;
    
    /**
     * 
     * @author 
     *
     */
    public class SecondarySort {
        
        static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
            
            OrderBean bean = new OrderBean();
            
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                String[] fields = StringUtils.split(line, ",");
                
                bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
                
                context.write(bean, NullWritable.get());
                
            }
            
        }
        
        static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
            
            
            //到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位
            @Override
            protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
        
        
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(SecondarySort.class);
            
            job.setMapperClass(SecondarySortMapper.class);
            job.setReducerClass(SecondarySortReducer.class);
            
            
            job.setOutputKeyClass(OrderBean.class);
            job.setOutputValueClass(NullWritable.class);
            
            FileInputFormat.setInputPaths(job, new Path("D:\srcdata\d10\orders.txt"));
            FileOutputFormat.setOutputPath(job, new Path("D:/srcdata/d10/gpoutput"));
            
            //在此设置自定义的Groupingcomparator类 
            job.setGroupingComparatorClass(ItemidGroupingComparator.class);
            //在此设置自定义的partitioner类
            job.setPartitionerClass(ItemIdPartitioner.class);
            
            job.setNumReduceTasks(2);
            
            job.waitForCompletion(true);
            
        }
    
    }
  • 相关阅读:
    rails的字符编码
    rails中ActionController::InvalidAuthenticityToken解决办法
    ruby on rails 实战(二)
    ruby on rails 实战(一)
    朴素贝叶斯趣味挑战项目
    python re的findall和finditer
    使用Naive Bayes从个人广告中获取区域倾向
    第五周(2.5~2.11)
    第四周(1.29~2.4)
    第三周(1.22~1.28)
  • 原文地址:https://www.cnblogs.com/huodaihao/p/8998731.html
Copyright © 2011-2022 走看看