zoukankan      html  css  js  c++  java
  • mapreduce数据处理——统计排序

    接上篇https://www.cnblogs.com/sengzhao666/p/11850849.html

    2、数据处理:

    ·统计最受欢迎的视频/文章的Top10访问次数 (id

    ·按照地市统计最受欢迎的Top10课程 (ip

    ·按照流量统计最受欢迎的Top10课程 (traffic

    分两步:

    统计;排序

    初始文件部分样例:

    1.192.25.84    2016-11-10-00:01:14    10    54    video    5551    
    1.194.144.222    2016-11-10-00:01:20    10    54    video    3589    
    1.194.187.2    2016-11-10-00:01:05    10    54    video    2212    
    1.203.177.243    2016-11-10-00:01:18    10    6050    video    7361    
    1.203.177.243    2016-11-10-00:01:19    10    72    video    7361    
    1.203.177.243    2016-11-10-00:01:22    10    6050    video    7361    
    1.30.162.63    2016-11-10-00:01:46    10    54    video    3639    
    1.84.205.195    2016-11-10-00:01:12    10    54    video    1412    

    统计:

    package priv.tzk.mapreduce.dataProcess.visits;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class DataVisits {
        public static String INPUT_PATH="/home/hadoop/out";  
        public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out";    
    
        public static class Map extends Mapper<Object,Text,Text,IntWritable>{    //将输入输出作为string类型,对应Text类型
                private static Text newKey=new Text();    //每一行作为一个数据  
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException{   
                    String line=value.toString();//转为字符串类型
                    //System.out.println(line);
                    if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce
                    {
                        String arr[]=line.split("	");//splite是按照输入的值拆分成数组
                        newKey.set(arr[5]);
                        int click=1;
                        context.write(newKey,new IntWritable(click)); 
                        //System.out.println(newKey+"  "+new IntWritable(click));
                    }
                 } 
             }   
             
        public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{   
            public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{      
                     int count=0;
                     for(IntWritable val:values) {
                         //Iterable迭代器
                         count++;
                     }         
                     context.write(key,new IntWritable(count));
                     //System.out.println("reduceStart");
                 }   
            }    
            
            public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{              
                Configuration conf=new Configuration();   
                System.out.println("start");
                Job job=Job.getInstance(conf); 
                job.setJobName("MyAverage");
                //Job job =new Job(conf,"MyAverage");
                job.setJarByClass(DataVisits.class);
                job.setMapperClass(Map.class);  
                job.setReducerClass(Reduce.class);
                job.setOutputKeyClass(Text.class);  
                job.setOutputValueClass(IntWritable.class);//设置map的输出格式
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                Path outputpath=new Path(OUTPUT_PATH); 
                Path inputpath=new Path(INPUT_PATH); 
                FileInputFormat.addInputPath(job,inputpath );  
                FileOutputFormat.setOutputPath(job,outputpath);  
                boolean flag = job.waitForCompletion(true);
                System.out.println(flag);
                System.exit(flag? 0 : 1);
             }
            
    }

    统计部分结果样例:

    10061    1
    10077    1
    10198    1
    10290    1
    10314    1
    10324    1
    1034    1
    10400    1
    10421    1
    10427    1
    10450    1
    10505    1
    10506    7
    10511    1

    针对统计结果排序:

    package priv.tzk.mapreduce.dataProcess.visits;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class visitsSort {
        public static String INPUT_PATH="/home/hadoop/visits_out";  
        public static String OUTPUT_PATH="hdfs://localhost:9000/mapReduce/mymapreduce1/out1";    
            
        public static class Sort extends WritableComparator {
            public Sort(){
            //这里就是看你map中填的输出key是什么数据类型,就给什么类型
            super(IntWritable.class,true);
            }
            @Override
            public int compare(WritableComparable a, WritableComparable b) {
            return -a.compareTo(b);//加个负号就是倒序,把负号去掉就是正序。
            }
        }
        
        public static class Map extends Mapper<Object,Text,IntWritable,Text>{    //将输入输出作为string类型,对应Text类型
                private static Text mid=new Text(); 
                private static IntWritable num=new IntWritable();
                public void map(Object key, Text value, Context context) throws IOException, InterruptedException{   
                    String line=value.toString();//转为字符串类型
                    if(!("".equals(line)))//增加控制语句,使得line为”“时能够停止。否则不符合reduce接受的数据不会执行reduce
                    {
                        String arr[]=line.split("	");//splite是按照输入的值拆分成数组
                        mid.set(arr[0]);
                        num.set(Integer.parseInt(arr[1]));
                        context.write(num,mid); 
                    }
                 } 
             }   
             //MapReduce框架默认排序规则。它是按照key值进行排序的
        public static class Reduce extends Reducer<IntWritable,Text,IntWritable,Text>{ 
            private static int i=0;
            public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException,InterruptedException{      
    
                     for(Text val:values) {
                         //Iterable迭代器
                         if(i<10) {
                             i++;
                             context.write(key, val);
                         }
                     }
                     //System.out.println("reduceStart");
                 }   
            }    
            
            public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{              
                Configuration conf=new Configuration();   
                System.out.println("start");
                Job job=Job.getInstance(conf); 
                //Job job =new Job(conf,"");
                job.setJarByClass(visitsSort.class);
                job.setMapperClass(Map.class);  
                job.setReducerClass(Reduce.class);
                job.setSortComparatorClass(Sort.class);
                //设置map的输出格式
                job.setOutputKeyClass(IntWritable.class);  
                job.setOutputValueClass(Text.class);
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                Path outputpath=new Path(OUTPUT_PATH); 
                Path inputpath=new Path(INPUT_PATH); 
                FileInputFormat.addInputPath(job,inputpath );  
                FileOutputFormat.setOutputPath(job,outputpath);  
                boolean flag = job.waitForCompletion(true);
                System.out.println(flag);
                System.exit(flag? 0 : 1);
             }
            
    }

    排序结果:

    31    2402
    19    1309
    18    3078
    18    2801
    16    5683
    16    3369
    16    1336
    16    4018
    15    11239
    15    13098
  • 相关阅读:
    一次性解决window系统下,git日志乱码的问题
    多线程之线程状态,状态切换种类及代码实例
    mybatis 第一个demo,并记一次解决问题:Mapped Statements collection does not contain value for
    有100盏灯,分别写上编号1~100,同样地 有100个开关,写上编号1~100。当我按1号开关,写上1的倍数的灯会开/关(如果灯开着就关,相反地,关着就会开),当我按2号开关,写上2的倍数的灯会开/关,如此类推
    阿里云云服务器 centos 7.4 安装mysql 过程记录
    java实现树形输出
    MATLAB入门笔记
    经测试稳定可用的蓝牙链接通信Demo,记录过程中遇到的问题的思考和解决办法,并整理后给出一个Utils类可以简单调用来实现蓝牙功能
    View的相关原理(读书笔记)
    JAVA设计方法思考之如何实现一个方法执行完毕后自动执行下一个方法
  • 原文地址:https://www.cnblogs.com/sengzhao666/p/11862763.html
Copyright © 2011-2022 走看看