zoukankan      html  css  js  c++  java
  • Mapreduce的排序(全局排序、分区加排序、Combiner优化)

    一、MR排序的分类

      1.部分排序:MR会根据自己输出记录的KV对数据进行排序,保证输出到每一个文件内存都是经过排序的;

      2.全局排序;

      3.辅助排序:再第一次排序后经过分区再排序一次;

      4.二次排序:经过一次排序后又根据业务逻辑再次进行排序。

    二、MR排序的接口——WritableComparable

      该接口继承了Hadoop的Writable接口和Java的Comparable接口,实现该接口要重写write、readFields、compareTo三个方法。

    三、流量统计案例的排序与分区

    /**
     * @author: PrincessHug
     * @date: 2019/3/24, 15:36
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class FlowSortBean implements WritableComparable<FlowSortBean> {
        private long upFlow;
        private long dwFlow;
        private long flowSum;
    
        public FlowSortBean() {
        }
    
        public FlowSortBean(long upFlow, long dwFlow) {
            this.upFlow = upFlow;
            this.dwFlow = dwFlow;
            this.flowSum = upFlow + dwFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDwFlow() {
            return dwFlow;
        }
    
        public void setDwFlow(long dwFlow) {
            this.dwFlow = dwFlow;
        }
    
        public long getFlowSum() {
            return flowSum;
        }
    
        public void setFlowSum(long flowSum) {
            this.flowSum = flowSum;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(dwFlow);
            out.writeLong(flowSum);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            dwFlow = in.readLong();
            flowSum = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + "	" + dwFlow + "	" + flowSum;
        }
    
        @Override
        public int compareTo(FlowSortBean o) {
            return this.flowSum > o.getFlowSum() ? -1:1;
        }
    }
    
    public class FlowSortMapper extends Mapper<LongWritable, Text,FlowSortBean,Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取数据
            String line = value.toString();
    
            //切分数据
            String[] fields = line.split("	");
    
            //封装数据
            long upFlow = Long.parseLong(fields[1]);
            long dwFlow = Long.parseLong(fields[2]);
    
            //传输数据
            context.write(new FlowSortBean(upFlow,dwFlow),new Text(fields[0]));
        }
    }
    
    public class FlowSortReducer extends Reducer<FlowSortBean,Text,Text,FlowSortBean> {
        @Override
        protected void reduce(FlowSortBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(),key);
        }
    }
    
    public class FlowSortPartitioner extends Partitioner<FlowSortBean, Text> {
        @Override
        public int getPartition(FlowSortBean key, Text value, int i) {
            String phoneNum = value.toString().substring(0, 3);
    
            int partition = 4;
            if ("135".equals(phoneNum)){
                return 0;
            }else if ("137".equals(phoneNum)){
                return 1;
            }else if ("138".equals(phoneNum)){
                return 2;
            }else if ("139".equals(phoneNum)){
                return 3;
            }
            return partition;
        }
    }
    
    public class FlowSortDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //设置配置,初始化Job类
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            //设置执行类
            job.setJarByClass(FlowSortDriver.class);
    
            //设置Mapper、Reducer类
            job.setMapperClass(FlowSortMapper.class);
            job.setReducerClass(FlowSortReducer.class);
    
            //设置Mapper输出数据类型
            job.setMapOutputKeyClass(FlowSortBean.class);
            job.setMapOutputValueClass(Text.class);
    
            //设置Reducer输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowSortBean.class);
    
            //设置自定义分区
            job.setPartitionerClass(FlowSortPartitioner.class);
            job.setNumReduceTasks(5);
    
            //设置文件输入输出类型
            FileInputFormat.setInputPaths(job,new Path("G:\mapreduce\flow\flowsort\in"));
            FileOutputFormat.setOutputPath(job,new Path("G:\mapreduce\flow\flowsort\partitionout"));
    
            //提交任务
            if (job.waitForCompletion(true)){
                System.out.println("运行完成!");
            }else {
                System.out.println("运行失败!");
            }
    
        }
    }
    

      注意:再写Mapper类的时候,要注意KV对输出的数据类型,Key的类型一定要为FlowSortBean,因为在Mapper和Reducer之间进行的排序(只是排序)是通过Mapper输出的Key来进行排序的,而分区可以指定是通过Key或者Value。

    四、Combiner合并

      Combiner是在MR之外的一个组件,可以用来在maptask输出到环形缓冲区溢写之后,分区排序完成时进行局部的汇总,可以减少网络传输量,进而优化MR程序。

      Combiner是用在当数据量到达一定规模之后的,小的数据量并不是很明显。

      例如WordCount程序,当单词文件的大小到达一定程度,可以使用自定义Combiner进行优化:

    public class WordCountCombiner extends Reducer<Text,IntWritable,Text,IntWritable>{
    	protected void reduce(Text key,Iterable<IntWritable> values,Context context){
    		//计数
    		int count = 0;
    		
    		//累加求和
    		for(IntWritable v:values){
    			count += v.get();
    		}
    		//输出
    		context.write(key,new IntWritable(count));
    	}
    }
    

      然后再Driver类中设置使用Combiner类

    job.setCombinerClass(WordCountCombiner.class);
    

      如果仔细观察,WordCount的自定义Combiner类与Reducer类是完全相同的,因为他们的逻辑是相同的,即在maptask之后的分区内先进行一次累加求和,然后到reducer后再进行总的累加求和,所以在设置Combiner时也可以这样:

    job.setCombinerClass(WordCountReducer.class);
    

      注意:Combiner的应用一定要注意不能影响最终业务逻辑的情况下使用,比如在求平均值的时候:

      mapper输出两个分区:3,5,7  =>avg=5

                2,6    =>avg=4

      reducer合并输出:  5,4     =>avg=4.5  但是实际应该为4.6,错误!

      所以在使用Combiner时要注意其不会影响最中的结果!!!

  • 相关阅读:
    Aurora 数据库支持多达五个跨区域只读副本
    Amazon RDS 的 Oracle 只读副本
    Amazon EC2 密钥对
    DynamoDB 读取请求单位和写入请求单位
    使用 EBS 优化的实例或 10 Gb 网络实例
    启动 LAMP 堆栈 Web 应用程序
    AWS 中的错误重试和指数退避 Error Retries and Exponential Backoff in AWS
    使用 Amazon S3 阻止公有访问
    路由表 Router Table
    使用MySQLAdmin工具查看QPS
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10591267.html
Copyright © 2011-2022 走看看