zoukankan      html  css  js  c++  java
  • 数据仓库 用户行为数仓 DWD数据明细层操作示例

    DWD(Data Warehouse Detail):数据明细层,结构和粒度与原始表保持一致,对ODS层数据进行清洗(取出空值、脏数据、超过极限范围的数据)。

    DWD层的数据来源于ODS原始数据层,在原始数据层的Hive表里,只有一个字段,存储了原始的一条条日志信息,下面以事件(如商品点击事件,展示详情事件)日志来说明,原始日志如下:

    1593095829089|{
        "cm":{
            "ln":"-89.3",
            "sv":"V2.6.6",
            "os":"8.0.3",
            "g":"SU1Z29ZJ@gmail.com",
            "mid":"1",
            "nw":"3G",
            "l":"en",
            "vc":"3",
            "hw":"640*1136",
            "ar":"MX",
            "uid":"1",
            "t":"1593002588300",
            "la":"-16.2",
            "md":"sumsung-3",
            "vn":"1.2.2",
            "ba":"Sumsung",
            "sr":"D"
        },
        "ap":"app",
        "et":[
            {
                "ett":"1593077273840",
                "en":"display",
                "kv":{
                    "goodsid":"0",
                    "action":"2",
                    "extend1":"2",
                    "place":"1",
                    "category":"93"
                }
            },
            {
                "ett":"1593052169678",
                "en":"loading",
                "kv":{
                    "extend2":"",
                    "loading_time":"54",
                    "action":"1",
                    "extend1":"",
                    "type":"1",
                    "type1":"102",
                    "loading_way":"1"
                }
            },
            {
                "ett":"1593013890514",
                "en":"notification",
                "kv":{
                    "ap_time":"1593003516686",
                    "action":"4",
                    "type":"2",
                    "content":""
                }
            },
            {
                "ett":"1592999171192",
                "en":"error",
                "kv":{
                    "errorDetail":"java.lang.NullPointerException\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound",
                    "errorBrief":"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"
                }
            },
            {
                "ett":"1593002958311",
                "en":"comment",
                "kv":{
                    "p_comment_id":1,
                    "addtime":"1593079427374",
                    "praise_count":188,
                    "other_id":0,
                    "comment_id":9,
                    "reply_count":193,
                    "userid":3,
                    "content":"涂士震嫩庙胞洪邮骗具捶赣锗塌舅捕沥爷"
                }
            },
            {
                "ett":"1593052803303",
                "en":"favorites",
                "kv":{
                    "course_id":4,
                    "id":0,
                    "add_time":"1593044515996",
                    "userid":7
                }
            },
            {
                "ett":"1593095771819",
                "en":"praise",
                "kv":{
                    "target_id":8,
                    "id":5,
                    "type":4,
                    "add_time":"1593000096852",
                    "userid":8
                }
            }]
    }

    数据格式为服务器时间|事件json,json中又包括公共字段cm,数据来源ap,以及事件数组et。由于事件是一段时间提交一次,是一个包含了多个不同类型事件的json数组,用en字段区分不同的事件,如display表示商品点击事件。因此在这里的处理需要经过两步,首先将ODS表中的长传json解析成一个个字段的DWD层的基础明细表,并且利用UDTF函数,将事件数组中的每个事件炸裂开,这些数据全部放在基础明细表里。然后针对不同的事件,将某一类事件过滤出来,并且取出事件中的kv值,放在特定的某一事件的DWD明细表中。

    一 基础事件明细表

    基础事件明细表包含了所有类型的事件数据,需要定义一个UDF函数,用来拆分长串的日志,将其处理成一个规则的格式,即以 分隔的字符串,后续可以通过hive自带的split函数转化成数组,利用下标取值。

    public class BaseFieldUDF extends UDF {
    
        public String evaluate(String line,String keysStr){
            String[] keysArr = keysStr.split(",");
    
            //原始时间日志格式:时间|json日志
            String[] logContent = line.split("\|");
            if (logContent.length != 2 || StringUtils.isBlank(logContent[1])){
                return "";
            }
    
            StringBuffer sb = new StringBuffer();
            try {
                //拼接公共字段
                JSONObject jsonObject = new JSONObject(logContent[1]);
                JSONObject cm = jsonObject.getJSONObject("cm");
                for (int i = 0; i < keysArr.length; i++) {
                    String key = keysArr[i].trim();
                    if (cm.has(key)){
                        sb.append(cm.getString(key)).append("	");
                    }
    
                }
    
                //拼接事件字段
                sb.append(jsonObject.getString("et")).append("	");
    
                //拼接服务器时间
                sb.append(logContent[0]).append("	");
    
    
            } catch (JSONException e) {
                e.printStackTrace();
            }
    
            return sb.toString();
        }
        
    }

    然后定义一个UDTF函数,用来对事件数组进行炸裂。传入的是1行1列的事件数组,返回的是2列多行的数据,第1列是事件名,稍后利用这个事件名过滤出不同的事件明细表,第2列是事件的详情kv信息。

    public class EventJsonUDTF extends GenericUDTF {
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
            List<String> fieldNames = new ArrayList<>();
            List<ObjectInspector> fieldTypes = new ArrayList<>();
    
            fieldNames.add("event_name");
            fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            fieldNames.add("event_json");
            fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldTypes);
        }
    
        @Override
        public void process(Object[] objects) throws HiveException {
            //获取输入数据
            String input = objects[0].toString();
    
            if (StringUtils.isBlank(input)){
                return;
            }else {
                try {
                    JSONArray ja = new JSONArray(input);
                    String[] result = new String[2];
    
                    for (int i = 0; i < ja.length(); i++) {
    
                        try {
                            result[0] = ja.getJSONObject(i).getString("en");
                            result[1] = ja.getString(i);
                        } catch (JSONException e) {
                            //防止因为某个数据的错误结束整个循环
                            continue;
                        }
                    }
    
                    //进来一行数据,返回2列多行数据
                    forward(result);
    
                } catch (JSONException e) {
                    e.printStackTrace();
                }
            }
    
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }

    接下来就是创建存储事件基础明细需要的表。event_name和event_json字段就是利用UDTF函数得到的结果。

    drop table if exists dwd_base_event_log;
    CREATE EXTERNAL TABLE dwd_base_event_log(
    `mid_id` string,
    `user_id` string, 
    `version_code` string, 
    `version_name` string, 
    `lang` string, 
    `source` string, 
    `os` string, 
    `area` string, 
    `model` string,
    `brand` string, 
    `sdk_version` string, 
    `gmail` string, 
    `height_width` string, 
    `app_time` string, 
    `network` string, 
    `lng` string, 
    `lat` string, 
    `event_name` string, 
    `event_json` string, 
    `server_time` string)
    PARTITIONED BY (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_base_event_log/';

    然后利用脚本将数据导入到基础明细表。

    ①需要在执行的sql中添加自定的UDF函数base_analizer,和UDTF函数flat_analizer。

    ②where条件中加了 base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'',是因为在我们自定义的UDF函数中如果数据错误,会返回"",所以在这里将其过滤掉。

    ③因为分区字段赋值了do_date,非严格模式似乎并没有必要。

    ④UDTF函数返回2列的写法 lateral view flat_analizer(ops) tmp_k as event_name, event_json

    ⑤因为我们建的是分区表,因此insert overwrite只会覆盖当前分区的数据,并不会覆盖表中的全部分区的数据。

    #!/bin/bash
    
    # 定义变量方便修改
    APP=gmall
    hive=/opt/module/hive/bin/hive
    
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
            do_date=$1
    else 
            do_date=`date -d "-1 day" +%F`  
    fi 
    
    sql="
            add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar;
    
            create temporary function base_analizer as 'com.atguigu.udf.BaseFieldUDF';
            create temporary function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF';
    
            set hive.exec.dynamic.partition.mode=nonstrict;
    
            insert overwrite table "$APP".dwd_base_event_log 
            PARTITION (dt='$do_date')
            select
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source ,
            os ,
            area ,
            model ,
            brand ,
            sdk_version ,
            gmail ,
            height_width ,
            network ,
            lng ,
            lat ,
            app_time ,
            event_name , 
            event_json , 
            server_time  
            from
            (
            select
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[0]   as mid_id,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[1]   as user_id,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[2]   as version_code,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[3]   as version_name,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[4]   as lang,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[5]   as source,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[6]   as os,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[7]   as area,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[8]   as model,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[9]   as brand,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[10]   as sdk_version,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[11]  as gmail,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[12]  as height_width,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[13]  as app_time,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[14]  as network,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[15]  as lng,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[16]  as lat,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[17]  as ops,
            split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'	')[18]  as server_time
            from "$APP".ods_event_log where dt='$do_date'  and base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' 
            ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
    "
    
    $hive -e "$sql"

    二 特定事件明细表

    特定事件明细表与基础事件明细表的字段大体一样,只是有2处改动

    ①去掉event_name字段,因为此表中存的就是这一类事件,不再需要event_name来区分。

    ②将event_json中描述事件详情的kv取出来,形成新的字段。

    以商品点击表为例,建表语句如下。去掉了event_name字段,新增了kv信息中的action,goodsid,place,extend1,category五个字段。

    drop table if exists dwd_display_log;
    CREATE EXTERNAL TABLE dwd_display_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `goodsid` string,
    `place` string,
    `extend1` string,
    `category` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    location '/warehouse/gmall/dwd/dwd_display_log/';

    然后是利用脚本,将数据从事件基础明细表,导入到特定的事件明细表。下面是一个包含了商品点击,详情,列表,广告,消息通知等事件的完整脚本,虽然很长,但是每一种事件的处理逻辑都是一样的。

    ①get_json_object(event_json,'$.kv.action')是一个hive内置的函数,可以从json串中取值,$符号表示此json本身。

    ②where dt='$do_date' and event_name='display' 通过上一步处理的事件名称来区分,以导入不同的事件明细表。

    #!/bin/bash
    
    # 定义变量方便修改
    APP=gmall
    hive=/opt/module/hive/bin/hive
    
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
            do_date=$1
    else 
            do_date=`date -d "-1 day" +%F`  
    fi 
    
    sql="
    set hive.exec.dynamic.partition.mode=nonstrict;
    
    insert overwrite table "$APP".dwd_display_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.action') action,
            get_json_object(event_json,'$.kv.goodsid') goodsid,
            get_json_object(event_json,'$.kv.place') place,
            get_json_object(event_json,'$.kv.extend1') extend1,
            get_json_object(event_json,'$.kv.category') category,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='display';
    
    
    insert overwrite table "$APP".dwd_newsdetail_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.entry') entry,
            get_json_object(event_json,'$.kv.action') action,
            get_json_object(event_json,'$.kv.goodsid') goodsid,
            get_json_object(event_json,'$.kv.showtype') showtype,
            get_json_object(event_json,'$.kv.news_staytime') news_staytime,
            get_json_object(event_json,'$.kv.loading_time') loading_time,
            get_json_object(event_json,'$.kv.type1') type1,
            get_json_object(event_json,'$.kv.category') category,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='newsdetail';
    
    
    insert overwrite table "$APP".dwd_loading_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.action') action,
            get_json_object(event_json,'$.kv.loading_time') loading_time,
            get_json_object(event_json,'$.kv.loading_way') loading_way,
            get_json_object(event_json,'$.kv.extend1') extend1,
            get_json_object(event_json,'$.kv.extend2') extend2,
            get_json_object(event_json,'$.kv.type') type,
            get_json_object(event_json,'$.kv.type1') type1,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='loading';
    
    
    insert overwrite table "$APP".dwd_ad_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.entry') entry,
            get_json_object(event_json,'$.kv.action') action,
            get_json_object(event_json,'$.kv.content') content,
            get_json_object(event_json,'$.kv.detail') detail,
            get_json_object(event_json,'$.kv.source') ad_source,
            get_json_object(event_json,'$.kv.behavior') behavior,
            get_json_object(event_json,'$.kv.newstype') newstype,
            get_json_object(event_json,'$.kv.show_style') show_style,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='ad';
    
    
    insert overwrite table "$APP".dwd_notification_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.action') action,
            get_json_object(event_json,'$.kv.noti_type') noti_type,
            get_json_object(event_json,'$.kv.ap_time') ap_time,
            get_json_object(event_json,'$.kv.content') content,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='notification';
    
    
    insert overwrite table "$APP".dwd_active_foreground_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
    get_json_object(event_json,'$.kv.push_id') push_id,
    get_json_object(event_json,'$.kv.access') access,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='active_foreground';
    
    
    insert overwrite table "$APP".dwd_active_background_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.active_source') active_source,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='active_background';
    
    
    insert overwrite table "$APP".dwd_comment_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.comment_id') comment_id,
            get_json_object(event_json,'$.kv.userid') userid,
            get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
            get_json_object(event_json,'$.kv.content') content,
            get_json_object(event_json,'$.kv.addtime') addtime,
            get_json_object(event_json,'$.kv.other_id') other_id,
            get_json_object(event_json,'$.kv.praise_count') praise_count,
            get_json_object(event_json,'$.kv.reply_count') reply_count,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='comment';
    
    
    insert overwrite table "$APP".dwd_favorites_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.id') id,
            get_json_object(event_json,'$.kv.course_id') course_id,
            get_json_object(event_json,'$.kv.userid') userid,
            get_json_object(event_json,'$.kv.add_time') add_time,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='favorites';
    
    
    insert overwrite table "$APP".dwd_praise_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.id') id,
            get_json_object(event_json,'$.kv.userid') userid,
            get_json_object(event_json,'$.kv.target_id') target_id,
            get_json_object(event_json,'$.kv.type') type,
            get_json_object(event_json,'$.kv.add_time') add_time,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='praise';
    
    
    insert overwrite table "$APP".dwd_error_log
    PARTITION (dt='$do_date')
    select 
            mid_id,
            user_id,
            version_code,
            version_name,
            lang,
            source,
            os,
            area,
            model,
            brand,
            sdk_version,
            gmail,
            height_width,
            app_time,
            network,
            lng,
            lat,
            get_json_object(event_json,'$.kv.errorBrief') errorBrief,
            get_json_object(event_json,'$.kv.errorDetail') errorDetail,
            server_time
    from "$APP".dwd_base_event_log 
    where dt='$do_date' and event_name='error';
    "
    
    $hive -e "$sql"
  • 相关阅读:
    summary
    谷歌浏览器Software Reporter Tool长时间占用CPU解决办法
    进栈 出栈 标准用法
    C语言保证,0永远不是有效的数据地址,因此,返回址0可用来表示发生的异常事件
    寄存器是内存阶层中的最顶端,也是系统获得操作资料的最快速途径。 存于寄存器内的地址可用来指向内存的某个位置,即寻址
    对内存分配的理解 自动变量 局部变量 临时变量 外部变量 字符串长度 C语言可以看成由一些列的外部对象构成
    ORM Active Record Data Mapper
    summary
    c预处理器
    #include<stdio.h> #include "stdio.h"
  • 原文地址:https://www.cnblogs.com/noyouth/p/13193473.html
Copyright © 2011-2022 走看看