zoukankan      html  css  js  c++  java
  • hive学习笔记之十一:UDTF

    欢迎访问我的GitHub

    https://github.com/zq2599/blog_demos

    内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

    本篇概览

    1. 本文是《hive学习笔记》系列的第十一篇,截至目前,一进一出的UDF、多进一出的UDAF咱们都学习过了,最后还有一进多出的UDTF留在本篇了,这也是本篇的主要内容;
    2. 一进多出的UDTF,名为用户自定义表生成函数(User-Defined Table-Generating Functions, UDTF);
    3. 前面的文章中,咱们曾经体验过explode就是hive内置的UDTF:
    hive> select explode(address) from t3;
    OK
    province	guangdong
    city	shenzhen
    province	jiangsu
    city	nanjing
    Time taken: 0.081 seconds, Fetched: 4 row(s)
    
    1. 本篇的UDTF一共有两个实例:把一列拆成多列、把一列拆成多行(每行多列);
    2. 接下来开始实战;

    源码下载

    1. 如果您不想编码,可以在GitHub下载所有源码,地址和链接信息如下表所示:
    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
    1. 这个git项目中有多个文件夹,本章的应用在hiveudf文件夹下,如下图红框所示:

    在这里插入图片描述

    准备工作

    为了验证UDTF的功能,咱们要先把表和数据都准备好:

    1. 新建名为t16的表:
    create table t16(
    person_name  string,
    string_field string
    )
    row format delimited 
    fields terminated by '|'
    stored as textfile;
    
    1. 本地新建文本文件016.txt,内容如下:
    tom|1:province:guangdong
    jerry|2:city:shenzhen
    john|3
    
    1. 导入数据:
    load data 
    local inpath '/home/hadoop/temp/202010/25/016.txt' 
    overwrite into table t16;
    
    1. 数据准备完毕,开始编码;

    UDTF开发的关键点

    1. 需要继承GenericUDTF类;
    2. 重写initialize方法,该方法的入参只有一个,类型是StructObjectInspector,从这里可以取得UDTF作用了几个字段,以及字段类型;
    3. initialize的返回值是StructObjectInspector类型,UDTF生成的每个列的名称和类型都设置到返回值中;
    4. 重写process方法,该方法中是一进多出的逻辑代码,把每个列的数据准备好放在数组中,执行一次forward方法,就是一行记录;
    5. close方法不是必须的,如果业务逻辑执行完毕,可以将释放资源的代码放在这里执行;
    6. 接下来,就按照上述关键点开发UDTF;

    一列拆成多列

    • 接下来要开发的UDTF,名为udf_wordsplitsinglerow,作用是将入参拆分成多个列;
    • 下图红框中是t16表的一条原始记录的string_field字段,会被udf_wordsplitsinglerow处理:

    在这里插入图片描述

    • 上面红框中的字段被UDTF处理处理后,一列变成了三列,每一列的名称如下图黄框所示,每一列的值如红框所示:

    在这里插入图片描述

    • 以上就是咱们马上就要开发的功能;
    • 打开前文创建的hiveudf工程,新建WordSplitSingleRow.java:
    package com.bolingcavalry.hiveudf.udtf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Description: 把指定字段拆成多列
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2020/11/5 14:43
     */
    public class WordSplitSingleRow extends GenericUDTF {
    
        private PrimitiveObjectInspector stringOI = null;
    
        private final static String[] EMPTY_ARRAY = {"NULL", "NULL", "NULL"};
    
        /**
         * 一列拆成多列的逻辑在此
         * @param args
         * @throws HiveException
         */
        @Override
        public void process(Object[] args) throws HiveException {
    
            String input = stringOI.getPrimitiveJavaObject(args[0]).toString();
    
            // 无效字符串
            if(StringUtils.isBlank(input)) {
                forward(EMPTY_ARRAY);
            } else {
    
                // 分割字符串
                String[] array = input.split(":");
    
                // 如果字符串数组不合法,就返回原始字符串和错误提示
                if(null==array || array.length<3) {
                    String[] errRlt = new String[3];
                    errRlt[0] = input;
                    errRlt[1] = "can not split to valid array";
                    errRlt[2] = "-";
    
                    forward(errRlt);
                } else {
                    forward(array);
                }
            }
        }
    
        /**
         * 释放资源在此执行,本例没有资源需要释放
         * @throws HiveException
         */
        @Override
        public void close() throws HiveException {
    
        }
    
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
    
            // 当前UDTF只处理一个参数,在此判断传入的是不是一个参数
            if (1 != inputFields.size()) {
                throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
            }
    
            // 此UDTF只处理字符串类型
            if(!Category.PRIMITIVE.equals(inputFields.get(0).getFieldObjectInspector().getCategory())) {
                throw new UDFArgumentException("ExplodeMap takes string as a parameter");
            }
    
            stringOI = (PrimitiveObjectInspector)inputFields.get(0).getFieldObjectInspector();
    
            //列名集合
            ArrayList<String> fieldNames = new ArrayList<String>();
    
            //列对应的value值
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    
            // 第一列的列名
            fieldNames.add("id");
            // 第一列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            // 第二列的列名
            fieldNames.add("key");
            // 第二列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            // 第三列的列名
            fieldNames.add("value");
            // 第三列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    }
    
    
    • 上述代码中的重点是process方法,取得入参后用冒号分割字符串,得到数组,再调用forward方法,就生成了一行记录,该记录有三列;

    验证UDTF

    接下来将WordSplitSingleRow.java部署成临时函数并验证;

    1. 编码完成后,在pom.xml所在目录执行命令mvn clean package -U
    2. 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
    3. 将jar下载到hive服务器,我这里放在此目录:/home/hadoop/udf/
    4. 在hive会话模式执行以下命令添加本地jar:
    add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
    
    1. 部署临时函数:
    create temporary function udf_wordsplitsinglerow as 'com.bolingcavalry.hiveudf.udtf.WordSplitSingleRow';
    
    1. 执行以下SQL验证:
    select udf_wordsplitsinglerow(string_field) from t16;
    
    1. 结果如下,可见每一行记录的string_field字段都被分割成了id、key、value三个字段:
    hive> select udf_wordsplitsinglerow(string_field) from t16;
    OK
    id	key	value
    1	province	guangdong
    2	city	shenzhen
    3	can not split to valid array	-
    Time taken: 0.066 seconds, Fetched: 3 row(s)
    

    关键点要注意

    • 值得注意的是,UDTF不能和其他字段同时出现在select语句中,例如以下的SQL会执行失败:
    select person_name,udf_wordsplitsinglerow(string_field) from t16;
    
    • 错误信息如下:
    hive> select person_name,udf_wordsplitsinglerow(string_field) from t16;
    FAILED: SemanticException [Error 10081]: UDTF's are not supported outside the SELECT clause, nor nested in expressions
    
    • 如果希望得到UDTF和其他字段的结果,可以使用LATERAL VIEW语法,完整SQL如下:
    select t.person_name, udtf_id, udtf_key, udtf_value
    from (
        select person_name, string_field 
        from  t16
    ) t LATERAL VIEW udf_wordsplitsinglerow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
    
    • 查询结果如下,可见指定字段和UDTF都能显示:
    hive> select t.person_name, udtf_id, udtf_key, udtf_value
        > from (
        >     select person_name, string_field 
        >     from  t16
        > ) t LATERAL VIEW udf_wordsplitsinglerow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
    OK
    t.person_name	udtf_id	udtf_key	udtf_value
    tom	1	province	guangdong
    jerry	2	city	shenzhen
    john	3	can not split to valid array	-
    Time taken: 0.122 seconds, Fetched: 3 row(s)
    

    一列拆成多行(每行多列)

    • 前面咱们试过了将string_field字段拆分成idkeyvalue三个字段,不过拆分后总行数还是不变,接下来的UDTF,是把string_field拆分成多条记录,然后每条记录都有三个字段;
    • 需要导入新的数据到t16表,新建文本文件016_multi.txt,内容如下:
    tom|1:province:guangdong,4:city:yangjiang
    jerry|2:city:shenzhen
    john|3
    
    • 在hive会话窗口执行以下命令,会用016_multi.txt的内容覆盖t16表已有内容:
    load data 
    local inpath '/home/hadoop/temp/202010/25/016_multi.txt' 
    overwrite into table t16;
    
    • 此时的数据如下图所示,红框中是一条记录的string_field字段值,咱们接下来要开发的UDTF,会先用逗号分隔,得到的就是1:province:guangdong4:city:yangjiang这两个字符串,接下来对每个字符串用冒号分隔,就会得到两条idkeyvalue这样的记录,也就是多行多列:

    在这里插入图片描述

    • 预期中的UDTF结果如下图所示,红框和黄框这两条记录都来自一条记录的string_field字段值:

    在这里插入图片描述

    • 接下来开始编码,新建WordSplitMultiRow.java,代码如下,可见和WordSplitSingleRow的差异仅在process方法,WordSplitMultiRow的process中执行了多次forward,因此有了多条记录:
    package com.bolingcavalry.hiveudf.udtf;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Description: 把指定字段拆成多行,每行有多列
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2020/11/5 14:43
     */
    public class WordSplitMultiRow extends GenericUDTF {
    
        private PrimitiveObjectInspector stringOI = null;
    
    
        private final static String[] EMPTY_ARRAY = {"NULL", "NULL", "NULL"};
    
        /**
         * 一列拆成多列的逻辑在此
         * @param args
         * @throws HiveException
         */
        @Override
        public void process(Object[] args) throws HiveException {
            String input = stringOI.getPrimitiveJavaObject(args[0]).toString();
    
            // 无效字符串
            if(StringUtils.isBlank(input)) {
                forward(EMPTY_ARRAY);
            } else {
    
                // 用逗号分隔
                String[] rowArray = input.split(",");
    
                // 处理异常
                if(null==rowArray || rowArray.length<1) {
                    String[] errRlt = new String[3];
                    errRlt[0] = input;
                    errRlt[1] = "can not split to valid row array";
                    errRlt[2] = "-";
    
                    forward(errRlt);
                } else {
                    // rowArray的每个元素,都是"id:key:value"这样的字符串
                    for(String singleRow : rowArray) {
    
                        // 要确保字符串有效
                        if(StringUtils.isBlank(singleRow)) {
                            forward(EMPTY_ARRAY);
                        } else {
                            // 分割字符串
                            String[] array = singleRow.split(":");
    
                            // 如果字符串数组不合法,就返回原始字符串和错误提示
                            if(null==array || array.length<3) {
                                String[] errRlt = new String[3];
                                errRlt[0] = input;
                                errRlt[1] = "can not split to valid array";
                                errRlt[2] = "-";
    
                                forward(errRlt);
                            } else {
                                forward(array);
                            }
                        }
                    }
    
                }
            }
        }
    
        /**
         * 释放资源在此执行,本例没有资源需要释放
         * @throws HiveException
         */
        @Override
        public void close() throws HiveException {
    
        }
    
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    
            List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
    
            // 当前UDTF只处理一个参数,在此判断传入的是不是一个参数
            if (1 != inputFields.size()) {
                throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
            }
    
            // 此UDTF只处理字符串类型
            if(!Category.PRIMITIVE.equals(inputFields.get(0).getFieldObjectInspector().getCategory())) {
                throw new UDFArgumentException("ExplodeMap takes string as a parameter");
            }
    
            stringOI = (PrimitiveObjectInspector)inputFields.get(0).getFieldObjectInspector();
    
            //列名集合
            ArrayList<String> fieldNames = new ArrayList<String>();
    
            //列对应的value值
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    
            // 第一列的列名
            fieldNames.add("id");
            // 第一列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            // 第二列的列名
            fieldNames.add("key");
            // 第二列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            // 第三列的列名
            fieldNames.add("value");
            // 第三列的inspector类型为string型
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    }
    

    验证UDTF

    接下来将WordSplitMultiRow.java部署成临时函数并验证;

    1. 编码完成后,在pom.xml所在目录执行命令mvn clean package -U
    2. 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
    3. 将jar下载到hive服务器,我这里放在此目录:/home/hadoop/udf/
    4. 如果还在同一个hive会话模式,需要先清理掉之前的jar和函数:
    drop temporary function if exists udf_wordsplitsinglerow;
    delete jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
    
    1. 在hive会话模式执行以下命令添加本地jar:
    add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
    
    1. 部署临时函数:
    create temporary function udf_wordsplitmultirow as 'com.bolingcavalry.hiveudf.udtf.WordSplitMultiRow';
    
    1. 执行以下SQL验证:
    select udf_wordsplitmultirow(string_field) from t16;
    
    1. 结果如下,可见每一行记录的string_field字段都被分割成了id、key、value三个字段:
    hive> select udf_wordsplitmultirow(string_field) from t16;
    OK
    id	key	value
    1	province	guangdong
    4	city	yangjiang
    2	city	shenzhen
    3	can not split to valid array	-
    Time taken: 0.041 seconds, Fetched: 4 row(s)
    
    1. LATERAL VIEW语法尝试将其他字段也查出来,SQL如下:
    select t.person_name, udtf_id, udtf_key, udtf_value
    from (
        select person_name, string_field 
        from  t16
    ) t LATERAL VIEW udf_wordsplitmultirow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
    
    1. 结果如下,符合预期:
    hive> select t.person_name, udtf_id, udtf_key, udtf_value
        > from (
        >     select person_name, string_field 
        >     from  t16
        > ) t LATERAL VIEW udf_wordsplitmultirow(t.string_field) v as  udtf_id, udtf_key, udtf_value;
    OK
    t.person_name	udtf_id	udtf_key	udtf_value
    tom	1	province	guangdong
    tom	4	city	yangjiang
    jerry	2	city	shenzhen
    john	3	can not split to valid array	-
    Time taken: 0.056 seconds, Fetched: 4 row(s)
    
    • 至此,HIVE的三种用户自定义函数咱们都学习和实践完成了,希望这些内容能给您的实践带来一些参考;

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
    https://github.com/zq2599/blog_demos

  • 相关阅读:
    css3 文本超出后出现省略号
    Bootstrap 开关(switch)控件需要注意的问题
    angularJs 使用中遇到的问题小结【一:关于传参】
    ctrl+enter提交留言
    div a块状布局
    模态框 快速选定合适的布局
    ionic 项目分享No.2——简化版【转】
    jquery判断div是否显示或者隐藏
    phpsotrm 设置命名空间
    win10 cmd 替换 powershell
  • 原文地址:https://www.cnblogs.com/bolingcavalry/p/14998215.html
Copyright © 2011-2022 走看看