zoukankan      html  css  js  c++  java
  • Hadoop 系列(三)Top N

    一:流程分析

    Top N简介

            关系数据库中经常有Top n数据查询的大部分是以下四种需求

            1.直接min或者max就可以取得最大或者最小的数据  (top 1)

            2.升级一点就再加上一个groupby取一个分组内的最大值,最小值(分组内的top1)

            3.top 10需求,使用order函数取一个前10

            4.分组内的top 10需求,使用window 函数生成一个虚拟列,虚拟列取< 11的数据就可以

            相同的我们在mapreduce中也可能需要实现这种需求:

            1.key取相同的值,value取最大值,或者最小值就可以。(优化一点的就是在map阶段就聚合部分的数据,不然容易数据倾斜,其实就是Combiner,但是没有做过helloworld,自己先试试

            2.key取groupby的值,value取最大值,最小值。(优化方案:map阶段取出来组内的最大最小值)

            3.key取相同的值,value取一个前10

            4.key取groupby的值,value取一个前10

    我们可以把这四种全部都实现一下:有一点需要谨记:所有的map函数和reduce函数都不是只执行一次的

    代码

    数据:

    2020040112 1
    2020040113 3
    2020040114 4
    2020040115 5
    2020040116 6
    2020040117 7
    2020040118 8
    2020040119 9
    2020040312 1
    2020040313 3
    2020040314 4
    2020040315 5
    2020040316 6
    2020040317 7
    2020040318 8
    2020040319 9
    2020040412 1
    2020040413 3
    2020040414 4
    2020040415 5
    2020040416 6
    2020040417 7
    2020040418 8
    2020040419 9

    代码1 输出最高温度和最低温度:

    package org.example;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private int max = 0;
    private int min = 0;
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line[] = value.toString().split(" ");
    if(Integer.valueOf(line[1]) > max){
    max = Integer.valueOf(line[1]);
    }
    if(Integer.valueOf(line[1]) < min){
    min = Integer.valueOf(line[1]);
    }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
    context.write(new Text("min"),new IntWritable(min));
    context.write(new Text("max"),new IntWritable(max));
    }
    }
    class WordcountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private int max = 0;
    private int min = 0;
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    for (IntWritable value : values) {
    if(value.get() > max){
    max = value.get();
    }
    if(value.get() < min){
    min = value.get();
    }
    }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
    context.write(new Text("min"),new IntWritable(min));
    context.write(new Text("max"),new IntWritable(max));
    }
    }
    public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "file:///");
    FileSystem fs= FileSystem.get(conf);
    String outputPath = "/software/java/data/output/";
    if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);

    Job job = Job.getInstance(conf);
    job.setJarByClass(WordcountDriver.class);
    job.setMapperClass(WordcountMapper.class);
    job.setReducerClass(WordcountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    //将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
    //job.submit();
    boolean res = job.waitForCompletion(true);
    }

    }
    
    
    代码2 分组内输出最高温度和最低温度:
    package org.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    class WordcountMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String,String> minmaxMap = new HashMap<String,String>();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line[] = value.toString().split("\ ");
            String date = line[0].substring(0,line[0].length()-2);
            int temperature = Integer.parseInt(line[1]);
            if(minmaxMap.containsKey(date)){
    
                int max = Integer.parseInt(minmaxMap.get(date).split("\:")[0]);
                int min = Integer.parseInt(minmaxMap.get(date).split("\:")[1]);
    
                if(temperature > max){
                    minmaxMap.put(date,temperature+":"+min);
                }
                if(temperature < min){
                    minmaxMap.put(date,max+":"+temperature);
                }
            }
            else{
                minmaxMap.put(date,temperature+":"+temperature);
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Map.Entry<String, String> dateTemperature :minmaxMap.entrySet()) {
                System.out.println("map"+dateTemperature.getKey() + "|"+dateTemperature.getValue());
                context.write(new Text(dateTemperature.getKey()),new Text(dateTemperature.getValue()));
            }
        }
    }
    class WordcountReducer extends Reducer<Text,Text,Text,Text> {
        private Map<String,String> minmaxMap = new HashMap<String,String>();
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text value:values ) {
                String date = key.toString();
                if(minmaxMap.containsKey(date)){
    
                    int existMax = Integer.parseInt(minmaxMap.get(date).split("\:")[0]);
                    int existMin = Integer.parseInt(minmaxMap.get(date).split("\:")[1]);
                    int max = Integer.parseInt(value.toString().split("\:")[0]);
                    int min = Integer.parseInt(value.toString().split("\:")[1]);
                    int finalMax = existMax > max ? existMax:max;
                    int finalMin = existMin < min ? existMin:min;
                    minmaxMap.put(date,finalMax+":"+finalMin);
                }
                else{
                    minmaxMap.put(date,value.toString());
                }
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Map.Entry<String, String> dateTemperature :minmaxMap.entrySet()) {
                System.out.println("reduce"+dateTemperature.getKey() + "|"+dateTemperature.getValue());
                context.write(new Text(dateTemperature.getKey()),new Text(dateTemperature.getValue()));
            }
        }
    }
    public class WordcountDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "file:///");
            FileSystem fs= FileSystem.get(conf);
            String outputPath = "/software/java/data/output/";
            if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(WordcountDriver.class);
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
    
            FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
            FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
            //将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行
            //job.submit();
            boolean res = job.waitForCompletion(true);
        }
    
    }

           代码3和代码4就不写了,因为差不多。

  • 相关阅读:
    C#的访问修饰符
    C#的数据类型
    iOS--小结系列八(继续)
    ios--小结系列八 控制器管理
    ios--小结系列八 程序启动原理
    ios--小结系列七 通知机制
    ios--小结系列六继续-UITableViewCell
    ios--小结系列六
    ios--小结系列五
    ios--小结系列四
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12733518.html
Copyright © 2011-2022 走看看