zoukankan      html  css  js  c++  java
  • mapreduce数据处理——统计排序

    接上篇https://www.cnblogs.com/sengzhao666/p/11850849.html

    2、数据处理:

    ·统计最受欢迎的视频/文章的Top10访问次数 (id

    ·按照地市统计最受欢迎的Top10课程 (ip

    ·按照流量统计最受欢迎的Top10课程 (traffic

    分两步:

    统计;排序

    初始文件部分样例:

    1.192.25.84    2016-11-10-00:01:14    10    54    video    5551    
    1.194.144.222    2016-11-10-00:01:20    10    54    video    3589    
    1.194.187.2    2016-11-10-00:01:05    10    54    video    2212    
    1.203.177.243    2016-11-10-00:01:18    10    6050    video    7361    
    1.203.177.243    2016-11-10-00:01:19    10    72    video    7361    
    1.203.177.243    2016-11-10-00:01:22    10    6050    video    7361    
    1.30.162.63    2016-11-10-00:01:46    10    54    video    3639    
    1.84.205.195    2016-11-10-00:01:12    10    54    video    1412    

    统计:

    package priv.tzk.mapreduce.dataProcess.visits;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class DataVisits {
        public static String INPUT_PATH="/home/hadoop/out";  
        public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out";    
    
        public static class Map extends Mapper<Object,Text,Text,IntWritable>{    //将输入输出作为string类型,对应Text类型
                private static Text newKey=new Text();    //每一行作为一个数据  
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException{   
                    String line=value.toString();//转为字符串类型
                    //System.out.println(line);
                    if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce
                    {
                        String arr[]=line.split("	");//splite是按照输入的值拆分成数组
                        newKey.set(arr[5]);
                        int click=1;
                        context.write(newKey,new IntWritable(click)); 
                        //System.out.println(newKey+"  "+new IntWritable(click));
                    }
                 } 
             }   
             
        public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{   
            public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      
                     int count=0;
                     for(IntWritable val:values) {
                         //Iterable迭代器
                         count++;
                     }         
                     context.write(key,new IntWritable(count));
                     //System.out.println("reduceStart");
                 }   
            }    
            
            public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{              
                Configuration conf=new Configuration();   
                System.out.println("start");
                Job job=Job.getInstance(conf); 
                job.setJobName("MyAverage");
                //Job job =new Job(conf,"MyAverage");
                job.setJarByClass(DataVisits.class);
                job.setMapperClass(Map.class);  
                job.setReducerClass(Reduce.class);
                job.setOutputKeyClass(Text.class);  
                job.setOutputValueClass(IntWritable.class);//设置map的输出格式
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                Path outputpath=new Path(OUTPUT_PATH); 
                Path inputpath=new Path(INPUT_PATH); 
                FileInputFormat.addInputPath(job,inputpath );  
                FileOutputFormat.setOutputPath(job,outputpath);  
                boolean flag = job.waitForCompletion(true);
                System.out.println(flag);
                System.exit(flag? 0 : 1);
             }
            
    }

    统计部分结果样例:

    10061    1
    10077    1
    10198    1
    10290    1
    10314    1
    10324    1
    1034    1
    10400    1
    10421    1
    10427    1
    10450    1
    10505    1
    10506    7
    10511    1

    针对统计结果排序:

    package priv.tzk.mapreduce.dataProcess.visits;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class visitsSort {
        public static String INPUT_PATH="/home/hadoop/visits_out";  
        public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out1";    
            
        public static class Sort extends WritableComparator {
            public Sort(){
            //这里就是看你map中填的输出key是什么数据类型,就给什么类型
            super(IntWritable.class,true);
            }
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
            return -a.compareTo(b);//加个负号就是倒序,把负号去掉就是正序。
            }
        }
        
        public static class Map extends Mapper<Object,Text,IntWritable,Text>{    //将输入输出作为string类型,对应Text类型
                private static Text mid=new Text(); 
                private static IntWritable num=new IntWritable();
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException{   
                    String line=value.toString();//转为字符串类型
                    if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce
                    {
                        String arr[]=line.split("	");//splite是按照输入的值拆分成数组
                        mid.set(arr[0]);
                        num.set(Integer.parseInt(arr[1]));
                        context.write(num,mid); 
                    }
                 } 
             }   
             //MapReduce框架默认排序规则。它是按照key值进行排序的
        public static class Reduce extends Reducer<IntWritable,Text,IntWritable,Text>{ 
            private static int i=0;
            public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException,InterruptedException{      
    
                     for(Text val:values) {
                         //Iterable迭代器
                         if(i<10) {
                             i++;
                             context.write(key, val);
                         }
                     }
                     //System.out.println("reduceStart");
                 }   
            }    
            
            public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{              
                Configuration conf=new Configuration();   
                System.out.println("start");
                Job job=Job.getInstance(conf); 
                //Job job =new Job(conf,"");
                job.setJarByClass(visitsSort.class);
                job.setMapperClass(Map.class);  
                job.setReducerClass(Reduce.class);
                job.setSortComparatorClass(Sort.class);
                //设置map的输出格式
                job.setOutputKeyClass(IntWritable.class);  
                job.setOutputValueClass(Text.class);
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                Path outputpath=new Path(OUTPUT_PATH); 
                Path inputpath=new Path(INPUT_PATH); 
                FileInputFormat.addInputPath(job,inputpath );  
                FileOutputFormat.setOutputPath(job,outputpath);  
                boolean flag = job.waitForCompletion(true);
                System.out.println(flag);
                System.exit(flag? 0 : 1);
             }
            
    }

    排序结果:

    31    2402
    19    1309
    18    3078
    18    2801
    16    5683
    16    3369
    16    1336
    16    4018
    15    11239
    15    13098
  • 相关阅读:
    【译文】纯HTML5捕获音频流和视频流
    Vue中scoped属性浅析
    jmeter 在liunx 环境下的测试问题汇总
    windows Jmeter 安装环境配置
    基于ghz 对grpc 服务进行压测
    使用powershell脚本自动发布
    SqlServer 重建索引脚本
    配置nginx代理本地多个站点到制定域名
    Xposed 插件开发入门教程(一)
    python 字典
  • 原文地址:https://www.cnblogs.com/sengzhao666/p/11862763.html
Copyright © 2011-2022 走看看