zoukankan      html  css  js  c++  java
  • 自定义UDF,UDTF函数

    为什么需要自定义UDF, UDTF函数

    因为一般event_log的json数据数据较复杂,处理event_log时需要先利用UDF, UDTF函数对其提取操作,

    然后导入至dwd_base_event_log表,再根据en将kv数据导入对应的event表

    event json数据样本:

    1613956146081|
            {"cm":
                {"ln":"-69.2","sv":"V2.3.0","os":"8.1.1","g":"G0MLZ2K4@gmail.com","mid":"1","nw":"4G","l":"pt","vc":"10","hw":"1080*1920","ar":"MX","uid":"1","t":"1613861466406","la":"11.9","md":"sumsung-4","vn":"1.0.9","ba":"Sumsung","sr":"V"},
    
             "ap":"app",
             "et":[{"ett":"1613872348614",
                    "en":"newsdetail",
                    "kv":{"entry":"3","goodsid":"0","news_staytime":"24","loading_time":"0","action":"2","showtype":"1","category":"9","type1":""}},
    
                 {"ett":"1613873812304",
                    "en":"loading",
                     "kv":{"extend2":"","loading_time":"6","action":"3","extend1":"","type":"1","type1":"325","loading_way":"2"}},
                   ]}

    json数据分成四大部分:时间戳,cm,ap,et

    UDF,UDTF功能:处理复杂的json数据

    UDF:返回四大部分任一部分,简称一对一

    UDTF:处理UDF返回的et,返回时间戳,en, et[i], 简述为一对多

    创建maven工程

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.ldy</groupId>
        <artifactId>hivefunc</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <hive.version>2.3.0</hive.version>
        </properties>
    
        <dependencies>
            <!--添加 hive 依赖-->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>${hive.version}</version>
            </dependency>
        </dependencies>
    
        <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
    
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    </project>

    自定义UDF:

    package com.ldy.udf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDF;
    import org.json.JSONException;
    import org.json.JSONObject;
    
    /**
     * UDF函数:一行进一行出
     */
    public class BaseFieldUDF extends UDF {
    
        public String evaluate(String line, String key) throws JSONException {
    
            // 1 处理line   服务器时间 | json
            String[] log = line.split("\|");
    
            //2 合法性校验;isBlank会检测字符串是否为" ",为null,为“”
            if (log.length != 2 || StringUtils.isBlank(log[1])) {
                return "";
            }
    
            // 3 开始处理json,删除头尾空白字符串
            JSONObject baseJson = new JSONObject(log[1].trim());
    
            String result = "";
    
            // 4 根据传进来的key查找相应的value
            if ("et".equals(key)) {
                if (baseJson.has("et")) {
                    //返回json数组
                    result = baseJson.getString("et");
                }
            } else if ("st".equals(key)) {
                //返回服务器时间
                result = log[0].trim();
            } else {
                //返回公共字段
                JSONObject cm = baseJson.getJSONObject("cm");
                if (cm.has(key)) {
                    result = cm.getString(key);
                }
            }
    
            return result;
        }
    }

    自定义UDTF:

    package com.ldy.udtf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONException;
    
    import java.util.ArrayList;
    
    public class EventJsonUDTF extends GenericUDTF {
    
        //该方法中,我们将指定输出参数的名称和参数类型:
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            ArrayList<String> fieldNames = new ArrayList<String>();
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    
            //指定返回名称和返回类型
            fieldNames.add("event_name");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldNames.add("event_json");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        //输入1条记录,输出若干条结果
        @Override
        public void process(Object[] objects) throws HiveException {
    
            // 获取传入的et
            String input = objects[0].toString();
    
            // 如果传进来的数据为空,直接返回过滤掉该数据
            if (StringUtils.isBlank(input)) {
                return;
            } else {
                try {
                    // 获取一共有几个事件(ad/facoriters)
                    JSONArray ja = new JSONArray(input);
    
                    if (ja.length() == 0)
                        return;
    
                    // 循环遍历每一个事件
                    for (int i = 0; i < ja.length(); i++) {
                        String[] result = new String[2];
    
                        try {
                            // 取出每个的事件名称(ad/facoriters)
                            result[0] = ja.getJSONObject(i).getString("en");
    
                            // 取出每一个事件整体
                            result[1] = ja.getString(i); //返回第i个整体的str
                        } catch (JSONException e) {
                            continue;
                        }
    
                        // 将结果返回
                        this.forward(result);
                    }
                } catch (JSONException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
        @Override
        public void close() throws HiveException {
    
        }
    
    }

    然后打包放入hdfs的 /user/hive/jars 路径下

    开启hive 关联jar (注意更换以下包名和路径)

    hive (gmall)> 
    create function base_analizer as 'com.atguigu.udf.BaseFieldUDF'
    using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';

    create function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF'
    using jar 'hdfs://hadoop102:9000/user/hive/jars/hivefunction-1.0-SNAPSHO T.jar';

    使用如下:

    #!/bin/bash
    hive=/apps/hive/bin/hive
    APP=ecdw
    
    if [[ -n "$1"  ]]; then
        do_date=$1
    else
        do_date=`date -d "-1 day" +%F`
    fi
    
    sql="
    use "$APP"; 
    insert overwrite table "$APP".dwd_base_event_log partition(dt='$do_date') 
    select base_analizer(line,'mid') as mid_id, 
    base_analizer(line,'uid') as user_id, 
    base_analizer(line,'vc') as version_code, 
    base_analizer(line,'vn') as version_name, 
    base_analizer(line,'l') as lang, 
    base_analizer(line,'sr') as source, 
    base_analizer(line,'os') as os, 
    base_analizer(line,'ar') as area,
    base_analizer(line,'md') as model, 
    base_analizer(line,'ba') as brand, 
    base_analizer(line,'sv') as sdk_version, 
    base_analizer(line,'g') as gmail, 
    base_analizer(line,'hw') as height_width, 
    base_analizer(line,'t') as app_time, 
    base_analizer(line,'nw') as network, 
    base_analizer(line,'ln') as lng, 
    base_analizer(line,'la') as lat, 
    event_name, 
    event_json, 
    base_analizer(line,'st') as server_time 
    from "$APP".ods_event_log 
    lateral view flat_analizer(base_analizer(line,'et')) tem_flat as event_name,event_json 
    where dt='$do_date' and base_analizer(line,'et')<>'';
    "
    $hive -e "$sql"

    流程如下:

    UDF base_analizer返回 et (json数组), UDTF  flat_analizer接收Object数组,Object[0] 为 et, 按业务处理返回 event_name(en) 和 et[i]

  • 相关阅读:
    Roads in the Kingdom CodeForces
    Vasya and Shifts CodeForces
    SOS dp
    Singer House CodeForces
    Codeforces Round #419 (Div. 1) (ABCD)
    在踏踏实实的生活里让自己坚持去做梦
    Replace Delegation with Inheritance
    Replace Inheritance with Delegation
    Replace Parameter with Methods
    Preserve Whole Object
  • 原文地址:https://www.cnblogs.com/ldy233/p/14435087.html
Copyright © 2011-2022 走看看