zoukankan      html  css  js  c++  java
  • Hadoop_UDAF示例

    UDAF: 多进一出

    GenericUDAFEvaluator : 就是根据job的不同阶段执行不同的方法
    Hive通过GenericUDAFEvaluator.Modle来确定job的执行阶段
    PARTIAL1: 从原始数据到部分聚合,调用方法iterate和terminatePartial方法
    PARTIAL2: 从部分数据聚合到部分数据聚合,会调用merge和terminatePartial
    FINAL: 从部分数据聚合到全部数据聚合,会调用merge和terminate
    COMPLETE: 从原始数据全部聚合,会调用方法iterate和terminate
    除了上面提到的iterate,merge,terminatePartial以外,还有init(初始化并返回,返回值的类型)
    getNewAggregationBuffer(获取新的buffer,也就是方法间传递参数的对象),reset(重置buffer对象)
    需求: 实现一个自定义的sum函数,要求韩函数支持整型和浮点型的sum操作
    

    简单示例,重写SUM函数

    package com.hive.udaf;
    
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
    import org.apache.hadoop.hive.serde2.io.DoubleWritable;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
    import org.apache.hadoop.io.LongWritable;
    
    /**
     * @author liuwl
     * mysum support float & double
     */
    public class mysum extends AbstractGenericUDAFResolver{
    	
      public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
    		 
    	// parameters is all clomuns
    	if(info.isAllColumns()){
    	  throw new SemanticException("this function is not support all parameters");
    	}
    	// only one clomun parameter
    	ObjectInspector[] inspectors = info.getParameterObjectInspectors();
    	if(inspectors.length != 1){
    	  throw new SemanticException("the parameters is only one clomun");
    	}
    	if(inspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
    	  throw new SemanticException("the parameters must be Basic data types");
    	}
    	// input parameter's Category
    	AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector)inspectors[0];
    	switch (woi.getPrimitiveCategory()) {
    	  case INT:
    	  case LONG:
    	  case BYTE:
    	  case SHORT:
    	    return new udafLong();
    	  case FLOAT:
    	  case DOUBLE:
    	    return new udafDouble();
    	  default:
    	    throw new SemanticException("the parameter's Category is not support");
    	 }
      }
    	 
      /**
      * sum the long data
      */
      public static class udafLong extends GenericUDAFEvaluator{
    
        // define data Category
        public PrimitiveObjectInspector longInputor;
    		 
        static class sumlongagg implements AggregationBuffer{
          long sum;
          boolean empty;
        }
    		 
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
    	
          super.init(m, parameters);
          if(parameters.length!=1){
            throw new UDFArgumentException("Argument Exception");
          }
          if(this.longInputor == null){
            this.longInputor=(PrimitiveObjectInspector)parameters[0];
          }
          return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
        }
    
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    			
          sumlongagg slg = new sumlongagg();
          this.reset(slg);
          return slg;
        }
    
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
    			
          sumlongagg slg = (sumlongagg)agg;
          slg.sum = 0;
          slg.empty = true;
        }
    
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    			
          if(parameters.length !=1 ){
            throw new UDFArgumentException("Argument Exception");
          }
          this.merge(agg, parameters[0]);
        }
    
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
          return this.terminate(agg);
        }
    
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    			
          sumlongagg slg = (sumlongagg)agg;
          if(partial != null){
            slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, longInputor);
            slg.empty = false;
          }
        }
    
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
    			
          sumlongagg slg = (sumlongagg)agg;
          if(slg.empty){
            return null;
          }
          return new LongWritable(slg.sum);
        }
    		 
      }
    	 
      /**
      * sum the double data
      */
      public static class udafDouble extends GenericUDAFEvaluator{
    		 
        // define data Category
        public PrimitiveObjectInspector doubleInputor;
    		 
          static class sumdoubleagg implements AggregationBuffer{
            double sum;
            boolean empty;
          }
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
    	 		
          super.init(m, parameters);
          if(parameters.length!=1){
            throw new UDFArgumentException("Argument Exception");
          }
          if(this.doubleInputor == null){
            this.doubleInputor=(PrimitiveObjectInspector)parameters[0];
          }
          return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;			}
    
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    			
          sumdoubleagg sdg = new sumdoubleagg();
            this.reset(sdg);
            return sdg;
          }
    
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
    			
          sumdoubleagg sdg = (sumdoubleagg)agg;
          sdg.sum = 0;
          sdg.empty = true;
        }
    
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
    			
          if(parameters.length !=1 ){
            throw new UDFArgumentException("Argument Exception");
          }
          this.merge(agg, parameters[0]);
        }
    
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
          return this.terminate(agg);
        }
    
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
    			
          sumdoubleagg sdg = (sumdoubleagg)agg;
          if(partial != null){
            sdg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, doubleInputor);
            sdg.empty = false;
          }
        }
    
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
    			
          sumdoubleagg sdg = (sumdoubleagg)agg;
          if(sdg.empty){
            return null;
          }
          return new DoubleWritable(sdg.sum);
        }		 
      }
    }
    

     测试

    hive (workdb)> add jar /home/liuwl/opt/datas/mysum.jar;
    hive (workdb)> create temporary function mysum as 'com.hive.udaf.mysum';
    hive (workdb)> select sum(deptno),mysum(deptno) from emp;
    结果: _c0  _c1
         310  310
    
  • 相关阅读:
    [LeetCode] 26 Remove Duplicates from Sorted Array
    归并排序
    插入排序
    选择排序
    冒泡排序
    单链表排序
    如何实现单链表反转
    Linux基础——centos 跳过管理员密码进行登录(单用户模式、救援模式)
    response.sendRedirect()与request.getRequestDispatcher().forward()区别
    json-lib——JsonConfig详细使用说明
  • 原文地址:https://www.cnblogs.com/eRrsr/p/6096989.html
Copyright © 2011-2022 走看看