zoukankan      html  css  js  c++  java
  • MapReduce分区和排序

    一、排序

    排序:
    
    需求:根据用户每月使用的流量按照使用的流量多少排序
    
    接口-->WritableCompareable
    
        排序操作在hadoop中属于默认的行为。默认按照字典殊勋排序。
        
    排序的分类:
    
        1)部分排序
        
        2)全排序
        
        3)辅助排序
        
        4)二次排序
        
    Combiner 合并
    
        父类Reducer
        局部汇总 ,减少网络传输量 ,进而优化程序。
        
        注意:求平均值?
        
        3  5  7  2  6
        
        mapper: (3 + 5 + 7)/3 = 5
                (2 + 6)/2 = 4
                
        reducer:(5+4)/2
        
        只能应用在不影响最终业务逻辑的情况下

    二、分区和排序实例

    1.Mapper类

    package com.css.flowsort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 1.获取一行数据
            String line = value.toString();
            // 2.切割
            String[] fields = line.split("	");
            // 3.取出关键字段
            long upFlow = Long.parseLong(fields[1]);
            long dfFlow = Long.parseLong(fields[2]);
            // 4.写出到reducer阶段
            context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
        }
    }

    2.Reducer类

    package com.css.flowsort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
    
        @Override
        protected void reduce(FlowBean key, Iterable<Text> value, Context context)
                throws IOException, InterruptedException {     
            context.write(value.iterator().next(), key);
        }
    }

    3.封装类

    package com.css.flowsort;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    // 封装类 直接完成排序
    public class FlowBean implements WritableComparable<FlowBean> {
    
        // 定义属性
        private long upFlow;
        private long dfFlow;
        private long flowSum;
        
        // 无参构造
        public FlowBean() {        
        }
        
        // 有参构造
        public FlowBean(long upFlow,long dfFlow){
            this.upFlow = upFlow;
            this.dfFlow = dfFlow;
            this.flowSum = upFlow + dfFlow;
        }
        
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDfFlow() {
            return dfFlow;
        }
    
        public void setDfFlow(long dfFlow) {
            this.dfFlow = dfFlow;
        }
    
        public long getFlowSum() {
            return flowSum;
        }
    
        public void setFlowSum(long flowSum) {
            this.flowSum = flowSum;
        }
        
        // 反序列化
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            dfFlow = in.readLong();
            flowSum = in.readLong();
        }
    
        // 序列化
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(dfFlow);
            out.writeLong(flowSum);
        }
        
        @Override
        public String toString() {
            return upFlow + "	" + dfFlow + "	" + flowSum;
        }
    
        // 排序
        @Override
        public int compareTo(FlowBean o) {
            // 倒序
            return this.flowSum > o.getFlowSum() ? -1 : 1;
        }
    }

    4.自定义分区类

    package com.css.flowsort;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class FlowSortPartitioner extends Partitioner<FlowBean, Text>{
    
        // 根据手机号前三位进行分区
        @Override
        public int getPartition(FlowBean key, Text value, int numPartitions) {
            // 获取手机号前三位
            String phoneNum = value.toString().substring(0, 3);
            // 分区
            int partitioner = 4;
            if ("135".equals(phoneNum)) {
                return 0;
            }else if ("137".equals(phoneNum)) {
                return 1;
            }else if ("138".equals(phoneNum)) {
                return 2;
            }else if ("139".equals(phoneNum)) {
                return 3;
            }
            return partitioner;
        }
    }

    5.Driver类

    package com.css.flowsort;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FlowSortDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 1.获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            // 2.获取jar包
            job.setJarByClass(FlowSortDriver.class);
            
            // 3.获取自定义的mapper与reducer类
            job.setMapperClass(FlowSortMapper.class);
            job.setReducerClass(FlowSortReducer.class);
            
            // 4.设置map输出的数据类型
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
            
            // 5.设置reduce输出的数据类型(最终的数据类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            
            //添加自定义分区
            job.setPartitionerClass(FlowSortPartitioner.class);
            job.setNumReduceTasks(5);
            
            // 6.设置输入存在的路径与处理后的结果路径
            FileInputFormat.setInputPaths(job, new Path("c:/flow1024/in"));
            FileOutputFormat.setOutputPath(job, new Path("c:/flow1024/out1"));
            
            // 7.提交任务
            boolean rs = job.waitForCompletion(true);
            System.out.println(rs ? 0 : 1);
        }
    }

    6.输入的文件part-r-00000

    13480253104    120    1320    1440
    13502468823    735    11349    12084
    13510439658    1116    954    2070
    13560436326    1136    94    1230
    13560436666    1136    94    1230
    13560439658    918    4938    5856
    13602846565    198    910    1108
    13660577991    660    690    1350
    13719199419    240    0    240
    13726130503    299    681    980
    13726238888    2481    24681    27162
    13760778710    120    120    240
    13822544101    264    0    264
    13884138413    4116    1432    5548
    13922314466    3008    3720    6728
    13925057413    11058    4243    15301
    13926251106    240    0    240
    13926435656    132    1512    1644
    15013685858    369    338    707
    15889002119    938    380    1318
    15920133257    316    296    612
    18212575961    1527    2106    3633
    18320173382    9531    212    9743

    7.如果第5步Driver类中的红色部分去掉,则输出全局排序后的文件part-r-00000

    13726238888    2481    24681    27162
    13925057413    11058    4243    15301
    13502468823    735    11349    12084
    18320173382    9531    212    9743
    13922314466    3008    3720    6728
    13560439658    918    4938    5856
    13884138413    4116    1432    5548
    18212575961    1527    2106    3633
    13510439658    1116    954    2070
    13926435656    132    1512    1644
    13480253104    120    1320    1440
    13660577991    660    690    1350
    15889002119    938    380    1318
    13560436326    1136    94    1230
    13560436666    1136    94    1230
    13602846565    198    910    1108
    13726130503    299    681    980
    15013685858    369    338    707
    15920133257    316    296    612
    13822544101    264    0    264
    13760778710    120    120    240
    13719199419    240    0    240
    13926251106    240    0    240

    8.如果第5步Driver类中的红色部分不去掉,则输出分区加排序后的文件

    (1)part-r-00000
    13502468823    735    11349    12084
    13560439658    918    4938    5856
    13510439658    1116    954    2070
    13560436666    1136    94    1230
    13560436326    1136    94    12302)part-r-00001
    13726238888    2481    24681    27162
    13726130503    299    681    980
    13760778710    120    120    240
    13719199419    240    0    2403)part-r-00002
    13884138413    4116    1432    5548
    13822544101    264    0    2644)part-r-00003
    13925057413    11058    4243    15301
    13922314466    3008    3720    6728
    13926435656    132    1512    1644
    13926251106    240    0    2405)part-r-00004
    18320173382    9531    212    9743
    18212575961    1527    2106    3633
    13480253104    120    1320    1440
    13660577991    660    690    1350
    15889002119    938    380    1318
    13602846565    198    910    1108
    15013685858    369    338    707
    15920133257    316    296    612
  • 相关阅读:
    sublime Text3插件无法安装解决方法(提示There are no packages available installation)
    扫码消费数量栏点击可编辑,失去焦点还原样式
    浏览器内核
    js全选checkbox框
    word-wrap: break-word;与word-break: break-all;文本自动换行
    含加减的输入框
    linux ,mac连接, git pull error, chmod修改文件的权限/chown修改文件和目录的所有者
    vue修改对象的属性值后页面不重新渲染
    jsp 判断时间大小
    利用js实现 禁用浏览器后退| 去除上一个历史记录链接
  • 原文地址:https://www.cnblogs.com/areyouready/p/9853735.html
Copyright © 2011-2022 走看看