为什么需要自定义UDF, UDTF函数
因为一般event_log的json数据数据较复杂,处理event_log时需要先利用UDF, UDTF函数对其提取操作,
然后导入至dwd_base_event_log表,再根据en将kv数据导入对应的event表
event json数据样本:
1613956146081|
{"cm":
{"ln":"-69.2","sv":"V2.3.0","os":"8.1.1","g":"G0MLZ2K4@gmail.com","mid":"1","nw":"4G","l":"pt","vc":"10","hw":"1080*1920","ar":"MX","uid":"1","t":"1613861466406","la":"11.9","md":"sumsung-4","vn":"1.0.9","ba":"Sumsung","sr":"V"},
"ap":"app",
"et":[{"ett":"1613872348614",
"en":"newsdetail",
"kv":{"entry":"3","goodsid":"0","news_staytime":"24","loading_time":"0","action":"2","showtype":"1","category":"9","type1":""}},
{"ett":"1613873812304",
"en":"loading",
"kv":{"extend2":"","loading_time":"6","action":"3","extend1":"","type":"1","type1":"325","loading_way":"2"}},
]}
json数据分成四大部分:时间戳,cm,ap,et
UDF,UDTF功能:处理复杂的json数据
UDF:返回四大部分任一部分,简称一对一
UDTF:处理UDF返回的et,返回时间戳,en, et[i], 简述为一对多
创建maven工程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ldy</groupId> <artifactId>hivefunc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <hive.version>2.3.0</hive.version> </properties> <dependencies> <!--添加 hive 依赖--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
自定义UDF:
package com.ldy.udf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.json.JSONException; import org.json.JSONObject; /** * UDF函数:一行进一行出 */ public class BaseFieldUDF extends UDF { public String evaluate(String line, String key) throws JSONException { // 1 处理line 服务器时间 | json String[] log = line.split("\|"); //2 合法性校验;isBlank会检测字符串是否为" ",为null,为“” if (log.length != 2 || StringUtils.isBlank(log[1])) { return ""; } // 3 开始处理json,删除头尾空白字符串 JSONObject baseJson = new JSONObject(log[1].trim()); String result = ""; // 4 根据传进来的key查找相应的value if ("et".equals(key)) { if (baseJson.has("et")) { //返回json数组 result = baseJson.getString("et"); } } else if ("st".equals(key)) { //返回服务器时间 result = log[0].trim(); } else { //返回公共字段 JSONObject cm = baseJson.getJSONObject("cm"); if (cm.has(key)) { result = cm.getString(key); } } return result; } }
自定义UDTF:
package com.ldy.udtf; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.json.JSONArray; import org.json.JSONException; import java.util.ArrayList; public class EventJsonUDTF extends GenericUDTF { //该方法中,我们将指定输出参数的名称和参数类型: @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //指定返回名称和返回类型 fieldNames.add("event_name"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("event_json"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //输入1条记录,输出若干条结果 @Override public void process(Object[] objects) throws HiveException { // 获取传入的et String input = objects[0].toString(); // 如果传进来的数据为空,直接返回过滤掉该数据 if (StringUtils.isBlank(input)) { return; } else { try { // 获取一共有几个事件(ad/facoriters) JSONArray ja = new JSONArray(input); if (ja.length() == 0) return; // 循环遍历每一个事件 for (int i = 0; i < ja.length(); i++) { String[] result = new String[2]; try { // 取出每个的事件名称(ad/facoriters) result[0] = ja.getJSONObject(i).getString("en"); // 取出每一个事件整体 result[1] = ja.getString(i); //返回第i个整体的str } catch (JSONException e) { continue; } // 将结果返回 this.forward(result); } } catch (JSONException e) { e.printStackTrace(); } } } //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出 @Override public void close() throws HiveException { } }
然后打包放入hdfs的 /user/hive/jars 路径下
开启hive 关联jar (注意更换以下包名和路径)
hive (gmall)>
create function base_analizer as 'com.atguigu.udf.BaseFieldUDF'
using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';
create function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF'
using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';
使用如下:
#!/bin/bash hive=/apps/hive/bin/hive APP=ecdw if [[ -n "$1" ]]; then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" use "$APP"; insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date') select base_analizer(line,'mid') as mid_id, base_analizer(line,'uid') as user_id, base_analizer(line,'vc') as version_code, base_analizer(line,'vn') as version_name, base_analizer(line,'l') as lang, base_analizer(line,'sr') as source, base_analizer(line,'os') as os, base_analizer(line,'ar') as area, base_analizer(line,'md') as model, base_analizer(line,'ba') as brand, base_analizer(line,'sv') as sdk_version, base_analizer(line,'g') as gmail, base_analizer(line,'hw') as height_width, base_analizer(line,'t') as app_time, base_analizer(line,'nw') as network, base_analizer(line,'ln') as lng, base_analizer(line,'la') as lat, event_name, event_json, base_analizer(line,'st') as server_time from "$APP".ods_event_log lateral view flat_analizer(base_analizer(line,'et')) tem_flat as event_name,event_json where dt='$do_date' and base_analizer(line,'et')<>''; " $hive -e "$sql"
流程如下:
UDF base_analizer返回 et (json数组), UDTF flat_analizer接收Object数组,Object[0] 为 et, 按业务处理返回 event_name(en) 和 et[i]