zoukankan      html  css  js  c++  java
  • Mapreduce数据清洗

    Result文件数据说明:

    Ip106.39.41.166,(城市)

    Date10/Nov/2016:00:01:02 +0800,(日期)

    Day10,(天数)

    Traffic: 54 ,(流量)

    Type: video,(类型:视频video或文章article

    Id: 8701(视频或者文章的id

    测试要求:

    1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中

    两阶段数据清洗:

    1)第一阶段:把需要的信息从原始日志中提取出来

    ip:    199.30.25.88

    time:  10/Nov/2016:00:01:03 +0800

    traffic:  62

    文章: article/11325

    视频: video/3235

    2)第二阶段:根据提取出来的信息做精细化操作

    ip--->城市 cityIP

    date--> time:2016-11-10 00:01:03

    day: 10

    traffic:62

    type:article/video

    id:11325

    3hive数据库表结构: 

    create table data(  ip string, time string,day string,traffic bigint,type string, id   string ) 

    package test;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Locale;
    
    
    
    
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class Filter {
        
        public static class Map extends Mapper<Object, Text, Text, NullWritable> {
            private static Text newKey = new Text();
    
            /*public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                System.out.println(line);
                String arr[] = line.split(" ");
                newKey.set(arr[1]);
                context.write(newKey, NullWritable.get());
                System.out.println(newKey);
            }
        }*/
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String S1 = value.toString    ();
             LogParser parser = new LogParser();
                final String[] array = parser.parse(S1);
            System.out.println(S1);
            /*System.out.format(
                    "解析结果:  ip=%s, time=%s,day=%s, traffic=%s, type=%s,id=%s",
                    array[0], array[1], array[2], array[3], array[4],array[5]);*/
            String a=array[0];
            String u=array[1];
            String c=array[2];
            String d=array[3];
            String e=array[4];
            String f=array[5];
            
            String str = a +" "+u +" "+c+" "+d+" "+e+" "+f;
            
            newKey.set(str);
            context.write(newKey, NullWritable.get());
            System.out.println(newKey);
        }
    }
    
        public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
            public void reduce(Text key, Iterable<NullWritable> values, Context context)
                    throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            System.out.println("start");
            
        
            Job job = new Job(conf, "filter");
            job.setJarByClass(Filter.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path in = new Path("hdfs://localhost:9000/user/hadoop/in/Result");
            Path out = new Path("hdfs://localhost:9000/user/hadoop/out");
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        static class MyMapper extends
        Mapper<LongWritable, Text, LongWritable, Text> {
    LogParser logParser = new LogParser();
    Text outputValue = new Text();
    
    protected void map(
            LongWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws java.io.IOException, InterruptedException {
        final String[] parsed = logParser.parse(value.toString());
    
        // step1.过滤掉静态资源访问请求
        if (parsed[2].startsWith("GET /static/")
                || parsed[2].startsWith("GET /uc_server")) {
            return;
        }
        // step2.过滤掉开头的指定字符串
        if (parsed[2].startsWith("GET /")) {
            parsed[2] = parsed[2].substring("GET /".length());
        } else if (parsed[2].startsWith("POST /")) {
            parsed[2] = parsed[2].substring("POST /".length());
        }
        // step3.过滤掉结尾的特定字符串
        if (parsed[2].endsWith(" HTTP/1.1")) {
            parsed[2] = parsed[2].substring(0, parsed[2].length()
                    - " HTTP/1.1".length());
        }
        // step4.只写入前三个记录类型项
        outputValue.set(parsed[0] + "	" + parsed[1] + "	" + parsed[2]);
        context.write(key, outputValue);
    }
    }
    
    static class MyReducer extends
        Reducer<LongWritable, Text, Text, NullWritable> {
    protected void reduce(
            LongWritable k2,
            java.lang.Iterable<Text> v2s,
            org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
            throws java.io.IOException, InterruptedException {
        for (Text v2 : v2s) {
            context.write(v2, NullWritable.get());
        }
    };
    }
    
    /*
    * 日志解析类
    */
    static class  LogParser {
    public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
            "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
    public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");
    
    
    
    /**
     * 解析英文时间字符串
     * 
     * @param string
     * @return
     * @throws ParseException
     */
    private Date parseDateFormat(String string) {
        Date parse = null;
        try {
            parse = FORMAT.parse(string);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return parse;
    }
    
    /**
     * 解析日志的行记录
     * 
     * @param line
     * @return 数组含有5个元素,分别是ip、时间、日期、状态、流量
     */
    public String[] parse(String line) {
        String ip = parseIP(line);
        String time = parseTime(line);
        String day = parseday(line);
        String traffic = parseTraffic(line);
        String  type = parsertype(line);
        String  id = parseid( line);
    
    
        return new String[] { ip, time, day,traffic , type, id };
    }
    private String parseIP(String line) {
        String ip = line.split(",")[0].trim();
        return ip;
    }
    
    private String parseTime(String line) {
        final int first = line.indexOf(",");
        final int last = line.indexOf(" +0800,");
        String time = line.substring(first + 1, last).trim();
        Date date = parseDateFormat(time);
        return dateformat1.format(date);
    }
    
    private String parseday(String line) {
        String riqi = line.split(",")[2].trim();
        return riqi;
    }
    private String parseTraffic(String line) {
        String riqi = line.split(",")[3].trim();
        return riqi;
    }
    //private String parseTraffic(String line) {
       // final String trim = line.substring(line.lastIndexOf(",") + 1)
          //      .trim();
        //String traffic = trim.split(" ")[0];
        //return traffic;
    //}
    
    //private String parsertype(String line) {
     //   final int first = line.indexOf(",");
       // final int last = line.lastIndexOf(",");
      //  String url = line.substring(first + 1, last);
      //  return url;
    //}
    private String parsertype(String line) {
        String riqi = line.split(",")[4].trim();
        return riqi;
    }
    
    private String parseid(String line) {
        final String trim = line.substring(line.lastIndexOf(",") + 1)
                .trim();
        String id = trim.split(" ")[0];
        return id;
    }
    
    
    
    
    
    
    }
    
        
        
    }

    load data inpath 'hdfs://localhost:9000/user/hadoop/out/part-r-00000' overwrite into table data;

     将清洗后的数据导入data的表里

    select * from data;

  • 相关阅读:
    MYSQL中replace into的用法
    Typora自定义样式
    Advanced Installer轻松带你入门
    H2数据库入门,看这篇就对了
    Linux之vim的使用
    Linux文件上传与下载
    @ConfigurationProperties 注解使用姿势,这一篇就够了
    Javadoc 使用详解
    MySQL学习提升
    JS前端获取用户的ip地址的方法
  • 原文地址:https://www.cnblogs.com/zlj843767688/p/11853916.html
Copyright © 2011-2022 走看看