zoukankan      html  css  js  c++  java
  • hive学习笔记之十:用户自定义聚合函数(UDAF)

    欢迎访问我的GitHub

    这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

    本篇概览

    • 本文是《hive学习笔记》的第十篇,前文实践过UDF的开发、部署、使用,那个UDF适用于一进一出的场景,例如将每条记录的指定字段转为大写;
    • 除了一进一出,在使用group by的SQL中,多进一出也是常见场景,例如hive自带的avg、sum都是多进一出,这个场景的自定义函数叫做用户自定义聚合函数(User Defiend Aggregate Function,UDAF),UDAF的开发比一进一出要复杂一些,本篇文章就一起来实战UDAF开发;
    • 本文开发的UDAF名为udf_fieldlength ,用于group by的时候,统计指定字段在每个分组中的总长度;

    准备工作

    1. 在一些旧版的教程和文档中,都会提到UDAF开发的关键是继承UDAF.java;
    2. 打开hive-exec的1.2.2版本源码,却发现UDAF类已被注解为Deprecated
    3. UDAF类被废弃后,推荐的替代品有两种:实现GenericUDAFResolver2接口,或者继承AbstractGenericUDAFResolver类;
    4. 现在新问题来了:上述两种替代品,咱们在做UDAF的时候该用哪一种呢?
    5. 打开AbstractGenericUDAFResolver类的源码瞅一眼,如下所示,是否有种恍然大悟的感觉,这个类自身就是GenericUDAFResolver2接口的实现类:
    public abstract class AbstractGenericUDAFResolver
        implements GenericUDAFResolver2
    {
    
      @SuppressWarnings("deprecation")
      @Override
      public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
        throws SemanticException {
    
        if (info.isAllColumns()) {
          throw new SemanticException(
              "The specified syntax for UDAF invocation is invalid.");
        }
    
        return getEvaluator(info.getParameters());
      }
    
      @Override
      public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
        throws SemanticException {
        throw new SemanticException(
              "This UDAF does not support the deprecated getEvaluator() method.");
      }
    }
    
    1. 既然源码都看了,也就没啥好纠结的了,继承父类还是实现接口都可以,您自己看着选吧,我这里选的是继承AbstractGenericUDAFResolver类;

    关于UDAF的四个阶段

    • 在编码前,要先了解UDAF的四个阶段,定义在GenericUDAFEvaluator的Mode枚举中:
    1. COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
    2. PARTIAL1:正常mapreduce的map阶段;
    3. PARTIAL2:正常mapreduce的combiner阶段;
    4. FINAL:正常mapreduce的reduce阶段;

    每个阶段被调用的方法

    • 开发UDAF时,要继承抽象类GenericUDAFEvaluator,里面有多个抽象方法,在不同的阶段,会调用到这些方法中的一个或多个;
    • 下图对每个阶段调用了哪些方法说得很清楚:

    在这里插入图片描述

    • 下图对顺序执行的三个阶段和涉及方法做了详细说明:

    在这里插入图片描述

    源码下载

    1. 如果您不想编码,可以在GitHub下载所有源码,地址和链接信息如下表所示:
    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
    1. 这个git项目中有多个文件夹,本章的应用在hiveudf文件夹下,如下图红框所示:

    在这里插入图片描述

    UDAF开发步骤简述

    开发UDAF分为以下几步:

    1. 新建类FieldLengthAggregationBuffer,用于保存中间结果,该类需继承AbstractAggregationBuffer;
    2. 新建类FieldLengthUDAFEvaluator,用于实现四个阶段中会被调用的方法,该类需继承GenericUDAFEvaluator;
    3. 新建类FieldLength,用于在hive中注册UDAF,里面会实例化FieldLengthUDAFEvaluator,该类需继承AbstractGenericUDAFResolver;
    4. 编译构建,得到jar;
    5. 在hive添加jar;
    6. 在hive注册函数;

    接下来就按照上述步骤开始操作;

    开发

    1. 打开前文新建的hiveudf工程,新建FieldLengthAggregationBuffer.java,这个类的作用是缓存中间计算结果,每次计算的结果都放入这里面,被传递给下个阶段,其成员变量value用来保存累加数据:
    package com.bolingcavalry.hiveudf.udaf;
    
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.ql.util.JavaDataModel;
    
    public class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {
    
        private Integer value = 0;
    
        public Integer getValue() {
            return value;
        }
    
        public void setValue(Integer value) {
            this.value = value;
        }
    
        public void add(int addValue) {
            synchronized (value) {
                value += addValue;
            }
        }
    
        /**
         * 合并值缓冲区大小,这里是用来保存字符串长度,因此设为4byte
         * @return
         */
        @Override
        public int estimate() {
            return JavaDataModel.PRIMITIVES1;
        }
    }
    
    1. 新建FieldLengthUDAFEvaluator.java,里面是整个UDAF逻辑实现,关键代码已经添加了注释,请结合前面的图片来理解,核心思路是iterate将当前分组的字段处理完毕,merger把分散的数据合并起来,再由terminate决定当前分组计算结果:
    package com.bolingcavalry.hiveudf.udaf;
    
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    
    /**
     * @Description: 这里是UDAF的实际处理类
     * @author: willzhao E-mail: zq2599@gmail.com
     * @date: 2020/11/4 9:57
     */
    public class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {
    
        PrimitiveObjectInspector inputOI;
    
        ObjectInspector outputOI;
    
        PrimitiveObjectInspector integerOI;
    
        /**
         * 每个阶段都会被执行的方法,
         * 这里面主要是把每个阶段要用到的输入输出inspector好,其他方法被调用时就能直接使用了
         * @param m
         * @param parameters
         * @return
         * @throws HiveException
         */
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
    
            // COMPLETE或者PARTIAL1,输入的都是数据库的原始数据
            if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {
                inputOI = (PrimitiveObjectInspector) parameters[0];
            } else {
                // PARTIAL2和FINAL阶段,都是基于前一个阶段init返回值作为parameters入参
                integerOI = (PrimitiveObjectInspector) parameters[0];
            }
    
            outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
                    Integer.class,
                    ObjectInspectorFactory.ObjectInspectorOptions.JAVA
            );
    
            // 给下一个阶段用的,即告诉下一个阶段,自己输出数据的类型
            return outputOI;
        }
    
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            return new FieldLengthAggregationBuffer();
        }
    
        /**
         * 重置,将总数清理掉
         * @param agg
         * @throws HiveException
         */
        public void reset(AggregationBuffer agg) throws HiveException {
            ((FieldLengthAggregationBuffer)agg).setValue(0);
        }
    
        /**
         * 不断被调用执行的方法,最终数据都保存在agg中
         * @param agg
         * @param parameters
         * @throws HiveException
         */
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            if(null==parameters || parameters.length<1) {
                return;
            }
    
            Object javaObj = inputOI.getPrimitiveJavaObject(parameters[0]);
    
            ((FieldLengthAggregationBuffer)agg).add(String.valueOf(javaObj).length());
        }
    
        /**
         * group by的时候返回当前分组的最终结果
         * @param agg
         * @return
         * @throws HiveException
         */
        public Object terminate(AggregationBuffer agg) throws HiveException {
            return ((FieldLengthAggregationBuffer)agg).getValue();
        }
    
        /**
         * 当前阶段结束时执行的方法,返回的是部分聚合的结果(map、combiner)
         * @param agg
         * @return
         * @throws HiveException
         */
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }
    
        /**
         * 合并数据,将总长度加入到缓存对象中(combiner或reduce)
         * @param agg
         * @param partial
         * @throws HiveException
         */
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    
            ((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));
        }
    }
    
    1. 最后是FieldLength.java,该类注册UDAF到hive时用到的,负责实例化FieldLengthUDAFEvaluator,给hive使用:
    package com.bolingcavalry.hiveudf.udaf;
    
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
    import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    
    public class FieldLength extends AbstractGenericUDAFResolver {
        @Override
        public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
            return new FieldLengthUDAFEvaluator();
        }
    
        @Override
        public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
            return new FieldLengthUDAFEvaluator();
        }
    }
    

    至此,编码完成,接下来是部署和体验;

    部署和体验

    本次部署的注册方式是临时函数,如果您想注册为永久函数,请参考前文;

    1. 在pom.xml所在目录执行mvn clean package -U,即可编译构建;
    2. 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
    3. 上传到hive服务器,我这里是放在/home/hadoop/udf目录;
    4. 进入hive会话,执行以下命令添加jar:
    add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
    
    1. 执行以下命令注册:
    create temporary function udf_fieldlength as 'com.bolingcavalry.hiveudf.udaf.FieldLength';
    
    1. 找一个适合执行group by的表试试,我这里是前面的文章中创建的address表,完整数据如下:
    hive> select * from address;
    OK
    1	guangdong	guangzhou
    2	guangdong	shenzhen
    3	shanxi	xian
    4	shanxi	hanzhong
    6	jiangshu	nanjing
    
    1. 执行下面的SQL:
    select province, count(city), udf_fieldlength(city) from address group by province;
    

    执行结果如下,可见guangdong的guangzhou和shenzhen总长度为17,jiangsu的nanjing为7,shanxi的xian和hanzhong总长度12,符合预期:

    Total MapReduce CPU Time Spent: 2 seconds 730 msec
    OK
    guangdong	2	17
    jiangshu	1	7
    shanxi	2	12
    Time taken: 28.484 seconds, Fetched: 3 row(s)
    

    至此,UDAF的学习和实践就完成了,咱们掌握了多进一出的函数开发,由于涉及到多个阶段和外部调用的逻辑,使得UDAF的开发难度略大,接下来的文章是一进多出的开发,会简单一些。

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
    https://github.com/zq2599/blog_demos

  • 相关阅读:
    分段控制器UISegmentedControl的使用、同一个控制器中实现多个View的切换、addChildViewController等方法的使用
    警示框UIAlertController的使用(看完马上会用!!)
    断言NSAssert的使用
    概念篇(一)
    《iOS开发进阶》书籍目录
    《编写高质量iOS与OS X代码的52个有效方法》书籍目录
    《精通Objective-C》书籍目录
    《iOS设计模式解析》书籍目录
    《精通iOS开发》书籍目录
    常用的代码块
  • 原文地址:https://www.cnblogs.com/bolingcavalry/p/14988931.html
Copyright © 2011-2022 走看看