zoukankan      html  css  js  c++  java
  • Mapreduce 数据清洗 更改

    package test;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Locale;
    
    
    
    
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class Filter {
        
        public static class Map extends Mapper<Object, Text, Text, NullWritable> {
            private static Text newKey = new Text();
    
            /*public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                System.out.println(line);
                String arr[] = line.split(" ");
                newKey.set(arr[1]);
                context.write(newKey, NullWritable.get());
                System.out.println(newKey);
            }
        }*/
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String S1 = value.toString    ();
             LogParser parser = new LogParser();
                final String[] array = parser.parse(S1);
            System.out.println(S1);
            /*System.out.format(
                    "解析结果:  ip=%s, time=%s,day=%s, traffic=%s, type=%s,id=%s",
                    array[0], array[1], array[2], array[3], array[4],array[5]);*/
            String a=array[0];
            String u=array[1];
            String c=array[2];
            String d=array[3];
            String e=array[4];
            String f=array[5];
            
            String str = a +","+u +","+c+","+d+","+e+","+f;
            
            newKey.set(str);
            context.write(newKey, NullWritable.get());
            System.out.println(newKey);
        }
    }
    
        public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
            public void reduce(Text key, Iterable<NullWritable> values, Context context)
                    throws IOException, InterruptedException {
                context.write(key, NullWritable.get());
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            System.out.println("start");
            
        
            Job job = new Job(conf, "filter");
            job.setJarByClass(Filter.class);
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            Path in = new Path("hdfs://localhost:9000/user/hadoop/in/Result");
            Path out = new Path("hdfs://localhost:9000/user/hadoop/out");
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
        static class MyMapper extends
        Mapper<LongWritable, Text, LongWritable, Text> {
    LogParser logParser = new LogParser();
    Text outputValue = new Text();
    
    protected void map(
            LongWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
            throws java.io.IOException, InterruptedException {
        final String[] parsed = logParser.parse(value.toString());
    
        // step1.过滤掉静态资源访问请求
        if (parsed[2].startsWith("GET /static/")
                || parsed[2].startsWith("GET /uc_server")) {
            return;
        }
        // step2.过滤掉开头的指定字符串
        if (parsed[2].startsWith("GET /")) {
            parsed[2] = parsed[2].substring("GET /".length());
        } else if (parsed[2].startsWith("POST /")) {
            parsed[2] = parsed[2].substring("POST /".length());
        }
        // step3.过滤掉结尾的特定字符串
        if (parsed[2].endsWith(" HTTP/1.1")) {
            parsed[2] = parsed[2].substring(0, parsed[2].length()
                    - " HTTP/1.1".length());
        }
        // step4.只写入前三个记录类型项
        outputValue.set(parsed[0] + "	" + parsed[1] + "	" + parsed[2]);
        context.write(key, outputValue);
    }
    }
    
    static class MyReducer extends
        Reducer<LongWritable, Text, Text, NullWritable> {
    protected void reduce(
            LongWritable k2,
            java.lang.Iterable<Text> v2s,
            org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
            throws java.io.IOException, InterruptedException {
        for (Text v2 : v2s) {
            context.write(v2, NullWritable.get());
        }
    };
    }
    
    /*
    * 日志解析类
    */
    static class  LogParser {
    public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
            "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
    public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");
    
    
    
    /**
     * 解析英文时间字符串
     * 
     * @param string
     * @return
     * @throws ParseException
     */
    private Date parseDateFormat(String string) {
        Date parse = null;
        try {
            parse = FORMAT.parse(string);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return parse;
    }
    
    /**
     * 解析日志的行记录
     * 
     * @param line
     * @return 数组含有5个元素,分别是ip、时间、日期、状态、流量
     */
    public String[] parse(String line) {
        String ip = parseIP(line);
        String time = parseTime(line);
        String day = parseday(line);
        String traffic = parseTraffic(line);
        String  type = parsertype(line);
        String  id = parseid( line);
    
    
        return new String[] { ip, time, day,traffic , type, id };
    }
    private String parseIP(String line) {
        String ip = line.split(",")[0].trim();
        return ip;
    }
    
    private String parseTime(String line) {
        final int first = line.indexOf(",");
        final int last = line.indexOf(" +0800,");
        String time = line.substring(first + 1, last).trim();
        Date date = parseDateFormat(time);
        return dateformat1.format(date);
    }
    
    private String parseday(String line) {
        String riqi = line.split(",")[2].trim();
        return riqi;
    }
    private String parseTraffic(String line) {
        String riqi = line.split(",")[3].trim();
        return riqi;
    }
    //private String parseTraffic(String line) {
       // final String trim = line.substring(line.lastIndexOf(",") + 1)
          //      .trim();
        //String traffic = trim.split(" ")[0];
        //return traffic;
    //}
    
    //private String parsertype(String line) {
     //   final int first = line.indexOf(",");
       // final int last = line.lastIndexOf(",");
      //  String url = line.substring(first + 1, last);
      //  return url;
    //}
    private String parsertype(String line) {
        String riqi = line.split(",")[4].trim();
        return riqi;
    }
    
    private String parseid(String line) {
        final String trim = line.substring(line.lastIndexOf(",") + 1)
                .trim();
        String id = trim.split(" ")[0];
        return id;
    }
    
    
    
    
    
    
    }
    
        
        
    }

    将清洗后输出的分隔符改为“,”,然后建表里,用 逗号分隔开。

    create table if not exists hive.data(ip string,`time` string,day string,traffic bigint,type string,id string)row format delimited fields terminated by ','; 建表语句

    load data inpath 'hdfs://localhost:9000/user/hadoop/out/part-r-00000' overwrite into table data;导入hive数据表里

  • 相关阅读:
    [LeetCode] Trips and Users 旅行和用户
    [LeetCode] Rising Temperature 上升温度
    [LeetCode] Delete Duplicate Emails 删除重复邮箱
    [LeetCode] Department Top Three Salaries 系里前三高薪水
    Spring boot Jackson基本演绎法&devtools热部署
    使用spring tool suite(STS)工具创建spring boot项目和出现错误后的处理
    Spring Boot 2.0官方文档之 Actuator
    springboot 使用webflux响应式开发教程(二)
    SpringBoot在自定义类中调用service层等Spring其他层
    springBoot单元测试-模拟MVC测试
  • 原文地址:https://www.cnblogs.com/zlj843767688/p/11854709.html
Copyright © 2011-2022 走看看