zoukankan      html  css  js  c++  java
  • 大数据小项目之电视收视率企业项目12

    因为环境不足,所以没有用flume收集日志,而是用的已经从别的渠道的日志

    日志截图:

    编写以下脚本,将收集的日志文件上传至HDFS

    #!/bin/bash
    
    #set java env
    export JAVA_HOME=/soft/jdk1.8.0_65/
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
    export PATH=${JAVA_HOME}/bin:$PATH
    
    #set hadoop env
    export HADOOP_HOME=/soft/hadoop-2.7.3/
    export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH
    
    #日志文件存放的目录
    log_src_dir=/home/wang/logs/log/ 
    
    #待上传文件存放的目录
    log_toupload_dir=/home/wang/logs/toupload/
    
    
    #日志文件上传到hdfs的根路径
    hdfs_root_dir=/data/clickLog/20180811/
    
    #打印环境变量信息
    echo "envs: hadoop_home: $HADOOP_HOME"
    
    
    #读取日志文件的目录,判断是否有需要上传的文件
    echo "log_src_dir:"$log_src_dir
    ls $log_src_dir | while read fileName
    do
        if [[ "$fileName" == access.log.* ]]; then
        # if [ "access.log" = "$fileName" ];then
            date=`date +%Y_%m_%d_%H_%M_%S`
            #将文件移动到待上传目录并重命名
            #打印信息
            echo "moving $log_src_dir$fileName to $log_toupload_dir"xxxxx_click_log_$fileName"$date"
            mv $log_src_dir$fileName $log_toupload_dir"xxxxx_click_log_$fileName"$date
            #将待上传的文件path写入一个列表文件willDoing
            echo $log_toupload_dir"xxxxx_click_log_$fileName"$date >> $log_toupload_dir"willDoing."$date
        fi
        
    done
    #找到列表文件willDoing
    ls $log_toupload_dir | grep will |grep -v "_COPY_" | grep -v "_DONE_" | while read line
    do
        #打印信息
        echo "toupload is in file:"$line
        #将待上传文件列表willDoing改名为willDoing_COPY_
        mv $log_toupload_dir$line $log_toupload_dir$line"_COPY_"
        #读列表文件willDoing_COPY_的内容(一个一个的待上传文件名)  ,此处的line 就是列表中的一个待上传文件的path
        cat $log_toupload_dir$line"_COPY_" |while read line
        do
            #打印信息
            echo "puting...$line to hdfs path.....$hdfs_root_dir"
            hadoop fs -put $line $hdfs_root_dir
        done    
        mv $log_toupload_dir$line"_COPY_"  $log_toupload_dir$line"_DONE_"
    done

    MR清洗数据

    创建工程导入依赖库:

    <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.3</version>
            </dependency>
    
    
            <dependency>
                <groupId>jdk.tools</groupId>
                <artifactId>jdk.tools</artifactId>
                <version>1.8</version>
                <scope>system</scope>
                <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
            </dependency>
            
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>2.1.0</version>
            </dependency>
            
         
            
       <dependency>
                <groupId>mysql</groupId>
               <artifactId>mysql-connector-java</artifactId>
               <version>5.1.33</version>
       </dependency>
    </dependencies>
     

    Mapper类

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import com.it18zhang.project.util.AnalysisNginxTool;
    
    public class AccessLogPreProcessMapper  extends Mapper<LongWritable, Text, Text, NullWritable>{
       
        Text text = new Text();
        
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            
            String value1 = value.toString();
            String itr[] = value1.toString().split(" ");
            if (itr.length < 11)
            {
                return;
            }
            String ip = itr[0];
            String date = AnalysisNginxTool.nginxDateStmpToDate(itr[3]);
            String url = itr[6];
            String upFlow = itr[9];
            
            text.set(ip+","+date+","+url+","+upFlow);
            context.write(text, NullWritable.get());
            
        }
    }

    Driver类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.it18zhang.project.util.DateToNUM;
    
    public class AccessLogDriver {
        
        public static void main(String[] args) throws Exception {
            DateToNUM.initMap();
            Configuration conf = new Configuration();
            if(args.length != 2){
                args[0] =  "hdfs://wang201/data/access.log";
                args[1]    =  "hdfs://wang201/uvout/hive" ;
            }
    
            Job job = Job.getInstance(conf); // 设置一个用户定义的job名称
            job.setJarByClass(AccessLogDriver.class);
            job.setMapperClass(AccessLogPreProcessMapper.class); // 为job设置Mapper类
            // 为job设置Reducer类
            job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(Text.class);// 为job的输出数据设置Key类
            job.setMapOutputValueClass(NullWritable.class);// 为job输出设置value类
            FileInputFormat.addInputPath(job, new Path(args[0])); // 为job设置输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));// 为job设置输出路径
            System.exit(job.waitForCompletion(true) ? 0 : 1); // 运行job
        }
    
    }
    AnalysisNginxTool工具类
    package com.it18wang;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AnalysisNginxTool
    {
        private static Logger logger = LoggerFactory.getLogger(AnalysisNginxTool.class);
    
        public static String nginxDateStmpToDate(String date)
        {
            String res = "";
            try
            {
                SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
                String datetmp = date.split(" ")[0].toUpperCase();
                String mtmp = datetmp.split("/")[1];
                DateToNUM.initMap();
                datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
                System.out.println(datetmp);
                Date d = df.parse(datetmp);
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
                res = sdf.format(d);
            }
            catch (ParseException e)
            {
                logger.error("error:" + date, e);
            }
            return res;
        }
    
        public static long nginxDateStmpToDateTime(String date)
        {
            long l = 0;
            try
            {
                SimpleDateFormat df = new SimpleDateFormat("[dd/MM/yyyy:HH:mm:ss");
                String datetmp = date.split(" ")[0].toUpperCase();
                String mtmp = datetmp.split("/")[1];
                datetmp = datetmp.replaceAll(mtmp, (String) DateToNUM.map.get(mtmp));
    
                Date d = df.parse(datetmp);
                l = d.getTime();
            }
            catch (ParseException e)
            {
                logger.error("error:" + date, e);
            }
            return l;
        }
    }
    package com.it18wang;
    
    import java.util.HashMap;
    public class DateToNUM
    {
        public static HashMap map = new HashMap();
    
        public static void initMap()
        {
            map.put("JAN", "01");
            map.put("FEB", "02");
            map.put("MAR", "03");
            map.put("APR", "04");
            map.put("MAY", "05");
            map.put("JUN", "06");
            map.put("JUL", "07");
            map.put("AUG", "08");
            map.put("SEPT", "09");
            map.put("OCT", "10");
            map.put("NOV", "11");
            map.put("DEC", "12");
        }
    }

     打jar包

    上传jar包到虚拟机,执行命令

    hadoop jar /home/wang/mrclick.jar com.it18wang.AccessLogDriver /data/clickLog/20180812 /vout/hive

     清洗后的数据

     

     数据导入hive表中

    create database mydb;

    use mydb;

    create external table mydb2.access(ip string,day string,url string,upflow string) row format delimited fields terminated by ',';

    create external table pvnum(id int,sum int) row format delimited fields terminated by ',';

     数据导入mysql中:

    本地Navicat连接虚拟机的mysql

    在数据库创建mysql数据:upflow

    里面有两个字段

    利用sqoop进行从hive数据库中导出到mysql的upflow表中

    sqoop export --connect

     jdbc:mysql://wang201:3306/userdb

     --username sqoop --password sqoop --table upflow --export-dir

    /user/hive/warehouse/db2.db/upflow --input-fields-terminated-by ','

  • 相关阅读:
    void用法
    使用taskkill 命令强制结束进程-附C++例子
    绝对路径和相对路径
    基于session的简单登录逻辑
    基于Cookie的简单登录流程
    网页跳转的几种方式
    header()函数
    图像处理_03_裁切与缩放
    图像处理_02_水印
    图像处理_01_验证码
  • 原文地址:https://www.cnblogs.com/wakerwang/p/9479677.html
Copyright © 2011-2022 走看看