zoukankan      html  css  js  c++  java
  • 离线数仓(五)

    6章 数仓搭建-DWD

      1)对用户行为数据解析。

      2)对业务数据采用维度模型重新建模。

    6.1 DWD层(用户行为日志)

    6.1.1 日志格式回顾

      (1)页面埋点日志

      (2)启动日志

    6.1.2 get_json_object函数使用

      1)数据

    [{"name":"大郎","sex":"","age":"25"},{"name":"西门庆","sex":"","age":"47"}]

      2)取出第一个json对象

    select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]','$[0]');

      3)取出第一个jsonage字段的值

    select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]','$[0].age');

    6.1.3 启动日志表

      启动日志解析思路:启动日志表中每行数据对应一个启动记录,一个启动记录应该包含日志中的公共信息和启动信息。先将所有包含start字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

      1)建表语句(数据采用parquet存储方式,是可以支持切片的,不需要再对数据创建索引。如果单纯的text方式存储数据,需要采用支持切片的,选用lzop压缩方式并创建索引)

    drop table if exists dwd_start_log;
    create external table dwd_start_log(
        `area_code` string comment '地区编码',
        `brand` string comment '手机品牌',
        `channel` string comment '渠道',
        `is_new` string comment '是否首次启动',
        `model` string comment '手机型号',
        `mid_id` string comment '设备id',
        `os` string comment '操作系统',
        `user_id` string comment '会员id',
        `version_code` string comment 'app版本号',
        `entry` string comment 'icon手机图标 notice 通知 install 安装后启动',
        `loading_time` bigint comment '启动加载时间',
        `open_ad_id` string comment '广告页ID ',
        `open_ad_ms` bigint comment '广告总共播放时间',
        `open_ad_skip_ms` bigint comment '用户跳过广告时点',
        `ts` bigint comment '时间'
    ) comment '启动日志表'
    partitioned by (`dt` string) --按时间分区
    stored as parquet   --采用parquet列式存储
    location '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
    tblproperties ('parquet.compression'='lzo') -- 采用LZO压缩

      2)数据导入

    insert overwrite table dwd_start_log partition (dt='2021-06-08')
    select
        --common中的字段
           get_json_object(line,'$.common.ar') area_code,
           get_json_object(line,'$.common.ba') brand,
           get_json_object(line,'$.common.ch') channel,
           get_json_object(line,'$.common.is_new') is_new,
           get_json_object(line,'$.common.md') model,
           get_json_object(line,'$.common.mid') mid_id,
           get_json_object(line,'$.common.os') os,
           get_json_object(line,'$.common.uid') user_id,
           get_json_object(line,'$.common.vc') version_code,
        --取start中的字段
           get_json_object(line,'$.start.entry') entry,
           get_json_object(line,'$.start.loading_time') loading_time,
           get_json_object(line,'$.start.open_ad_id') open_ad_id,
           get_json_object(line,'$.start.open_ad_ms') open_ad_ms,
           get_json_object(line,'$.start.open_ad_skip_ms') open_ad_skip_ms,
    
           get_json_object(line,'$.ts') ts
    from ods_log
    --过滤出启动日志,只有包含start属性的是启动日志
    where dt='2021-06-08'
    and get_json_object(line,'$.start') is not null;

      3)查看数据

    select * from dwd_start_log where dt='2021-06-08' limit 5;

    6.1.4 页面日志表

      页面日志解析思路:页面日志表中每行数据对应一个页面访问记录,一个页面访问记录应该包含日志中的公共信息和页面信息。先将所有包含page字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

      1)建表语句

    drop table if exists dwd_page_log;
    create external table dwd_page_log(
        `area_code` string comment '地区编码',
        `brand` string comment '手机品牌',
        `channel` string comment '渠道',
        `is_new` string comment '是否首次启动',
        `model` string comment '手机型号',
        `mid_id` string comment '设备id',
        `os` string comment '操作系统',
        `user_id` string comment '会员id',
        `version_code` string comment 'app版本号',
        `during_time` bigint comment '持续时间毫秒',
        `page_item` string comment '目标id',
        `page_item_type` string comment '目标类型',
        `last_page_id` string comment '上页类型',
        `page_id` string comment '页面ID ',
        `source_type` string comment '来源类型',
        `ts` bigint comment '时间'
    ) comment '页面日志表'
    partitioned by (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_page_log'
    tblproperties('parquet.compression'='lzo');

      2)数据导入

    insert overwrite table dwd_page_log partition (dt='2021-06-08')
    select
        --common中的字段
           get_json_object(line,'$.common.ar') area_code,
           get_json_object(line,'$.common.ba') brand,
           get_json_object(line,'$.common.ch') channel,
           get_json_object(line,'$.common.is_new') is_new,
           get_json_object(line,'$.common.md') model,
           get_json_object(line,'$.common.mid') mid_id,
           get_json_object(line,'$.common.os') os,
           get_json_object(line,'$.common.uid') user_id,
           get_json_object(line,'$.common.vc') version_code,
        --取page中的字段
           get_json_object(line,'$.page.during_time') during_time,
           get_json_object(line,'$.page.item') page_item,
           get_json_object(line,'$.page.item_type') page_item_type,
           get_json_object(line,'$.page.last_page_id') last_page_id,
           get_json_object(line,'$.page.page_id') page_id,
           get_json_object(line,'$.page.source_type') source_type,
    
           get_json_object(line,'$.ts') ts
    from ods_log
    --过滤出页面日志,只有包含page属性的是yem日志
    where dt='2021-06-08'
    and get_json_object(line,'$.page') is not null;

      3)查看数据

    select * from dwd_page_log where dt='2021-06-08' limit 5;

    6.1.5 动作日志表

      动作日志解析思路:动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含action字段的日志过滤出来,然后通过UDTF函数action数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段

      1)建表语句

    create external table dwd_action_log(
        `area_code` string comment '地区编码',
        `brand` string comment '手机品牌',
        `channel` string comment '渠道',
        `is_new` string comment '是否首次启动',
        `model` string comment '手机型号',
        `mid_id` string comment '设备id',
        `os` string comment '操作系统',
        `user_id` string comment '会员id',
        `version_code` string comment 'app版本号',
        `during_time` bigint comment '持续时间毫秒',
        `page_item` string comment '目标id',
        `page_item_type` string comment '目标类型',
        `last_page_id` string comment '上页类型',
        `page_id` string comment '页面id',
        `source_type` string comment '来源类型',
        `action_id` string comment '动作id',
        `item` string comment '目标id ',
        `item_type` string comment '目标类型',
        `ts` bigint comment '时间'
    ) comment '动作日志表'
    partitioned by (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_action_log'
    tblproperties ('parquet.compression'='lzo');

      2)创建UDTF函数——设计思路

      3)创建UDTF函数——编写代码

        (1)创建一个maven工程hivefunction

        (2)创建包名:com.yuange.hive.udtf

        (3)引入如下依赖

    <dependencies>
        <!--添加hive依赖-->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

        (4)编码

    package com.yuange.hive;
    
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructField;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONObject;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @作者:袁哥
     * @时间:2021/6/14 13:19
     */
    public class TestHiveFunction extends GenericUDTF {
    
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            // 获取输入参数的所有字段类型的引用
            List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
            //检查入参是否符合条件
            // 要求必须只能传入一列参数
            if (inputFields.size() != 1){
                throw  new UDFArgumentException("此函数只允许传入1列参数!");
            }
    
            // 要求传入的一列参数必须是string类型
            if (!"string".equals(inputFields.get(0).getFieldObjectInspector().getTypeName())){
                throw  new UDFArgumentException("此函数只允许传入1列string类型参数!");
            }
    
            //返回StructObjectInspector
            //为函数返回的每行每列的参数起个名称
            List<String> fieldNames=new ArrayList<String>();
            fieldNames.add("col1");
    
            //函数返回的每行每列的参数类型(ObjectInspector)
            List<ObjectInspector> fieldOIs=new ArrayList<ObjectInspector>();
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return   ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                    fieldOIs);
        }
    
    
        private String[] result=new String[ 1 ];
    
        /*
                    执行炸裂的逻辑,将炸裂的结果写出
    
                    Object[] objects: 函数传入的参数
         */
        @Override
        public void process(Object[] objects) throws HiveException {
    
            //取出传入的1列参数  [{},{},{}]
            String jsonArrayStr = objects[0].toString();
    
            //将jsonArrayStr转为 Java对象
            JSONArray jsonArray = new JSONArray(jsonArrayStr);
    
            for (int i=0; i < jsonArray.length() ; i++){
                // {}
                JSONObject jsonObject = jsonArray.getJSONObject(i);
    
                // 将 Java对象 JSONObject 转为 jsonObjectStr
                // result数组就代表写出的一行数据
                result[0] = jsonObject.toString();
    
                //将函数计算的结果输出
                forward(result);
            }
        }
    
        // 选择性
        @Override
        public void close() throws HiveException {
    
        }
    }

      4)创建函数

        (1)使用Maven打包

        (2hivefunction-1.0-SNAPSHOT.jar上传hadoop102的 $HIVE_HOME/auxlib目录(重启hive)

    mkdir auxlib

        (3)创建永久函数与开发好的java class关联

    CREATE function gmall.explode_jsonarray as 'com.yuange.hive.TestHiveFunction';
    desc function gmall.explode_jsonarray;

        (4)注意:如果修改了自定义函数重新生成jar包怎么处理?只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。

      5)数据导入

    insert overwrite table dwd_action_log partition (dt='2021-06-08')
    select
        --common中的字段
           get_json_object(line,'$.common.ar') area_code,
           get_json_object(line,'$.common.ba') brand,
           get_json_object(line,'$.common.ch') channel,
           get_json_object(line,'$.common.is_new') is_new,
           get_json_object(line,'$.common.md') model,
           get_json_object(line,'$.common.mid') mid_id,
           get_json_object(line,'$.common.os') os,
           get_json_object(line,'$.common.uid') user_id,
           get_json_object(line,'$.common.vc') version_code,
        --取page中的字段
           get_json_object(line,'$.page.during_time') during_time,
           get_json_object(line,'$.page.item') page_item,
           get_json_object(line,'$.page.item_type') page_item_type,
           get_json_object(line,'$.page.last_page_id') last_page_id,
           get_json_object(line,'$.page.page_id') page_id,
           get_json_object(line,'$.page.source_type') source_type,
    
           get_json_object(jsonStr,'$.action_id') action_id,
           get_json_object(jsonStr,'$.item') item,
           get_json_object(jsonStr,'$.item_type') item_type,
           get_json_object(jsonStr,'$.ts') ts
    from ods_log
    lateral view explode_jsonarray(get_json_object(line,'$.actions')) tmp as jsonStr
    --过滤出动作日志,只有包含actions属性的是动作日志
    where dt='2021-06-08'
    and get_json_object(line,'$.actions') is not null;

      3)查看数据

    select * from dwd_action_log where dt='2021-06-08' limit 5;

    6.1.6 曝光日志表

      曝光日志解析思路:曝光日志表中每行数据对应一个曝光记录,一个曝光记录应当包含公共信息、页面信息以及曝光信息。先将包含display字段的日志过滤出来,然后通过UDTF函数,将display数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

      1)建表语句

    drop table if exists dwd_display_log;
    create external table dwd_display_log(
        `area_code` string comment '地区编码',
        `brand` string comment '手机品牌',
        `channel` string comment '渠道',
        `is_new` string comment '是否首次启动',
        `model` string comment '手机型号',
        `mid_id` string comment '设备id',
        `os` string comment '操作系统',
        `user_id` string comment '会员id',
        `version_code` string comment 'app版本号',
        `during_time` bigint comment 'app版本号',
        `page_item` string comment '目标id ',
        `page_item_type` string comment '目标类型',
        `last_page_id` string comment '上页类型',
        `page_id` string comment '页面ID ',
        `source_type` string comment '来源类型',
        `ts` bigint comment 'app版本号',
        `display_type` string comment '曝光类型',
        `item` string comment '曝光对象id ',
        `item_type` string comment 'app版本号',
        `order` bigint comment '曝光顺序',
        `pos_id` bigint comment '曝光位置'
    ) comment '曝光日志表'
    partitioned by (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_display_log'
    tblproperties ('parquet.compression'='lzo');

      2)数据导入

    insert overwrite table dwd_display_log partition(dt='2021-06-08')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.page.during_time'),
        get_json_object(line,'$.page.item'),
        get_json_object(line,'$.page.item_type'),
        get_json_object(line,'$.page.last_page_id'),
        get_json_object(line,'$.page.page_id'),
        get_json_object(line,'$.page.source_type'),
        get_json_object(line,'$.ts'),
        get_json_object(display,'$.display_type'),
        get_json_object(display,'$.item'),
        get_json_object(display,'$.item_type'),
        get_json_object(display,'$.order'),
        get_json_object(display,'$.pos_id')
    from ods_log lateral view explode_jsonarray(get_json_object(line,'$.displays')) tmp as display
    where dt='2021-06-08'
    and get_json_object(line,'$.displays') is not null;

      3)查看数据

    select * from dwd_display_log where dt='2021-06-08' limit 5;

    6.1.7 错误日志表

      错误日志解析思路:错误日志表中每行数据对应一个错误记录,为方便定位错误,一个错误记录应当包含与之对应的公共信息、页面信息、曝光信息、动作信息、启动信息以及错误信息。先将包含err字段的日志过滤出来,然后使用get_json_object函数解析所有字段。

      1)建表语句

    drop table if exists dwd_error_log;
    CREATE EXTERNAL TABLE dwd_error_log(
        `area_code` STRING COMMENT '地区编码',
        `brand` STRING COMMENT '手机品牌',
        `channel` STRING COMMENT '渠道',
        `is_new` STRING COMMENT '是否首次启动',
        `model` STRING COMMENT '手机型号',
        `mid_id` STRING COMMENT '设备id',
        `os` STRING COMMENT '操作系统',
        `user_id` STRING COMMENT '会员id',
        `version_code` STRING COMMENT 'app版本号',
        `page_item` STRING COMMENT '目标id ',
        `page_item_type` STRING COMMENT '目标类型',
        `last_page_id` STRING COMMENT '上页类型',
        `page_id` STRING COMMENT '页面ID ',
        `source_type` STRING COMMENT '来源类型',
        `entry` STRING COMMENT ' icon手机图标  notice 通知 install 安装后启动',
        `loading_time` STRING COMMENT '启动加载时间',
        `open_ad_id` STRING COMMENT '广告页ID ',
        `open_ad_ms` STRING COMMENT '广告总共播放时间',
        `open_ad_skip_ms` STRING COMMENT '用户跳过广告时点',
        `actions` STRING COMMENT '动作',
        `displays` STRING COMMENT '曝光',
        `ts` STRING COMMENT '时间',
        `error_code` STRING COMMENT '错误码',
        `msg` STRING COMMENT '错误信息'
    ) COMMENT '错误日志表'
    partitioned by (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_error_log'
    tblproperties ('parquet.compression'='lzo');

        说明:此处为对动作数组和曝光数组做处理,如需分析错误与单个动作或曝光的关联,可先使用explode_jsonarray函数将数组“炸开”,再使用get_json_object函数获取具体字段

      2)数据导入

    insert overwrite table dwd_error_log partition (dt='2021-06-08')
    select
        --common中的字段
           get_json_object(line,'$.common.ar') area_code,
           get_json_object(line,'$.common.ba') brand,
           get_json_object(line,'$.common.ch') channel,
           get_json_object(line,'$.common.is_new') is_new,
           get_json_object(line,'$.common.md') model,
           get_json_object(line,'$.common.mid') mid_id,
           get_json_object(line,'$.common.os') os,
           get_json_object(line,'$.common.uid') user_id,
           get_json_object(line,'$.common.vc') version_code,
        --取page中的字段
           get_json_object(line,'$.page.item') page_item,
           get_json_object(line,'$.page.item_type') page_item_type,
           get_json_object(line,'$.page.last_page_id') last_page_id,
           get_json_object(line,'$.page.page_id') page_id,
           get_json_object(line,'$.page.source_type') source_type,
           --取start中的字段
            get_json_object(line,'$.start.entry') entry,
           get_json_object(line,'$.start.loading_time') loading_time,
           get_json_object(line,'$.start.open_ad_id') open_ad_id,
           get_json_object(line,'$.start.open_ad_ms') open_ad_ms,
           get_json_object(line,'$.start.open_ad_skip_ms') open_ad_skip_ms,
           get_json_object(line,'$.actions') actions,
           get_json_object(line,'$.displays') displays,
    
           get_json_object(line,'$.ts') ts,
           get_json_object(line,'$.err.error_code') error_code,
           get_json_object(line,'$.err.msg') msg
    from ods_log
    --过滤出错误日志,只有包含err属性的是错误日志
    where dt='2021-06-08'
    and get_json_object(line,'$.err') is not null;

      5)查看数据

    select * from dwd_error_log where dt='2021-06-08' limit 5;

    6.1.8 DWD用户行为数据加载脚本

      1)编写脚本

        (1)在hadoop102/home/atguigu/bin目录下创建脚本

    vim ods_to_dwd_log.sh
    #!/bin/bash
    
    APP=gmall
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$2" ] ;then
        do_date=$2
    else
        do_date=`date -d "-1 day" +%F`
    fi
    
    dwd_start_log="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.start.entry'),
        get_json_object(line,'$.start.loading_time'),
        get_json_object(line,'$.start.open_ad_id'),
        get_json_object(line,'$.start.open_ad_ms'),
        get_json_object(line,'$.start.open_ad_skip_ms'),
        get_json_object(line,'$.ts')
    from ${APP}.ods_log
    where dt='$do_date'
    and get_json_object(line,'$.start') is not null;"
    
    dwd_page_log="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.page.during_time'),
        get_json_object(line,'$.page.item'),
        get_json_object(line,'$.page.item_type'),
        get_json_object(line,'$.page.last_page_id'),
        get_json_object(line,'$.page.page_id'),
        get_json_object(line,'$.page.source_type'),
        get_json_object(line,'$.ts')
    from ${APP}.ods_log
    where dt='$do_date'
    and get_json_object(line,'$.page') is not null;"
    
    dwd_action_log="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.page.during_time'),
        get_json_object(line,'$.page.item'),
        get_json_object(line,'$.page.item_type'),
        get_json_object(line,'$.page.last_page_id'),
        get_json_object(line,'$.page.page_id'),
        get_json_object(line,'$.page.source_type'),
        get_json_object(action,'$.action_id'),
        get_json_object(action,'$.item'),
        get_json_object(action,'$.item_type'),
        get_json_object(action,'$.ts')
    from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
    where dt='$do_date'
    and get_json_object(line,'$.actions') is not null;"
    
    dwd_display_log="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.page.during_time'),
        get_json_object(line,'$.page.item'),
        get_json_object(line,'$.page.item_type'),
        get_json_object(line,'$.page.last_page_id'),
        get_json_object(line,'$.page.page_id'),
        get_json_object(line,'$.page.source_type'),
        get_json_object(line,'$.ts'),
        get_json_object(display,'$.display_type'),
        get_json_object(display,'$.item'),
        get_json_object(display,'$.item_type'),
        get_json_object(display,'$.order'),
        get_json_object(display,'$.pos_id')
    from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
    where dt='$do_date'
    and get_json_object(line,'$.displays') is not null;"
     
    dwd_error_log="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
    select
        get_json_object(line,'$.common.ar'),
        get_json_object(line,'$.common.ba'),
        get_json_object(line,'$.common.ch'),
        get_json_object(line,'$.common.is_new'),
        get_json_object(line,'$.common.md'),
        get_json_object(line,'$.common.mid'),
        get_json_object(line,'$.common.os'),
        get_json_object(line,'$.common.uid'),
        get_json_object(line,'$.common.vc'),
        get_json_object(line,'$.page.item'),
        get_json_object(line,'$.page.item_type'),
        get_json_object(line,'$.page.last_page_id'),
        get_json_object(line,'$.page.page_id'),
        get_json_object(line,'$.page.source_type'),
        get_json_object(line,'$.start.entry'),
        get_json_object(line,'$.start.loading_time'),
        get_json_object(line,'$.start.open_ad_id'),
        get_json_object(line,'$.start.open_ad_ms'),
        get_json_object(line,'$.start.open_ad_skip_ms'),
        get_json_object(line,'$.actions'),
        get_json_object(line,'$.displays'),
        get_json_object(line,'$.ts'),
        get_json_object(line,'$.err.error_code'),
        get_json_object(line,'$.err.msg')
    from ${APP}.ods_log
    where dt='$do_date'
    and get_json_object(line,'$.err') is not null;"
    
    case $1 in
        dwd_start_log )
            hive -e "$dwd_start_log"
        ;;
        dwd_page_log )
            hive -e "$dwd_page_log"
        ;;
        dwd_action_log )
            hive -e "$dwd_action_log"
        ;;
        dwd_display_log )
            hive -e "$dwd_display_log"
        ;;
        dwd_error_log )
            hive -e "$dwd_error_log"
        ;;
        all )
            hive -e "$dwd_start_log$dwd_page_log$dwd_action_log$dwd_display_log$dwd_error_log"
        ;;
    esac
    ods_to_dwd_log.sh

        (2)增加脚本执行权限

    chmod 777 ods_to_dwd_log.sh

      2)脚本使用

        (1)执行脚本

    ods_to_dwd_log.sh all 2021-06-08

        (2)查询导入结果

    6.2 DWD层(业务数据)

      业务数据方面DWD层的搭建主要注意点在于维度建模。

    6.2.1 评价事实表(事务型事实表)

      1)建表语句

    drop table if exists dwd_comment_info;
    create external table dwd_comment_info(
    --来自ods_comment_info
    `id` string comment '编号',
    `user_id` string comment '用户ID',
    `sku_id` string comment '商品sku',
    `spu_id` string comment '商品spu',
    `order_id` string comment '订单ID',
    `appraise` string comment '评价(好评、中评、差评、默认评价)',
    `create_time` string comment '评价时间'
    ) comment '评价事实表'
    partitioned by (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_comment_info/'
    tblproperties ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table dwd_comment_info partition (dt)
    select id,
           user_id,
           sku_id,
           spu_id,
           order_id,
           appraise,
           create_time,
           date_format(create_time,'yyyy-MM-dd')
    from ods_comment_info
    where dt='2021-06-08';

        (2)每日装载

    insert overwrite table dwd_comment_info partition (dt='2021-06-09')
    select id,
           user_id,
           sku_id,
           spu_id,
           order_id,
           appraise,
           create_time
    from ods_comment_info
    where dt = '2021-06-09';

    6.2.2 订单明细事实表(事务型事实表)

      1)建表语句

    drop table if exists dwd_order_detail;
    CREATE EXTERNAL TABLE dwd_order_detail (
    --来自ods_order_detail
    `id` STRING COMMENT '订单编号',
    `order_id` STRING COMMENT '订单号',
    `sku_id` STRING COMMENT 'sku商品id',
    `sku_num` BIGINT COMMENT '商品数量',
    `create_time` STRING COMMENT '创建时间',
    `source_type` STRING COMMENT '来源类型',
    `source_id` STRING COMMENT '来源编号',
    `split_final_amount` DECIMAL(16,2) COMMENT '最终价格分摊',
    `split_activity_amount` DECIMAL(16,2) COMMENT '活动优惠分摊',
    `split_coupon_amount` DECIMAL(16,2) COMMENT '优惠券优惠分摊',
    --来自ods_order_info
    `province_id` STRING COMMENT '省份ID',
    `original_amount` DECIMAL(16,2) COMMENT '原始价格',
    `user_id` STRING COMMENT '用户id',
    --来自ods_order_detail_activity
    `activity_id` STRING COMMENT '活动ID',
    `activity_rule_id` STRING COMMENT '活动规则ID',
    --来自ods_order_detail_coupon
    `coupon_id` STRING COMMENT '优惠券ID'
    ) COMMENT '订单明细事实表表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_order_detail/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    
    
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table dwd_order_detail partition(dt)
    select
    od.id, --订单明细id
    od.order_id, --订单id
    od.sku_id, --skuid
    od.sku_num, --商品数量
    od.create_time, --创建时间
    od.source_type, --来源类型
    od.source_id, --来源编号
    od.split_final_amount, --分摊最终金额
    od.split_activity_amount, --分摊活动优惠
    od.split_coupon_amount, --分摊优惠券优惠
    oi.province_id, --省份id
    od.order_price*od.sku_num, --原始价格:商品价格*sku数量
    oi.user_id, --用户id
    oda.activity_id, --活动id
    oda.activity_rule_id, --活动规则id
    odc.coupon_id, --优惠券id
    date_format(create_time,'yyyy-MM-dd')
    from
    (
    select id, --订单明细id
    order_id, --订单id
    sku_id, --skuid
    sku_name, --skuname
    order_price, --商品价格
    sku_num, --商品数量
    create_time, --创建时间
    source_type, --来源类型
    source_id, --来源编号
    split_final_amount, --分摊最终金额
    split_activity_amount, --分摊活动优惠
    split_coupon_amount, --分摊优惠券优惠
    dt
    from ods_order_detail
    where dt='2021-06-08'
    )od
    left join
    (
    select
    id, --订单id
    user_id, --用户id
    province_id --省份id
    from ods_order_info
    where dt='2021-06-08'
    )oi
    on od.order_id=oi.id
    left join
    (
    select
    order_detail_id, --订单明细id
    activity_id, --活动id
    activity_rule_id --活动规则id
    from ods_order_detail_activity
    where dt='2021-06-08'
    )oda
    on od.id=oda.order_detail_id
    left join
    (
    select
    order_detail_id, --订单明细id
    coupon_id --优惠券id
    from ods_order_detail_coupon
    where dt='2021-06-08'
    )odc
    on od.id=odc.order_detail_id;

      (2)每日装载

    insert overwrite table dwd_order_detail partition(dt='2021-06-09')
    select
    od.id, --订单明细id
    od.order_id, --订单id
    od.sku_id, --skuid
    od.sku_num, --商品数量
    od.create_time, --创建时间
    od.source_type, --来源类型
    od.source_id, --来源编号
    od.split_final_amount, --分摊最终金额
    od.split_activity_amount, --分摊活动优惠
    od.split_coupon_amount, --分摊优惠券优惠
    oi.province_id, --省份id
    od.order_price*od.sku_num, --原始价格:商品价格*sku数量
    oi.user_id, --用户id
    oda.activity_id, --活动id
    oda.activity_rule_id, --活动规则id
    odc.coupon_id --优惠券id
    from
    (
    select id, --订单明细id
    order_id, --订单id
    sku_id, --skuid
    sku_name, --skuname
    order_price, --商品价格
    sku_num, --商品数量
    create_time, --创建时间
    source_type, --来源类型
    source_id, --来源编号
    split_final_amount, --分摊最终金额
    split_activity_amount, --分摊活动优惠
    split_coupon_amount, --分摊优惠券优惠
    dt
    from ods_order_detail
    where dt='2021-06-09'
    )od
    left join
    (
    select
    id, --订单id
    user_id, --用户id
    province_id --省份id
    from ods_order_info
    where dt='2021-06-09'
    )oi
    on od.order_id=oi.id
    left join
    (
    select
    order_detail_id, --订单明细id
    activity_id, --活动id
    activity_rule_id --活动规则id
    from ods_order_detail_activity
    where dt='2021-06-09'
    )oda
    on od.id=oda.order_detail_id
    left join
    (
    select
    order_detail_id, --订单明细id
    coupon_id --优惠券id
    from ods_order_detail_coupon
    where dt='2021-06-09'
    )odc
    on od.id=odc.order_detail_id;

    6.2.3 退事实表(事务型事实表)

      1)建表语句

    DROP TABLE IF EXISTS dwd_order_refund_info;
    CREATE EXTERNAL TABLE dwd_order_refund_info(
    --来自ods_order_refund_info
    `id` STRING COMMENT '编号',
    `user_id` STRING COMMENT '用户ID',
    `order_id` STRING COMMENT '订单ID',
    `sku_id` STRING COMMENT '商品ID',
    `refund_type` STRING COMMENT '退单类型',
    `refund_num` BIGINT COMMENT '退单件数',
    `refund_amount` DECIMAL(16,2) COMMENT '退单金额',
    `refund_reason_type` STRING COMMENT '退单原因类型',
    `create_time` STRING COMMENT '退单时间',
    --来自ods_order_info
    `province_id` STRING COMMENT '地区ID'
    ) COMMENT '退单事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_order_refund_info/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    
    
    insert overwrite table dwd_order_refund_info partition(dt)
    select
    ri.id, --退单id
    ri.user_id, --用户id
    ri.order_id, --订单id
    ri.sku_id, --商品id
    ri.refund_type, --退单类型
    ri.refund_num, --退单件数
    ri.refund_amount, --退单金额
    ri.refund_reason_type, --退单原因类型
    ri.create_time, --退单时间
    oi.province_id, --省份id
    date_format(ri.create_time,'yyyy-MM-dd')
    from
    (
    select id, --退单id
    user_id, --用户id
    order_id, --订单id
    sku_id, --商品id
    refund_type, --退单类型
    refund_num, --退单件数
    refund_amount, --退单金额
    refund_reason_type, --退单原因类型
    create_time, --退单时间
    refund_status, --退单状态
    dt
    from ods_order_refund_info
    where dt='2021-06-08'
    )ri
    left join
    (
    select id, --订单id
    province_id --省份id
    from ods_order_info
    where dt='2021-06-08'
    )oi
    on ri.order_id=oi.id;

        (2)每日装载

    insert overwrite table dwd_order_refund_info partition(dt='2021-06-09')
    select
    ri.id, --退单id
    ri.user_id, --用户id
    ri.order_id, --订单id
    ri.sku_id, --商品id
    ri.refund_type, --退单类型
    ri.refund_num, --退单件数
    ri.refund_amount, --退单金额
    ri.refund_reason_type, --退单原因类型
    ri.create_time, --退单时间
    oi.province_id --省份id
    from
    (
    select id, --退单id
    user_id, --用户id
    order_id, --订单id
    sku_id, --商品id
    refund_type, --退单类型
    refund_num, --退单件数
    refund_amount, --退单金额
    refund_reason_type, --退单原因类型
    create_time, --退单时间
    refund_status, --退单状态
    dt
    from ods_order_refund_info
    where dt='2021-06-09'
    )ri
    left join
    (
    select id, --订单id
    province_id --省份id
    from ods_order_info
    where dt='2021-06-09'
    )oi
    on ri.order_id=oi.id;

        3)查询加载结果

    6.2.4 加购事实表周期型快照事实表,每日快照

      1)建表语句

    CREATE EXTERNAL TABLE dwd_cart_info(
    --来自ods_cart_info
    `id` STRING COMMENT '编号',
    `user_id` STRING COMMENT '用户ID',
    `sku_id` STRING COMMENT '商品ID',
    `cart_price` DECIMAL(16,2) COMMENT '加入购物车时的价格',
    `sku_num` BIGINT COMMENT '加购数量',
    `create_time` STRING COMMENT '创建时间',
    `operate_time` STRING COMMENT '修改时间',
    `is_ordered` STRING COMMENT '是否已下单',
    `order_time` STRING COMMENT '下单时间',
    `source_type` STRING COMMENT '来源类型',
    `source_id` STRING COMMENT '来源编号'
    ) COMMENT '加购事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_cart_info/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    insert overwrite table dwd_cart_info partition(dt='2021-06-08')
    select
    id, --购物车id
    user_id, --用户id
    sku_id, --商品id
    cart_price, --加入购物车时的价格
    sku_num, --加购数量
    create_time, --创建时间
    operate_time, --修改时间
    is_ordered, --是否下单
    order_time, --下单时间
    source_type, --来源类型
    source_id --来源id
    from ods_cart_info
    where dt='2021-06-08';

        (2)每日装载

    insert overwrite table dwd_cart_info partition(dt='2021-06-09')
    select
    id, --购物车id
    user_id, --用户id
    sku_id, --商品id
    cart_price, --加入购物车时的价格
    sku_num, --加购数量
    create_time, --创建时间
    operate_time, --修改时间
    is_ordered, --是否下单
    order_time, --下单时间
    source_type, --来源类型
    source_id --来源id
    from ods_cart_info
    where dt='2021-06-09';

    6.2.5 收藏事实表周期型快照事实表,每日快照

      1)建表语句

    DROP TABLE IF EXISTS dwd_favor_info;
    CREATE EXTERNAL TABLE dwd_favor_info(
    --来自ods_favor_info
    `id` STRING COMMENT '编号',
    `user_id` STRING COMMENT '用户id',
    `sku_id` STRING COMMENT 'skuid',
    `spu_id` STRING COMMENT 'spuid',
    `is_cancel` STRING COMMENT '是否取消',
    `create_time` STRING COMMENT '收藏时间',
    `cancel_time` STRING COMMENT '取消时间'
    ) COMMENT '收藏事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_favor_info/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    insert overwrite table dwd_favor_info partition(dt='2021-06-08')
    select
    id, --收藏id
    user_id, --用户id
    sku_id, --skuid
    spu_id, --spuid
    is_cancel, --是否取消
    create_time, --收藏时间
    cancel_time --取消时间
    from ods_favor_info
    where dt='2021-06-08';

        (2)每日装载

    insert overwrite table dwd_favor_info partition(dt='2021-06-09')
    select
    id, --收藏id
    user_id, --用户id
    sku_id, --skuid
    spu_id, --spuid
    is_cancel, --是否取消
    create_time, --收藏时间
    cancel_time --取消时间
    from ods_favor_info
    where dt='2021-06-09';

    6.2.6 优惠券领用事实表累积型快照事实表

      1)建表语句

    DROP TABLE IF EXISTS dwd_coupon_use;
    CREATE EXTERNAL TABLE dwd_coupon_use(
    --来自ods_coupon_use
    `id` STRING COMMENT '编号',
    `coupon_id` STRING COMMENT '优惠券ID',
    `user_id` STRING COMMENT 'userid',
    `order_id` STRING COMMENT '订单id',
    `coupon_status` STRING COMMENT '优惠券状态',
    `get_time` STRING COMMENT '领取时间',
    `using_time` STRING COMMENT '使用时间(下单)',
    `used_time` STRING COMMENT '使用时间(支付)',
    `expire_time` STRING COMMENT '过期时间'
    ) COMMENT '优惠券领用事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_coupon_use/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table dwd_coupon_use partition(dt)
    select
    id, --优惠券使用id
    coupon_id, --优惠券ID
    user_id, --userid
    order_id, --订单id
    coupon_status, --优惠券状态
    get_time, --领取时间
    using_time, --使用时间(下单)
    used_time, --使用时间(支付)
    expire_time, --过期时间
    coalesce(date_format(used_time,'yyyy-MM-dd'),date_format(expire_time,'yyyy-MM-dd'),'9999-99-99')
    from ods_coupon_use
    where dt='2021-06-08';

        (2)每日装载

    insert overwrite table dwd_coupon_use partition(dt)
    select
    nvl(new.id,old.id),
    nvl(new.coupon_id,old.coupon_id),
    nvl(new.user_id,old.user_id),
    nvl(new.order_id,old.order_id),
    nvl(new.coupon_status,old.coupon_status),
    nvl(new.get_time,old.get_time),
    nvl(new.using_time,old.using_time),
    nvl(new.used_time,old.used_time),
    nvl(new.expire_time,old.expire_time),
    coalesce(date_format(nvl(new.used_time,old.used_time),'yyyy-MM-dd'),date_format(nvl(new.expire_time,old.expire_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
    select
    id, --优惠券使用id
    coupon_id, --优惠券ID
    user_id, --userid
    order_id, --订单id
    coupon_status, --优惠券状态
    get_time, --领取时间
    using_time, --使用时间(下单)
    used_time, --使用时间(支付)
    expire_time --过期时间
    from dwd_coupon_use
    where dt='9999-99-99'
    )old
    full outer join
    (
    select
    id, --优惠券使用id
    coupon_id, --优惠券ID
    user_id, --userid
    order_id, --订单id
    coupon_status, --优惠券状态
    get_time, --领取时间
    using_time, --使用时间(下单)
    used_time, --使用时间(支付)
    expire_time --过期时间
    from ods_coupon_use
    where dt='2021-06-09'
    )new
    on old.id=new.id;

    6.2.7 支付事实表(累积快照事实表)

      1)建表语句

    DROP TABLE IF EXISTS dwd_payment_info;
    CREATE EXTERNAL TABLE dwd_payment_info (
    --来自ods_payment_info
    `id` STRING COMMENT '编号',
    `out_trade_no` STRING COMMENT '对外交易编号',
    `order_id` STRING COMMENT '订单编号',
    `user_id` STRING COMMENT '用户编号',
    `payment_type` STRING COMMENT '支付类型',
    `trade_no` STRING COMMENT '交易编号',
    `payment_amount` DECIMAL(16,2) COMMENT '支付金额',
    `payment_status` STRING COMMENT '支付状态',
    `create_time` STRING COMMENT '创建时间',--调用第三方支付接口的时间
    `callback_time` STRING COMMENT '完成时间',--支付完成时间,即支付成功回调时间
    --来自ods_order_info
    `province_id` STRING COMMENT '地区ID'
    ) COMMENT '支付事实表表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_payment_info/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    insert overwrite table dwd_payment_info partition(dt)
    select
    pi.id, --支付id
    pi.out_trade_no, --对外交易编号
    pi.order_id, --订单编号
    pi.user_id, --用户编号
    pi.payment_type, --支付类型
    pi.trade_no, --交易编号
    pi.payment_amount, --支付金额
    pi.payment_status, --支付状态
    pi.create_time, --创建时间
    pi.callback_time, --完成时间
    oi.province_id, ----地区ID
    nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
    from
    (
    select id, --支付id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    user_id, --用户编号
    payment_type, --支付类型
    trade_no, --交易编号
    payment_amount, --支付金额
    subject, --交易内容
    payment_status, --支付状态
    create_time, --创建时间
    callback_time, --完成时间
    dt
    from ods_payment_info
    where dt='2021-06-08'
    )pi
    left join
    (
    select id, --订单编号
    province_id --地区ID
    from ods_order_info
    where dt='2021-06-08'
    )oi on pi.order_id=oi.id;

        (2)每日装载

    insert overwrite table dwd_payment_info partition(dt)
    select
    nvl(new.id,old.id),
    nvl(new.out_trade_no,old.out_trade_no),
    nvl(new.order_id,old.order_id),
    nvl(new.user_id,old.user_id),
    nvl(new.payment_type,old.payment_type),
    nvl(new.trade_no,old.trade_no),
    nvl(new.payment_amount,old.payment_amount),
    nvl(new.payment_status,old.payment_status),
    nvl(new.create_time,old.create_time),
    nvl(new.callback_time,old.callback_time),
    nvl(new.province_id,old.province_id),
    nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
    select id, --支付id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    user_id, --用户编号
    payment_type, --支付类型
    trade_no, --交易编号
    payment_amount, --支付金额
    payment_status, --支付状态
    create_time, --创建时间
    callback_time, --完成时间
    province_id --地区ID
    from dwd_payment_info
    where dt = '9999-99-99'
    )old
    full outer join
    (
    select
    pi.id, --支付id
    pi.out_trade_no, --对外交易编号
    pi.order_id, --订单编号
    pi.user_id, --用户编号
    oi.province_id, --地区ID
    pi.payment_type, --支付类型
    pi.trade_no, --交易编号
    pi.payment_amount, --支付金额
    pi.payment_status, --支付状态
    pi.create_time, --创建时间
    pi.callback_time --完成时间
    from
    (
    select id, --支付id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    user_id, --用户编号
    payment_type, --支付类型
    trade_no, --交易编号
    payment_amount, --支付金额
    subject, --交易内容
    payment_status, --支付状态
    create_time, --创建时间
    callback_time, --完成时间
    dt
    from ods_payment_info
    where dt='2021-06-09'
    )pi
    left join
    (
    select id, --订单编号
    province_id --地区ID
    from ods_order_info
    where dt='2021-06-09'
    )oi
    on pi.order_id=oi.id
    )new
    on old.id=new.id;

    6.2.8 退款事实表(累积快照事实表)

      1)建表语句

    DROP TABLE IF EXISTS dwd_refund_payment;
    CREATE EXTERNAL TABLE dwd_refund_payment (
    --来自ods_refund_payment
    `id` STRING COMMENT '编号',
    `out_trade_no` STRING COMMENT '对外交易编号',
    `order_id` STRING COMMENT '订单编号',
    `sku_id` STRING COMMENT 'SKU编号',
    `payment_type` STRING COMMENT '支付类型',
    `trade_no` STRING COMMENT '交易编号',
    `refund_amount` DECIMAL(16,2) COMMENT '退款金额',
    `refund_status` STRING COMMENT '退款状态',
    `create_time` STRING COMMENT '创建时间', --调用第三方支付接口的时间
    `callback_time` STRING COMMENT '回调时间', --支付接口回调时间,即支付成功时间
    --来自ods_order_info
    `user_id` STRING COMMENT '用户ID',
    `province_id` STRING COMMENT '地区ID'
    ) COMMENT '退款事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_refund_payment/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    insert overwrite table dwd_refund_payment partition(dt)
    select
    rp.id, --退款id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    sku_id, --SKU编号
    payment_type, --支付类型
    trade_no, --交易编号
    refund_amount, --退款金额
    refund_status, --退款状态
    create_time, --创建时间
    callback_time, --回调时间
    user_id, --用户ID
    province_id, --地区ID
    nvl(date_format(callback_time,'yyyy-MM-dd'),'9999-99-99')
    from
    (
    select
    id, --退款id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    sku_id, --SKU编号
    payment_type, --支付类型
    trade_no, --交易编号
    refund_amount, --退款金额
    refund_status, --退款状态
    create_time, --创建时间
    callback_time --回调时间
    from ods_refund_payment
    where dt='2021-06-08'
    )rp
    left join
    (
    select
    id, --订单编号
    user_id, --用户ID
    province_id --地区ID
    from ods_order_info
    where dt='2021-06-08'
    )oi
    on rp.order_id=oi.id;

        (2)每日装载

    insert overwrite table dwd_refund_payment partition(dt)
    select
    nvl(new.id,old.id),
    nvl(new.out_trade_no,old.out_trade_no),
    nvl(new.order_id,old.order_id),
    nvl(new.sku_id,old.sku_id),
    nvl(new.payment_type,old.payment_type),
    nvl(new.trade_no,old.trade_no),
    nvl(new.refund_amount,old.refund_amount),
    nvl(new.refund_status,old.refund_status),
    nvl(new.create_time,old.create_time),
    nvl(new.callback_time,old.callback_time),
    nvl(new.user_id,old.user_id),
    nvl(new.province_id,old.province_id),
    nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
    select
    id, --退款id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    sku_id, --SKU编号
    payment_type, --支付类型
    trade_no, --交易编号
    refund_amount, --退款金额
    refund_status, --退款状态
    create_time, --创建时间
    callback_time, --回调时间
    user_id, --用户ID
    province_id --地区ID
    from dwd_refund_payment
    where dt='9999-99-99'
    )old
    full outer join
    (
    select
    rp.id, --退款id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    sku_id, --SKU编号
    payment_type, --支付类型
    trade_no, --交易编号
    refund_amount, --退款金额
    refund_status, --退款状态
    create_time, --创建时间
    callback_time, --回调时间
    user_id, --用户ID
    province_id --地区ID
    from
    (
    select
    id, --退款id
    out_trade_no, --对外交易编号
    order_id, --订单编号
    sku_id, --SKU编号
    payment_type, --支付类型
    trade_no, --交易编号
    refund_amount, --退款金额
    refund_status, --退款状态
    create_time, --创建时间
    callback_time --回调时间
    from ods_refund_payment
    where dt='2021-06-09'
    )rp
    left join
    (
    select
    id, --订单编号
    user_id, --用户ID
    province_id --地区ID
    from ods_order_info
    where dt='2021-06-09'
    )oi
    on rp.order_id=oi.id
    )new
    on old.id=new.id;

      3)查询加载结果

    6.2.9 订单事实表累积型快照事实表

      1)建表语句

    DROP TABLE IF EXISTS dwd_order_info;
    CREATE EXTERNAL TABLE dwd_order_info(
    --来自ods_order_info
    `id` STRING COMMENT '编号',
    `final_amount` DECIMAL(16,2) COMMENT '订单最终价格',
    `order_status` STRING COMMENT '订单状态',
    `user_id` STRING COMMENT '用户ID',
    `payment_way` STRING COMMENT '支付方式',
    `delivery_address` STRING COMMENT '邮寄地址',
    `out_trade_no` STRING COMMENT '对外交易编号',
    `create_time` STRING COMMENT '创建时间(未支付状态)',
    `expire_time` STRING COMMENT '过期时间',
    `payment_time` STRING COMMENT '支付时间(已支付状态)',
    `cancel_time` STRING COMMENT '取消时间(已取消状态)',
    `finish_time` STRING COMMENT '完成时间(已完成状态)',
    `refund_time` STRING COMMENT '退款时间(退款中状态)',
    `refund_finish_time` STRING COMMENT '退款完成时间(退款完成状态)',
    `tracking_no` STRING COMMENT '物流单号',
    `province_id` STRING COMMENT '地区ID',
    `activity_reduce_amount` DECIMAL(16,2) COMMENT '活动减免',
    `coupon_reduce_amount` DECIMAL(16,2) COMMENT '优惠券减免',
    `original_amount` DECIMAL(16,2) COMMENT '订单原始价格',
    `feight_fee` DECIMAL(16,2) COMMENT '运费',
    `feight_fee_reduce` DECIMAL(16,2) COMMENT '运费减免'
    ) COMMENT '订单事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS PARQUET
    LOCATION '/warehouse/gmall/dwd/dwd_order_info/'
    TBLPROPERTIES ("parquet.compression"="lzo");

      2)数据装载

        (1)首日装载

    insert overwrite table dwd_order_info partition(dt)
    select
    oi.id, --订单号
    final_amount, --订单最终价格
    oi.order_status, --订单状态
    oi.user_id, --用户ID
    oi.payment_way, --支付方式
    oi.delivery_address, --邮寄地址
    oi.out_trade_no, --对外交易编号
    oi.create_time, --创建时间(未支付状态)
    oi.expire_time, --过期时间
    times.ts['1002'] payment_time, --支付时间(已支付状态)
    times.ts['1003'] cancel_time, --取消时间(已取消状态)
    times.ts['1004'] finish_time, --完成时间(已完成状态)
    times.ts['1005'] refund_time, --退款时间(退款中状态)
    times.ts['1006'] refund_finish_time, --退款完成时间(退款完成状态)
    oi.tracking_no, --物流单号
    oi.province_id, --地区ID
    activity_reduce_amount, --活动减免
    coupon_reduce_amount, --优惠券减免
    original_amount, --订单原始价格
    feight_fee, --运费
    feight_fee_reduce, ----运费减免
    case
    when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
    when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='2021-06-08' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
    when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
    when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
    else '9999-99-99'
    end
    from
    (
    select id, --订单号
    final_amount, --订单最终价格
    order_status, --订单状态
    user_id, --用户ID
    payment_way, --支付方式
    delivery_address, --邮寄地址
    out_trade_no, --对外交易编号
    create_time, --创建时间(未支付状态)
    operate_time, --操作时间
    expire_time, --过期时间
    tracking_no, --物流单号
    province_id, --地区ID
    activity_reduce_amount, --活动减免
    coupon_reduce_amount, --优惠券减免
    original_amount, --订单原始价格
    feight_fee, --运费
    feight_fee_reduce, --运费减免
    dt
    from ods_order_info
    where dt='2021-06-08'
    )oi
    left join
    (
    select
    order_id, --订单编号
    --订单状态:修改时间
    str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
    from ods_order_status_log
    where dt='2021-06-08'
    group by order_id
    )times
    on oi.id=times.order_id;

        (2)每日装载

    insert overwrite table dwd_order_info partition(dt)
    select
    oi.id, --订单号
    final_amount, --订单最终价格
    oi.order_status, --订单状态
    oi.user_id, --用户ID
    oi.payment_way, --支付方式
    oi.delivery_address, --邮寄地址
    oi.out_trade_no, --对外交易编号
    oi.create_time, --创建时间(未支付状态)
    oi.expire_time, --过期时间
    times.ts['1002'] payment_time, --支付时间(已支付状态)
    times.ts['1003'] cancel_time, --取消时间(已取消状态)
    times.ts['1004'] finish_time, --完成时间(已完成状态)
    times.ts['1005'] refund_time, --退款时间(退款中状态)
    times.ts['1006'] refund_finish_time, --退款完成时间(退款完成状态)
    oi.tracking_no, --物流单号
    oi.province_id, --地区ID
    activity_reduce_amount, --活动减免
    coupon_reduce_amount, --优惠券减免
    original_amount, --订单原始价格
    feight_fee, --运费
    feight_fee_reduce, ----运费减免
    case
    when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
    when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='2021-06-09' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
    when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
    when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
    else '9999-99-99'
    end
    from
    (
    select id, --订单号
    final_amount, --订单最终价格
    order_status, --订单状态
    user_id, --用户ID
    payment_way, --支付方式
    delivery_address, --邮寄地址
    out_trade_no, --对外交易编号
    create_time, --创建时间(未支付状态)
    operate_time, --操作时间
    expire_time, --过期时间
    tracking_no, --物流单号
    province_id, --地区ID
    activity_reduce_amount, --活动减免
    coupon_reduce_amount, --优惠券减免
    original_amount, --订单原始价格
    feight_fee, --运费
    feight_fee_reduce, --运费减免
    dt
    from ods_order_info
    where dt='2021-06-09'
    )oi
    left join
    (
    select
    order_id, --订单编号
    --订单状态:修改时间
    str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
    from ods_order_status_log
    where dt='2021-06-09'
    group by order_id
    )times
    on oi.id=times.order_id;

    6.2.10 DWD业务数据首日装载脚本

      1)编写脚本

        (1)在/home/atguigu/bin目录下创建脚本ods_to_dwd_db_init.sh

    vim ods_to_dwd_db_init.sh
    #!/bin/bash
    APP=gmall
    
    if [ -n "$2" ] ;then
       do_date=$2
    else 
       echo "请传入日期参数"
       exit
    fi 
    
    dwd_order_info="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_order_info partition(dt)
    select
        oi.id,                      --订单号
        final_amount,               --订单最终价格
        oi.order_status,            --订单状态
        oi.user_id,                 --用户ID
        oi.payment_way,             --支付方式
        oi.delivery_address,        --邮寄地址
        oi.out_trade_no,            --对外交易编号
        oi.create_time,             --创建时间(未支付状态)
        oi.expire_time,             --过期时间
        times.ts['1002'] payment_time,  --支付时间(已支付状态)
        times.ts['1003'] cancel_time,   --取消时间(已取消状态)
        times.ts['1004'] finish_time,   --完成时间(已完成状态)
        times.ts['1005'] refund_time,   --退款时间(退款中状态)
        times.ts['1006'] refund_finish_time,    --退款完成时间(退款完成状态)
        oi.tracking_no,             --物流单号
        oi.province_id,             --地区ID
        activity_reduce_amount,     --活动减免
        coupon_reduce_amount,       --优惠券减免
        original_amount,            --订单原始价格
        feight_fee,                 --运费
        feight_fee_reduce,          ----运费减免
        case
            when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
            when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='$do_date' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
            when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
            when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
            else '9999-99-99'
        end
    from
    (
        select id,                  --订单号
               final_amount,        --订单最终价格
               order_status,        --订单状态
               user_id,             --用户ID
               payment_way,         --支付方式
               delivery_address,    --邮寄地址
               out_trade_no,        --对外交易编号
               create_time,         --创建时间(未支付状态)
               operate_time,        --操作时间
               expire_time,         --过期时间
               tracking_no,         --物流单号
               province_id,         --地区ID
               activity_reduce_amount,  --活动减免
               coupon_reduce_amount,    --优惠券减免
               original_amount,         --订单原始价格
               feight_fee,              --运费
               feight_fee_reduce,       --运费减免
               dt
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    left join
    (
        select
            order_id,       --订单编号
            --订单状态:修改时间
            str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
        from ${APP}.ods_order_status_log
        where dt='$do_date'
        group by order_id
    )times
    on oi.id=times.order_id;"
    
    dwd_order_detail="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_order_detail partition(dt)
    select
        od.id,                              --订单明细id
        od.order_id,                        --订单id
        od.sku_id,                          --skuid
        od.sku_num,                         --商品数量
        od.create_time,                     --创建时间
        od.source_type,                     --来源类型
        od.source_id,                       --来源编号
        od.split_final_amount,              --分摊最终金额
        od.split_activity_amount,           --分摊活动优惠
        od.split_coupon_amount,             --分摊优惠券优惠
        oi.province_id,                     --省份id
        od.order_price*od.sku_num,          --原始价格:商品价格*sku数量
        oi.user_id,                         --用户id
        oda.activity_id,                    --活动id
        oda.activity_rule_id,               --活动规则id
        odc.coupon_id,                      --优惠券id
        date_format(create_time,'yyyy-MM-dd')
    from
    (
        select id,                      --订单明细id
               order_id,                --订单id
               sku_id,                  --skuid
               sku_name,                --skuname
               order_price,             --商品价格
               sku_num,                 --商品数量
               create_time,             --创建时间
               source_type,             --来源类型
               source_id,               --来源编号
               split_final_amount,      --分摊最终金额
               split_activity_amount,   --分摊活动优惠
               split_coupon_amount,     --分摊优惠券优惠
               dt
        from ${APP}.ods_order_detail
        where dt='$do_date'
    )od
    left join
    (
        select
            id,             --订单id
            user_id,        --用户id
            province_id     --省份id
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on od.order_id=oi.id
    left join
    (
        select
            order_detail_id,    --订单明细id
            activity_id,        --活动id
            activity_rule_id    --活动规则id
        from ${APP}.ods_order_detail_activity
        where dt='$do_date'
    )oda
    on od.id=oda.order_detail_id
    left join
    (
        select
            order_detail_id,    --订单明细id
            coupon_id           --优惠券id
        from ${APP}.ods_order_detail_coupon
        where dt='$do_date'
    )odc
    on od.id=odc.order_detail_id;"
    
    dwd_payment_info="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_payment_info partition(dt)
    select
        pi.id,              --支付id
        pi.out_trade_no,    --对外交易编号
        pi.order_id,        --订单编号
        pi.user_id,         --用户编号
        pi.payment_type,    --支付类型
        pi.trade_no,        --交易编号
        pi.payment_amount,  --支付金额
        pi.payment_status,  --支付状态
        pi.create_time,     --创建时间
        pi.callback_time,   --完成时间
        oi.province_id,     ----地区ID
        nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
    from
    (
        select id,              --支付id
               out_trade_no,    --对外交易编号
               order_id,        --订单编号
               user_id,         --用户编号
               payment_type,    --支付类型
               trade_no,        --交易编号
               payment_amount,  --支付金额
               subject,         --交易内容
               payment_status,  --支付状态
               create_time,     --创建时间
               callback_time,   --完成时间
               dt
        from ${APP}.ods_payment_info
        where dt='$do_date'
    )pi
    left join
    (
        select id,          --订单编号
               province_id  --地区ID
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi on pi.order_id=oi.id;"
    
    dwd_cart_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_cart_info partition(dt='$do_date')
    select
        id,                 --购物车id
        user_id,            --用户id
        sku_id,             --商品id
        cart_price,         --加入购物车时的价格
        sku_num,            --加购数量
        create_time,        --创建时间
        operate_time,       --修改时间
        is_ordered,         --是否下单
        order_time,         --下单时间
        source_type,        --来源类型
        source_id           --来源id
    from ${APP}.ods_cart_info
    where dt='$do_date';"
    
    dwd_comment_info="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_comment_info partition (dt)
    select id,
           user_id,
           sku_id,
           spu_id,
           order_id,
           appraise,
           create_time,
           date_format(create_time,'yyyy-MM-dd')
    from ${APP}.ods_comment_info
    where dt='$do_date';
    "
    
    dwd_favor_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_favor_info partition(dt='$do_date')
    select
        id,                 --收藏id
        user_id,            --用户id
        sku_id,             --skuid
        spu_id,             --spuid
        is_cancel,          --是否取消
        create_time,        --收藏时间
        cancel_time         --取消时间
    from ${APP}.ods_favor_info
    where dt='$do_date';"
    
    dwd_coupon_use="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_coupon_use partition(dt)
    select
        id,                 --优惠券使用id
        coupon_id,          --优惠券ID
        user_id,            --userid
        order_id,           --订单id
        coupon_status,      --优惠券状态
        get_time,           --领取时间
        using_time,         --使用时间(下单)
        used_time,          --使用时间(支付)
        expire_time,        --过期时间
        coalesce(date_format(used_time,'yyyy-MM-dd'),date_format(expire_time,'yyyy-MM-dd'),'9999-99-99')
    from ${APP}.ods_coupon_use
    where dt='$do_date';"
    
    dwd_order_refund_info="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_order_refund_info partition(dt)
    select
        ri.id,                      --退单id
        ri.user_id,                 --用户id
        ri.order_id,                --订单id
        ri.sku_id,                  --商品id
        ri.refund_type,             --退单类型
        ri.refund_num,              --退单件数
        ri.refund_amount,           --退单金额
        ri.refund_reason_type,      --退单原因类型
        ri.create_time,             --退单时间
        oi.province_id,             --省份id
        date_format(ri.create_time,'yyyy-MM-dd')
    from
    (
        select id,                  --退单id
               user_id,             --用户id
               order_id,            --订单id
               sku_id,              --商品id
               refund_type,         --退单类型
               refund_num,          --退单件数
               refund_amount,       --退单金额
               refund_reason_type,  --退单原因类型
               create_time,         --退单时间
               refund_status,       --退单状态
               dt
        from ${APP}.ods_order_refund_info
        where dt='$do_date'
    )ri
    left join
    (
        select id,              --订单id
               province_id      --省份id
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on ri.order_id=oi.id;"
    
    dwd_refund_payment="
    set hive.exec.dynamic.partition.mode=nonstrict;
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_payment_info partition(dt)
    select
        pi.id,              --支付id
        pi.out_trade_no,    --对外交易编号
        pi.order_id,        --订单编号
        pi.user_id,         --用户编号
        pi.payment_type,    --支付类型
        pi.trade_no,        --交易编号
        pi.payment_amount,  --支付金额
        pi.payment_status,  --支付状态
        pi.create_time,     --创建时间
        pi.callback_time,   --完成时间
        oi.province_id,     ----地区ID
        nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
    from
    (
        select id,              --支付id
               out_trade_no,    --对外交易编号
               order_id,        --订单编号
               user_id,         --用户编号
               payment_type,    --支付类型
               trade_no,        --交易编号
               payment_amount,  --支付金额
               subject,         --交易内容
               payment_status,  --支付状态
               create_time,     --创建时间
               callback_time,   --完成时间
               dt
        from ${APP}.ods_payment_info
        where dt='$do_date'
    )pi
    left join
    (
        select id,          --订单编号
               province_id  --地区ID
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi on pi.order_id=oi.id;"
    
    case $1 in
        dwd_order_info )
            hive -e "$dwd_order_info"
        ;;
        dwd_order_detail )
            hive -e "$dwd_order_detail"
        ;;
        dwd_payment_info )
            hive -e "$dwd_payment_info"
        ;;
        dwd_cart_info )
            hive -e "$dwd_cart_info"
        ;;
        dwd_comment_info )
            hive -e "$dwd_comment_info"
        ;;
        dwd_favor_info )
            hive -e "$dwd_favor_info"
        ;;
        dwd_coupon_use )
            hive -e "$dwd_coupon_use"
        ;;
        dwd_order_refund_info )
            hive -e "$dwd_order_refund_info"
        ;;
        dwd_refund_payment )
            hive -e "$dwd_refund_payment"
        ;;
        all )
            hive -e "$dwd_order_info$dwd_order_detail$dwd_payment_info$dwd_cart_info$dwd_comment_info$dwd_favor_info$dwd_coupon_use$dwd_order_refund_info$dwd_refund_payment"
        ;;
    esac

        (2)增加执行权限

    chmod +x ods_to_dwd_db_init.sh

      2)脚本使用

        (1)执行脚本

    ods_to_dwd_db_init.sh all 2021-06-08

        (2)查看数据是否导入成功

    6.2.11 DWD业务数据每日装载脚本

      1)编写脚本

        (1)在/home/atguigu/bin目录下创建脚本ods_to_dwd_db.sh

    vim ods_to_dwd_db.sh
    #!/bin/bash
    
    APP=gmall
    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$2" ] ;then
        do_date=$2
    else 
        do_date= date -d "-1 day" +%F
    fi
    
    
    # 假设某累积型快照事实表,某天所有的业务记录全部完成,则会导致9999-99-99分区的数据未被覆盖,从而导致数据重复,该函数根据9999-99-99分区的数据
    clear_data(){
        current_date=`date +%F`
        current_date_timestamp=`date -d "$current_date" +%s`
    
        last_modified_date=`hadoop fs -ls /warehouse/gmall/dwd/$1 | grep '9999-99-99' | awk '{print $6}'`
        last_modified_date_timestamp=`date -d "$last_modified_date" +%s`
    
        if [[ $last_modified_date_timestamp -lt $current_date_timestamp ]]; then
            echo "clear table $1 partition(dt=9999-99-99)"
            hadoop fs -rm -r -f /warehouse/gmall/dwd/$1/dt=9999-99-99/*
        fi
    }
    
    dwd_order_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table ${APP}.dwd_order_info partition(dt)
    select
        oi.id,                      --订单号
        final_amount,               --订单最终价格
        oi.order_status,            --订单状态
        oi.user_id,                 --用户ID
        oi.payment_way,             --支付方式
        oi.delivery_address,        --邮寄地址
        oi.out_trade_no,            --对外交易编号
        oi.create_time,             --创建时间(未支付状态)
        oi.expire_time,             --过期时间
        times.ts['1002'] payment_time,  --支付时间(已支付状态)
        times.ts['1003'] cancel_time,   --取消时间(已取消状态)
        times.ts['1004'] finish_time,   --完成时间(已完成状态)
        times.ts['1005'] refund_time,   --退款时间(退款中状态)
        times.ts['1006'] refund_finish_time,    --退款完成时间(退款完成状态)
        oi.tracking_no,             --物流单号
        oi.province_id,             --地区ID
        activity_reduce_amount,     --活动减免
        coupon_reduce_amount,       --优惠券减免
        original_amount,            --订单原始价格
        feight_fee,                 --运费
        feight_fee_reduce,          ----运费减免
        case
            when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
            when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='$do_date' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
            when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
            when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
            else '9999-99-99'
        end
    from
    (
        select id,                  --订单号
               final_amount,        --订单最终价格
               order_status,        --订单状态
               user_id,             --用户ID
               payment_way,         --支付方式
               delivery_address,    --邮寄地址
               out_trade_no,        --对外交易编号
               create_time,         --创建时间(未支付状态)
               operate_time,        --操作时间
               expire_time,         --过期时间
               tracking_no,         --物流单号
               province_id,         --地区ID
               activity_reduce_amount,  --活动减免
               coupon_reduce_amount,    --优惠券减免
               original_amount,         --订单原始价格
               feight_fee,              --运费
               feight_fee_reduce,       --运费减免
               dt
        from ${APP}.ods_order_info
        where dt='2021-06-09'
    )oi
    left join
    (
        select
            order_id,       --订单编号
            --订单状态:修改时间
            str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
        from ${APP}.ods_order_status_log
        where dt='$do_date'
        group by order_id
    )times
    on oi.id=times.order_id;"
    
    dwd_order_detail="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_order_detail partition(dt='$do_date')
    select
        od.id,                              --订单明细id
        od.order_id,                        --订单id
        od.sku_id,                          --skuid
        od.sku_num,                         --商品数量
        od.create_time,                     --创建时间
        od.source_type,                     --来源类型
        od.source_id,                       --来源编号
        od.split_final_amount,              --分摊最终金额
        od.split_activity_amount,           --分摊活动优惠
        od.split_coupon_amount,             --分摊优惠券优惠
        oi.province_id,                     --省份id
        od.order_price*od.sku_num,          --原始价格:商品价格*sku数量
        oi.user_id,                         --用户id
        oda.activity_id,                    --活动id
        oda.activity_rule_id,               --活动规则id
        odc.coupon_id                      --优惠券id
    from
    (
        select id,                      --订单明细id
               order_id,                --订单id
               sku_id,                  --skuid
               sku_name,                --skuname
               order_price,             --商品价格
               sku_num,                 --商品数量
               create_time,             --创建时间
               source_type,             --来源类型
               source_id,               --来源编号
               split_final_amount,      --分摊最终金额
               split_activity_amount,   --分摊活动优惠
               split_coupon_amount,     --分摊优惠券优惠
               dt
        from ${APP}.ods_order_detail
        where dt='$do_date'
    )od
    left join
    (
        select
            id,             --订单id
            user_id,        --用户id
            province_id     --省份id
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on od.order_id=oi.id
    left join
    (
        select
            order_detail_id,    --订单明细id
            activity_id,        --活动id
            activity_rule_id    --活动规则id
        from ${APP}.ods_order_detail_activity
        where dt='$do_date'
    )oda
    on od.id=oda.order_detail_id
    left join
    (
        select
            order_detail_id,    --订单明细id
            coupon_id           --优惠券id
        from ${APP}.ods_order_detail_coupon
        where dt='$do_date'
    )odc
    on od.id=odc.order_detail_id;"
    
    
    dwd_payment_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table ${APP}.dwd_payment_info partition(dt)
    select
        nvl(new.id,old.id),
        nvl(new.out_trade_no,old.out_trade_no),
        nvl(new.order_id,old.order_id),
        nvl(new.user_id,old.user_id),
        nvl(new.payment_type,old.payment_type),
        nvl(new.trade_no,old.trade_no),
        nvl(new.payment_amount,old.payment_amount),
        nvl(new.payment_status,old.payment_status),
        nvl(new.create_time,old.create_time),
        nvl(new.callback_time,old.callback_time),
        nvl(new.province_id,old.province_id),
        nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
        select id,              --支付id
               out_trade_no,    --对外交易编号
               order_id,        --订单编号
               user_id,         --用户编号
               payment_type,    --支付类型
               trade_no,        --交易编号
               payment_amount,  --支付金额
               payment_status,  --支付状态
               create_time,     --创建时间
               callback_time,   --完成时间
               province_id      --地区ID
        from ${APP}.dwd_payment_info
        where dt = '9999-99-99'
    )old
    full outer join
    (
        select
            pi.id,              --支付id
            pi.out_trade_no,    --对外交易编号
            pi.order_id,        --订单编号
            pi.user_id,         --用户编号
            oi.province_id,     --地区ID
            pi.payment_type,    --支付类型
            pi.trade_no,        --交易编号
            pi.payment_amount,  --支付金额
            pi.payment_status,  --支付状态
            pi.create_time,     --创建时间
            pi.callback_time    --完成时间
        from
        (
            select id,              --支付id
                   out_trade_no,    --对外交易编号
                   order_id,        --订单编号
                   user_id,         --用户编号
                   payment_type,    --支付类型
                   trade_no,        --交易编号
                   payment_amount,  --支付金额
                   subject,         --交易内容
                   payment_status,  --支付状态
                   create_time,     --创建时间
                   callback_time,   --完成时间
                   dt
            from ${APP}.ods_payment_info
            where dt='$do_date'
        )pi
        left join
        (
            select id,          --订单编号
                   province_id  --地区ID
            from ${APP}.ods_order_info
            where dt='$do_date'
        )oi
        on pi.order_id=oi.id
    )new
    on old.id=new.id;"
    
    dwd_cart_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_cart_info partition(dt='$do_date')
    select
        id,                 --购物车id
        user_id,            --用户id
        sku_id,             --商品id
        cart_price,         --加入购物车时的价格
        sku_num,            --加购数量
        create_time,        --创建时间
        operate_time,       --修改时间
        is_ordered,         --是否下单
        order_time,         --下单时间
        source_type,        --来源类型
        source_id           --来源id
    from ${APP}.ods_cart_info
    where dt='$do_date';"
    
    
    dwd_comment_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_comment_info partition (dt='$do_date')
    select id,
           user_id,
           sku_id,
           spu_id,
           order_id,
           appraise,
           create_time
    from ${APP}.ods_comment_info
    where dt = '$do_date';"
    
    
    dwd_favor_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_favor_info partition(dt='$do_date')
    select
        id,                 --收藏id
        user_id,            --用户id
        sku_id,             --skuid
        spu_id,             --spuid
        is_cancel,          --是否取消
        create_time,        --收藏时间
        cancel_time         --取消时间
    from ${APP}.ods_favor_info
    where dt='$do_date';"
    
    
    dwd_coupon_use="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table ${APP}.dwd_coupon_use partition(dt)
    select
        nvl(new.id,old.id),
        nvl(new.coupon_id,old.coupon_id),
        nvl(new.user_id,old.user_id),
        nvl(new.order_id,old.order_id),
        nvl(new.coupon_status,old.coupon_status),
        nvl(new.get_time,old.get_time),
        nvl(new.using_time,old.using_time),
        nvl(new.used_time,old.used_time),
        nvl(new.expire_time,old.expire_time),
        coalesce(date_format(nvl(new.used_time,old.used_time),'yyyy-MM-dd'),date_format(nvl(new.expire_time,old.expire_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
        select
            id,                 --优惠券使用id
            coupon_id,          --优惠券ID
            user_id,            --userid
            order_id,           --订单id
            coupon_status,      --优惠券状态
            get_time,           --领取时间
            using_time,         --使用时间(下单)
            used_time,          --使用时间(支付)
            expire_time        --过期时间
        from ${APP}.dwd_coupon_use
        where dt='9999-99-99'
    )old
    full outer join
    (
        select
            id,                 --优惠券使用id
            coupon_id,          --优惠券ID
            user_id,            --userid
            order_id,           --订单id
            coupon_status,      --优惠券状态
            get_time,           --领取时间
            using_time,         --使用时间(下单)
            used_time,          --使用时间(支付)
            expire_time        --过期时间
        from ${APP}.ods_coupon_use
        where dt='$do_date'
    )new
    on old.id=new.id;"
    
    dwd_order_refund_info="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    insert overwrite table ${APP}.dwd_order_refund_info partition(dt='$do_date')
    select
        ri.id,                      --退单id
        ri.user_id,                 --用户id
        ri.order_id,                --订单id
        ri.sku_id,                  --商品id
        ri.refund_type,             --退单类型
        ri.refund_num,              --退单件数
        ri.refund_amount,           --退单金额
        ri.refund_reason_type,      --退单原因类型
        ri.create_time,             --退单时间
        oi.province_id             --省份id
    from
    (
        select id,                  --退单id
               user_id,             --用户id
               order_id,            --订单id
               sku_id,              --商品id
               refund_type,         --退单类型
               refund_num,          --退单件数
               refund_amount,       --退单金额
               refund_reason_type,  --退单原因类型
               create_time,         --退单时间
               refund_status,       --退单状态
               dt
        from ${APP}.ods_order_refund_info
        where dt='$do_date'
    )ri
    left join
    (
        select id,              --订单id
               province_id      --省份id
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on ri.order_id=oi.id;"
    
    
    dwd_refund_payment="
    set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
    set hive.exec.dynamic.partition.mode=nonstrict;
    insert overwrite table ${APP}.dwd_refund_payment partition(dt)
    select
        nvl(new.id,old.id),
        nvl(new.out_trade_no,old.out_trade_no),
        nvl(new.order_id,old.order_id),
        nvl(new.sku_id,old.sku_id),
        nvl(new.payment_type,old.payment_type),
        nvl(new.trade_no,old.trade_no),
        nvl(new.refund_amount,old.refund_amount),
        nvl(new.refund_status,old.refund_status),
        nvl(new.create_time,old.create_time),
        nvl(new.callback_time,old.callback_time),
        nvl(new.user_id,old.user_id),
        nvl(new.province_id,old.province_id),
        nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
    from
    (
        select
            id,             --退款id
            out_trade_no,   --对外交易编号
            order_id,       --订单编号
            sku_id,         --SKU编号
            payment_type,   --支付类型
            trade_no,       --交易编号
            refund_amount,  --退款金额
            refund_status,  --退款状态
            create_time,    --创建时间
            callback_time,  --回调时间
            user_id,        --用户ID
            province_id    --地区ID
        from ${APP}.dwd_refund_payment
        where dt='9999-99-99'
    )old
    full outer join
    (
        select
            rp.id,          --退款id
            out_trade_no,   --对外交易编号
            order_id,       --订单编号
            sku_id,         --SKU编号
            payment_type,   --支付类型
            trade_no,       --交易编号
            refund_amount,  --退款金额
            refund_status,  --退款状态
            create_time,    --创建时间
            callback_time,  --回调时间
            user_id,        --用户ID
            province_id    --地区ID
        from
        (
            select
                id,             --退款id
                out_trade_no,   --对外交易编号
                order_id,       --订单编号
                sku_id,         --SKU编号
                payment_type,   --支付类型
                trade_no,       --交易编号
                refund_amount,  --退款金额
                refund_status,  --退款状态
                create_time,    --创建时间
                callback_time   --回调时间
            from ${APP}.ods_refund_payment
            where dt='$do_date'
        )rp
        left join
        (
            select
                id,             --订单编号
                user_id,        --用户ID
                province_id     --地区ID
            from ${APP}.ods_order_info
            where dt='$do_date'
        )oi
        on rp.order_id=oi.id
    )new
    on old.id=new.id;"
    
    case $1 in
        dwd_order_info )
            hive -e "$dwd_order_info"
            clear_data dwd_order_info
        ;;
        dwd_order_detail )
            hive -e "$dwd_order_detail"
        ;;
        dwd_payment_info )
            hive -e "$dwd_payment_info"
            clear_data dwd_payment_info
        ;;
        dwd_cart_info )
            hive -e "$dwd_cart_info"
        ;;
        dwd_comment_info )
            hive -e "$dwd_comment_info"
        ;;
        dwd_favor_info )
            hive -e "$dwd_favor_info"
        ;;
        dwd_coupon_use )
            hive -e "$dwd_coupon_use"
            clear_data dwd_coupon_use
        ;;
        dwd_order_refund_info )
            hive -e "$dwd_order_refund_info"
        ;;
        dwd_refund_payment )
            hive -e "$dwd_refund_payment"
            clear_data dwd_refund_payment
        ;;
        all )
            hive -e "$dwd_order_info$dwd_order_detail$dwd_payment_info$dwd_cart_info$dwd_comment_info$dwd_favor_info$dwd_coupon_use$dwd_order_refund_info$dwd_refund_payment"
            clear_data dwd_order_info
            clear_data dwd_payment_info
            clear_data dwd_coupon_use
            clear_data dwd_refund_payment
        ;;
    esac

        (2)增加脚本执行权限

    chmod 777 ods_to_dwd_db.sh

      2)脚本使用

        (1)执行脚本

    ods_to_dwd_db.sh all 2021-06-09

        (2)查看数据是否导入成功

  • 相关阅读:
    Codeforces 841 D
    Codeforces 838 B
    Codeforces 833 C
    Codeforces 101572 D
    Codeforces 101173 C
    Codeforces 444 C
    POJ 3076 Sudoku
    Codeforces 1025 D
    算法笔记--基环树
    Codeforces 1016 E
  • 原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14875005.html
Copyright © 2011-2022 走看看