zoukankan      html  css  js  c++  java
  • Hive UDF初探

    1. 引言

    前一篇中,解决了Hive表中复杂数据结构平铺化以导入Kylin的问题,但是平铺之后计算广告日志的曝光PV是翻倍的,因为一个用户对应于多个标签。所以,为了计算曝光PV,我们得另外创建视图。

    分析需求:

    • 每个DSP上的曝光PV,标签覆盖的曝光PV;
    • 累计曝光PV,累计标签覆盖曝光PV

    相当于cube(dsp, tag) + measure(pv),HiveQL如下:

    select dsp, tag, count(*) as pv
    from ad_view
    where view = 'view' and day_time between '2016-04-18' and '2016-04-24'
    group by dsp, tag with cube;
    

    现在问题来了:如何将原始表中的tags array<struct<tag:string,label:string,src:string>> 转换成有标签(taged)、无标签(empty)呢?显而易见的办法,为字段tags写一个UDF来判断是否有标签。

    2. 实战

    基本介绍

    user-defined function (UDF)包括:

    • 对于字段进行转换操作的函数,如round()、abs()、concat()等;
    • 聚集函数user-defined aggregate functions (UDAFs),比如sum()、avg()等;
    • 表生成函数user-defined table generating functions (UDTFs),生成多列或多行数据,比如explode()、inline()等

    UDTF的使用在与select语句使用时受到了限制,比如,不能与其他的列组合出现:

    hive> SELECT name, explode(subordinates) FROM employees;
    FAILED: Error in semantic analysis: UDTF's are not supported outside the SELECT clause, nor nested in expressions
    

    Hive提供LATERAL VIEW关键字,对UDTF的输入进行包装(wrap),如此可以达到列组合的效果:

    hive> SELECT name, sub
    > FROM employees
    > LATERAL VIEW explode(subordinates) subView AS sub;
    

    UDF与GenericUDF

    org.apache.hadoop.hive.ql.exec.UDF是字段转换操作的基类,提供对于简单数据类型进行转换操作。在实现转换操作时,需要重写evaluate()方法。较UDF抽象类,org.apache.hadoop.hive.ql.udf.generic.GenericUDF提供更为复杂的处理方法类,包括三个方法:

    • initialize(ObjectInspector[] arguments),检查输入参数的类型、确定返回值的类型;
    • evaluate(DeferredObject[] arguments),字段转换操作的实现函数,其返回值的类型与initialize方法中所指定的返回类型保持一致;
    • getDisplayString(String[] children),给Hadoop任务展示debug信息的。

    判断tags array<struct<tag:string,label:string,src:string>>是否为空标签(EMPTY)的UDF实现如下:

    @Description(name = "checkTag",
            value = "_FUNC_(array<struct>) - from the input array of struct "+
                    "returns the TAGED or EMPTY(no tag).",
            extended = "Example:
    "
                    + " > SELECT _FUNC_(tags_array) FROM src;")
    public class CheckTag extends GenericUDF {
      private ListObjectInspector listOI;
    
      public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length != 1) {
          throw new UDFArgumentLengthException("only takes 1 arguments: List<T>");
        }
    
        ObjectInspector a = arguments[0];
        if (!(a instanceof ListObjectInspector)) {
          throw new UDFArgumentException("first argument must be a list / array");
        }
        this.listOI = (ListObjectInspector) a;
    
        if(!(listOI.getListElementObjectInspector() instanceof StructObjectInspector)) {
          throw new UDFArgumentException("first argument must be a list of struct");
        }
    
        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
      }
    
      public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if(listOI == null || listOI.getListLength(arguments[0].get()) == 0) {
          return "null_field";
        }
    
        StructObjectInspector structOI = (StructObjectInspector) listOI.getListElementObjectInspector();
        String tag = structOI.getStructFieldData(listOI.getListElement(arguments[0].get(), 0),
                structOI.getStructFieldRef("tag")).toString();
    
        if (listOI.getListLength(arguments[0].get()) == 1 && tag.equals("EMPTY")) {
          return "EMPTY";
        }
        return "TAGED";
      }
    
      public String getDisplayString(String[] children) {
        return "check tag whether is empty";
      }
    
    }
    

    还需添加依赖:

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>0.14.0</version>
      <scope>provided</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.5.0-cdh5.3.2</version>
      <scope>provided</scope>
    </dependency>
    

    编译后打成jar包,放在HDFS上,然后add jar即可调用该UDF了:

    add jar hdfs://path/to/udf-1.0-SNAPSHOT.jar;
    create temporary function checktag as 'com.hive.udf.CheckTag';
    
    create view if not exists yooshu_view
    partitioned on (day_time)
    as
    select uid, dsp, view, click, checktag(tags) as tag, day_time
    from ad_base;
    
  • 相关阅读:
    各种数据库默认端口总结
    Entity Framework学习
    Entity Framework学习
    .Net MVC API初试
    MongoDB Shell
    MongoDB安装及简单实验
    Android Studio记录
    Android使用Fragment程序崩溃
    git操作笔记
    centos防火墙设置
  • 原文地址:https://www.cnblogs.com/en-heng/p/5462796.html
Copyright © 2011-2022 走看看