zoukankan      html  css  js  c++  java
  • Hive高阶开发_自定义函数UDF/UDAF/UDTF

    Hive中编写自定义函数

    自定义函数有三类
     UDF
     UDAF
     UDTF
    

    Hive中编写UDF函数的方式

    Hive有两个不同的接口编写UDF程序。一个是基础的UDF接口,一个是复杂的GenericUDF接口。
        01.UDF
    	  重写 evaluate
    	2.GenericUDF :增强版的 udf ( 支持复杂类型的输入和输出 )
    	  01、继承GenericUDF
          02、实现initialize、evaluate、getDisplayString方法
    	     initialize        : 这个方法的目标是确定参数的返回类型
    	     evaluate          :实现主要逻辑
    	     getDisplayString  :显示函数的用法  
    

    Hive编写UDAF

    1.继承 AbstractGenericUDAFResolver
    2.继承 GenericUDAFEvaluator
    3.Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate这几个函数 
       init 初始化
       iterate 函数处理读入的行数据
       terminatePartial 返回iterate处理的中间结果
       merge 合并上述处理结果
       terminate 返回最终值
    
     案例
     public static enum Mode {
        /**
         * PARTIAL1: from original data to partial aggregation data: iterate() and terminatePartial() will be called.
         * PARTIAL1: 从原始数据到部分聚合数据的过程,会调用 iterate()和 terminatePartial()
         * 可以理解为MapReduce过程中的map阶段
         */
        PARTIAL1,
            /**
         * PARTIAL2: from partial aggregation data to partial aggregation data: * merge() and terminatePartial() will be called.
         * PARTIAL2: 从部分聚合数据到部分聚合数据的过程(多次聚合),会调用 merge()和 terminatePartial()
         * 可以理解为MapReduce过程中的combine阶段
         */
        PARTIAL2,
            /**
         * FINAL: from partial aggregation to full aggregation: merge() and terminate() will be called.
         * FINAL: 从部分聚合数据到全部聚合数据的过程,会调用 merge()和 terminate()
         * 可以理解为MapReduce过程中的reduce阶段
         */
        FINAL,
            /**
         * COMPLETE: from original data directly to full aggregation: iterate() and terminate() will be called.
         * COMPLETE: 从原始数据直接到全部聚合数据的过程,会调用 iterate()和 terminate()
         * 可以理解为MapReduce过程中的直接map输出阶段,没有reduce阶段
         */
        COMPLETE
      };
    
    
    3.程序执行过程:
          1)PARTIAL1(阶段1:map):init() --> iterate() --> terminatePartial()
          2)PARTIAL2(阶段2:combine):init() --> merge() --> terminatePartial()
          3)FINAL (最终阶段:reduce):init() --> merge() --> terminate()
          4)COMPLETE(直接输出阶段:只有map):init() --> iterate() --> terminate()
          
          注:每个阶段都会执行init()初始化操作。
    一个GenericUDAF必须先了解以下两个抽象类: 
        org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver 
        org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
    过程理解:
       这个UDAF函数读取数据(mapper),
       聚集一堆mapper输出到部分聚集结果(combiner),
       并且最终创建一个最终的聚集结果(reducer)。
       因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果
    
    完整的UDAF逻辑是一个mapreduce过程,
        如果有mapper和reducer,             就会经历PARTIAL1(mapper),FINAL(reducer),
    	如果有mapper和reducer还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
        有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果
    

    Hive写UDTF

      POM文件
      	<dependency>
    		<groupId>org.apache.hive</groupId>
    		<artifactId>hive-exec</artifactId>
    		<version>2.3.4</version>
    		<scope>provided</scope>
    	 </dependency>
     类
       org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
    重写的方法
       initialize   process  close
      //该方法中,指定输入输出参数:输入参数的ObjectInspector 与输出参数的StructObjectInspector
       abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;  
       //处理一条输入记录,输出若干条结果记录
       abstract void process(Object[] record) throws HiveException; 
       //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
       abstract void close() throws HiveException;
    

    Hive UDTF代码示例

     代码主要参考网上资料,做了注释用于学习
    `
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.Iterator;
    
    public class GenericUDTFEBitMap extends GenericUDTF {
    /**
     *   输入数据
     */
    private transient BinaryObjectInspector binaryOI = null;
    
    /**
     *
     * @param args 输入参数的ObjectInspector
     * @return 输出参数的StructObjectInspector
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException  {
    
        if (args.length != 1) {
            throw new UDFArgumentException("GenericUDTFEBitMap() takes  only one argument");
        }
        /** 有两种数据类型
         *  two categories of Hive Data types that are primitive data type and complex data type
         *  PRIMITIVE : Numeric  Date/time String Miscellaneous
         *  Complex Type:  Array  Map  Struct Union
         * public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() {}}
         */
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
                && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.BINARY) {
            throw new UDFArgumentException("GenericUDTFEBitMap() () takes a binary as a parameter");
        }
    
        // 输入格式(inspectors)
        binaryOI = (BinaryObjectInspector) args[0];
    
        // ObjectInspector生成的方式
        // 输出格式(inspectors) -- 有两个属性的对象
        // 静态属性获得列名和列的类型 定义 List<ObjectInspector>列表
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("col_id");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
        // 通过 ObjectInspectorFactory 构造ObjectInspector
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    
    /**
     *  forwardListObj forwardMapObj
     *  forward
     * @param record
     * @throws HiveException   处理一条输入记录,输出若干条结果记录
     */
    @Override
    public void process(Object[] record) throws HiveException {
        byte[] idBytes = this.binaryOI.getPrimitiveJavaObject(record[0]);
        if(idBytes !=null && idBytes.length>0){
            ImmutableRoaringBitmap other = new ImmutableRoaringBitmap(ByteBuffer.wrap(idBytes));
            Iterator<Integer> iterator = other.iterator();
            while (iterator.hasNext()){
                //   forward  Passes an output row to the collector.
                //   真正的处理过程在process函数中,在process中,每一次forward()调用产生一行
                forward(new Object[] { iterator.next() });
            }
        }
    }
    
    /**
     * 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
     * @throws HiveException
     */
    @Override
    public void close() throws HiveException {
    }
    
    /**
     * toString
     */
    @Override
    public String toString() {
        return "GenericUDTFEBitMap";
    }
    
    }`
    

    参考

     Hive之自定义聚合函数UDAF  https://blog.csdn.net/weixin_39469127/article/details/89766266
     https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
     https://github.com/sunyaf/bitmapudf
     Hive Data Types – Primitive and Complex Data Types in Hive https://data-flair.training/blogs/hive-data-types/
  • 相关阅读:
    2019长安大学ACM校赛网络同步赛 J Binary Number(组合数学+贪心)
    棋盘问题
    DP待整理
    Monkey and Banana(DP)
    Ignatius and the Princess IV
    【[kuangbin带你飞]专题十二 基础DP1】Max Sum Plus Plus(DP+滚动数组)
    第三届山东ACM Pick apples
    第三届山东省ACM The Best Seat in ACM Contest
    第三届山东ACM省赛 Pixel density
    第三届山东ACM省赛 n a^o7 !
  • 原文地址:https://www.cnblogs.com/ytwang/p/13986913.html
Copyright © 2011-2022 走看看