项目概要
需求
手机应用日志
定期离线分析手机应用新增用户、活跃用户、沉默用户、启动次数、版本分布和留存用户等业务指标。
工作流程
- 手机APP启动时,上报启动日志、错误日志、页面日志、事件日志、使用时长日志等信息到日志收集服务器。
- 日志收集服务器将收集到的日志信息发送给kafka。
- Flume分别消费kafka中的5种主题信息,并把数据存储到HDFS上。
- 通过crontab任务调度定时把HDFS中的信息拷贝到Hive数据仓库中。
- 核心业务操作采用Hive查询。
一般先flume再kafka或者只有kafka
一般数据开发就负责2到5或6(展示),即数据收集和处理或加上展示。
具体实现
公共模板、手机端日志生成模块、日志收集web模块、flume拦截器、hive自定义函数
日志
public class AppBaseLog implements Serializable
日志基类包含日志创建时间、应用ID、租户企业ID、设备ID(手机用户ID)、应用版本、应用下载渠道、操作系统及其版本、机型等信息。
其他日志继承自基类,错误日志类、事件日志类、页面日志类、启动日志类、使用时长日志类、地理信息类。
还有一个AppLogEntity类,里面包含了这些上述日志的数组,手机应用每次发送的日志信息就是使用这个类。
手机客户端
一个模拟日志生成的类。
数据收集模板
WebController类,将收集到的日志信息发送给Kafka。
一个手机用户发送一份AppLogEntity对象数据,里面包含了各种日志类型的数据。将这些数据分别转化为Json格式,然后通过Kafka的ProducerRecord发送到Kafka集群。
总体流程
Tomcat运行web程序,generateData类不断向端口发送日志信息。web类的controller层将数据发送到kafka集群。搭建Flume管道,从Kafka集群中拉取数据到HDFS。
编写Hive脚本,通过crondtab实现每隔一段时间从HDFS上导入数据到Hive。
Hive UDF类
根据输入的时间信息:
-
返回当天的起始时间;
-
返回本周的起始时间;
-
返回本月的起始时间;
根据输入的时间和时间格式化信息:返回按照格式化要求显示的信息。
新建一个DateUtil类,里面有输入时间获取相应起始时间的函数。
新建一个DayBegin类,继承UDF类,重写并重载各种evaluate方法。实现的方法包括无参返回当天的零点;偏移量参数返回当天加偏移量的那一天的零点;指定日期加偏移量返回该天的起始时间;根据String计算某天的起始时间;string时间 + offset;string时间 + 指定时间格式;string时间 + 指定时间格式 + 偏移。
按DayBegin类的思路,再实现周、月的类。
还有一个FormatTimeUDF类,增加根据long时间 + string指定时间格式 返回string时间;根据string时间 + string指定时间格式 返回string时间;根据long时间 +string指定时间格式 + 辅助参数 返回周内第一天的string时间。
public class FormatTimeUDF extends UDF {
// 根据输入的时间毫秒值(long类型)和格式化要求,返回String类型时间
public String evaluate(long ms,String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
Date d = new Date();
d.setTime(ms);
return sdf.format(d) ;
}
// 根据输入的时间毫秒值(String类型)和格式化要求,返回String类型时间
public String evaluate(String ms,String fmt) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
Date d = new Date();
d.setTime(Long.parseLong(ms));
return sdf.format(d) ;
}
// 根据输入的时间毫秒值(long类型)、格式化要求,和区分周的任意值,返回String类型时间
public String evaluate(long ms ,String fmt , int week) throws ParseException {
Date d = new Date();
d.setTime(ms);
//周内第一天
Date firstDay = DateUtil.getWeekBeginTime(d) ;
SimpleDateFormat sdf = new SimpleDateFormat(fmt) ;
return sdf.format(firstDay) ;
}
添加函数:将jar包放到hive/lib,修改hive-site,在hiveclient上注册函数(一个类一个函数)
HIVE查询
主要包括:新增用户、活跃用户、沉默用户、留存用户(上周没,这周有;这两周没,之前有;本周留存)、新鲜度
新增用户
createdatms为日志创建时间
# 判断今天的新增用户
select count(*)
from (
select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin()
)t;
# 昨天就是 mintime >= getdaybegin(-1) and mintime < getdaybegin()
活跃用户
# 日活跃用户
select count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734' and createdatms >= getdaybegin() and createdatms < getdaybegin(1);
# 优化,根据时间分区表去查询,避免全表扫描。ym和day在load数据时已经设定好的分区名
where ym = formattime(getdaybegin(),'yyyyMM') and day = formattime(getdaybegin(),'dd') and appid = 'sdk34734';
# 一周内每日的日活跃数。formattime得到每日的开始。
select formattime(createdatms,'yyyy/MM/dd') day ,count(distinct deviceid)
from ext_startup_logs
where concat(ym, day) >= formattime(getdaybegin(-6), "yyyyMMdd")
and appid = 'sdk34734'
group by formattime(createdatms,'yyyy/MM/dd');
# 一次查询出过去的5周,每周的周活跃数。formattime的0参数是为了得出当日对应的周开始
select formattime(createdatms,'yyyy/MM/dd',0) week ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getweekbegin(-6) and createdatms < getweekbegin(-1)
group by formattime(createdatms,'yyyy/MM/dd',0);
# 过去5周,包含本周的每周周活跃数
where concat(ym,day) >= formattime(getweekbegin(-4),'yyyyMMdd') and appid ='sdk34734'
# 一次查询出过去的三个月内的月活跃数
select formattime(createdatms,'yyyy/MM',0) month ,count(distinct deviceid)
from ext_startup_logs
where appid = 'sdk34734'
and createdatms >= getmonthbegin(-2) and createdatms < getmonthbegin(-1)
group by formattime(createdatms,'yyyy/MM',0);
# 包含本月
where ym >= formattime(getmonthbegin(-2),'yyyyMM') and appid ='sdk34734'
# 连续三周活跃用户
select deviceid , count(distinct(formattime(createdatms,'yyyyMMdd',0))) c
from ext_startup_logs
where concat(ym,day) >= formattime(getweekbegin(-2),'yyyyMMdd') and appid = 'sdk34734'
group by deviceid
having c = 3;
沉默用户
条件:只有一条日志;安装2天内不算
select count(*)
from (
select deviceid, count(createdate) c
from ext_startup_logs
where concat(ym, day) < formattime(getdaybegin(-2), 'yyyyMMdd') and appid = 'sdk34734'
group by deviceid
having c = 1;
);
留存用户
# 本周回流,即上周没启动过,本周启动了
select distinct s.deviceid
from ext_startup_logs s
where appid = 'sdk34734' and concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd') and deviceid not in (
select distinct t.deviceid
from ext_startup_logs t
where t.appid = 'sdk34734'
and concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd')
and concat(t.ym,t.day) < formattime(getweekbegin(),'yyyyMMdd')
);
# 连续2周没有启动的用户,之前启动过
select distinct s.deviceid
from ext_startup_logs s
where concat(ym,day) < formattime(getweekbegin(-1),'yyyyMMdd')
and appid='sdk34734'
and s.deviceid not in (
select distinct(t.deviceid)
from ext_startup_logs t
where concat(t.ym,t.day) >= formattime(getweekbegin(-1),'yyyyMMdd')
and t.appid='sdk34734'
);
# 留存用户。本周留存指上周新增,且本周活跃。
select distinct s.deviceid
from ext_startup_logs s
where concat(ym,day) >= formattime(getweekbegin(),'yyyyMMdd')
and s.appid = 'sdk34734'
and s.deviceid in (
select tt.deviceid , min(tt.createdatms) mintime
from ext_startup_logs tt
where tt.appid = 'sdk34734'
group by tt.deviceid
having mintime >= getweekbegin(-2)
and mintime < getweekbegin(-1)
);
新鲜度分析
# 新鲜度分析,某段时间的新增用户数/某段时间的活跃的用户数
# 今天新增
select count(*)
from(
select min(createdatms) mintime
from ext_startup_logs
where appid = 'sdk34734'
group by deviceid
having mintime >= getdaybegin()
)t;
# 今天活跃
select count(distinct deviceid)
from ext_startup_logs
where concat(ym, day) = formattime(getdaybegin, "yyyyMMdd");
and appid = 'sdk34734'