zoukankan      html  css  js  c++  java
  • Hadoop-MR实现日志清洗(三)

    Hadoop-MR实现日志清洗(三)

    5.论坛请求日志清洗解析

    请求日志的清洗主要是指过滤掉跟后续统计无关的数据,包括爬虫数据、静态资源数据、无用数据列等。根据需要,清洗过程中也可以对部门数据域进行数据转换,比如日期,以便简化后续的数据加工/统计分析。

    对日志的清洗逻辑上也是分为编写map、reduce、run(main)函数,在对输入数据处理时,日志的提取过滤较为复杂,通常是将文件处理的方法单独编写作为解析类,由map调用相关的方法。

    5.1解析日志的各个域

    单独编写的解析类,给map函数调用

    package com.leeyk99.hadoop;
    
    import java.text.ParseException;
    
    import java.text.SimpleDateFormat;
    
    import java.util.Date;
    
    import java.util.Locale;
    
    /**
    
    * 解析日志的每个数据列:日志的数据域大致可分为:IP 、"-"、"-"、TIME、URL、STATUS、STREAM、?、?等等
    
    * @author LIN
    
    */
    
    public class FieldParser {
    
        public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    
        public static final SimpleDateFormat FORMAT=new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
    
        /**
    
         * 解析日志记录
    
         * @return 数组,含有五个元素,分别是IPTIMEURLSTAUS流量
    
         */
    
        public String[] parseLog(String line){
    
            String ip=parseIP(line);
    
            String time=parseTime(line);
    
            String url=parseURL(line);
    
            String status=parseStatus(line);
    
            String stream=parseStream(line);
    
            String[] fields=new String[5];
    
            fields[0]=ip;
    
            fields[1]=time;
    
            fields[2]=url;
    
            fields[3]=status;
    
            fields[4]=stream;
    
            //String[] fields=new String[]{ip,time,url,status,stream};
    
            return fields;
    
        }
    
        private String parseStream(String line) {
    
            try{
    
                final String trim = line.substring(line.lastIndexOf(""")+1).trim();
    
                String stream = trim.split(" ")[1];
    
                return stream;
    
            }catch (ArrayIndexOutOfBoundsException e){
    
                e.printStackTrace();
    
                System.out.println(line);
    
            }finally{
    
                return null;
    
            }
    
        }
    
        private String parseStatus(String line) {
    
            final String trim = line.substring(line.lastIndexOf(""")+1).trim();
    
            String status = trim.split(" ")[0];
    
            return status;
    
        }
    
        private String parseURL(String line) {
    
            final String trim = line.split(""")[1].trim();
    
            String url = trim;
    
            return url;
    
        }
    
        private String parseTime(String line) {
    
            final String trim = line.split(""")[0].trim();
    
            String time = trim.split(" ")[3].substring(1);
    
            Date date=parseDateFormat(time);//原始字符串解析成date才能方便格式化为指定的字符串样式
    
            time=dateFormat.format(date);//转成20180903101923格式
    
            return time;
    
        }
    
        private String parseIP(String line) {
    
            final String trim = line.split(" ")[0].trim();
    
            String ip = trim;
    
            return ip;
    
        }
    
        /**
    
         * 日志时间转换  18/Sep/2013:16:16:16
    
         * @author LIN
    
         * @param 18/Sep/2013:16:16:16
    
         */
    
        private Date parseDateFormat(String time){
    
            Date formatTime=new Date();
    
            try{
    
                formatTime =FORMAT.parse(time);//FORMAT.parse解析String类型返回Date类型,FORMAT.format解析Date类型返回字符串类型
    
            }catch (ParseException e){
    
                e.printStackTrace();
    
            }
    
            return  formatTime;
    
        }
    
    }

    5.2编写map函数

    这里也演示了如何对多个字段进行传递输出的方法。

    package com.leeyk99.hadoop.mapreduce;
    
    import com.leeyk99.hadoop.FieldParser;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class LogMapper extends Mapper<LongWritable,Text,LongWritable,Text> {
    
        @Override
    
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            //super.map(key, value, context);
    
            String line=value.toString();
    
            FieldParser fieldParser=new FieldParser();
    
            String[] record=fieldParser.parseLog(line);
    
            /*数据预处理*/
    
            //1.过滤指定字符串开头的数据
    
            if( record[2].startsWith("GET /uc_server") || record[2].startsWith("GET /static") ){ //测试过滤数据
    
                return;
    
            }
    
            //2.数据域加工,这里是字符串截取
    
            if( record[2].startsWith("GET /")){
    
                record[2]=record[2].substring("GET /".length()-1);//或者5
    
            }else if(record[2].startsWith("POST /")){
    
                record[2]=record[2].substring("POST /".length()-1);
    
            }
    
            if (record[2].endsWith(" HTTP/1.1")){
    
                //System.out.println("1"+record[2]);
    
                record[2]=record[2].substring(0,record[2].length()-" HTTP/1.1".length());
    
                //System.out.println("2"+record[2]);
    
            }
    
            //3.列裁剪,进一步选取指定的列
    
            Text outPutValue=new Text();
    
            outPutValue.set(record[0]+"01"+record[1]+"01"+record[2]); //指定了01分隔符
    
            /*map输出,这个输出key使用的是LongWritable,输出的还是行号,没有像往常使用Text(维度)
    
            输出是Text,不像我们平时的IntWritable或DoubleWritable,这个不是在reduce中进行与group by类似计算的*/
    
            context.write(key,outPutValue);
    
        }
    
    }

    5.3编写reducer函数

    package com.leeyk99.hadoop.mapreduce;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.NullWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class LogReducer extends Reducer<LongWritable, Text, Text, NullWritable> {
    
        @Override
    
        protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
            //super.reduce(key, values, context);
    
            for(Text value : values){
    
                context.write(value, NullWritable.get());
    
            }
    
        }
    
    }

    5.4编写入口函数(main函数、run函数)

    package com.leeyk99.hadoop.mapreduce;
    
    //import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.conf.Configured;
    
    import org.apache.hadoop.fs.FileSystem;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.NullWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    //import org.apache.hadoop.util.GenericOptionsParser;
    
    import org.apache.hadoop.util.Tool;
    
    import org.apache.hadoop.util.ToolRunner;
    
    //import java.io.File;
    
    import java.net.URI;
    
    public class LogParser extends Configured implements Tool {
    
        //不能使用小写override
    
        //@Override 实现接口的方法不能注释为重写,一直红色波浪线提示不合规,程序运行正常,找了好久这个位置的异常。
    
        public int run(String[] args) throws Exception{
    
            if(args.length != 2){
    
                System.err.printf("Usage: %s [generic options ] <input> <output> 
    ",getClass().getSimpleName());
    
                ToolRunner.printGenericCommandUsage(System.err);
    
                return -1;
    
            }
    
            //getClass() 、getConf()
    
            //方法1:Hadoop权威指南写法
    
            /*Job job=new Job(getConf(),"Log parser");
    
            job.setJarByClass(getClass());*/
    
            //方法二:main写法,最简单写法
    
            Job job=new Job();
    
            job.setJarByClass(getClass());//getClass() 获取类名 LogParser.class
    
            job.setJobName("Log parser");
    
            //方法三:Configuration写法,网上写法
    
            /*Configuration conf=new Configuration();
    
            //String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
    
            Job job=new Job(conf, "Job_001");//新建一个job对象,并给了job任务名
    
            job.setJarByClass(LogParser.class);  //指定class
    
            //FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //输入路径
    
            //FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //输出路径*/
    
            FileInputFormat.addInputPath(job,new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
    
            job.setMapperClass(LogMapper.class);
    
            job.setMapOutputKeyClass(LongWritable.class); //与Reducer的不一致,需要指定
    
            job.setMapOutputValueClass(Text.class);
    
            /*使用这个后,map一直卡在22%不动,因为map的输出是<LongWritable,Text>,如果使用Combiner后,输出与reducer一致<Text, NullWritable>,
    
              这种输出是不能作为Reducer的输入的,因为输入要求是<LongWritable,Text>*/
    
            //job.setCombinerClass(LogReducer.class);
    
            job.setReducerClass(LogReducer.class);
    
            job.setOutputKeyClass(Text.class);
    
            job.setOutputValueClass(NullWritable.class);
    
            //Hdfs 输出目录删除
    
            FileSystem fs= FileSystem.get(new URI(args[0]),getConf());
    
            Path outPath=new Path(args[1]);
    
            if(fs.exists(outPath)){
    
                fs.delete(outPath,true);
    
            }
    
            return  job.waitForCompletion(true)?0:1;
    
        }
    
        public static void main(String[] args) throws Exception {
    
            int exitCode=ToolRunner.run(new LogParser(),args);
    
            System.exit(exitCode);
    
        }
    
        //使用本地,Hdfs 输出目录应该怎么删除呢
    
        /*private static void delDir(String path){
    
            File f=new File(path);
    
            if(f.exists()){
    
                if(f.isDirectory()){
    
                    String[] items=f.list();
    
                    for( String item : items ){
    
                        File f2=new File(path+"/"+item);
    
                        if(f2.isDirectory()){
    
                            delDir(path+"/"+item);
    
                        }
    
                        else{
    
                            f2.delete();
    
                        }
    
                    }
    
                }
    
                f.delete(); //删除文件或者最后的空目录
    
            }
    
            else{
    
                System.out.println("Output directory does not exist .");
    
            }
    
        }*/
    
    }
  • 相关阅读:
    hihocoder 1049 后序遍历
    hihocoder 1310 岛屿
    Leetcode 63. Unique Paths II
    Leetcode 62. Unique Paths
    Leetcode 70. Climbing Stairs
    poj 3544 Journey with Pigs
    Leetcode 338. Counting Bits
    Leetcode 136. Single Number
    Leetcode 342. Power of Four
    Leetcode 299. Bulls and Cows
  • 原文地址:https://www.cnblogs.com/leeyuki/p/9584217.html
Copyright © 2011-2022 走看看