zoukankan      html  css  js  c++  java
  • MapReduce排序

    需求介绍

    现有如下数据:

    1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
    2	13846544121	192.196.100.2			264	0	200
    3 	13956435636	192.196.100.3			132	1512	200
    4 	13966251146	192.168.100.1			240	0	404
    5 	18271575951	192.168.100.2	www.atguigu.com	1527	2106	200
    6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
    7 	13590439668	192.168.100.4			1116	954	200
    8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
    9 	13729199489	192.168.100.6			240	0	200
    10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
    11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
    12 	15959002129	192.168.100.9	www.atguigu.com	1938	180	500
    13 	13560439638	192.168.100.10			918	4938	200
    14 	13470253144	192.168.100.11			180	180	200
    15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
    16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
    17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
    18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
    19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
    20 	13768778790	192.168.100.17			120	120	200
    21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
    22 	13568436656	192.168.100.19			1116	954	200
    

    现在要对数据进行统计,统计格式为:手机号 上行流量 下行流量 总流量

    排序时要按照总流量的大小进行倒序排序

    分析:

    ​ 在我们运行MR程序时,shuffle过程会对数据进行排序,不过针对的是key进行排序,既然要按照总流量进行排序,那么我们便要在map阶段对总流量封装的对象进行排序,在reduce阶段再照常输出即可。

    实现

    共同方法:

    FlowMapper:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FlowMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
    
        private FlowBean outk = new FlowBean();
        private Text outv = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String line = value.toString();
            String[] strings = line.split("	");
            outv.set(strings[1]);
            outk.setUpFlow(Long.parseLong(strings[strings.length - 3]));
            outk.setDownFlow(Long.parseLong(strings[strings.length - 2]));
            outk.setSumFlow();
            context.write(outk,outv);
        }
    }
    
    

    FlowReducer:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
            for (Text value : values) {
    
                context.write(value,key);
    
            }
    
    
        }
    }
    
    

    方法一

    FlowBean:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class FlowBean implements WritableComparable<FlowBean> {
    
        private Long upFlow;
        private Long downFlow;
        private Long sumFlow;
    
        public FlowBean() {
        }
    
        public Long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(Long upFlow) {
            this.upFlow = upFlow;
        }
    
        public Long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(Long downFlow) {
            this.downFlow = downFlow;
        }
    
        public Long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(Long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        public void setSumFlow() {
            this.sumFlow = this.getUpFlow() + this.getDownFlow();
        }
    
        @Override
        public String toString() {
            return "FlowBean{" +
                    "upFlow=" + upFlow +
                    ", downFlow=" + downFlow +
                    ", sumFlow=" + sumFlow +
                    '}';
        }
    
        @Override
        public int compareTo(FlowBean o) {
            return 0;
        }
    }
    
    

    此时需要继承WritableComparator类并实现其排序方法:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class FlowWritableComparable extends WritableComparator {
    
        public FlowWritableComparable(){
            super(FlowBean.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
    
            FlowBean abean = (FlowBean) a;
            FlowBean bbean = (FlowBean) b;
    
            return -abean.getSumFlow().compareTo(bbean.getSumFlow());
        }
    }
    
    

    因此需要在Driver类中注册该方法:

    FlowDriver:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1.创建配置
            Configuration configuration = new Configuration();
            //2.创建job
            Job job = Job.getInstance(configuration);
            //3.关联驱动类
            job.setJarByClass(com.neve.phone.FlowDriver.class);
            //4.关联mapper和reducer类
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            //5.设置mapper的输出值和value
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
            //6.设置最终的输出值和value
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            //7.设置输入输出路径
            FileInputFormat.setInputPaths(job,new Path("F:\Workplace\IDEA_Workplace\hadoopstudy\flowinput"));
            FileOutputFormat.setOutputPath(job,new Path("F:\Workplace\IDEA_Workplace\hadoopstudy\flowoutput"));
            //设置排序器
            job.setSortComparatorClass(FlowWritableComparable.class);
            //8.提交job
            job.waitForCompletion(true);
        }
    
    }
    
    

    方法二(常用)

    直接在FlowBean中重写比较方法:

    package com.neve.writeComparable;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class FlowBean implements WritableComparable<FlowBean> {
    
        private Long upFlow;
        private Long downFlow;
        private Long sumFlow;
    
        public FlowBean() {
        }
    
        public Long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(Long upFlow) {
            this.upFlow = upFlow;
        }
    
        public Long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(Long downFlow) {
            this.downFlow = downFlow;
        }
    
        public Long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(Long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        public void setSumFlow() {
            this.sumFlow = this.getUpFlow() + this.getDownFlow();
        }
    
        @Override
        public String toString() {
            return "FlowBean{" +
                    "upFlow=" + upFlow +
                    ", downFlow=" + downFlow +
                    ", sumFlow=" + sumFlow +
                    '}';
        }
    
        //需要重写的排序方法
        @Override
        public int compareTo(FlowBean o) {
            return -this.getSumFlow().compareTo(o.getSumFlow());
        }
    }
    
    

    Driver类中删去注册排序器的步骤即可,即删除

    job.setSortComparatorClass(FlowWritableComparable.class);
    

    结果

    13509468723	FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
    13975057813	FlowBean{upFlow=11058, downFlow=48243, sumFlow=59301}
    13736230513	FlowBean{upFlow=2481, downFlow=24681, sumFlow=27162}
    13568436656	FlowBean{upFlow=2481, downFlow=24681, sumFlow=27162}
    18390173782	FlowBean{upFlow=9531, downFlow=2412, sumFlow=11943}
    13630577991	FlowBean{upFlow=6960, downFlow=690, sumFlow=7650}
    15043685818	FlowBean{upFlow=3659, downFlow=3538, sumFlow=7197}
    13992314666	FlowBean{upFlow=3008, downFlow=3720, sumFlow=6728}
    15910133277	FlowBean{upFlow=3156, downFlow=2936, sumFlow=6092}
    13560439638	FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
    84188413	FlowBean{upFlow=4116, downFlow=1432, sumFlow=5548}
    13682846555	FlowBean{upFlow=1938, downFlow=2910, sumFlow=4848}
    18271575951	FlowBean{upFlow=1527, downFlow=2106, sumFlow=3633}
    15959002129	FlowBean{upFlow=1938, downFlow=180, sumFlow=2118}
    13590439668	FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
    13568436656	FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
    13956435636	FlowBean{upFlow=132, downFlow=1512, sumFlow=1644}
    13470253144	FlowBean{upFlow=180, downFlow=180, sumFlow=360}
    13846544121	FlowBean{upFlow=264, downFlow=0, sumFlow=264}
    13768778790	FlowBean{upFlow=120, downFlow=120, sumFlow=240}
    13729199489	FlowBean{upFlow=240, downFlow=0, sumFlow=240}
    13966251146	FlowBean{upFlow=240, downFlow=0, sumFlow=240}
    
    

    当然如果使用此方法的话,对于同一个手机号的不同数据就不能计算到一起了,不过后期我们会有方法处理。

  • 相关阅读:
    解决Warning: mysql_connect(): Headers and client library minor version mismatch. 警告
    读取微博feed伪代码
    [待续]不为人知的PHP-SPL标准库
    封装pyMysql
    捉“客”记
    实现小程序插件自定义导航栏
    圆形与矩形的碰撞检测--Mr.Ember
    mpvue原理分析
    webpack学习--Mr.Ember
    原型链、继承--Mr.Ember
  • 原文地址:https://www.cnblogs.com/wuren-best/p/13746893.html
Copyright © 2011-2022 走看看