zoukankan      html  css  js  c++  java
  • 19.通过MAPREDUCE 把收集数据进行清洗

    在eclipse软件里创建一个maven项目

     

     

    jdk要换成本地安装的1.8版本的

     加载pom.xml文件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.it19gong</groupId>
      <artifactId>clickLog</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>clickLog</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.0</version>
            </dependency>
    
    
            <dependency>
                <groupId>jdk.tools</groupId>
                <artifactId>jdk.tools</artifactId>
                <version>1.8</version>
                <scope>system</scope>
                <systemPath>E:/software/jdk1.8/lib/tools.jar</systemPath>
            </dependency>
            
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>2.1.0</version>
            </dependency>
            
         
            
            <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.33</version>
          </dependency>
      </dependencies>
      
    </project>

     

    在加载依赖包的时候如果出现错误,在仓库里找不到1.8jdk.tools

     在这个地方改成本地的jdk绝对路径,再重新加载一次maven的依赖包

    我这里修改成

    在项目下新建AccessLogPreProcessMapper类

     

    package com.it19gong.clickLog;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class AccessLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text text = new Text();
        @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
           String itr[] = value.toString().split(" ");
           if (itr.length < 11)
            {
                return;
            }
            String ip = itr[0];
            String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
            String url = itr[6];
            String upFlow = itr[9];
            
            text.set(ip+","+date+","+url+","+upFlow);
            context.write(text, NullWritable.get());
           
    }
    }

    创建AnalysisNginxTool类

    package com.it19gong.clickLog;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AnalysisNginxTool
    {
        private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);
    
        public static String nginxDateStmpToDate(String date)
        {
            String res = "";
            try
            {
                SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
                String datetmp = date.split(" ")[0].toUpperCase();
                String mtmp = datetmp.split("/")[1];
                DateToNUM.initMap();
                datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
                System.out.println(datetmp);
                Date d = df.parse(datetmp);
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
                res = sdf.format(d);
            }
            catch (ParseException e)
            {
                logger.error("error:" + date, e);
            }
            return res;
        }
    
        public static long nginxDateStmpToDateTime(String date)
        {
            long l = 0;
            try
            {
                SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
                String datetmp = date.split(" ")[0].toUpperCase();
                String mtmp = datetmp.split("/")[1];
                datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
    
                Date d = df.parse(datetmp);
                l = d.getTime();
            }
            catch (ParseException e)
            {
                logger.error("error:" + date, e);
            }
            return l;
        }
    }

     创建DateToNUM类

    package com.it19gong.clickLog;
    
    import java.util.HashMap;
    
    public class DateToNUM
    {
        public static HashMap map = new HashMap();
    
        public static void initMap()
        {
            map.put("JAN", "01");
            map.put("FEB", "02");
            map.put("MAR", "03");
            map.put("APR", "04");
            map.put("MAY", "05");
            map.put("JUN", "06");
            map.put("JUL", "07");
            map.put("AUG", "08");
            map.put("SEPT", "09");
            map.put("OCT", "10");
            map.put("NOV", "11");
            map.put("DEC", "12");
        }
    }

     新建AccessLogDriver类

    package com.it19gong.clickLog;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class AccessLogDriver {
        
        public static void main(String[] args) throws Exception {
            DateToNUM.initMap();
            Configuration conf = new Configuration();
            if(args.length != 2){
                args = new String[2];
                args[0] =  "hdfs://node1/data/clickLog/20190620/";
                args[1]    =  "hdfs://node1/uvout/hive" ;
            }
    
            Job job = Job.getInstance(conf); // 设置一个用户定义的job名称
            job.setJarByClass(AccessLogDriver.class);
            job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类
            // 为job设置Reducer类
            job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类
            job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类
            FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径
            System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job
        }
    
    }

    把工程打包成Jar包

     

     

     

     

    把jar包上传到集群

     在集群上运行一下,先检查一下集群的启动进程

     hadoop jar mrclick.jar com.it19gong.clickLog.AccessLogDriver

     

     可以看到输出目录

    查看清洗后的数据

  • 相关阅读:
    WebUpLoder 能自动预览,能多实例,包括后台demo
    ajax请求总是进入Error里
    c#_1:后台post请求
    Echarts_1:水平柱体
    Hello World!
    python正则表达式
    python web.py出现ValueError: need more than 1 value to unpack
    web.py端口被占用的错误
    github commit时出现 Please tell me who you are.以及项目名称管理
    打飞机小游戏 python+pygame
  • 原文地址:https://www.cnblogs.com/braveym/p/11373024.html
Copyright © 2011-2022 走看看