zoukankan      html  css  js  c++  java
  • Hadoop全排序

    1、未分区,按照key排序

    1、mapper,输出<k,v>都为intwritable

    package com.cr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class MaxTempMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String[] arr = value.toString().split(" ");
            context.write(new IntWritable(Integer.parseInt(arr[0])),new IntWritable(Integer.parseInt(arr[1])));
    
        }
        }
    
    

    2、reducer,输入输出<k,v>都为intwritable

    package com.cr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class MaxTempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int max = Integer.MIN_VALUE;
            for(IntWritable iw :values){
                max = max > iw.get()? max : iw.get();
            }
            context.write(key,new IntWritable(max));
    
        }
    }
    

    3、maxTempApp

    package com.cr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MaxTempApp {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
    
            //设置job的各种属性
            job.setJobName("MaxTempApp");                 //设置job名称
            job.setJarByClass(MaxTempApp.class);              //设置搜索类
            job.setInputFormatClass(TextInputFormat.class);
    
            //设置输入路径
            FileInputFormat.addInputPath(job,new Path((args[0])));
            //设置输出路径
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setMapperClass(MaxTempMapper.class);               //设置mapper类
            job.setReducerClass(MaxTempReducer.class);               //设置reduecer类
            job.setNumReduceTasks(1);                         //设置reduce个数
    
            job.setMapOutputKeyClass(IntWritable.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
            job.setOutputKeyClass(IntWritable.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
            job.waitForCompletion(true);
        }
    }
    

    4、按照key升序排列

    1995	15
    2000	13
    2015	45
    2018	34
    

    2、自定义分区

    1、定义分区函数

    package com.cr.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class MyPartitoner extends Partitioner<IntWritable,IntWritable> {
    
    
        public int getPartition(IntWritable year, IntWritable temp, int parts) {
            int tmp = year.get() - 1995;
            if(tmp < 8){
                return 0;
            }
            else if(tmp >=8 && tmp <16){
                return 1;
            }
            else {
                return 2;
            }
        }
    }

    2、增加分区数和reducer个数

    package com.cr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MaxTempApp {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //单例作业
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","file:///");
            Job job = Job.getInstance(conf);
    
            //设置job的各种属性
            job.setJobName("MaxTempApp");                 //设置job名称
            job.setJarByClass(MaxTempApp.class);              //设置搜索类
            job.setInputFormatClass(TextInputFormat.class);
    
            //设置输入路径
            FileInputFormat.addInputPath(job,new Path((args[0])));
            //设置输出路径
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setPartitionerClass(MyPartitoner.class);
            job.setMapperClass(MaxTempMapper.class);               //设置mapper类
            job.setReducerClass(MaxTempReducer.class);               //设置reduecer类
            job.setNumReduceTasks(3);                         //设置reduce个数
    
            job.setMapOutputKeyClass(IntWritable.class);            //设置之map输出key
            job.setMapOutputValueClass(IntWritable.class);   //设置map输出value
            job.setOutputKeyClass(IntWritable.class);               //设置mapreduce 输出key
            job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
            job.waitForCompletion(true);
        }
    }
    

    3、结果根据自定义分区范围进入不同的分区


    part-r-00000

    1995	15
    1996	23
    1997	234
    1998	43
    1999	32
    2000	13
    2001	45
    2002	32

    part-r-00001

    2003	23
    2004	12
    2005	23
    2006	45
    

    part-r-00002

    2011	38
    2015	45
    2018	34
    2021	45
    2024	12
    2033	345
    







































    欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
  • 相关阅读:
    .NET XmlNavigator with Namespace
    编程要素
    【FOJ】1962 新击鼓传花游戏
    【POJ】1389 Area of Simple Polygons
    【POJ】2482 Stars in Your Window
    【HDU】3265 Posters
    【HDU】1199 Color the Ball
    【HDU】3642 Get The Treasury
    【HDU】4027 Can you answer these queries?
    【HDU】1542 Atlantis
  • 原文地址:https://www.cnblogs.com/flyingcr/p/10326961.html
Copyright © 2011-2022 走看看