一、数仓搭建 - DWD 层
- 1)对用户行为数据解析
- 2)对核心数据进行判空过滤
- 3)对业务数据采用维度模型重新建模,即维度退化
1.1 DWD 层(用户行为启动表数据解析)
1.1.1 创建启动表 1)建表语句
drop table if exists dwd_start_log; CREATE EXTERNAL TABLE dwd_start_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `open_ad_type` string, `action` string, `loading_time` string, `detail` string, `extend1` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_start_log/' TBLPROPERTIES('parquet.compression'='lzo');
说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引
1.1.2 get_json_object 函数使用
1)输入数据 xjson
Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男","age":"47"}]
2)取出第一个 json 对象
SELECT get_json_object(xjson,"$.[0]") FROM person;
结果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}
3)取出第一个 json 的 age 字段的值
SELECT get_json_object(xjson,"$.[0].age") FROM person;
结果是:25
1.1.3 向启动表导入数据
insert overwrite table dwd_start_log PARTITION (dt='2020-03-10') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from ods_start_log where dt='2020-03-10';
3)测试
select * from dwd_start_log where dt='2020-03-10' limit 2;
1.1.4 DWD 层启动表加载数据脚本
1)vim ods_to_dwd_log.sh
在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall hive=/opt/modules/hive/bin/hive # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_start_log PARTITION (dt='$do_date') select get_json_object(line,'$.mid') mid_id, get_json_object(line,'$.uid') user_id, get_json_object(line,'$.vc') version_code, get_json_object(line,'$.vn') version_name, get_json_object(line,'$.l') lang, get_json_object(line,'$.sr') source, get_json_object(line,'$.os') os, get_json_object(line,'$.ar') area, get_json_object(line,'$.md') model, get_json_object(line,'$.ba') brand, get_json_object(line,'$.sv') sdk_version, get_json_object(line,'$.g') gmail, get_json_object(line,'$.hw') height_width, get_json_object(line,'$.t') app_time, get_json_object(line,'$.nw') network, get_json_object(line,'$.ln') lng, get_json_object(line,'$.la') lat, get_json_object(line,'$.entry') entry, get_json_object(line,'$.open_ad_type') open_ad_type, get_json_object(line,'$.action') action, get_json_object(line,'$.loading_time') loading_time, get_json_object(line,'$.detail') detail, get_json_object(line,'$.extend1') extend1 from "$APP".ods_start_log where dt='$do_date'; " $hive -e "$sql"
2)增加脚本执行权限
chmod 770 ods_to_dwd_log.sh 3)脚本使用 ods_to_dwd_log.sh 2020-03-11 4)查询导入结果 select * from dwd_start_log where dt='2020-03-11' limit 2;
1.2 DWD 层(用户行为事件表数据解析)
1.2.1 创建基础明细表
明细表用于存储 ODS 层原始表转换过来的明细数据
1)创建事件日志基础明细表
drop table if exists dwd_base_event_log; CREATE EXTERNAL TABLE dwd_base_event_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `event_name` string, `event_json` string, `server_time` string) PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_base_event_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF
1.2.2 自定义 UDF 函数(解析公共字段)
UDF 函数特点:一行进一行出。简称,一进一出
1)创建一个 maven 工程:hivefunction 2)创建包名:com.zsy.udf
3)在 pom.xml 文件中添加如下内容
<properties> <hive.version>2.3.0</hive.version> </properties> <repositories> <repository> <id>spring-plugin</id> <url>https://repo.spring.io/plugins-release/</url> </repository> </repositories> <dependencies> <!--添加 hive 依赖--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中
-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
-Dmaven.wagon.http.ssl.ignore.validity.dates=true
详情请点击博客👉:maven下载依赖时候忽略SSL证书校验
注意 2:打包时如果出现如下错误,说明 idea 内存溢出
Exception in thread "main" java.lang.StackOverflowError
解决办法:把 -Xss4m 添加到下图位置
4)UDF 用于解析公共字段
package com.zsy.udf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.json.JSONObject; public class BaseFieldUDF extends UDF { public String evaluate(String line,String key){ // 1.切分数据 String[] log = line.split("\|"); String result = ""; // 2.校验 if(log.length != 2 || StringUtils.isBlank(log[1])){ return result; } // 3.解析数据获取json对象 JSONObject json = new JSONObject(log[1].trim()); // 4.根据传入的key获取对应的值 if("st".equals(key)){ result = log[0].trim(); }else if("et".equals(key)){ if(json.has("et")){ result = json.getString("et"); } }else{ JSONObject cm = json.getJSONObject("cm"); if(cm.has(key)){ result = cm.getString(key); } } return result; } /** * 测试 */ // public static void main(String[] args) { // String line = "1583776132686|{"cm":{"ln":"-42.8","sv":"V2.3.9","os":"8.1.7","g":"X470IP70@gmail.com","mid":"0","nw":"4G","l":"en","vc":"13","hw":"1080*1920","ar":"MX","uid":"0","t":"1583758268106","la":"-31.3","md":"sumsung-18","vn":"1.1.1","ba":"Sumsung","sr":"M"},"ap":"app","et":[{"ett":"1583685512624","en":"display","kv":{"goodsid":"0","action":"2","extend1":"2","place":"1","category":"17"}},{"ett":"1583769686402","en":"newsdetail","kv":{"entry":"3","goodsid":"1","news_staytime":"16","loading_time":"0","action":"4","showtype":"5","category":"97","type1":""}},{"ett":"1583709065211","en":"ad","kv":{"activityId":"1","displayMills":"58537","entry":"1","action":"3","contentType":"0"}},{"ett":"1583693966746","en":"active_background","kv":{"active_source":"3"}},{"ett":"1583734521683","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1583755388633","en":"praise","kv":{"target_id":0,"id":1,"type":3,"add_time":"1583713812739","userid":4}}]}"; // String result = new BaseFieldUDF().evaluate(line, "st"); // System.out.println(result); // } }
1.2.3 自定义 UDTF 函数(解析事件字段)
UDTF 函数特点:多行进多行出。 简称,多进多出。
1)创建包名:com.zsy.udtf
2)在 com.zsy.udtf 包下创建类名:EventJsonUDTF
3)用于展开业务字段
package com.zsy.udtf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.json.JSONArray; import org.json.JSONException; import java.util.ArrayList; import java.util.List; public class EventJsonUDTF extends GenericUDTF { @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { // 定义UDTF返回值类型和名称 List<String> fieldName = new ArrayList<>(); List<ObjectInspector> fieldType = new ArrayList<>(); fieldName.add("event_name"); fieldName.add("event_json"); fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldType); } @Override public void process(Object[] objects) throws HiveException { // 1.获取传入的数据,传入的是Json array =》 UDF传入et String input = objects[0].toString(); // 2.校验 if (StringUtils.isBlank(input)) { return; } else { JSONArray ja = new JSONArray(input); if (ja == null) { return; } // 循环遍历array当中的每一个元素,封装成 事件名称和事件内容 for (int i = 0; i < ja.length(); i++) { String[] result = new String[2]; try { result[0] = ja.getJSONObject(i).getString("en"); result[1] = ja.getString(i); } catch (JSONException ex) { continue; } // 写出数据 forward(result); } } } @Override public void close() throws HiveException { } }
4)打包,上传到HDFS的 /user/hive/jars
hdfs dfs -mkdir /user/hive/jars
hdfs dfs -put ./hivefunction-1.0-SNAPSHOT.jar /user/hive/jars
注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
jar 包,然后重启 Hive 客户端即可
1.2.4 解析事件日志基础明细表
1)解析事件日志基础明细表
insert overwrite table dwd_base_event_log partition(dt='2020-03-10') select base_analizer(line,'mid') as mid_id, base_analizer(line,'uid') as user_id, base_analizer(line,'vc') as version_code, base_analizer(line,'vn') as version_name, base_analizer(line,'l') as lang, base_analizer(line,'sr') as source, base_analizer(line,'os') as os, base_analizer(line,'ar') as area, base_analizer(line,'md') as model, base_analizer(line,'ba') as brand, base_analizer(line,'sv') as sdk_version, base_analizer(line,'g') as gmail, base_analizer(line,'hw') as height_width, base_analizer(line,'t') as app_time, base_analizer(line,'nw') as network, base_analizer(line,'ln') as lng, base_analizer(line,'la') as lat, event_name, event_json, base_analizer(line,'st') as server_time from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json where dt='2020-03-10' and base_analizer(line,'et')<>'';
2)测试
select * from dwd_base_event_log where dt='2020-03-10' limit 2;
1.2.5 DWD 层数据解析脚本
1)vim ods_to_dwd_base_log.sh
在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall hive=/opt/modules/hive/bin/hive # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" use gmall; insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date') select base_analizer(line,'mid') as mid_id, base_analizer(line,'uid') as user_id, base_analizer(line,'vc') as version_code, base_analizer(line,'vn') as version_name, base_analizer(line,'l') as lang, base_analizer(line,'sr') as source, base_analizer(line,'os') as os, base_analizer(line,'ar') as area, base_analizer(line,'md') as model, base_analizer(line,'ba') as brand, base_analizer(line,'sv') as sdk_version, base_analizer(line,'g') as gmail, base_analizer(line,'hw') as height_width, base_analizer(line,'t') as app_time, base_analizer(line,'nw') as network, base_analizer(line,'ln') as lng, base_analizer(line,'la') as lat, event_name, event_json, base_analizer(line,'st') as server_time from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as event_name,event_json where dt='$do_date' and base_analizer(line,'et')<>''; " $hive -e "$sql"
注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
2)增加脚本执行权限
chmod 770 ods_to_dwd_base_log.sh
3)脚本使用
ods_to_dwd_base_log.sh 2020-03-11
4)查询导入结果
select * from dwd_base_event_log where dt='2020-03-11' limit 2;
1.3 DWD 层(用户行为事件表获取)
<ignore_js_op style="overflow-wrap: break-word; color: rgb(68, 68, 68); font-family: "Microsoft Yahei", tahoma, arial, "Hiragino Sans GB", 宋体, sans-serif;">
1.3.1 商品点击表
<ignore_js_op style="overflow-wrap: break-word; color: rgb(68, 68, 68); font-family: "Microsoft Yahei", tahoma, arial, "Hiragino Sans GB", 宋体, sans-serif;">
1)建表语句
drop table if exists dwd_display_log; CREATE EXTERNAL TABLE dwd_display_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `action` string, `goodsid` string, `place` string, `extend1` string, `category` string, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_display_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_display_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.place') place, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.category') category, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='display';
3)测试
select * from dwd_display_log where dt='2020-03-10' limit 2;
1.3.2 商品详情页表
1)建表语句
drop table if exists dwd_newsdetail_log; CREATE EXTERNAL TABLE dwd_newsdetail_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `action` string, `goodsid` string, `showtype` string, `news_staytime` string, `loading_time` string, `type1` string, `category` string, `server_time` string) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_newsdetail_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_newsdetail_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.showtype') showtype, get_json_object(event_json,'$.kv.news_staytime') news_staytime, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.type1') type1, get_json_object(event_json,'$.kv.category') category, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='newsdetail';
3)测试
select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;
1.3.3 商品列表页表
1)建表语句
drop table if exists dwd_loading_log; CREATE EXTERNAL TABLE dwd_loading_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `action` string, `loading_time` string, `loading_way` string, `extend1` string, `extend2` string, `type` string, `type1` string, `server_time` string) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_loading_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_loading_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.loading_way') loading_way, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.extend2') extend2, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.type1') type1, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='loading';
3)测试
hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;
1.3.4 广告表
1)建表语句
drop table if exists dwd_ad_log; CREATE EXTERNAL TABLE dwd_ad_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `entry` string, `action` string, `contentType` string, `displayMills` string, `itemId` string, `activityId` string, `server_time` string) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_ad_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_ad_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.contentType') contentType, get_json_object(event_json,'$.kv.displayMills') displayMills, get_json_object(event_json,'$.kv.itemId') itemId, get_json_object(event_json,'$.kv.activityId') activityId, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='ad';
3)测试
select * from dwd_ad_log where dt='2020-03-10' limit 2;
1.3.5 消息通知表
1)建表语句
drop table if exists dwd_notification_log; CREATE EXTERNAL TABLE dwd_notification_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `action` string, `noti_type` string, `ap_time` string, `content` string, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_notification_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_notification_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.noti_type') noti_type, get_json_object(event_json,'$.kv.ap_time') ap_time, get_json_object(event_json,'$.kv.content') content, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='notification';
3)测试
select * from dwd_notification_log where dt='2020-03-10' limit 2;
1.3.6 用户后台活跃表
1)建表语句
drop table if exists dwd_active_background_log; CREATE EXTERNAL TABLE dwd_active_background_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `active_source` string, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_background_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_active_background_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.active_source') active_source, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='active_background';
3)测试
select * from dwd_active_background_log where dt='2020-03-10' limit 2;
1.3.7 评论表
1)建表语句
drop table if exists dwd_comment_log; CREATE EXTERNAL TABLE dwd_comment_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `comment_id` int, `userid` int, `p_comment_id` int, `content` string, `addtime` string, `other_id` int, `praise_count` int, `reply_count` int, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_comment_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_comment_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.comment_id') comment_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.p_comment_id') p_comment_id, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.addtime') addtime, get_json_object(event_json,'$.kv.other_id') other_id, get_json_object(event_json,'$.kv.praise_count') praise_count, get_json_object(event_json,'$.kv.reply_count') reply_count, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='comment';
3)测试
select * from dwd_comment_log where dt='2020-03-10' limit 2;
1.3.8 收藏表
1)建表语句
drop table if exists dwd_favorites_log; CREATE EXTERNAL TABLE dwd_favorites_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `id` int, `course_id` int, `userid` int, `add_time` string, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_favorites_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_favorites_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.course_id') course_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.add_time') add_time, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='favorites';
3)测试
select * from dwd_favorites_log where dt='2020-03-10' limit 2;
1.3.9 点赞表
1)建表语句
drop table if exists dwd_praise_log; CREATE EXTERNAL TABLE dwd_praise_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `id` string, `userid` string, `target_id` string, `type` string, `add_time` string, `server_time` string ) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_praise_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_praise_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.target_id') target_id, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.add_time') add_time, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='praise';
3)测试
select * from dwd_praise_log where dt='2020-03-10' limit 2;
1.3.10 错误日志表
1)建表语句
drop table if exists dwd_error_log; CREATE EXTERNAL TABLE dwd_error_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `errorBrief` string, `errorDetail` string, `server_time` string) PARTITIONED BY (dt string) stored as parquet location '/warehouse/gmall/dwd/dwd_error_log/' TBLPROPERTIES('parquet.compression'='lzo');
2)导入数据
insert overwrite table dwd_error_log PARTITION (dt='2020-03-10') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.errorBrief') errorBrief, get_json_object(event_json,'$.kv.errorDetail') errorDetail, server_time from dwd_base_event_log where dt='2020-03-10' and event_name='error';
3)测试
select * from dwd_error_log where dt='2020-03-10' limit 2;
1.3.11 DWD 层事件表加载数据脚本
1) vim ods_to_dwd_event_log.sh
在脚本中编写如下内容
#!/bin/bash # 定义变量方便修改 APP=gmall hive=/opt/modules/hive/bin/hive # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" insert overwrite table "$APP".dwd_display_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.place') place, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='display'; insert overwrite table "$APP".dwd_newsdetail_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.showtype') showtype, get_json_object(event_json,'$.kv.news_staytime') news_staytime, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.type1') type1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='newsdetail'; insert overwrite table "$APP".dwd_loading_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.loading_way') loading_way, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.extend2') extend2, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.type1') type1, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='loading'; insert overwrite table "$APP".dwd_ad_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.contentType') contentType, get_json_object(event_json,'$.kv.displayMills') displayMills, get_json_object(event_json,'$.kv.itemId') itemId, get_json_object(event_json,'$.kv.activityId') activityId, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='ad'; insert overwrite table "$APP".dwd_notification_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.noti_type') noti_type, get_json_object(event_json,'$.kv.ap_time') ap_time, get_json_object(event_json,'$.kv.content') content, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='notification'; insert overwrite table "$APP".dwd_active_background_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.active_source') active_source, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='active_background'; insert overwrite table "$APP".dwd_comment_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.comment_id') comment_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.p_comment_id') p_comment_id, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.addtime') addtime, get_json_object(event_json,'$.kv.other_id') other_id, get_json_object(event_json,'$.kv.praise_count') praise_count, get_json_object(event_json,'$.kv.reply_count') reply_count, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='comment'; insert overwrite table "$APP".dwd_favorites_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.course_id') course_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='favorites'; insert overwrite table "$APP".dwd_praise_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.target_id') target_id, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='praise'; insert overwrite table "$APP".dwd_error_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.errorBrief') errorBrief, get_json_object(event_json,'$.kv.errorDetail') errorDetail, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='error'; " $hive -e "$sql"
2)增加脚本执行权限
chmod 770 ods_to_dwd_event_log.sh
3)脚本使用
ods_to_dwd_event_log.sh 2020-03-11
4)查询导入结果
select * from dwd_comment_log where dt='2020-03-11' limit 2;
结束语
本章对ODS层的用户行为数据进行了解析,构建并将数据导入到了DWD层,下章将会对ODS层的业务数据解析,导入DWD层!
注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
jar 包,然后重启 Hive 客户端即可
1.2.4 解析事件日志基础明细表
1)解析事件日志基础明细表