zoukankan      html  css  js  c++  java
  • hadoop实战 -- 网站日志KPI指标分析

      本项目分析apache服务器产生的日志,分析pv、独立ip数和跳出率等指标。其实这些指标在第三方系统中都可以检测到,在生产环境中通常用来分析用户交易等核心数据,此处只是用于演示说明日志数据的分析流程。

    一、需求分析

      我们可以编写执行的shell脚本,将apache每天产生的日志上传到HDFS中,然后经过数据清洗,hive分析,最后将数据从HDFS导入到mysql中,然后设定计划任务每天定期自动执行分析工作。

    1、指标说明

       ▶ PV(Page View):页面浏览量,用户每1次对网站中的每个网页访问均被记录1次。用户对同一页面的多次访问,计算累计访问。通常对网站中资源的访问请求也被计算在PV统计内;

       ▶ 跳出率:只访问了一个页面就离开的浏览量与所产生总浏览量的百分比;

    2、实现步骤

       1) 将日志数据上传到HDFS中。如果数据较小,可以通过在shell中编写hdfs命令上传数据到HDFS中,如果数据量较大的话可以通过NFS在另一台机器上上传日志数据。如果服务器比较多,可以使用flume来完成日志收集工作;

       2) 使用MapReduce将HDFS中进行数据清洗。日志原始数据可能在格式上不满足要求,这时候需要通过MapReduce程序来将HDFS中的数据进行清洗过滤,转换成hive统计所需要的格式;

       3) 使用hive对清洗后的数据进行统计分析。利用hive,我们可以很方便地统计分析日志数据。实现建立一个外部分区表,关联到清洗后的HDFS目录中,然后每天数据清洗完后添加当天的分区,最后执行HiveQL完成统计并保存结果;

       4) 利用sqoop将hive统计结果导出到关系数据库如mysql中;

       5) 前端展现统计结果。利用web或其他一些展现手段将mysql中的结果数据直观地展示给用户;

    二、详细实现

       本实验分析apache服务器产生的访问日志(样本),其数据格式如“221.194.31.149 - - [27/May/2015:17:52:48 +0800] "GET /static/image/common/star_level1.gif HTTP/1.1" 200 547”,依次表示访问ip、访问时间、访问路径、服务器返回状态。

       本实验实验流程图如下所示:

    图1 apache服务器日志分析流程示意图 

    1、编写MapReduce,将原始数据格式清洗筛选,留下三列:IP、访问时间和访问地址,参考代码:

    package com.hicoor.hadoop.logproj;
    
    import java.net.URI;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Locale;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class DataCleaner {
        
        static class LogMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
            LogParser logParser = new LogParser();
            Text v2 = new Text();
            protected void map(LongWritable k1, Text v1, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
                try {
                    String[] parse = logParser.parse(v1.toString());
                    //过滤空白行
                    if(parse != null) {
                        //过滤结尾的特定格式字符串
                        if(parse[2].endsWith(" HTTP/1.1")){
                            parse[2] = parse[2].substring(0, parse[2].length()-" HTTP/1.1".length());
                        }
                        
                        v2.set(parse[0]+'	'+parse[1]+'	'+parse[2]);
                    }
                } catch (Exception e) {
                    System.out.println("当前行处理出错:"+v1.toString());
                }
                context.write(v2, new LongWritable(1));
            };
        }
        
        static class LogReduce extends Reducer<Text, LongWritable, Text, NullWritable>{
            protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
                context.write(k2, NullWritable.get());
            };
        }
        
        public static void main(String[] args) throws Exception {
            System.setProperty("hadoop.home.dir", "D:/desktop/hadoop-2.6.0");
            Configuration conf = new Configuration();
            conf.setStrings("dfs.nameservices", "cluster1");
            conf.setStrings("dfs.ha.namenodes.cluster1", "hadoop0,hadoop1");
            conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop0", "hadoop0:9000");
            conf.setStrings("dfs.namenode.rpc-address.cluster1.hadoop1", "hadoop1:9000");
            //必须配置,可以通过该类获取当前处于active状态的namenode
            conf.setStrings("dfs.client.failover.proxy.provider.cluster1", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
            
            
            Job job = Job.getInstance(conf, "LogDataCleaner");
            
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
              System.err.println("Usage: wordcount <in> [<in>...] <out>");
              System.exit(2);
            }
            
                
            // 删除已存在的输出目录
            String FILE_OUT_PATH = otherArgs[otherArgs.length - 1];
            //String FILE_OUT_PATH = "hdfs://cluster1/hmbbs_cleaned/2013_05_30";
             FileSystem fileSystem = FileSystem.get(new URI(FILE_OUT_PATH), conf);
             if (fileSystem.exists(new Path(FILE_OUT_PATH))) {
                 fileSystem.delete(new Path(FILE_OUT_PATH), true);
             }
            
            job.setJarByClass(DataCleaner.class);
            //1.1 设置分片函数
            job.setInputFormatClass(TextInputFormat.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
              FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            //FileInputFormat.addInputPath(job, new Path("hdfs://cluster1/hmbbs_logs/access_2013_05_30.log"));
            //1.2 设置map
            job.setMapperClass(LogMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //1.3 设置分区函数
            job.setPartitionerClass(HashPartitioner.class);
            //job.setNumReduceTasks(3);
            //1.4 分组排序
            //1.5 规约
            
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, new Path(FILE_OUT_PATH));
            
            //2.2 设置Reduce
            job.setReducerClass(LogReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            
            job.waitForCompletion(true);
        }
        
        static class LogParser {
    
            public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
            public static final SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
    //        public static void main(String[] args) throws ParseException {
    //            final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127";
    //            LogParser parser = new LogParser();
    //            final String[] array = parser.parse(S1);
    //            System.out.println("样例数据: "+S1);
    //            System.out.format("解析结果:  ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]);
    //        }
            /**
             * 解析英文时间字符串
             * @param string
             * @return
             * @throws ParseException
             */
            private Date parseDateFormat(String string){
                Date parse = null;
                try {
                    parse = FORMAT.parse(string);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                return parse;
            }
            /**
             * 解析日志的行记录
             * @param line
             * @return 数组含有5个元素,分别是ip、时间、url、状态、流量
             */
            public String[] parse(String line){
                if(line.trim() == "") {
                    return null;
                }
                String ip = parseIP(line);
                String time = parseTime(line);
                String url = parseURL(line);
                String status = parseStatus(line);
                String traffic = parseTraffic(line);
                
                return new String[]{ip, time ,url, status, traffic};
            }
            
            private String parseTraffic(String line) {
                final String trim = line.substring(line.lastIndexOf(""")+1).trim();
                String traffic = trim.split(" ")[1];
                return traffic;
            }
            private String parseStatus(String line) {
                final String trim = line.substring(line.lastIndexOf(""")+1).trim();
                String status = trim.split(" ")[0];
                return status;
            }
            private String parseURL(String line) {
                final int first = line.indexOf(""");
                final int last = line.lastIndexOf(""");
                String url = line.substring(first+1, last);
                return url;
            }
            private String parseTime(String line) {
                final int first = line.indexOf("[");
                final int last = line.indexOf("+0800]");
                String time = line.substring(first+1,last).trim();
                Date date = parseDateFormat(time);
                return dateformat1.format(date);
            }
            private String parseIP(String line) {
                String ip = line.split("- -")[0].trim();
                return ip;
            }
            
        }
    }
    View Code

    2、mysql中新建表web_logs_stat,包含字段:vtime、pv、ip_n、jumper_n。

    3、在shell中创建外部分区表,关联到清洗后的hdfs目录,命令:

       hive> CREATE EXTERNAL TABLE ex_logs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '/web_cleaned';

    4、新建日常自动处理的shell脚本logs_daily_process.sh,内容如下:

    #!/bin/sh
    
    #get yesterday format string
    yesterday=`date --date='1 days ago' +%Y_%m_%d`
    #yesterday=$1
    
    #upload logs to hdfs
    hadoop fs -put /apache_logs/access_${yesterday}.log  /web_logs
    
    #cleaning data
    hadoop jar /apache_logs/cleaned.jar  /web_logs/access_${yesterday}.log  /web_cleaned/${yesterday}  1>/dev/null
    
    
    #alter hive table and then add partition to existed table
    hive -e "ALTER TABLE ex_logs ADD PARTITION(logdate='${yesterday}') LOCATION '/web_cleaned/${yesterday}';"
    
    #create hive table everyday
    hive -e "CREATE TABLE web_pv_${yesterday} AS SELECT COUNT(1) AS PV FROM ex_logs WHERE logdate='${yesterday}';"
    hive -e "CREATE TABLE web_ip_${yesterday} AS SELECT COUNT(DISTINCT ip) AS IP FROM ex_logs WHERE logdate='${yesterday}';"
    hive -e "CREATE TABLE web_jumper_${yesterday} AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM ex_logs WHERE logdate='${yesterday}' GROUP BY ip HAVING times=1) e;"
    hive -e "CREATE TABLE web_${yesterday} AS SELECT '${yesterday}', a.pv, b.ip, C.jumper FROM web_pv_${yesterday} a JOIN  JOIN web_ip_${yesterday} b ON 1=1 JOIN web_jumper_${yesterday} c ON 1=1;"
    
    #delete hive tables
    hive -e "drop table web_pv_${yesterday};"
    hive -e "drop table web_ip_${yesterday};"
    hive -e "drop table web_jumper_${yesterday};"
    
    
    #sqoop export to mysql
    sqoop export --connect jdbc:mysql://hadoop0:3306/ex_logs --username root --password 123456 --table web_logs_stat --fields-terminated-by '01' --export-dir '/user/hive/warehouse/web_${yesterday}'
    
    #delete hive tables
    hive -e "drop table web_${yesterday};"
    View Code

    5、配置计划任务,命令:crontab -e,内容如:* 1 * * * logs_daily_process.sh,表示每天1点周期执行脚本。

  • 相关阅读:
    【可视化】指标块分析
    【可视化】可视化概况(一)
    webpack 打包编译优化之路
    Akka源码分析-Akka-Streams-概念入门
    Akka源码分析-Cluster-DistributedData
    Akka源码分析-Cluster-Sharding
    Akka源码分析-Cluster-Metrics
    Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster
    Akka源码分析-Cluster-ClusterClient
    Akka源码分析-Cluster-Singleton
  • 原文地址:https://www.cnblogs.com/hanganglin/p/4533992.html
Copyright © 2011-2022 走看看