zoukankan      html  css  js  c++  java
  • 064 UDF

    一:UDF

    1.自定义UDF

      

    二:UDAF 

    2.UDAF

      

    3.介绍AbstractGenericUDAFResolver

      

    4.介绍GenericUDAFEvaluator

      

    5.程序

      1 package org.apache.hadoop.hive_udf;
      2 
      3 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
      4 import org.apache.hadoop.hive.ql.metadata.HiveException;
      5 import org.apache.hadoop.hive.ql.parse.SemanticException;
      6 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
      7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
      8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
      9 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
     10 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
     11 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
     12 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
     13 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
     14 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
     15 import org.apache.hadoop.io.LongWritable;
     16 
     17 /**
     18  * 
     19  * 需求:实现sum函数,支持int和double类型
     20  *
     21  */
     22 
     23 public class UdafProject extends AbstractGenericUDAFResolver{
     24     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
     25             throws SemanticException {
     26         //判断参数是否是全部列
     27         if(info.isAllColumns()){
     28             throw new SemanticException("不支持*的参数");
     29         }
     30         
     31         //判断是否只有一个参数
     32         ObjectInspector[] inspector = info.getParameterObjectInspectors();
     33         if(inspector.length != 1){
     34             throw new SemanticException("参数只能有一个");
     35         }
     36         //判断输入列的数据类型是否为基本类型
     37         if(inspector[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
     38             throw new SemanticException("参数必须为基本数据类型");
     39         }
     40         
     41         AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector) inspector[0];
     42         
     43         //判断是那种基本数据类型
     44         
     45         switch(woi.getPrimitiveCategory()){
     46         case INT:
     47         case LONG:
     48         case BYTE:
     49         case SHORT:
     50             return new udafLong();
     51         case FLOAT:
     52         case DOUBLE:
     53             return new udafDouble();
     54             default:
     55                 throw new SemanticException("参数必须是基本类型,且不能为string等类型");
     56         
     57         
     58         }
     59           
     60     }
     61     
     62     /**
     63      * 对整形数据进行求和
     64      */
     65     public static class udafLong extends  GenericUDAFEvaluator{
     66         
     67         //定义输入数据类型
     68         public  PrimitiveObjectInspector inputor;
     69         
     70         //实现自定义buffer
     71         static class sumlongagg implements AggregationBuffer{
     72             long sum;
     73             boolean empty;
     74         }
     75         
     76         //初始化方法
     77         @Override
     78         public ObjectInspector init(Mode m, ObjectInspector[] parameters)
     79                 throws HiveException {
     80             // TODO Auto-generated method stub
     81             
     82             super.init(m, parameters);
     83             if(parameters.length !=1 ){
     84                 throw new UDFArgumentException("参数异常");
     85             }
     86             if(inputor == null){
     87                 this.inputor = (PrimitiveObjectInspector) parameters[0];
     88             }
     89             //注意返回的类型要与最终sum的类型一致
     90             return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     91         }
     92 
     93         @Override
     94         public AggregationBuffer getNewAggregationBuffer() throws HiveException {
     95             // TODO Auto-generated method stub
     96             sumlongagg slg = new sumlongagg();
     97             this.reset(slg);
     98             return slg;
     99         }
    100 
    101         @Override
    102         public void reset(AggregationBuffer agg) throws HiveException {
    103             // TODO Auto-generated method stub
    104             sumlongagg slg = (sumlongagg) agg;
    105             slg.sum=0;
    106             slg.empty=true;
    107         }
    108 
    109         @Override
    110         public void iterate(AggregationBuffer agg, Object[] parameters)
    111                 throws HiveException {
    112             // TODO Auto-generated method stub
    113             if(parameters.length != 1){
    114                 throw new UDFArgumentException("参数错误");
    115             }
    116             this.merge(agg, parameters[0]);
    117             
    118         }
    119 
    120         @Override
    121         public Object terminatePartial(AggregationBuffer agg)
    122                 throws HiveException {
    123             // TODO Auto-generated method stub
    124             return this.terminate(agg);
    125         }
    126 
    127         @Override
    128         public void merge(AggregationBuffer agg, Object partial)
    129                 throws HiveException {
    130             // TODO Auto-generated method stub
    131             sumlongagg slg = (sumlongagg) agg;
    132             if(partial != null){
    133                 slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputor);
    134                 slg.empty=false;
    135             }
    136         }
    137 
    138         @Override
    139         public Object terminate(AggregationBuffer agg) throws HiveException {
    140             // TODO Auto-generated method stub
    141             sumlongagg slg = (sumlongagg) agg;
    142             if(slg.empty){
    143                 return null;
    144             }
    145             return new LongWritable(slg.sum);
    146         }
    147         
    148     }
    149     
    150     /**
    151      * 实现浮点型的求和
    152      */
    153     public static class udafDouble extends GenericUDAFEvaluator{
    154         
    155         //定义输入数据类型
    156         public  PrimitiveObjectInspector input;
    157         
    158         //实现自定义buffer
    159         static class sumdoubleagg implements AggregationBuffer{
    160             double sum;
    161             boolean empty;
    162         }
    163         
    164         //初始化方法
    165         @Override
    166         public ObjectInspector init(Mode m, ObjectInspector[] parameters)
    167                 throws HiveException {
    168             // TODO Auto-generated method stub
    169             
    170             super.init(m, parameters);
    171             if(parameters.length !=1 ){
    172                 throw new UDFArgumentException("参数异常");
    173             }
    174             if(input == null){
    175                 this.input = (PrimitiveObjectInspector) parameters[0];
    176             }
    177             //注意返回的类型要与最终sum的类型一致
    178             return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
    179         }
    180         
    181         
    182 
    183         @Override
    184         public AggregationBuffer getNewAggregationBuffer() throws HiveException {
    185             // TODO Auto-generated method stub
    186             sumdoubleagg sdg = new sumdoubleagg();
    187             this.reset(sdg);
    188             return sdg;
    189         }
    190 
    191         @Override
    192         public void reset(AggregationBuffer agg) throws HiveException {
    193             // TODO Auto-generated method stub
    194             sumdoubleagg sdg = (sumdoubleagg) agg;
    195             sdg.sum=0;
    196             sdg.empty=true;
    197         }
    198 
    199         @Override
    200         public void iterate(AggregationBuffer agg, Object[] parameters)
    201                 throws HiveException {
    202             // TODO Auto-generated method stub
    203             if(parameters.length != 1){
    204                 throw new UDFArgumentException("参数错误");
    205             }
    206             this.merge(agg, parameters[0]);
    207         }
    208 
    209         @Override
    210         public Object terminatePartial(AggregationBuffer agg)
    211                 throws HiveException {
    212             // TODO Auto-generated method stub
    213             return this.terminate(agg);
    214         }
    215 
    216         @Override
    217         public void merge(AggregationBuffer agg, Object partial)
    218                 throws HiveException {
    219             // TODO Auto-generated method stub
    220             sumdoubleagg sdg =(sumdoubleagg) agg;
    221             if(partial != null){
    222                 sdg.sum += PrimitiveObjectInspectorUtils.getDouble(sdg, input);
    223                 sdg.empty=false;
    224             }
    225         }
    226 
    227         @Override
    228         public Object terminate(AggregationBuffer agg) throws HiveException {
    229             // TODO Auto-generated method stub
    230             sumdoubleagg sdg = (sumdoubleagg) agg;
    231             if (sdg.empty){
    232                 return null;
    233             }
    234             return new DoubleWritable(sdg.sum);
    235         }
    236         
    237     }
    238 
    239 }

    6.打成jar包

      并放入路径:/etc/opt/datas/

    7.添加jar到path

      格式:

        add jar linux_path;

      即:

        add jar /etc/opt/datas/af.jar

    8.创建方法

      create temporary function af as 'org.apache.hadoop.hive_udf.UdafProject';

    9.在hive中运行

      select sum(id),af(id) from stu_info;

    三:UDTF

    1.UDTF

      

    2.程序 

     1 package org.apache.hadoop.hive.udf;
     2 
     3 import java.util.ArrayList;
     4 
     5 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
     6 import org.apache.hadoop.hive.ql.metadata.HiveException;
     7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
     8 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
     9 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    10 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    11 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    12 
    13 public class UDTFtest extends GenericUDTF {
    14 
    15     @Override
    16     public StructObjectInspector initialize(StructObjectInspector argOIs)
    17             throws UDFArgumentException {
    18         // TODO Auto-generated method stub
    19         if(argOIs.getAllStructFieldRefs().size() != 1){
    20             throw new UDFArgumentException("参数只能有一个");
    21         }
    22         ArrayList<String> fieldname = new ArrayList<String>();
    23         fieldname.add("name");
    24         fieldname.add("email");
    25         ArrayList<ObjectInspector> fieldio = new ArrayList<ObjectInspector>();
    26         fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    27         fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    28         
    29         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldname, fieldio);
    30     }
    31     
    32     @Override
    33     public void process(Object[] args) throws HiveException {
    34         // TODO Auto-generated method stub
    35         if(args.length == 1){
    36             String name = args[0].toString();
    37             String email = name + "@ibeifneg.com";
    38             super.forward(new String[] {name,email});
    39         }
    40     }
    41 
    42     @Override
    43     public void close() throws HiveException {
    44         // TODO Auto-generated method stub
    45         super.forward(new String[] {"complete","finish"});
    46     }
    47 
    48 }

    3.同样的步骤

    4.在hive中运行

      select tf(ename) as (name,email) from emp;

  • 相关阅读:
    【从0安装】mysql
    Java面试题整理(待完善)
    Linux部署Java项目
    执行旧命令的几种方法
    SQL Server死锁报错分析
    枚举类中枚举值不存在.valueOf(enum) 抛异常处理
    InitializingBean的项目开发使用
    巧用枚举来干掉if-else,代码更优雅!
    使用Docker安装配置GitLab CE
    批量条件导出之---CSV
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6079378.html
Copyright © 2011-2022 走看看