zoukankan      html  css  js  c++  java
  • Hive学习之自己定义聚合函数

          Hive支持用户自己定义聚合函数(UDAF),这样的类型的函数提供了更加强大的数据处理功能。

    Hive支持两种类型的UDAF:简单型和通用型。正如名称所暗示的,简单型UDAF的实现很easy,但因为使用了反射的原因会出现性能的损耗,而且不支持长度可变的參数列表等特征。而通用型UDAF尽管支持长度可变的參数等特征。但不像简单型那么easy编写。

          这篇文章将学习编写UDAF的规则,比方须要实现哪些接口,继承哪些类,定义哪些方法等。 实现通用型UDAF须要编写两个类:解析器和计算器。解析器负责UDAF的參数检查。操作符的重载以及对于给定的一组參数类型查找正确的计算器。计算器实现实际UDAF的计算逻辑。通常解析器能够实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2接口,但建议继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver抽象类,该类实现了GenericUDAFResolver2接口。计算器须要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类。并做为解析器的内部静态类实现。

          解析器的类型检查确保用户传递正确的參数。比方UDAF的參数为Integer类型。那么用户传递Double就须要抛出异常。操作符重载则同意为不同类型的參数定义不同的UDAF逻辑。

    在编码之前。先了解一下AbstractGenericUDAFResolver类,该类有两个重载的方法public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfoinfo)和public GenericUDAFEvaluatorgetEvaluator(TypeInfo[] info),当中前者不再建议使用,这样继承该类时仅仅覆盖第二个方法就可以。

    该方法的參数类型为TypeInfo[],返回值为GenericUDAFEvaluator。在该方法中完毕參数的检查,不仅包括參数的数量还有參数的类型。TypeInfo位于包org.apache.hadoop.hive.serde2.typeinfo中。该类存储类型信息。Hive眼下支持5种类型:基本类型(String。Number等)、List对象、Map对象、Struct对象和Union对象。

    该类的getCategory()方法返回类型信息的类别。详细为枚举类ObjectInspector.Category,该枚举类包括了相应上述5种类型的枚举常量,分别为:PRIMITIVE、LIST、MAP、STRUCT和UNION。getEvaluator(TypeInfo[] info)的详细实现例如以下:

    @Override
      public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {
        if (parameters.length != 1) {
          throw new UDFArgumentTypeException(parameters.length - 1,
              "Exactly one argument is expected.");
        }
        ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
        if (!ObjectInspectorUtils.compareSupported(oi)) {
          throw new UDFArgumentTypeException(parameters.length - 1,
              "Cannot support comparison of map<> type or complex type containing map<>.");
        }
        return new GenericUDAFMaxEvaluator();
      }
    

          假设想实现操作符重载,须要创建与操作符数目同样的计算器内部类,比方有两个重载方法,那么须要创建两个计算器,然后依据输入參数的不同返回不同的计算器。

          正如上面提到的计算器须要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类。该类提供了几个须要被子类实现的抽象方法。这些方法建立了处理UDAF语义的过程。在详细学习怎样编写计算器之前,先了解一下计算器的4种模式,这些模式由枚举类GenericUDAFEvaluator.Mode定义:

    public static enum Mode {
        PARTIAL1,
        PARTIAL2,	
        FINAL,
        COMPLETE
      };
    

          PARTIAL1模式是从原始数据到部分聚合数据的过程,将调用方法iterate() 和terminatePartial()。PARTIAL2模式是从部分聚合数据到部分聚合数据的过程。将调用方法merge() 和terminatePartial()。FINAL模式是从部分聚合到所有聚合的过程,将调用merge()和 terminate()。最后一种模式为COMPLETE,该模式为从原始数据直接到所有聚合的过程,将调用merge() 和 terminate()。

          在了解了计算器的模式后。详细看看计算器必须实现的方法。

    GenericUDAFEvaluator类提供了以下几个抽象方法:

    • getNewAggregationBuffer():用于返回存储暂时聚合结果的 GenericUDAFEvaluator.AggregationBuffer对象。

    • reset(GenericUDAFEvaluator.AggregationBuffer agg):重置聚合,该方法在重用同样的聚合时非常实用。

    • iterate(GenericUDAFEvaluator.AggregationBuffer agg,Object[] parameters):迭代parameters表示的原始数据并保存到agg中。
    • terminatePartial(GenericUDAFEvaluator.AggregationBuffer agg):以持久化的方式返回agg表示部分聚合结果,这里的持久化意味着返回值仅仅能Java基础类型、数组、基础类型包装器、Hadoop的Writables、Lists和Maps。即使实现了java.io.Serializable,也不要使用自己定义的类。
    • merge(GenericUDAFEvaluator.AggregationBuffer agg,Object partial):合并由partial表示的部分聚合结果到agg中。
    •  terminate(GenericUDAFEvaluator.AggregationBuffer agg):返回由agg表示的终于结果。

          除了上述抽象方法,GenericUDAFEvaluato另一个尽管不是抽象方法但通常也须要覆盖的方法ObjectInspector  init(GenericUDAFEvaluator.Mode m,ObjectInspector[] parameters),该方法用于初始化计算器,在不同的模式下第二參数的含义是不同的,比方m为PARTIAL1 和 COMPLETE时,第二个參数为原始数据。m为PARTIAL2 和 FINAL时。该參数仅为部分聚合数据(该数组总是仅仅有一个元素)。在PARTIAL1和PARTIAL2模式下,ObjectInspector 用于terminatePartial方法的返回值,在FINAL和COMPLETE模式下ObjectInspector 用于terminate方法的返回值。

          上述这些方法基本依照init、getNewAggregationBuffer、iterate、terminatePartial、merge、terminate的顺序调用。另一点须要明白的是聚合计算必须在数据上是随意可分的。

          能够參考Hive自带的聚合函数。比方求最大值的max函数,其计算器的源码例如以下所看到的。

    在计算器中必须注意的是ObjectInspector及其子类的使用。该类表示特定的类型及怎样在内存中存储该类型的数据,详细的用法能够參考API。

    public static class GenericUDAFMaxEvaluator extends GenericUDAFEvaluator {
        private transient ObjectInspector inputOI;
        private transient ObjectInspector outputOI;
    
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {
          assert (parameters.length == 1);
          super.init(m, parameters);
          inputOI = parameters[0];
          // Copy to Java object because that saves object creation time.
          // Note that on average the number of copies is log(N) so that's not
          // very important.
          outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
              ObjectInspectorCopyOption.JAVA);
          return outputOI;
        }
        /** class for storing the current max value */
        static class MaxAgg extends AbstractAggregationBuffer {
          Object o;
        }
    
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
          MaxAgg result = new MaxAgg();
          return result;
        }
    
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
          MaxAgg myagg = (MaxAgg) agg;
          myagg.o = null;
        }
    
        boolean warned = false;
    
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
          assert (parameters.length == 1);
          merge(agg, parameters[0]);
        }
        @Override
    public Object terminatePartial(AggregationBuffer agg) 
    throws HiveException {
          return terminate(agg);
        }
        @Override
        public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
          if (partial != null) {
            MaxAgg myagg = (MaxAgg) agg;
            int r = ObjectInspectorUtils.compare(myagg.o, outputOI, partial, inputOI);
            if (myagg.o == null || r < 0) {
              myagg.o = ObjectInspectorUtils.copyToStandardObject(partial, inputOI,ObjectInspectorCopyOption.JAVA);
            }
          }
        }
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
          MaxAgg myagg = (MaxAgg) agg;
          return myagg.o;
        }
      }

  • 相关阅读:
    kill命令
    linux grep命令
    ps命令详解
    Linux chmod命令详解
    Linux netstat命令详解
    多线程同步机制的几种方法
    C++_运算符重载 总结
    malloc/free与new/delete的区别
    python发送邮件
    linux之nfs
  • 原文地址:https://www.cnblogs.com/lcchuguo/p/5391931.html
Copyright © 2011-2022 走看看