zoukankan      html  css  js  c++  java
  • 项目实战从0到1之hive(39)大数据项目之电商数仓(用户行为数据)(七)

    第9章 数仓搭建之DWD层

    对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)。

    9.1 DWD层启动表数据解析

    9.1.1 创建启动表

    1)建表语句

    drop table if exists dwd_start_log;
    CREATE EXTERNAL TABLE dwd_start_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `open_ad_type` string,
    `action` string,
    `loading_time` string,
    `detail` string,
    `extend1` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_start_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    9.1.2 向启动表导入数据

     insert overwrite table dwd_start_log
    PARTITION (dt='2020-10-14')
    select
      get_json_object(line,'$.mid') mid_id,
      get_json_object(line,'$.uid') user_id,
      get_json_object(line,'$.vc') version_code,
      get_json_object(line,'$.vn') version_name,
      get_json_object(line,'$.l') lang,
      get_json_object(line,'$.sr') source,
      get_json_object(line,'$.os') os,
      get_json_object(line,'$.ar') area,
      get_json_object(line,'$.md') model,
      get_json_object(line,'$.ba') brand,
      get_json_object(line,'$.sv') sdk_version,
      get_json_object(line,'$.g') gmail,
      get_json_object(line,'$.hw') height_width,
      get_json_object(line,'$.t') app_time,
      get_json_object(line,'$.nw') network,
      get_json_object(line,'$.ln') lng,
      get_json_object(line,'$.la') lat,
      get_json_object(line,'$.entry') entry,
      get_json_object(line,'$.open_ad_type') open_ad_type,
      get_json_object(line,'$.action') action,
      get_json_object(line,'$.loading_time') loading_time,
      get_json_object(line,'$.detail') detail,
      get_json_object(line,'$.extend1') extend1
    from ods_start_log
    where dt='2020-10-14';

    2)测试

    select * from dwd_start_log limit 2;

    9.1.3 DWD层启动表加载数据脚本

    1)在hadoop102的/home/kgg/bin目录下创建脚本

    [kgg@hadoop102 bin]$ vim dwd_start_log.sh
      在脚本中编写如下内容
    #!/bin/bash

    # 定义变量方便修改
    APP=gmall
    hive=/opt/module/hive/bin/hive

    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
       do_date=$1
    else
       do_date=`date -d "-1 day" +%F`  
    fi
    echo "===日志日期为 $do_date==="
    sql="
    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table "$APP".dwd_start_log
    PARTITION (dt='$do_date')
    select
      get_json_object(line,'$.mid') mid_id,
      get_json_object(line,'$.uid') user_id,
      get_json_object(line,'$.vc') version_code,
      get_json_object(line,'$.vn') version_name,
      get_json_object(line,'$.l') lang,
      get_json_object(line,'$.sr') source,
      get_json_object(line,'$.os') os,
      get_json_object(line,'$.ar') area,
      get_json_object(line,'$.md') model,
      get_json_object(line,'$.ba') brand,
      get_json_object(line,'$.sv') sdk_version,
      get_json_object(line,'$.g') gmail,
      get_json_object(line,'$.hw') height_width,
      get_json_object(line,'$.t') app_time,
      get_json_object(line,'$.nw') network,
      get_json_object(line,'$.ln') lng,
      get_json_object(line,'$.la') lat,
      get_json_object(line,'$.entry') entry,
      get_json_object(line,'$.open_ad_type') open_ad_type,
      get_json_object(line,'$.action') action,
      get_json_object(line,'$.loading_time') loading_time,
      get_json_object(line,'$.detail') detail,
      get_json_object(line,'$.extend1') extend1
    from "$APP".ods_start_log
    where dt='$do_date';"

    $hive -e "$sql"

    2)增加脚本执行权限

    [kgg@hadoop102 bin]$ chmod 777 dwd_start_log.sh

    3)脚本使用

    [kgg@hadoop102 module]$ dwd_start_log.sh 2019-02-11

    4)查询导入结果

    select * from dwd_start_log where dt='2019-02-11' limit 2;

    5)脚本执行时间

    企业开发中一般在每日凌晨30分~1点

    9.2 DWD层事件表数据解析

    9.2.1 创建基础明细表

    明细表用于存储ODS层原始表转换过来的明细数据。

    img

    1)创建事件日志基础明细表

    drop table if exists dwd_base_event_log;
    CREATE EXTERNAL TABLE dwd_base_event_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `event_name` string,
    `event_json` string,
    `server_time` string)
    PARTITIONED BY (`dt` string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_base_event_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)说明

    其中event_name和event_json用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDF和UDTF。

    9.2.2 自定义UDF函数(解析公共字段)

    img

    1)创建一个maven工程:hivefunction 2)创建包名:com.kgg.udf 3)在pom.xml文件中添加如下内容

    <properties>
       <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
       <hive.version>1.2.1</hive.version>
    </properties>

    <dependencies>
       <!--添加hive依赖-->
       <dependency>
           <groupId>org.apache.hive</groupId>
           <artifactId>hive-exec</artifactId>
           <version>${hive.version}</version>
       </dependency>
    </dependencies>

    <build>
       <plugins>
           <plugin>
               <artifactId>maven-compiler-plugin</artifactId>
               <version>2.3.2</version>
               <configuration>
                   <source>1.8</source>
                   <target>1.8</target>
               </configuration>
           </plugin>
           <plugin>
               <artifactId>maven-assembly-plugin</artifactId>
               <configuration>
                   <descriptorRefs>
                       <descriptorRef>jar-with-dependencies</descriptorRef>
                   </descriptorRefs>
               </configuration>
               <executions>
                   <execution>
                       <id>make-assembly</id>
                       <phase>package</phase>
                       <goals>
                           <goal>single</goal>
                       </goals>
                   </execution>
               </executions>
           </plugin>
       </plugins>
    </build>

    4)UDF用于解析公共字段

    package com.kgg.udf;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.json.JSONException;
    import org.json.JSONObject;

    public class BaseFieldUDF extends UDF {

       public String evaluate(String line, String key) throws JSONException {

           // 1 处理line   服务器时间 | json
           String[] log = line.split("\|");

           //2 合法性校验
           if (log.length != 2 || StringUtils.isBlank(log[1])) {
               return "";
          }

           // 3 开始处理json
           JSONObject baseJson = new JSONObject(log[1].trim());

           String result = "";

           // 4 根据传进来的key查找相应的value
           if ("et".equals(key)) {
               if (baseJson.has("et")) {
                   result = baseJson.getString("et");
              }
          } else if ("st".equals(key)) {
               result = log[0].trim();
          } else {
               JSONObject cm =“” baseJson.getJSONObject("cm");
               if (cm.has(key)) {
                   result = cm.getString(key);
              }
          }
           return result;
      }

       public static void main(String[] args) throws JSONException {

           String line = "1541217850324|{"cm":{"mid":"m7856","uid":"u8739","ln":"-74.8","sv":"V2.2.2","os":"8.1.3","g":"P7XC9126@gmail.com","nw":"3G","l":"es","vc":"6","hw":"640*960","ar":"MX","t":"1541204134250","la":"-31.7","md":"huawei-17","vn":"1.1.2","sr":"O","ba":"Huawei"},"ap":"weather","et":[{"ett":"1541146624055","en":"display","kv":{"goodsid":"n4195","copyright":"ESPN","content_provider":"CNN","extend2":"5","action":"2","extend1":"2","place":"3","showtype":"2","category":"72","newstype":"5"}},{"ett":"1541213331817","en":"loading","kv":{"extend2":"","loading_time":"15","action":"3","extend1":"","type1":"","type":"3","loading_way":"1"}},{"ett":"1541126195645","en":"ad","kv":{"entry":"3","show_style":"0","action":"2","detail":"325","source":"4","behavior":"2","content":"1","newstype":"5"}},{"ett":"1541202678812","en":"notification","kv":{"ap_time":"1541184614380","action":"3","type":"4","content":""}},{"ett":"1541194686688","en":"active_background","kv":{"active_source":"3"}}]}";
           String x = new BaseFieldUDF().evaluate(line, "mid");
           System.out.println(x);
      }
    }

    注意:使用main函数主要用于模拟数据测试。

    9.2.3 自定义UDTF函数(解析具体事件字段)

    img 1)创建包名:com.kgg.udtf 2)在com.kgg.udtf包下创建类名:EventJsonUDTF 3)用于展开业务字段

    package com.kgg.udtf;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONException;

    import java.util.ArrayList;

    public class EventJsonUDTF extends GenericUDTF {

       //该方法中,我们将指定输出参数的名称和参数类型:
       @Override
       public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {

           ArrayList<String> fieldNames = new ArrayList<String>();
           ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

           fieldNames.add("event_name");
           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
           fieldNames.add("event_json");
           fieldOIs.add(PrimitiveObjectInspectorFactor    y.javaStringObjectInspector);

           return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
      }

       //输入1条记录,输出若干条结果
       @Override
       public void process(Object[] objects) throws HiveException {

           // 获取传入的et
           String input = objects[0].toString();

           // 如果传进来的数据为空,直接返回过滤掉该数据
           if (StringUtils.isBlank(input)) {
               return;
          } else {

               try {
                   // 获取一共有几个事件(ad/facoriters)
                   JSONArray ja = new JSONArray(input);

                   if (ja == null)
                       return;

                   // 循环遍历每一个事件
                   for (int i = 0; i < ja.length(); i++) {
                       String[] result = new String[2];

                       try {
                           // 取出每个的事件名称(ad/facoriters)
                           result[0] = ja.getJSONObject(i).getString("en");

                           // 取出每一个事件整体
                           result[1] = ja.getString(i);
                      } catch (JSONException e) {
                           continue;
                      }

                       // 将结果返回
                       forward(result);
                  }
              } catch (JSONException e) {
                   e.printStackTrace();
              }
          }
      }

       //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
       @Override
       public void close() throws HiveException {

      }
    }

    2)打包 3)将hivefunction-1.0-SNAPSHOT.jar上传到HDFS上的/user/hive/jars路径下 4)创建永久函数与开发好的java class关联

    create function base_analizer as 'com.kgg.udf.BaseFieldUDF' using jar 'hdfs://hadoop101:9000/user/hive/jars/hive-func-1.0-SNAPSHOT.jar';

    create function flat_analizer as 'com.kgg.udtf.EventJsonUDTF' using jar 'hdfs://hadoop101:9000/user/hive/jars/hive-func-1.0-SNAPSHOT.jar';

    9.2.4 解析事件日志基础明细表

    1)解析事件日志基础明细表

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_base_event_log partition(dt='2020-10-14')
    select
      base_analizer(line,'mid') as mid_id,
      base_analizer(line,'uid') as user_id,
      base_analizer(line,'vc') as version_code,
      base_analizer(line,'vn') as version_name,
      base_analizer(line,'l') as lang,
      base_analizer(line,'sr') as source,
      base_analizer(line,'os') as os,
      base_analizer(line,'ar') as area,
      base_analizer(line,'md') as model,
      base_analizer(line,'ba') as brand,
      base_analizer(line,'sv') as sdk_version,
      base_analizer(line,'g') as gmail,
      base_analizer(line,'hw') as height_width,
      base_analizer(line,'t') as app_time,
      base_analizer(line,'nw') as network,
      base_analizer(line,'ln') as lng,
      base_analizer(line,'la') as lat,
      event_name,
      event_json,
      base_analizer(line,'st') as server_time
    from ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tmp_flat as event_name,event_json
    where dt='2020-10-14' and base_analizer(line,'et')<>'';

    2)测试

    select * from dwd_base_event_log limit 2;

    9.2.5 DWD层数据解析脚本

    1)创建脚本

    [kgg@hadoop102 bin]$ vim dwd_base_log.sh
      在脚本中编写如下内容
    #!/bin/bash

    # 定义变量方便修改
    APP=gmall
    hive=/opt/module/hive/bin/hive

    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
       do_date=$1
    else
       do_date=`date -d "-1 day" +%F`  
    fi
    echo "===日志日期为 $do_date==="
    sql="
    use gmall;
    insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date')
    select
      base_analizer(line,'mid') as mid_id,
      base_analizer(line,'uid') as user_id,
      base_analizer(line,'vc') as version_code,
      base_analizer(line,'vn') as version_name,
      base_analizer(line,'l') as lang,
      base_analizer(line,'sr') as source,
      base_analizer(line,'os') as os,
      base_analizer(line,'ar') as area,
      base_analizer(line,'md') as model,
      base_analizer(line,'ba') as brand,
      base_analizer(line,'sv') as sdk_version,
      base_analizer(line,'g') as gmail,
      base_analizer(line,'hw') as height_width,
      base_analizer(line,'t') as app_time,
      base_analizer(line,'nw') as network,
      base_analizer(line,'ln') as lng,
      base_analizer(line,'la') as lat,
      event_name,
      event_json,
      base_analizer(line,'st') as server_time
    from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as event_name,event_json
    where dt='$do_date' and base_analizer(line,'et')<>'';
    "

    $hive -e "$sql"

    2)增加脚本执行权限

    chmod 777 dwd_base_log.sh

    3)脚本使用

    dwd_base_log.sh 2019-02-11

    4)查询导入结果

    select * from dwd_base_event_log where dt='2019-02-11' limit 2;

    5)脚本执行时间

    企业开发中一般在每日凌晨30分~1点

    9.3 DWD层事件表获取

    img

    9.3.1 商品点击表

    img

    1)建表语句

    drop table if exists dwd_display_log;
    CREATE EXTERNAL TABLE dwd_display_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `goodsid` string,
    `place` string,
    `extend1` string,
    `category` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_display_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_display_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.place') place,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='display';

    3)测试

    select * from dwd_display_log limit 2;

    9.3.2 商品详情页表

    1)建表语句

    drop table if exists dwd_newsdetail_log;
    CREATE EXTERNAL TABLE dwd_newsdetail_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,  
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `action` string,
    `goodsid` string,
    `showtype` string,
    `news_staytime` string,
    `loading_time` string,
    `type1` string,
    `category` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_newsdetail_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_newsdetail_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.goodsid') goodsid,
    get_json_object(event_json,'$.kv.showtype') showtype,
    get_json_object(event_json,'$.kv.news_staytime') news_staytime,
    get_json_object(event_json,'$.kv.loading_time') loading_time,
    get_json_object(event_json,'$.kv.type1') type1,
    get_json_object(event_json,'$.kv.category') category,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='newsdetail';

    3)测试

    select * from dwd_newsdetail_log limit 2;

    9.3.3 商品列表页表

    1)建表语句

    drop table if exists dwd_loading_log;
    CREATE EXTERNAL TABLE dwd_loading_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `loading_time` string,
    `loading_way` string,
    `extend1` string,
    `extend2` string,
    `type` string,
    `type1` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_loading_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_loading_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.loading_time') loading_time,
    get_json_object(event_json,'$.kv.loading_way') loading_way,
    get_json_object(event_json,'$.kv.extend1') extend1,
    get_json_object(event_json,'$.kv.extend2') extend2,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.type1') type1,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='loading';

    3)测试

    select * from dwd_loading_log limit 2;

    9.3.4 广告表

    1)建表语句

    drop table if exists dwd_ad_log;
    CREATE EXTERNAL TABLE dwd_ad_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `entry` string,
    `action` string,
    `content` string,
    `detail` string,
    `ad_source` string,
    `behavior` string,
    `newstype` string,
    `show_style` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_ad_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_ad_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.entry') entry,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.content') content,
    get_json_object(event_json,'$.kv.detail') detail,
    get_json_object(event_json,'$.kv.source') ad_source,
    get_json_object(event_json,'$.kv.behavior') behavior,
    get_json_object(event_json,'$.kv.newstype') newstype,
    get_json_object(event_json,'$.kv.show_style') show_style,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='ad';

    3)测试

    select * from dwd_ad_log limit 2;

    9.3.5 消息通知表

    1)建表语句

    drop table if exists dwd_notification_log;
    CREATE EXTERNAL TABLE dwd_notification_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `action` string,
    `noti_type` string,
    `ap_time` string,
    `content` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_notification_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_notification_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.action') action,
    get_json_object(event_json,'$.kv.noti_type') noti_type,
    get_json_object(event_json,'$.kv.ap_time') ap_time,
    get_json_object(event_json,'$.kv.content') content,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='notification';

    3)测试

    select * from dwd_notification_log limit 2;

    9.3.6 用户前台活跃表

    1)建表语句

    drop table if exists dwd_active_foreground_log;
    CREATE EXTERNAL TABLE dwd_active_foreground_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `push_id` string,
    `access` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_foreground_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_active_foreground_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.push_id') push_id,
    get_json_object(event_json,'$.kv.access') access,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='active_foreground';

    3)测试

    select * from dwd_active_foreground_log limit 2;

    9.3.7 用户后台活跃表

    1)建表语句

    drop table if exists dwd_active_background_log;
    CREATE EXTERNAL TABLE dwd_active_background_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `active_source` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_background_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_active_background_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.active_source') active_source,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='active_background';

    3)测试

    select * from dwd_active_background_log limit 2;

    9.3.8 评论表

    1)建表语句

    drop table if exists dwd_comment_log;
    CREATE EXTERNAL TABLE dwd_comment_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `comment_id` int,
    `userid` int,
    `p_comment_id` int,
    `content` string,
    `addtime` string,
    `other_id` int,
    `praise_count` int,
    `reply_count` int,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_comment_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_comment_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.comment_id') comment_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
    get_json_object(event_json,'$.kv.content') content,
    get_json_object(event_json,'$.kv.addtime') addtime,
    get_json_object(event_json,'$.kv.other_id') other_id,
    get_json_object(event_json,'$.kv.praise_count') praise_count,
    get_json_object(event_json,'$.kv.reply_count') reply_count,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='comment';

    3)测试

    select * from dwd_comment_log limit 2;

    9.3.9 收藏表

    1)建表语句

    drop table if exists dwd_favorites_log;
    CREATE EXTERNAL TABLE dwd_favorites_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `id` int,
    `course_id` int,
    `userid` int,
    `add_time` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_favorites_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_favorites_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.course_id') course_id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='favorites';

    3)测试

     select * from dwd_favorites_log limit 2;

    9.3.10 点赞表

    1)建表语句

    drop table if exists dwd_praise_log;
    CREATE EXTERNAL TABLE dwd_praise_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `id` string,
    `userid` string,
    `target_id` string,
    `type` string,
    `add_time` string,
    `server_time` string
    )
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_praise_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_praise_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.id') id,
    get_json_object(event_json,'$.kv.userid') userid,
    get_json_object(event_json,'$.kv.target_id') target_id,
    get_json_object(event_json,'$.kv.type') type,
    get_json_object(event_json,'$.kv.add_time') add_time,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='praise';

    3)测试

    select * from dwd_praise_log limit 2;

    9.3.11 错误日志表

    1)建表语句

    drop table if exists dwd_error_log;
    CREATE EXTERNAL TABLE dwd_error_log(
    `mid_id` string,
    `user_id` string,
    `version_code` string,
    `version_name` string,
    `lang` string,
    `source` string,
    `os` string,
    `area` string,
    `model` string,
    `brand` string,
    `sdk_version` string,
    `gmail` string,
    `height_width` string,  
    `app_time` string,
    `network` string,
    `lng` string,
    `lat` string,
    `errorBrief` string,
    `errorDetail` string,
    `server_time` string)
    PARTITIONED BY (dt string)
    stored as parquet
    location '/warehouse/gmall/dwd/dwd_error_log/'
    TBLPROPERTIES('parquet.compression'='lzo');

    2)导入数据

    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table dwd_error_log
    PARTITION (dt='2020-10-14')
    select
    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source,
    os,
    area,
    model,
    brand,
    sdk_version,
    gmail,
    height_width,
    app_time,
    network,
    lng,
    lat,
    get_json_object(event_json,'$.kv.errorBrief') errorBrief,
    get_json_object(event_json,'$.kv.errorDetail') errorDetail,
    server_time
    from dwd_base_event_log
    where dt='2020-10-14' and event_name='error';

    3)测试

    select * from dwd_error_log limit 2;

    9.3.12 DWD层事件表加载数据脚本

    1)创建脚本

    vim dwd_event_log.sh
      在脚本中编写如下内容
    #!/bin/bash

    # 定义变量方便修改
    APP=gmall
    hive=/opt/module/hive/bin/hive

    # 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
    if [ -n "$1" ] ;then
       do_date=$1
    else
       do_date=`date -d "-1 day" +%F`  
    fi
    echo "===日志日期为 $do_date==="
    sql="
    set hive.exec.dynamic.partition.mode=nonstrict;

    insert overwrite table "$APP".dwd_display_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.action') action,
      get_json_object(event_json,'$.kv.goodsid') goodsid,
      get_json_object(event_json,'$.kv.place') place,
      get_json_object(event_json,'$.kv.extend1') extend1,
      get_json_object(event_json,'$.kv.category') category,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='display';


    insert overwrite table "$APP".dwd_newsdetail_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.entry') entry,
      get_json_object(event_json,'$.kv.action') action,
      get_json_object(event_json,'$.kv.goodsid') goodsid,
      get_json_object(event_json,'$.kv.showtype') showtype,
      get_json_object(event_json,'$.kv.news_staytime') news_staytime,
      get_json_object(event_json,'$.kv.loading_time') loading_time,
      get_json_object(event_json,'$.kv.type1') type1,
      get_json_object(event_json,'$.kv.category') category,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='newsdetail';


    insert overwrite table "$APP".dwd_loading_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.action') action,
      get_json_object(event_json,'$.kv.loading_time') loading_time,
      get_json_object(event_json,'$.kv.loading_way') loading_way,
      get_json_object(event_json,'$.kv.extend1') extend1,
      get_json_object(event_json,'$.kv.extend2') extend2,
      get_json_object(event_json,'$.kv.type') type,
      get_json_object(event_json,'$.kv.type1') type1,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='loading';


    insert overwrite table "$APP".dwd_ad_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.entry') entry,
      get_json_object(event_json,'$.kv.action') action,
      get_json_object(event_json,'$.kv.content') content,
      get_json_object(event_json,'$.kv.detail') detail,
      get_json_object(event_json,'$.kv.source') ad_source,
      get_json_object(event_json,'$.kv.behavior') behavior,
      get_json_object(event_json,'$.kv.newstype') newstype,
      get_json_object(event_json,'$.kv.show_style') show_style,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='ad';


    insert overwrite table "$APP".dwd_notification_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.action') action,
      get_json_object(event_json,'$.kv.noti_type') noti_type,
      get_json_object(event_json,'$.kv.ap_time') ap_time,
      get_json_object(event_json,'$.kv.content') content,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='notification';


    insert overwrite table "$APP".dwd_active_foreground_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
    get_json_object(event_json,'$.kv.push_id') push_id,
    get_json_object(event_json,'$.kv.access') access,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='active_foreground';


    insert overwrite table "$APP".dwd_active_background_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.active_source') active_source,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='active_background';


    insert overwrite table "$APP".dwd_comment_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.comment_id') comment_id,
      get_json_object(event_json,'$.kv.userid') userid,
      get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
      get_json_object(event_json,'$.kv.content') content,
      get_json_object(event_json,'$.kv.addtime') addtime,
      get_json_object(event_json,'$.kv.other_id') other_id,
      get_json_object(event_json,'$.kv.praise_count') praise_count,
      get_json_object(event_json,'$.kv.reply_count') reply_count,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='comment';


    insert overwrite table "$APP".dwd_favorites_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.id') id,
      get_json_object(event_json,'$.kv.course_id') course_id,
      get_json_object(event_json,'$.kv.userid') userid,
      get_json_object(event_json,'$.kv.add_time') add_time,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='favorites';


    insert overwrite table "$APP".dwd_praise_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.id') id,
      get_json_object(event_json,'$.kv.userid') userid,
      get_json_object(event_json,'$.kv.target_id') target_id,
      get_json_object(event_json,'$.kv.type') type,
      get_json_object(event_json,'$.kv.add_time') add_time,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='praise';


    insert overwrite table "$APP".dwd_error_log
    PARTITION (dt='$do_date')
    select
      mid_id,
      user_id,
      version_code,
      version_name,
      lang,
       source,
      os,
      area,
      model,
      brand,
      sdk_version,
      gmail,
      height_width,
      app_time,
      network,
      lng,
      lat,
      get_json_object(event_json,'$.kv.errorBrief') errorBrief,
      get_json_object(event_json,'$.kv.errorDetail') errorDetail,
      server_time
    from "$APP".dwd_base_event_log
    where dt='$do_date' and event_name='error';
    "

    $hive -e "$sql"

    2)增加脚本执行权限

     chmod 777 dwd_event_log.sh

    3)脚本使用

     dwd_event_log.sh 2019-02-11

    4)查询导入结果

    select * from dwd_comment_log where dt='2019-02-11' limit 2;

    5)脚本执行时间

    企业开发中一般在每日凌晨30分~1点

    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    WPF / Win Form:多线程去修改或访问UI线程数据的方法( winform 跨线程访问UI控件 )
    TCP 流模式与UDP数据报模式(转)
    hibernate 检索方式
    【UVA】1449-Dominating Patterns(AC自己主动机)
    软件项目工作流程图
    iOS7 UIKit动力学-碰撞特性UICollisionBehavior 下
    东莞无人工厂变成现实,中国无人工厂将非常快普及,保住世界工厂地位
    小米手机与魅族的PK战结果 说明了什么
    python Debug 单步调试
    合并两个排序的单链表
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/14299316.html
Copyright © 2011-2022 走看看