zoukankan      html  css  js  c++  java
  • 项目实战从0到1之hive(25)企业级数据仓库构建(七):搭建DWD 层

    一、数仓搭建 - DWD 层

    • 1)对用户行为数据解析
    • 2)对核心数据进行判空过滤
    • 3)对业务数据采用维度模型重新建模,即维度退化


    1.1 DWD 层(用户行为启动表数据解析)



    1.1.1 创建启动表 1)建表语句

    drop table if exists dwd_start_log;
    CREATE EXTERNAL TABLE dwd_start_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `open_ad_type` string,
    `action` string,
    `loading_time` string,
    `detail` string,
    `extend1` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_start_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    说明:数据采用 parquet 存储方式,是可以支持切片的,不需要再对数据创建索引

    1.1.2 get_json_object 函数使用

    1)输入数据 xjson

    Xjson=[{"name":" 大 郎 ","sex":" 男 ","age":"25"},{"name":" 西 门 庆 ","sex":" 男","age":"47"}]

    2)取出第一个 json 对象

    SELECT get_json_object(xjson,"$.[0]") FROM person;

    结果是:{“name”:“大郎”,“sex”:“男”,“age”:“25”}

    3)取出第一个 json 的 age 字段的值

    SELECT get_json_object(xjson,"$.[0].age") FROM person;

    结果是:25

    1.1.3 向启动表导入数据

    insert overwrite table dwd_start_log
    PARTITION (dt='2020-03-10')
    select
    get_json_object(line,'$.mid') mid_id,
    get_json_object(line,'$.uid') user_id,
    get_json_object(line,'$.vc') version_code,
    get_json_object(line,'$.vn') version_name,
    get_json_object(line,'$.l') lang,
    get_json_object(line,'$.sr') source,
    get_json_object(line,'$.os') os,
    get_json_object(line,'$.ar') area,
    get_json_object(line,'$.md') model,
    get_json_object(line,'$.ba') brand,
    get_json_object(line,'$.sv') sdk_version,
    get_json_object(line,'$.g') gmail,
    get_json_object(line,'$.hw') height_width,
    get_json_object(line,'$.t') app_time,
    get_json_object(line,'$.nw') network,
    get_json_object(line,'$.ln') lng,
    get_json_object(line,'$.la') lat,
    get_json_object(line,'$.entry') entry,
    get_json_object(line,'$.open_ad_type') open_ad_type,
    get_json_object(line,'$.action') action,
    get_json_object(line,'$.loading_time') loading_time,
    get_json_object(line,'$.detail') detail,
    get_json_object(line,'$.extend1') extend1
    from ods_start_log
    where dt='2020-03-10';

    3)测试

    select * from dwd_start_log where dt='2020-03-10' limit 2;

    1.1.4 DWD 层启动表加载数据脚本

    1)vim ods_to_dwd_log.sh
    在脚本中编写如下内容

    #!/bin/bash
    # 定义变量方便修改
    APP=gmall
    hive=/opt/modules/hive/bin/hive
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table "$APP".dwd_start_log
    PARTITION (dt='$do_date')
    select
    get_json_object(line,'$.mid') mid_id,
    get_json_object(line,'$.uid') user_id,
    get_json_object(line,'$.vc') version_code,
    get_json_object(line,'$.vn') version_name,
    get_json_object(line,'$.l') lang,
    get_json_object(line,'$.sr') source,
    get_json_object(line,'$.os') os,
    get_json_object(line,'$.ar') area,
    get_json_object(line,'$.md') model,
    get_json_object(line,'$.ba') brand,
    get_json_object(line,'$.sv') sdk_version,
    get_json_object(line,'$.g') gmail,
    get_json_object(line,'$.hw') height_width,
    get_json_object(line,'$.t') app_time,
    get_json_object(line,'$.nw') network,
    get_json_object(line,'$.ln') lng,
    get_json_object(line,'$.la') lat,
    get_json_object(line,'$.entry') entry,
    get_json_object(line,'$.open_ad_type') open_ad_type,
    get_json_object(line,'$.action') action,
    get_json_object(line,'$.loading_time') loading_time,
    get_json_object(line,'$.detail') detail,
    get_json_object(line,'$.extend1') extend1
    from "$APP".ods_start_log
    where dt='$do_date';
    "
    $hive -e "$sql"

    2)增加脚本执行权限

    chmod 770 ods_to_dwd_log.sh 3)脚本使用
    ods_to_dwd_log.sh 2020-03-11 4)查询导入结果
    select * from dwd_start_log where dt='2020-03-11' limit 2;

    1.2 DWD 层(用户行为事件表数据解析)




    1.2.1 创建基础明细表

    明细表用于存储 ODS 层原始表转换过来的明细数据

    1)创建事件日志基础明细表

    drop table if exists dwd_base_event_log;
    CREATE EXTERNAL TABLE dwd_base_event_log(
      `mid_id` string,
      `user_id` string,
      `version_code` string,
      `version_name` string,
      `lang` string,
      `source` string,
      `os` string,
      `area` string,
      `model` string,
      `brand` string,
      `sdk_version` string,
      `gmail` string,
      `height_width` string,
      `app_time` string,
      `network` string,
      `lng` string,
      `lat` string,
      `event_name` string,
      `event_json` string,
      `server_time` string)
    PARTITIONED BY (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_base_event_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)说明:其中 event_name 和 event_json 用来对应事件名和整个事件。这个地方将原始日志1 对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到 UDF 和 UDTF

    1.2.2 自定义 UDF 函数(解析公共字段)

    UDF 函数特点:一行进一行出。简称,一进一出


    1)创建一个 maven 工程:hivefunction 2)创建包名:com.zsy.udf
    3)在 pom.xml 文件中添加如下内容

    <properties>
            <hive.version>2.3.0</hive.version>
    </properties>
    
    <repositories>
            <repository>
                    <id>spring-plugin</id>
                    <url>https://repo.spring.io/plugins-release/</url>
            </repository>
    </repositories>
    
    <dependencies>
            <!--添加 hive 依赖-->
            <dependency>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-exec</artifactId>
                    <version>${hive.version}</version>
            </dependency>
    </dependencies>
    
    <build>
            <plugins>
                    <plugin>
                            <artifactId>maven-compiler-plugin</artifactId>
                            <version>2.3.2</version>
                            <configuration>
                                    <source>1.8</source>
                                    <target>1.8</target>
                            </configuration>
                    </plugin>
                    <plugin>
                            <artifactId>maven-assembly-plugin</artifactId>
                            <configuration>
                                    <descriptorRefs>
                                            <descriptorRef>jar-with-dependencies</descriptorRef>
                                    </descriptorRefs>
                            </configuration>
                            <executions>
                                    <execution>
                                            <id>make-assembly</id>
                                            <phase>package</phase>
                                            <goals>
                                                    <goal>single</goal>
                                            </goals>
                                    </execution>
                            </executions>
                    </plugin>
            </plugins>
    </build>

    注意 1:如果 hive 的 jar 包下载失败,可以将如下参数配置添加到 idea 中

    -Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true
    -Dmaven.wagon.http.ssl.ignore.validity.dates=true




    详情请点击博客&#128073;:maven下载依赖时候忽略SSL证书校验
    注意 2:打包时如果出现如下错误,说明 idea 内存溢出

    Exception in thread "main" java.lang.StackOverflowError

    解决办法:把 -Xss4m 添加到下图位置


    4)UDF 用于解析公共字段

    package com.zsy.udf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.json.JSONObject;
    
    public class BaseFieldUDF extends UDF {
    
        public String evaluate(String line,String key){
            // 1.切分数据
            String[] log = line.split("\|");
    
            String result = "";
    
            // 2.校验
            if(log.length != 2 || StringUtils.isBlank(log[1])){
                return result;
            }
    
            // 3.解析数据获取json对象
            JSONObject json = new JSONObject(log[1].trim());
    
            // 4.根据传入的key获取对应的值
            if("st".equals(key)){
                result = log[0].trim();
            }else if("et".equals(key)){
                if(json.has("et")){
                    result = json.getString("et");
                }
            }else{
                JSONObject cm = json.getJSONObject("cm");
                if(cm.has(key)){
                    result = cm.getString(key);
                }
            }
            return result;
        }
    
        /**
         * 测试
         */
    //    public static void main(String[] args) {
    //        String line = "1583776132686|{"cm":{"ln":"-42.8","sv":"V2.3.9","os":"8.1.7","g":"X470IP70@gmail.com","mid":"0","nw":"4G","l":"en","vc":"13","hw":"1080*1920","ar":"MX","uid":"0","t":"1583758268106","la":"-31.3","md":"sumsung-18","vn":"1.1.1","ba":"Sumsung","sr":"M"},"ap":"app","et":[{"ett":"1583685512624","en":"display","kv":{"goodsid":"0","action":"2","extend1":"2","place":"1","category":"17"}},{"ett":"1583769686402","en":"newsdetail","kv":{"entry":"3","goodsid":"1","news_staytime":"16","loading_time":"0","action":"4","showtype":"5","category":"97","type1":""}},{"ett":"1583709065211","en":"ad","kv":{"activityId":"1","displayMills":"58537","entry":"1","action":"3","contentType":"0"}},{"ett":"1583693966746","en":"active_background","kv":{"active_source":"3"}},{"ett":"1583734521683","en":"error","kv":{"errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound","errorBrief":"at cn.lift.dfdf.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)"}},{"ett":"1583755388633","en":"praise","kv":{"target_id":0,"id":1,"type":3,"add_time":"1583713812739","userid":4}}]}";
    //        String result = new BaseFieldUDF().evaluate(line, "st");
    //        System.out.println(result);
    //    }
    }

    1.2.3 自定义 UDTF 函数(解析事件字段)

    UDTF 函数特点:多行进多行出。 简称,多进多出。


    1)创建包名:com.zsy.udtf
    2)在 com.zsy.udtf 包下创建类名:EventJsonUDTF
    3)用于展开业务字段

    package com.zsy.udtf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONException;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class EventJsonUDTF extends GenericUDTF {
    
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            // 定义UDTF返回值类型和名称
            List<String> fieldName = new ArrayList<>();
            List<ObjectInspector> fieldType = new ArrayList<>();
            fieldName.add("event_name");
            fieldName.add("event_json");
            fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldType);
        }
    
        @Override
        public void process(Object[] objects) throws HiveException {
            // 1.获取传入的数据,传入的是Json array =》 UDF传入et
            String input = objects[0].toString();
    
            // 2.校验
            if (StringUtils.isBlank(input)) {
                return;
            } else {
                JSONArray ja = new JSONArray(input);
                if (ja == null) {
                    return;
                }
                // 循环遍历array当中的每一个元素,封装成 事件名称和事件内容
                for (int i = 0; i < ja.length(); i++) {
                    String[] result = new String[2];
                    try {
                        result[0] = ja.getJSONObject(i).getString("en");
                        result[1] = ja.getString(i);
                    } catch (JSONException ex) {
                        continue;
                    }
                    // 写出数据
                    forward(result);
                }
            }
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }

    4)打包,上传到HDFS的 /user/hive/jars

    hdfs dfs -mkdir /user/hive/jars 
    
    hdfs dfs -put ./hivefunction-1.0-SNAPSHOT.jar /user/hive/jars

    注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
    jar 包,然后重启 Hive 客户端即可

    1.2.4 解析事件日志基础明细表

    1)解析事件日志基础明细表

    insert overwrite table dwd_base_event_log partition(dt='2020-03-10')
    select
    base_analizer(line,'mid') as mid_id,
    base_analizer(line,'uid') as user_id,
    base_analizer(line,'vc') as version_code,
    base_analizer(line,'vn') as version_name,
    base_analizer(line,'l') as lang,
    base_analizer(line,'sr') as source,
    base_analizer(line,'os') as os,
    base_analizer(line,'ar') as area,
    base_analizer(line,'md') as model,
    base_analizer(line,'ba') as brand,
    base_analizer(line,'sv') as sdk_version,
    base_analizer(line,'g') as gmail,
    base_analizer(line,'hw') as height_width,
    base_analizer(line,'t') as app_time,
    base_analizer(line,'nw') as network,
    base_analizer(line,'ln') as lng,
    base_analizer(line,'la') as lat,
    event_name,
    event_json,
    base_analizer(line,'st') as server_time
    from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as
    event_name,event_json
    where dt='2020-03-10' and base_analizer(line,'et')<>'';

    2)测试
    select * from dwd_base_event_log where dt='2020-03-10' limit 2;


    1.2.5 DWD 层数据解析脚本

    1)vim ods_to_dwd_base_log.sh
    在脚本中编写如下内容

    #!/bin/bash
    # 定义变量方便修改
    APP=gmall
    hive=/opt/modules/hive/bin/hive
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    use gmall;
    insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date')
    select
    base_analizer(line,'mid') as mid_id,
    base_analizer(line,'uid') as user_id,
    base_analizer(line,'vc') as version_code,
    base_analizer(line,'vn') as version_name,
    base_analizer(line,'l') as lang,
    base_analizer(line,'sr') as source,
    base_analizer(line,'os') as os,
    base_analizer(line,'ar') as area,
    base_analizer(line,'md') as model,
    base_analizer(line,'ba') as brand,
    base_analizer(line,'sv') as sdk_version,
    base_analizer(line,'g') as gmail,
    base_analizer(line,'hw') as height_width,
    base_analizer(line,'t') as app_time,
    base_analizer(line,'nw') as network,
    base_analizer(line,'ln') as lng,
    base_analizer(line,'la') as lat,
    event_name,
    event_json,
    base_analizer(line,'st') as server_time
    from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as
    event_name,event_json
    where dt='$do_date' and base_analizer(line,'et')<>''; "
    $hive -e "$sql"

    注意:使用自定义函数时,需要在执行脚本前,增加上要使用的库。例如:use gmall;
    2)增加脚本执行权限

    chmod 770 ods_to_dwd_base_log.sh

    3)脚本使用

    ods_to_dwd_base_log.sh 2020-03-11

    4)查询导入结果

    select * from dwd_base_event_log where dt='2020-03-11' limit 2;

    1.3 DWD 层(用户行为事件表获取)
    <ignore_js_op style="overflow-wrap: break-word; color: rgb(68, 68, 68); font-family: "Microsoft Yahei", tahoma, arial, "Hiragino Sans GB", 宋体, sans-serif;">

    1.3.1 商品点击表
    <ignore_js_op style="overflow-wrap: break-word; color: rgb(68, 68, 68); font-family: "Microsoft Yahei", tahoma, arial, "Hiragino Sans GB", 宋体, sans-serif;">

    1)建表语句

    drop table if exists dwd_display_log;
    CREATE EXTERNAL TABLE dwd_display_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `goodsid` string,
    `place` string,
    `extend1` string,
    `category` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_display_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_display_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.place') place,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='display';

    3)测试

    select * from dwd_display_log where dt='2020-03-10' limit 2;

    1.3.2 商品详情页表

    1)建表语句

    drop table if exists dwd_newsdetail_log;
    CREATE EXTERNAL TABLE dwd_newsdetail_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `action` string,
    `goodsid` string,
    `showtype` string,
    `news_staytime` string,
    `loading_time` string,
    `type1` string,
    `category` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_newsdetail_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.showtype') showtype,
    get_json_object(event_json,'$.kv.news_staytime') news_staytime,
    get_json_object(event_json,'$.kv.loading_time') loading_time,
    get_json_object(event_json,'$.kv.type1') type1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='newsdetail';

    3)测试

    select * from dwd_newsdetail_log where dt='2020-03-10' limit 2;

    1.3.3 商品列表页表
    1)建表语句

    drop table if exists dwd_loading_log;
    CREATE EXTERNAL TABLE dwd_loading_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `loading_time` string,
    `loading_way` string,
    `extend1` string,
    `extend2` string,
    `type` string,
    `type1` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_loading_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_loading_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.loading_time') loading_time,
    get_json_object(event_json,'$.kv.loading_way') loading_way,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.extend2') extend2,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.type1') type1,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='loading';

    3)测试

    hive (gmall)> select * from dwd_loading_log where dt='2020-03-10' limit 2;

    1.3.4 广告表

    1)建表语句

    drop table if exists dwd_ad_log;
    CREATE EXTERNAL TABLE dwd_ad_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `action` string,
    `contentType` string,
    `displayMills` string,
    `itemId` string,
    `activityId` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_ad_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_ad_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.contentType') contentType,
    get_json_object(event_json,'$.kv.displayMills') displayMills,
    get_json_object(event_json,'$.kv.itemId') itemId,
    get_json_object(event_json,'$.kv.activityId') activityId,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='ad';

    3)测试

    select * from dwd_ad_log where dt='2020-03-10' limit 2;

    1.3.5 消息通知表

    1)建表语句

    drop table if exists dwd_notification_log;
    CREATE EXTERNAL TABLE dwd_notification_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `noti_type` string,
    `ap_time` string,
    `content` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_notification_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_notification_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.noti_type') noti_type,
    get_json_object(event_json,'$.kv.ap_time') ap_time,
    get_json_object(event_json,'$.kv.content') content,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='notification';

    3)测试

    select * from dwd_notification_log where dt='2020-03-10' limit 2;

    1.3.6 用户后台活跃表
    1)建表语句

    drop table if exists dwd_active_background_log;
    CREATE EXTERNAL TABLE dwd_active_background_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `active_source` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_background_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_active_background_log PARTITION
    (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.active_source') active_source,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='active_background';

    3)测试

    select * from dwd_active_background_log where dt='2020-03-10' limit 2;

    1.3.7 评论表

    1)建表语句

    drop table if exists dwd_comment_log;
    CREATE EXTERNAL TABLE dwd_comment_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `comment_id` int,
    `userid` int,
    `p_comment_id` int,
    `content` string,
    `addtime` string,
    `other_id` int,
    `praise_count` int,
    `reply_count` int,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_comment_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_comment_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.comment_id') comment_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
    get_json_object(event_json,'$.kv.content') content,
    get_json_object(event_json,'$.kv.addtime') addtime,
    get_json_object(event_json,'$.kv.other_id') other_id,
    get_json_object(event_json,'$.kv.praise_count') praise_count,
    get_json_object(event_json,'$.kv.reply_count') reply_count,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='comment';

    3)测试

     select * from dwd_comment_log where dt='2020-03-10' limit 2;

    1.3.8 收藏表

    1)建表语句

    drop table if exists dwd_favorites_log;
    CREATE EXTERNAL TABLE dwd_favorites_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `id` int,
    `course_id` int,
    `userid` int,
    `add_time` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_favorites_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_favorites_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.course_id') course_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='favorites';

    3)测试

    select * from dwd_favorites_log where dt='2020-03-10' limit 2;

    1.3.9 点赞表

    1)建表语句

    drop table if exists dwd_praise_log;
    CREATE EXTERNAL TABLE dwd_praise_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `id` string,
    `userid` string,
    `target_id` string,
    `type` string,
    `add_time` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_praise_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_praise_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.target_id') target_id,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='praise';

    3)测试

    select * from dwd_praise_log where dt='2020-03-10' limit 2;

    1.3.10 错误日志表
    1)建表语句

    drop table if exists dwd_error_log;
    CREATE EXTERNAL TABLE dwd_error_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `errorBrief` string,
    `errorDetail` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_error_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    insert overwrite table dwd_error_log PARTITION (dt='2020-03-10')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.errorBrief') errorBrief,
    get_json_object(event_json,'$.kv.errorDetail') errorDetail,
    server_time
    from dwd_base_event_log
    where dt='2020-03-10' and event_name='error';

    3)测试

    select * from dwd_error_log where dt='2020-03-10' limit 2;

    1.3.11 DWD 层事件表加载数据脚本

    1) vim ods_to_dwd_event_log.sh
    在脚本中编写如下内容

    #!/bin/bash
    # 定义变量方便修改
    APP=gmall
    hive=/opt/modules/hive/bin/hive
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
    do_date=$1
    else
    do_date=`date -d "-1 day" +%F`
    fi
    sql="
    insert overwrite table "$APP".dwd_display_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.place') place,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='display';
    
    
    insert overwrite table "$APP".dwd_newsdetail_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.showtype') showtype,
    get_json_object(event_json,'$.kv.news_staytime')
    news_staytime,
    get_json_object(event_json,'$.kv.loading_time')
    loading_time,
    get_json_object(event_json,'$.kv.type1') type1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='newsdetail';
    
    
    insert overwrite table "$APP".dwd_loading_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.loading_time')
    loading_time,
    get_json_object(event_json,'$.kv.loading_way') loading_way,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.extend2') extend2,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.type1') type1,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='loading';
    
    
    insert overwrite table "$APP".dwd_ad_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.contentType') contentType,
    get_json_object(event_json,'$.kv.displayMills')
    displayMills,
    get_json_object(event_json,'$.kv.itemId') itemId,
    get_json_object(event_json,'$.kv.activityId') activityId,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='ad';
    
    
    insert overwrite table "$APP".dwd_notification_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.noti_type') noti_type,
    get_json_object(event_json,'$.kv.ap_time') ap_time,
    get_json_object(event_json,'$.kv.content') content,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='notification';
    
    
    insert overwrite table "$APP".dwd_active_background_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.active_source')
    active_source,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='active_background';
    
    
    insert overwrite table "$APP".dwd_comment_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.comment_id') comment_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.p_comment_id')
    p_comment_id,
    get_json_object(event_json,'$.kv.content') content,
    get_json_object(event_json,'$.kv.addtime') addtime,
    get_json_object(event_json,'$.kv.other_id') other_id,
    get_json_object(event_json,'$.kv.praise_count')
    praise_count,
    get_json_object(event_json,'$.kv.reply_count') reply_count,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='comment';
    
    
    insert overwrite table "$APP".dwd_favorites_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.course_id') course_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='favorites';
    
    
    insert overwrite table "$APP".dwd_praise_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.target_id') target_id,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='praise';
    
    
    insert overwrite table "$APP".dwd_error_log
    PARTITION (dt='$do_date')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.errorBrief') errorBrief,
    get_json_object(event_json,'$.kv.errorDetail') errorDetail,
    server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='error';
    "
    
    $hive -e "$sql"

    2)增加脚本执行权限

    chmod 770 ods_to_dwd_event_log.sh

    3)脚本使用

    ods_to_dwd_event_log.sh 2020-03-11

    4)查询导入结果

    select * from dwd_comment_log where dt='2020-03-11' limit 2;

    结束语
    本章对ODS层的用户行为数据进行了解析,构建并将数据导入到了DWD层,下章将会对ODS层的业务数据解析,导入DWD层!

    注意:如果修改了自定义函数重新生成 jar 包怎么处理?只需要替换 HDFS 路径上的旧
    jar 包,然后重启 Hive 客户端即可

    1.2.4 解析事件日志基础明细表

    1)解析事件日志基础明细表

    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    Vue之登录基础交互
    Vue之Slot用法初探
    序言
    sqlserver下通用 行转列 函数(原创)
    A printf format reference page (cheat sheet)
    [PowerShell]HTML parsing -- get information from a website
    [Python]打印a..z的字符
    Takari Extensions for Apache Maven (TEAM)
    【转】LAMBDAFICATOR: Crossing the gap from imperative to functional programming through refactorings
    [转][Java]使用Spring配合Junit进行单元测试的总结
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13718011.html
Copyright © 2011-2022 走看看