zoukankan      html  css  js  c++  java
  • Hive中的UDF详解


    hive作为一个sql查询引擎,自带了一些基本的函数,比如count(计数),sum(求和),有时候这些基本函数满足不了我们的需求,这时候就要写hive hdf(user defined funation),又叫用户自定义函数。

    UDF 创建与使用步骤

    • 继承org.apache.hadoop.hive.ql.exec.UDF类,实现evaluate方法;
    • jar包上传到集群,通过create temporary function创建临时函数,不加temporary就创建了一个永久函数;
    • 通过select 语句使用;

    例一

    下面是一个判断hive表字段是否包含’100’这个子串的简单udf:

    package com.js.dataclean.hive.udf.hm2
    
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    public class IsContains100 extends UDF{
    
    	public String evaluate(String s){
    
            if(s == null || s.length() == 0){
            	return "0";
            }
    
            return s.contains("100") ? "1" : "0";
        }
    }
    

    使用maven将其打包,进入hive cli,输入命令:

    add jar /home/hadoop/codejar/flash_format.jar;
    create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
    

    创建完临时函数,即可使用这个函数了:

    select isContains100('abc100def') from table limit 1;
    1
    

    例二

    通过读取mysql数据库中的规则,为hive中的workflow返回对应的,类型:

    type workflow
    a	1
    a	2
    b	11
    b	22
    b	33
    

    需求:我们希望,将hive的workflow字段取值为,1,2的变为类型(type)a,取值为11,22,33的全部变为b,就是归类的意思。

    这个udf可以这么实现:

    package com.js.dataclean.hive.udf.hm2.workflow;
    
    import org.apache.hadoop.hive.ql.exec.UDF;
    
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @ Author: keguang
     * @ Date: 2018/12/13 16:24
     * @ version: v1.0.0
     * @ description:
     */
    public class GetWorkflow extends UDF{
    
        private static final String host = "0.0.0.0";
        private static final String port = "3306";
        private static final String database = "root";
        private static final String userName = "root";
        private static final String password = "123456";
        private static String url = "";
        private static final String driver = "com.mysql.jdbc.Driver";
        private static Connection conn = null;
        private static Map<String, List<String>> workflowType = null;
    
        static {
            url = "jdbc:mysql://" + host + ":" + port + "/" + database;
            try {
                // Class.forName(driver);
                conn = DriverManager.getConnection(url, userName, password);
                workflowType = getWorkflowType(conn);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        private static Map<String, List<String>> getWorkflowType(Connection conn){
            Map<String, List<String>> workflowType = new HashMap<>();
            String sql = "select * from flash_player_workflow";
            PreparedStatement ps = null;
            try {
                ps = conn.prepareStatement(sql);
                ResultSet rs = ps.executeQuery();
                while (rs.next()){
                    String workflow = rs.getString("workflow");
                    String type = rs.getString("flag");
    
                    List<String> workflows = workflowType.get(type);
                    if(workflows == null){
                        workflows = new ArrayList<>();
                    }
                    workflows.add(workflow);
                    workflowType.put(type, workflows);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
    
                // 关闭链接
                if(conn != null){
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
            return workflowType;
    
        }
    
        public String evaluate(String s){
            assert workflowType != null;
    
            for(String type:workflowType.keySet()){
                List<String> workflows = workflowType.get(type);
                if(workflows.contains(s)){
                    return type;
                }
            }
    
            return s;
        }
    
    }
    

    打好jar包,创建函数: workflow2type(省略),然后使用:

    select workflow2type(workflow) from table;
    
    a
    a
    b
    b
    b
    

    这样就把很多取值归为几个大类了。

    查看hive function的用法

    查month 相关的函数

    show functions like '*month*';
    

    查看 add_months 函数的用法

    desc function add_months;
    

    查看 add_months 函数的详细说明并举例

    desc function extended add_months;
    

    hive 中的 UDAF

    可以看出,udf就是一个输入一个输出,输入一个性别,返回’男’或者’女’,如果我们想实现select date,count(1) from table,统计每天的流量呢?这就是一个分组统计,显然是多个输入,一个输出,这时候udf已经不能满足我们的需要,就需要写udaf,user defined aggregare function(用户自定义聚合函数)。

    这里写一个字符串连接函数,相当于concat的功能,将多行输入,合并为一个字符串,当然了hive中有字符串连接函数,这里是举例说明UDAF的用法:

    package com.js.dataclean.hive.udaf.hm2;
    
    import com.js.dataclean.utils.StringUtil;
    import org.apache.hadoop.hive.ql.exec.UDAF;
    import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    
    /**
     * 实现字符串连接聚合的UDAF
     * @version v1.0.0
     * @Author:keguang
     * @Date:2018/10/22 14:36
     */
    public class MutiStringConcat extends UDAF{
        public static class SumState{
            private String sumStr;
        }
    
        public static class SumEvaluator implements UDAFEvaluator{
            SumState sumState;
    
            public SumEvaluator(){
                super();
                sumState = new SumState();
                init();
            }
    
            @Override
            public void init() {
                sumState.sumStr = "";
            }
    
            /**
             * 来了一行数据
             * @param s
             * @return
             */
            public boolean iterate(String s){
                if(!StringUtil.isNull(s)){
                    sumState.sumStr += s;
                }
                return true;
            }
    
            /**
             * 状态传递
             * @return
             */
            public SumState terminatePartial() {
                return sumState;
            }
    
            /**
             * 子任务合并
             * @param state
             * @return
             */
            public boolean merge(SumState state){
                if(state != null){
                    sumState.sumStr += state.sumStr;
                }
                return true;
            }
    
            /**
             * 返回最终结果
             * @return
             */
            public String terminate(){
                return sumState.sumStr;
            }
        }
    }
    

    用法,与udf一样,还是需要打包并且到hive cli中注册使用。

    关于UDAF开发注意点:

    • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
    • 函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
    • Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
      • init函数类似于构造函数,用于UDAF的初始化
      • iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
      • terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
      • merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
      • terminate返回最终的聚集函数结果

    临时与永久函数

    Hive自定义函数分为临时与永久函数,顾名思义,分别是临时使用和永久有效使用的意思。

    临时函数

    临时函数,关闭会话就结束了生命周期,下次要想使用,需要重新注册。

    add jar /path/xx.jar(存储在本地磁盘)
    
    // 临时注册UDF函数(hive会话生效)
    create temporary function 函数名 as '包名.类名';
    

    删除临时函数:

    • drop temporary function 数据库名.函数名;

    永久函数

    永久函数一旦注册,可以在hive cli,远程连接hiveserver2等地方永久使用,步骤为:

    • 先上传jar包到HDFS

    • 永久注册:

    CREATE FUNCTION 函数名 AS '包名.类名' USING JAR 'hdfs:///path/xxxx.jar';
    

    注意:指定jar包路径需要是hdfs路径。

    • 删除永久函数:
    drop function 数据库名.函数名字;
    

    新增的永久函数,比如在hive cli命令行注册的,可能会在beeline或者hiveserver2远程连接时,提示不存在该函数。解决办法是,在无法使用UDF的HiveServer2上,执行reload function命令,将MetaStore中新增的UDF信息同步到HiveServer2内存中。

    场景

    UDF在hive中使用场景广泛,这里列举常用的使用场景。

    IP 转化为地址

    分词

    SQL 分析UDF

  • 相关阅读:
    试用solace 消息平台
    mqtt5 share subscription 简单说明
    文件批量重命名神器:Bulk Rename Utility
    Elasticsearch入门,这一篇就够了
    burp suite使用(一) --- 抓包,截包,改包
    BurpSuite安装和配置
    ORA-01779: 无法修改与非键值保存表对应的列”中涉及的概念和解决方法
    一个非常有用的函数——COALESCE
    ORA-01779: cannot modify a column which maps to a non-key-preserved table
    Oracle批量、大量Update方法总结
  • 原文地址:https://www.cnblogs.com/data-magnifier/p/14167382.html
Copyright © 2011-2022 走看看