zoukankan      html  css  js  c++  java
  • Mapreduce的排序(全局排序、分区加排序、Combiner优化)

    一、MR排序的分类

      1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

      2.全局排序;

      3.辅助排序:再第一次排序后经过分区再排序一次;

      4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

    二、MR排序的接口——WritableComparable

      该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

    三、流量统计案例的排序与分区

    /**
     * @author: PrincessHug
     * @date: 2019/3/24, 15:36
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class FlowSortBean implements WritableComparable<FlowSortBean> {
        private long upFlow;
        private long dwFlow;
        private long flowSum;
    
        public FlowSortBean() {
        }
    
        public FlowSortBean(long upFlow, long dwFlow) {
            this.upFlow = upFlow;
            this.dwFlow = dwFlow;
            this.flowSum = upFlow + dwFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDwFlow() {
            return dwFlow;
        }
    
        public void setDwFlow(long dwFlow) {
            this.dwFlow = dwFlow;
        }
    
        public long getFlowSum() {
            return flowSum;
        }
    
        public void setFlowSum(long flowSum) {
            this.flowSum = flowSum;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(dwFlow);
            out.writeLong(flowSum);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            dwFlow = in.readLong();
            flowSum = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + "	" + dwFlow + "	" + flowSum;
        }
    
        @Override
        public int compareTo(FlowSortBean o) {
            return this.flowSum > o.getFlowSum() ? -1:1;
        }
    }
    
    public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取数据
            String line = value.toString();
    
            //切分数据
            String[] fields = line.split("	");
    
            //封装数据
            long upFlow = Long.parseLong(fields[1]);
            long dwFlow = Long.parseLong(fields[2]);
    
            //传输数据
            context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
        }
    }
    
    public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
        @Override
        protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }
    
    public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
        @Override
        public int getPartition(FlowSortBean key, Text value, int i) {
            String phoneNum = value.toString().substring(0, 3);
    
            int partition = 4;
            if ("135".equals(phoneNum)){
                return 0;
            }else if ("137".equals(phoneNum)){
                return 1;
            }else if ("138".equals(phoneNum)){
                return 2;
            }else if ("139".equals(phoneNum)){
                return 3;
            }
            return partition;
        }
    }
    
    public class FlowSortDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //设置配置,初始化Job类
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //设置执行类
            job.setJarByClass(FlowSortDriver.class);
    
            //设置Mapper、Reducer类
            job.setMapperClass(FlowSortMapper.class);
            job.setReducerClass(FlowSortReducer.class);
    
            //设置Mapper输出数据类型
            job.setMapOutputKeyClass(FlowSortBean.class);
            job.setMapOutputValueClass(Text.class);
    
            //设置Reducer输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowSortBean.class);
    
            //设置自定义分区
            job.setPartitionerClass(FlowSortPartitioner.class);
            job.setNumReduceTasks(5);
    
            //设置文件输入输出类型
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\flow\flowsort\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\flow\flowsort\partitionout"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
    
        }
    }
    

      注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

    四、Combiner合并

      Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

      Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

      例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

    public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
    	protected void reduce(Text key,Iterable<IntWritable> values,Context context){
    		//计数
    		int count = 0;
    		
    		//累加求和
    		for(IntWritable v:values){
    			count += v.get();
    		}
    		//输出
    		context.write(key,new IntWritable(count));
    	}
    }
    

      然后再Driver类中设置使用Combiner类

    job.setCombinerClass(WordCountCombiner.class);
    

      如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

    job.setCombinerClass(WordCountReducer.class);
    

      注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

      mapper输出两个分区:3,5,7  =>avg=5

                2,6    =>avg=4

      reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

      所以在使用Combiner时要注意其不会影响最中的结果!!!

  • 相关阅读:
    Idea导入tomcat源码
    SpringBoot学习 (一) Eclipse中创建新的SpringBoot项目
    Zookeeper客户端Curator使用详解
    解决老是提示找不到Mapper文件无法执行定义的方法问题!
    人工智能、机器学习、深度学习三者之间的关系
    java5增加对https的支持
    Spring静态注入的三种方式
    Thrift入门及Java实例演示【转】
    activeMQ 学习
    python中 import 和from ... import 的区别
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10591267.html
Copyright © 2011-2022 走看看