zoukankan      html  css  js  c++  java
  • MapReduce TopN(自主复习)

    1.MyTopN  主程序

    package com.littlepage.topn;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import java.io.IOException;
    
    public class MyTopN {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf=new Configuration(true);
            String[] other=new GenericOptionsParser(conf,args).getRemainingArgs();
            //设定本地环境运行,不进行集群运行
            conf.set("mapreduce.framework.name","local");
            //设定异构平台
            conf.set("mapreduce.app-submission.cross-platform","true");
            Job job=Job.getInstance(conf);
            job.setJarByClass(MyTopN.class);
            job.setJobName("TopN");
            //核心
            //map task
            //input,output
            TextInputFormat.addInputPath(job,new Path(other[0]));
            Path outPath=new Path(other[1]);
            if(outPath.getFileSystem(conf).exists(outPath)){
                outPath.getFileSystem(conf).delete(outPath,true);
            }
            //map
            job.setMapperClass(TopNMapper.class);
            job.setMapOutputKeyClass(TopNKey.class);
            job.setMapOutputValueClass(IntWritable.class);
            //partitioner
            //只需要满足相同的key获得相同的分区号
            job.setPartitionerClass(TopNPartitioner.class);
            //sortComparator
            job.setSortComparatorClass(TopNSortComparator.class);
            //combine
    
            //reducetask
            job.setReducerClass(TopNReducer.class);
            //groupingComparator
            job.setGroupingComparatorClass(TopNGroupingComparator.class);
            //output
            TextOutputFormat.setOutputPath(job,outPath);
            job.waitForCompletion(true);
        }
    }

    2.TopNKey

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * 自定义类型必须实现接口:
     * 序列化/反序列化   比较器
     */
    
    public class TopNKey implements WritableComparable<TopNKey> {
    
        private int year;
        private int month;
        private int day;
        private int template;
    
        public int getYear() {
            return year;
        }
    
        public void setYear(int year) {
            this.year = year;
        }
    
        public int getMonth() {
            return month;
        }
    
        public void setMonth(int month) {
            this.month = month;
        }
    
        public int getDay() {
            return day;
        }
    
        public void setDay(int day) {
            this.day = day;
        }
    
        public int getTemplate() {
            return template;
        }
    
        public void setTemplate(int template) {
            this.template = template;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(year);
            out.writeInt(month);
            out.writeInt(day);
            out.writeInt(template);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.year = in.readInt();
            this.month = in.readInt();
            this.day = in.readInt();
            this.template = in.readInt();
        }
    
        @Override
        public int compareTo(TopNKey that) {
            int c1 = Integer.compare(this.year,that.getYear());
            if(c1==0){
                int c2 = Integer.compare(this.month,that.getMonth());
                if(c2 == 0){
                    return Integer.compare(this.day,that.getDay());
                }
                return c2;
            }
            return c1;
        }
    }

    3.TopNMapper

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.util.StringUtils;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    public class TopNMapper extends Mapper<LongWritable, Text,TopNKey, IntWritable> {
        TopNKey topNKey = new TopNKey();
        IntWritable intWritable = new IntWritable();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //开发习惯
            //value: 2019-6-1 22:22:22   1    31
            String[] strs = StringUtils.split(value.toString(), '	');
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            try{
                Date date = sdf.parse(strs[0]);
                Calendar cal=Calendar.getInstance();
                cal.setTime(date);
                topNKey.setYear(cal.get(Calendar.YEAR));
                topNKey.setMonth(cal.get(Calendar.MONTH)+1);
                topNKey.setDay(cal.get(Calendar.DAY_OF_MONTH));
                int template=Integer.parseInt(strs[2]);
                topNKey.setTemplate(template);
                intWritable.set(template);
                context.write(topNKey,intWritable);
            }catch (ParseException e){
                e.printStackTrace();
            }
        }
    }

    4.TopNReducer

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class TopNReducer extends Reducer<TopNKey, IntWritable, Text,IntWritable> {
        Text rkey=new Text();
        IntWritable rval=new IntWritable();
        int flag=0;
        int day=0;
        @Override
        protected void reduce(TopNKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<IntWritable> iter = values.iterator();
            while(iter.hasNext()){
                IntWritable val=iter.next();
                if(flag==0){
                    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                    rval.set(key.getTemplate());
                    context.write(rkey,rval);
                    flag++;
                    day=key.getDay();
                }
                if(flag!=0&&day!=key.getDay()){
                    rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                    rval.set(key.getTemplate());
                    context.write(rkey,rval);
                    break;
                }
            }
        }
    }

    5.TopNPartitioner 分区规划,来划分Map之后的结果是存在哪个dn进行处理

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class TopNPartitioner extends Partitioner<TopNKey,IntWritable> {
        @Override
        public int getPartition(TopNKey key, IntWritable value, int numPartitions) {
            //1.不能太复杂
            //2.缩小组的维度
            return key.getYear()%numPartitions;//可能会产生数据倾斜
        }
    
    }

    6.TopNSortComparator 排序比较器,在Map中精确到月,按温度递减

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class TopNSortComparator extends WritableComparator {
        public  TopNSortComparator(){
            super(TopNKey.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TopNKey k1=(TopNKey) a;
            TopNKey k2=(TopNKey) b;
            //年,月,温度,且温度倒序
            int c1=Integer.compare(k1.getYear(),k2.getYear());
            if(c1==0){
                int c2=Integer.compare(k1.getMonth(),k2.getMonth());
                if(c2==0){
                    return -Integer.compare(k1.getTemplate(),k2.getTemplate());
                }
                return c2;
            }
            return c1;
        }
    }

    7.TopNGroupingComparator 分组比较器,用于reduce的分组,每一个组是年月,进行reduce操作

    package com.littlepage.topn;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class TopNGroupingComparator extends WritableComparator {
        public TopNGroupingComparator() {
            super(TopNKey.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TopNKey k1 = (TopNKey) a;
            TopNKey k2 = (TopNKey) b;
            //年,月
            int c1 = Integer.compare(k1.getYear(), k2.getYear());
            if (c1 == 0) {
                return Integer.compare(k1.getMonth(), k2.getMonth());
            }
            return c1;
        }
    }

    TopN案例是MapReduce的典型案例,需牢记

  • 相关阅读:
    js实现base64转换
    使用maven命令终端构建一个web项目及发布该项目
    使用eclipse构建Maven项目及发布一个Maven项目
    Maven在Windows上的安装与配置
    centos7下安装配置redis3.0.4
    Centos7下完美安装并配置mysql5.6
    linux常用命令总结
    VMware下centos桥接模式静态ip配置
    解决centos7下tomcat启动正常,无法访问项目的问题
    centos7系统下安装配置jdk、tomcat教程
  • 原文地址:https://www.cnblogs.com/littlepage/p/11343474.html
Copyright © 2011-2022 走看看