zoukankan      html  css  js  c++  java
  • 日志离线收集处理方案——2.数据清理和计算

    接上文,本文采用Hive相关的技术来完成数据清理和计算

    一. 原始日志信息的存储

    hive的表支持正则式的方式进行存储和读取,如下:(注:不要建成外部表,便于数据清洗完后Truncate掉数据,以便后续数据的清洗)

    CREATE TABLE nginxlog (
      ip STRING,
      time STRING,
      request STRING,
      status STRING,
      size STRING,
      referer STRING,
      agent STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
      "input.regex" = "([^ ]*) (\[[^\]]*\]) ("[^"]*") (-|[0-9]*) (-|[0-9]*) ("[^ ]*") ("[^\"]*")"
    )
    STORED AS TEXTFILE LOCATION '/test' ;

    我们的access.log的日志数据格式如下:

    192.168.111.1 [29/Jul/2019:19:58:55 +0800] "GET /big.png?url=http://127.0.0.1/a.html&urlname=a.html&scr=1366x768&ce=1&cnv=0.6735760053703803&ref=http://127.0.0.1/b.html&stat_uv=67256303183188720208&stat_ss=6553789412_7_1564401535833 HTTP/1.0" 200 37700 "http://127.0.0.1/a.html" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"

    所以建表语句中的正则式 

    ([^ ]*) (\[[^\]]*\]) ("[^"]*") (-|[0-9]*) (-|[0-9]*) ("[^ ]*") ("[^\"]*") 必须要和上述的数据匹配,
    首先要先理解这个正则式中每个分组匹配的意义:
      ([^ ]*) //表示匹配除了空格之外的其他字符,^在方括号表达式中使用,此时它表示不接受该字符集合。
       (-|[0-9]*)  //表示匹配-或者0到9的多个数字
      ("[^\"]*")  // " 表示实际的双引号字符 ,引号前的符是java语言为了转义它之后的双引号符,与正则式并无关系

    (\[[^\]]*\]) //
    \[ 是为了表示正则式的 [ ,即正则式中的实际的左中括号符号

    此外在Hive中正式应用这个正则式前,最好在JAAV的单元测试代码中进行验证:

    @Test
        public void testLog(){
    
            String regex =  "(([^ ]*) (\[[^\]]*\]) ("[^"]*") (-|[0-9]*) (-|[0-9]*) ("[^ ]*") ("[^\"]*"))" ;
            Pattern pattern = Pattern.compile( regex);
            String data = "192.168.111.1 [21/Jul/2019:15:53:07 } +0800] "GET /favicon.ico HTTP/1.1" 404 555 "http://192.168.111.123/" "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36"" ;
            Matcher matcher = pattern.matcher(data);
            
         //注意: matches()方法表示匹配整个串, find()方法表示匹配子串就可以
    if(matcher.matches()) { for(int i=0;i<matcher.groupCount();i++){ System.out.println(matcher.group(i+1)) ; } }else{ System.out.println("No match found.%n"); } }
    
    
    
    

    二. 原始日志信息的处理——>产生中间处理数据

       1. 对nginxlog原始日志数据进行处理,产生结构化格式良好的 pv级日志访问记录,以便为后一步的KPI指标计算打下基础,先创建一个PV记录表:

    create table page_view(
            time string,
            pv_id string,
            uv_id string,
            ip      string,
            url string,
            resource string,
            access_time string,
            status int,
            size  int,
            referer_server string,
            referer_client string,
            cooike_enabled binary,
            screen string,
            session_id string,
            session_times string,
            session_timelen string)  PARTITIONED BY(day string) ROW FORMAT DELIMITED
       FIELDS TERMINATED BY 't'
       STORED AS TEXTFILE

    2. 要完成数据从nginxlog表到 page_view表,最好的方式是写两个hive函数,分别完成对 nginxlog表中的 time 和 request这两列的拆解

    先看折解time这一列,主要是把数据从 [29/Jul/2019:19:58:55 +0800]  变成  2019-07-19  19  这样的日期和小时这两列的数据,具体代码如下:

      

    public class DatetimeOfLog extends  GenericUDTF {
    
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs)
                throws UDFArgumentException {
    
            if(argOIs.length != 1){
                throw new UDFArgumentException("ExplodeStringUDTF takes exactly one argument.");
            }
            if(argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE
                    && ((PrimitiveObjectInspector)argOIs[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
                throw new UDFArgumentTypeException(0, "ExplodeStringUDTF takes a string as a parameter.");
            }
    
            ArrayList<String> fieldNames = new ArrayList<String>();
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
            fieldNames.add("date");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldNames.add("time");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        @Override
        public void process(Object[] objects) throws HiveException {
            String date = "" ;
            String hour = "" ;
            if(objects.length == 1){
                String timestamp = objects[0].toString().substring(1,objects[0].toString().length()-1);  //获取传入的参数,实际就是nginxlog表中的time列的数据
                SimpleDateFormat format = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss z", Locale.ENGLISH);
                SimpleDateFormat df_date = new SimpleDateFormat("yyyy-MM-dd");
                SimpleDateFormat df_hour = new SimpleDateFormat("HH");
                Date d = null ;
                try {
                    d = format.parse(timestamp);
                }catch (Exception e){
    
                }
                date = df_date.format(d) ;
                hour = df_hour.format(d) ;
    
                super.forward(new String[]{date,hour});
            }
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }

    拆解 request 列的hive函数代码:

     工具类:

    public class LogSplit {
    
        //private String[] names ;
    
        private String[]  values ;
    
    
    
        public LogSplit(){
    
        }
    
        public void  process(String data , String resource){
            Integer index = data.indexOf(resource + "?") ;
            String[] items = data.substring(index).split(" ")[0].split("&") ;
            Integer len = items.length;
            this.values = new String[len+2] ;  //由于最后一项是session相关信息还要拆成三列
             for(int i=0; i<len ;i++){
                this.values[i] = split(items[i]) ;
            }
            String[] sessionValues = this.values[len-1].split("_") ;
             this.values[len-1] = sessionValues[0] ;
             this.values[len]  = sessionValues[1] ;
            this.values[len+1]  = sessionValues[2] ;
        }
    
        private String split(String data){
            String[] pail = data.split("=") ;
            if(pail.length > 1){
                return pail[1];
            }else{
                return  "";
            }
        }
    
    
    
    
        public String[] values(){
            return  values;
        }
    
    
    }

     UDTF类

    public class LogSplitUDF extends  GenericUDTF {
    
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs)
                throws UDFArgumentException {
    
            if(argOIs.length != 2){
                throw new UDFArgumentException("ExplodeStringUDTF takes exactly two arguments.");
            }
            if(argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE
                    && ((PrimitiveObjectInspector)argOIs[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
                throw new UDFArgumentTypeException(0, "ExplodeStringUDTF takes a string as a parameter.");
            }
    
            ArrayList<String> fieldNames = new ArrayList<String>();
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
            for(int i=1 ; i<11 ; i++){
                fieldNames.add("c" + i);
                fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            }
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        @Override
        public void process(Object[] objects) throws HiveException {
    
            if(objects.length == 2){
    
                LogSplit log = new LogSplit() ;
                String logdata = objects[0].toString() ;
                String resName = objects[1].toString() ;
                log.process(logdata , resName);
                super.forward(log.values());
            }
    
    
        }
    
    
    
        @Override
        public void close() throws HiveException {
    
        }
    }

    以上代码编写完后,可通过如下代码进行单元测试:

    @Test
        public void test1(){
            String logdata = ""GET /big.png?url=http://127.0.0.1/a.html&urlname=a.html&scr=1366x768&ce=1&cnv=0.3810127868986546&ref=&stat_uv=60800284059261604254&stat_ss=9338821966_0_1563866457340 HTTP/1.0"" ;
    
            LogSplit split = new LogSplit() ;
            split.process(logdata , "big.png");
            String[] values = split.values();
            System.out.println(values.length);
            for(String value : values){
                System.out.println(value);
            }
        }

    使用maven进行打包: 

        mvn clean install -DskipTests=true

    在hive下执行:

              add jar {jar的路径+jar的名字}  ;

         create temporary function fun_request_split as 'xxx.yyy.LogSplitUDF' ;

              create temporary function fun_time_split as 'xxx.yyy.DatetimeOfLog' ;

    函数的测试:

              select fun_request_split(request,'big.png') from nginxlog;     //big.png为埋点的图片名,因为request字段中含埋点信息,所以函数通过这个图片名可以更准确进行拆解

              select fun_time_split(time) from nginxlog ;

         

    三.根据中间处理数据产生最终的KPI指标统计数据

  • 相关阅读:
    定时自动备份数据库
    读<你必须知道的.NET>小结3
    [转载]手把手教你用C#打包应用程序(安装程序)【卸载模块已添加】
    集成测试
    P2P
    20110818炒股日记急拉慢跌的走势
    xp sp3安装IIS
    调用ASP.NET工作流:承载及限制
    20110822炒股日记进入筑底阶段
    用Duplex实现消息广播
  • 原文地址:https://www.cnblogs.com/hzhuxin/p/11266385.html
Copyright © 2011-2022 走看看