zoukankan      html  css  js  c++  java
  • 基于Hive的手机应用信息统计分析系统

    项目概要

    需求

    手机应用日志

    定期离线分析手机应用新增用户、活跃用户、沉默用户、启动次数、版本分布和留存用户等业务指标。

    工作流程

    1. 手机APP启动时,上报启动日志、错误日志、页面日志、事件日志、使用时长日志等信息到日志收集服务器。
    2. 日志收集服务器将收集到的日志信息发送给kafka。
    3. Flume分别消费kafka中的5种主题信息,并把数据存储到HDFS上。
    4. 通过crontab任务调度定时把HDFS中的信息拷贝到Hive数据仓库中。
    5. 核心业务操作采用Hive查询。

    一般先flume再kafka或者只有kafka
    一般数据开发就负责2到5或6(展示),即数据收集和处理或加上展示。

    具体实现

    公共模板、手机端日志生成模块、日志收集web模块、flume拦截器、hive自定义函数

    日志

    public class AppBaseLog implements Serializable 
    

    日志基类包含日志创建时间、应用ID、租户企业ID、设备ID(手机用户ID)、应用版本、应用下载渠道、操作系统及其版本、机型等信息。

    其他日志继承自基类,错误日志类、事件日志类、页面日志类、启动日志类、使用时长日志类、地理信息类。

    还有一个AppLogEntity类,里面包含了这些上述日志的数组,手机应用每次发送的日志信息就是使用这个类。

    手机客户端

    一个模拟日志生成的类。

    数据收集模板

    WebController类,将收集到的日志信息发送给Kafka。

    一个手机用户发送一份AppLogEntity对象数据,里面包含了各种日志类型的数据。将这些数据分别转化为Json格式,然后通过Kafka的ProducerRecord发送到Kafka集群。

    总体流程

    Tomcat运行web程序,generateData类不断向端口发送日志信息。web类的controller层将数据发送到kafka集群。搭建Flume管道,从Kafka集群中拉取数据到HDFS。

    编写Hive脚本,通过crondtab实现每隔一段时间从HDFS上导入数据到Hive。

    Hive UDF类

    根据输入的时间信息:

    • 返回当天的起始时间;

    • 返回本周的起始时间;

    • 返回本月的起始时间;

    根据输入的时间和时间格式化信息:返回按照格式化要求显示的信息。

    新建一个DateUtil类,里面有输入时间获取相应起始时间的函数。

    新建一个DayBegin类,继承UDF类,重写并重载各种evaluate方法。实现的方法包括无参返回当天的零点;偏移量参数返回当天加偏移量的那一天的零点;指定日期加偏移量返回该天的起始时间;根据String计算某天的起始时间;string时间 + offset;string时间 + 指定时间格式;string时间 + 指定时间格式 + 偏移。

    按DayBegin类的思路,再实现周、月的类。

    还有一个FormatTimeUDF类,增加根据long时间 + string指定时间格式 返回string时间;根据string时间 + string指定时间格式 返回string时间;根据long时间 +string指定时间格式 + 辅助参数 返回周内第一天的string时间。

    public class FormatTimeUDF extends UDF {
    
    // 根据输入的时间毫秒值(long类型)和格式化要求,返回String类型时间
    public String evaluate(long ms,String fmt) throws ParseException {
    
        SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
        Date d = new Date();
        d.setTime(ms);
    
        return sdf.format(d) ;
    }
    
    // 根据输入的时间毫秒值(String类型)和格式化要求,返回String类型时间
    public String evaluate(String ms,String fmt) throws ParseException {
    
        SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
        Date d = new Date();
        d.setTime(Long.parseLong(ms));
    
        return sdf.format(d) ;
    }
    
    // 根据输入的时间毫秒值(long类型)、格式化要求,和区分周的任意值,返回String类型时间
    public String evaluate(long ms ,String fmt , int week) throws ParseException {
    
        Date d = new Date();
        d.setTime(ms);
    
        //周内第一天
        Date firstDay = DateUtil.getWeekBeginTime(d) ;
        SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
    
        return sdf.format(firstDay) ;
    }
    

    添加函数:将jar包放到hive/lib,修改hive-site,在hiveclient上注册函数(一个类一个函数)

    HIVE查询

    主要包括:新增用户、活跃用户、沉默用户、留存用户(上周没,这周有;这两周没,之前有;本周留存)、新鲜度

    新增用户

    createdatms为日志创建时间

    # 判断今天的新增用户
    select count(*)
    from (
        select min(createdatms) mintime
        from ext_startup_logs
        where appid = 'sdk34734'
        group by deviceid
        having mintime >= getdaybegin()
    )t;
    # 昨天就是 mintime >= getdaybegin(-1) and mintime < getdaybegin()
    

    活跃用户

    # 日活跃用户
    select count(distinct deviceid)
    from ext_startup_logs
    where appid = 'sdk34734' and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
    # 优化,根据时间分区表去查询,避免全表扫描。ym和day在load数据时已经设定好的分区名
    where ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd') and  appid = 'sdk34734';
    
    # 一周内每日的日活跃数。formattime得到每日的开始。
    select formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
    from ext_startup_logs
    where concat(ym, day) >= formattime(getdaybegin(-6), "yyyyMMdd")
    and appid = 'sdk34734'
    group by formattime(createdatms,'yyyy/MM/dd');
    
    # 一次查询出过去的5周,每周的周活跃数。formattime的0参数是为了得出当日对应的周开始
    select formattime(createdatms,'yyyy/MM/dd',0) week ,count(distinct deviceid)
    from ext_startup_logs
    where appid = 'sdk34734'
    and createdatms >= getweekbegin(-6) and createdatms < getweekbegin(-1)
    group by formattime(createdatms,'yyyy/MM/dd',0);
    # 过去5周,包含本周的每周周活跃数
    where concat(ym,day) >= formattime(getweekbegin(-4),'yyyyMMdd') and appid ='sdk34734'
    
    # 一次查询出过去的三个月内的月活跃数
    select formattime(createdatms,'yyyy/MM',0) month ,count(distinct deviceid)
    from ext_startup_logs
    where appid = 'sdk34734'
    and createdatms >= getmonthbegin(-2) and createdatms < getmonthbegin(-1)
    group by formattime(createdatms,'yyyy/MM',0);
    # 包含本月
    where ym >= formattime(getmonthbegin(-2),'yyyyMM') and appid ='sdk34734'
    
    # 连续三周活跃用户
    select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
    from ext_startup_logs
    where concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd') and appid = 'sdk34734'
    group by deviceid
    having c = 3;
    

    沉默用户

    条件:只有一条日志;安装2天内不算

    select count(*)
    from (
    	select deviceid, count(createdate) c
    	from ext_startup_logs
    	where concat(ym, day) < formattime(getdaybegin(-2), 'yyyyMMdd') and appid = 'sdk34734'
    	group by deviceid
    	having c = 1;
    );
    

    留存用户

    # 本周回流,即上周没启动过,本周启动了
    select distinct s.deviceid
    from ext_startup_logs s
    where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd') and deviceid not in (
        select distinct t.deviceid
        from ext_startup_logs t
        where t.appid = 'sdk34734' 
        and concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd') 
        and concat(t.ym,t.day) < formattime(getweekbegin(),'yyyyMMdd') 
        );
    
    # 连续2周没有启动的用户,之前启动过
    select distinct s.deviceid
    from ext_startup_logs s
    where concat(ym,day) < formattime(getweekbegin(-1),'yyyyMMdd')
    and appid='sdk34734'
    and s.deviceid not in (
        select distinct(t.deviceid)
        from ext_startup_logs t
        where concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd')
        and t.appid='sdk34734'
        );
        
    # 留存用户。本周留存指上周新增,且本周活跃。
    select distinct s.deviceid
    from ext_startup_logs s
    where concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd')
    and s.appid = 'sdk34734'
    and s.deviceid in (
        select tt.deviceid , min(tt.createdatms) mintime
        from ext_startup_logs tt
        where tt.appid = 'sdk34734'
        group by tt.deviceid 
        having mintime >= getweekbegin(-2) 
        and mintime < getweekbegin(-1)
        );
    

    新鲜度分析

    # 新鲜度分析,某段时间的新增用户数/某段时间的活跃的用户数
    # 今天新增
    select count(*)
    from(
        select min(createdatms) mintime
        from ext_startup_logs
        where appid = 'sdk34734'
        group by deviceid
        having mintime >= getdaybegin() 
        )t;
    # 今天活跃
    select count(distinct deviceid)
    from ext_startup_logs
    where concat(ym, day) = formattime(getdaybegin, "yyyyMMdd");
    and appid = 'sdk34734'
    
  • 相关阅读:
    crontab
    待重写
    待重写
    多套开发资源使用情况
    待重写
    待重写
    待重写
    docker安装es
    docker run启动时目录映射研究
    rabbitmq第二篇:使用插件实现延迟功能
  • 原文地址:https://www.cnblogs.com/code2one/p/10187885.html
Copyright © 2011-2022 走看看