1. 概述
UDF函数其实就是一个简单的函数,执行过程就是在Hive转换成MapReduce程序后,执行java方法,类似于像MapReduce执行过程中加入一个插件,方便扩展。UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF。
Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。
2. UDF类型
Hive中有3种UDF:
UDF:操作单个数据行,产生单个数据行;
UDAF:操作多个数据行,产生一个数据行;
UDTF:操作一个数据行,产生多个数据行一个表作为输出;
3. 如何构建UDF
用户构建的UDF使用过程如下:
- 继承UDF或者UDAF或者UDTF,实现特定的方法;
- 将写好的类打包为jar,如LowerUDF.jar;
- 进入到Hive shell环境中,输入命令add jar /home/hadoop/LowerUDF.jar注册该jar文件;或者把LowerUDF.jar上传到hdfs,hadoop fs -put LowerUDF.jar /home/hadoop/LowerUDF.jar,再输入命令add jar hdfs://hadoop01:8020/user/home/LowerUDF.jar;
- 为该类起一个别名,create temporary function lower_udf as 'UDF.lowerUDF';注意,这里UDF只是为这个Hive会话临时定义的;
- 在select中使用lower_udf();
4. 自定义UDF
4.1 pom.xml依赖
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency>
4.2 编写UDF代码
package UDF; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; public class LowerUDF extends UDF{ /** * 1. Implement one or more methods named "evaluate" which will be called by Hive. * * 2. "evaluate" should never be a void method. However it can return "null" if needed. */ public Text evaluate(Text str){ // input parameter validate if(null == str){ return null ; } // validate if(StringUtils.isBlank(str.toString())){ return null ; } // lower return new Text(str.toString().toLowerCase()) ; } }
4.3 打包
注意:工程所用的jdk要与Hadoop集群使用的jdk是同一个版本。
4.4 注册UDF
hive> add jar /home/hadoop/LowerUDF.jar hive> create temporary function lower_udf as "UDF.LowerUDF";
4.5 测试
hive> create table test (id int ,name string); hive> insert into test values(1,'TEST'); hive> select lower_udf(name) from test; OK test
注意事项:
- 一个用户UDF必须org.apache.hadoop.hive.ql.exec.UDF;
- 一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自定义的。在使用的时候,Hive会调用UDF的evaluate()方法。
5. 自定义UDAF
UDAF是聚合函数,相当于reduce,将表中多行数据聚合成一行结果
UDAF是需要在hive的sql语句和group by联合使用,hive的group by 对于每个分组,只能返回一条记录,这点和mysql不一样。
开发通用UDAF有两个步骤:
- resolver负责类型检查,操作符重载,里面创建evaluator类对象;
- evaluator真正实现UDAF的逻辑;
5.1 继承AbstractAggregationBuffer和实现evaluator
package cn.wisec.meerkat.analyseOnHive; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; /** * @Author: Yang JianQiu * @Date: 2019/7/16 10:48 * * 开发通用UDAF有两个步骤: * 第一个是编写resolver类,第二个是编写evaluator类 * resolver负责类型检查,操作符重载 * evaluator真正实现UDAF的逻辑。 * 通常来说,顶层UDAF类继承{@link org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2} * 里面编写嵌套类evaluator实现UDAF的逻辑 * * resolver通常继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化 * GenericUDAFResolver和GenericUDAFResolver2接口的区别是: 后面的允许evaluator实现利用GenericUDAFParameterInfo可以访问更多的信息,例如DISTINCT限定符,通配符(*)。 */ public class CountUDAF extends AbstractGenericUDAFResolver { /** * 构建方法,传入的是函数指定的列 * @param params * @return * @throws SemanticException */ @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException { if (params.length > 1){ throw new UDFArgumentLengthException("Exactly one argument is expected"); } return new CountUDAFEvaluator(); } /** * 这个构建方法可以判输入的参数是*号或者distinct * @param info * @return * @throws SemanticException */ @Override public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { ObjectInspector[] parameters = info.getParameterObjectInspectors(); boolean isAllColumns = false; if (parameters.length == 0){ if (!info.isAllColumns()){ throw new UDFArgumentException("Argument expected"); } if (info.isDistinct()){ throw new UDFArgumentException("DISTINCT not supported with"); } isAllColumns = true; }else if (parameters.length != 1){ throw new UDFArgumentLengthException("Exactly one argument is expected."); } return new CountUDAFEvaluator(isAllColumns); } /** * GenericUDAFEvaluator类实现UDAF的逻辑 * * enum Mode运行阶段枚举类 * PARTIAL1; * 这个是mapreduce的map阶段:从原始数据到部分数据聚合 * 将会调用iterate()和terminatePartial() * * PARTIAL2: * 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:部分数据聚合 * 将会调用merge()和terminatePartial() * * FINAL: * mapreduce的reduce阶段:从部分数据的聚合到完全聚合 * 将会调用merge()和terminate() * * COMPLETE: * 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了;从原始数据直接到完全聚合 * 将会调用iterate()和terminate() */ public static class CountUDAFEvaluator extends GenericUDAFEvaluator{ private boolean isAllColumns = false; /** * 合并结果的类型 */ private LongObjectInspector aggOI; private LongWritable result; public CountUDAFEvaluator() { } public CountUDAFEvaluator(boolean isAllColumns) { this.isAllColumns = isAllColumns; } /** * 负责初始化计算函数并设置它的内部状态,result是存放最终结果的 * @param m 代表此时在map-reduce哪个阶段,因为不同的阶段可能在不同的机器上执行,需要重新创建对象partial1,partial2,final,complete * @param parameters partial1或complete阶段传入的parameters类型是原始输入数据的类型 * partial2和final阶段(执行合并)的parameters类型是partial-aggregations(既合并返回结果的类型),此时parameters长度肯定只有1了 * @return ObjectInspector * 在partial1和partial2阶段返回局部合并结果的类型,既terminatePartial的类型 * 在complete或final阶段返回总结果的类型,既terminate的类型 * @throws HiveException */ @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters); //当是combiner和reduce阶段时,获取合并结果的类型,因为需要执行merge方法 //merge方法需要部分合并的结果类型来取得值 if (m == Mode.PARTIAL2 || m == Mode.FINAL){ aggOI = (LongObjectInspector) parameters[0]; } //保存总结果 result = new LongWritable(0); //局部合并结果的类型和总合并结果的类型都是long return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } /** * 定义一个AbstractAggregationBuffer类来缓存合并值 */ static class CountAgg extends AbstractAggregationBuffer{ long value; /** * 返回类型占的字节数,long为8 * @return */ @Override public int estimate() { return JavaDataModel.PRIMITIVES2; } } /** * 创建缓存合并值的buffer * @return * @throws HiveException */ @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { CountAgg countAgg = new CountAgg(); reset(countAgg); return countAgg; } /** * 重置合并值 * @param agg * @throws HiveException */ @Override public void reset(AggregationBuffer agg) throws HiveException { ((CountAgg) agg).value = 0; } /** * map时执行,迭代数据 * @param agg * @param parameters * @throws HiveException */ @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { //parameters为输入数据 //parameters == null means the input table/split is empty if (parameters == null){ return; } if (isAllColumns){ ((CountAgg) agg).value ++; }else { boolean countThisRow = true; for (Object nextParam: parameters){ if (nextParam == null){ countThisRow = false; break; } } if (countThisRow){ ((CountAgg) agg).value++; } } } /** * 返回buffer中部分聚合结果,map结束和combiner结束执行 * @param agg * @return * @throws HiveException */ @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { return terminate(agg); } /** * 合并结果,combiner或reduce时执行 * @param agg * @param partial * @throws HiveException */ @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null){ //累加部分聚合的结果 ((CountAgg) agg).value += aggOI.get(partial); } } /** * 返回buffer中总结果,reduce结束执行或者没有reduce时map结束执行 * @param agg * @return * @throws HiveException */ @Override public Object terminate(AggregationBuffer agg) throws HiveException { //每一组执行一次(group by) result.set(((CountAgg) agg).value); //返回writable类型 return result; } } }
使用:
hive> add jar /root/udf.jar hive> create temporary function mycount as 'udf.CountUDAF' hive> select call, mycount(*) as cn from beauty group by call order by cn desc hive> select tag, mycount(tag) as cn from beauty lateral view explode(tags) lve_beauty as tag group by tag order by cn desc
6. 自定义UDTF
UDTF用来解决输入一行输出多行的需求。
限制:
- No other expressions are allowed in SELECT不能和其他字段一起使用:SELECT pageid,explode(adid_list) AS myCol... is not supported
- UDTF's can't be nested 不能嵌套:SELECT explode(explode(adid_list)) AS myCol..... is not supported
- GROUP BY/ CLUSTER BY/ DISTRIBUTE BY/ SORT BY is not supported:SELECT explode(adid_list) AS myCol.....GROUP BY myCol is not supported
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize,process,close三个方法。
执行过程:
- UDTF首先会调用initialize方法,此方法返回UDTF的输出行的信息(输出列个数与类型);
- 初始化完成后,会调用process方法,真正的处理过程在process函数中:在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
- 最后close()方法调用,对需要清理的方法进行清理。
下面是实现一个explode函数的例子:
explode会将一个数组中每个元素都输出一行,map中每对key-value都输出一行,实现对数据展开
package cn.wisec.meerkat.analyseOnHive; import org.apache.hadoop.hive.ql.exec.TaskExecutionException; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.*; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @Author: Yang JianQiu * @Date: 2019/7/16 17:08 */ public class MyExplodeUDTF extends GenericUDTF { private transient ObjectInspector inputOI = null; /** * 初始化 * 构建一个StructObjectInspector类型用于输出 * 其中struct的字段构成输出的一行 * 字段名称不重要,因为它们将被用户提供的列别名覆盖 * @param argOIs * @return * @throws UDFArgumentException */ @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { //得到结构体的字段 List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs(); ObjectInspector[] udfInputOIs = new ObjectInspector[inputFields.size()]; for (int i = 0; i < inputFields.size(); i++){ //字段类型 udfInputOIs[i] = inputFields.get(i).getFieldObjectInspector(); } if (udfInputOIs.length != 1){ throw new UDFArgumentLengthException("explode() takes only one argument"); } List<String> fieldNames = new ArrayList<>(); List<ObjectInspector> fieldOIs = new ArrayList<>(); switch (udfInputOIs[0].getCategory()){ case LIST: inputOI = udfInputOIs[0]; //指定list生成的列名,可在as后覆写 fieldNames.add("col"); //获取list元素的类型 fieldOIs.add(((ListObjectInspector) inputOI).getListElementObjectInspector()); break; case MAP: inputOI = udfInputOIs[0]; //指定map中key的生成的列名,可在as后覆写 fieldNames.add("key"); //指定map中value的生成的列名,可在as后覆写 fieldNames.add("value"); //得到map中key的类型 fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector()); //得到map中value的类型 fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector()); break; default: throw new UDFArgumentException("explode() takes an array or a map as a parameter"); } //创建一个Struct类型返回 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } //输出list private transient Object[] forwardListObj = new Object[1]; //输出map private transient Object[] forwardMapObj = new Object[2]; /** * 每行执行一次,输入数据args * 每调用forward,输出一行 * @param args * @throws HiveException */ @Override public void process(Object[] args) throws HiveException { switch (inputOI.getCategory()){ case LIST: ListObjectInspector listOI = (ListObjectInspector) inputOI; List<?> list = listOI.getList(args[0]); if (list == null){ return; } //list中每个元素输出一行 for (Object o: list){ forwardListObj[0] = o; forward(forwardListObj); } break; case MAP: MapObjectInspector mapOI = (MapObjectInspector) inputOI; Map<?, ?> map = mapOI.getMap(args[0]); if (map == null){ return; } //map中每一对输出一行 for (Map.Entry<?, ?> entry: map.entrySet()){ forwardMapObj[0] = entry.getKey(); forwardMapObj[1] = entry.getValue(); forward(forwardMapObj); } break; default: throw new TaskExecutionException("explode() can only operate on an array or a map"); } } @Override public void close() throws HiveException { } }
使用:
hive> add jar /root/udtf.jar hive> create temporary function myexplode as 'udf.MyExplodeUDTF' hive> select myexplode(tags) as tag from beauty hive> select myexplode(props) as (k,v) from beauty hive> select tag, count(tag) as cn from beauty lateral view myexplode(tags) lve_beauty as tag group by tag order by cn desc
【参考资料】
https://blog.csdn.net/wypersist/article/details/80314352
https://blog.csdn.net/zmywei_20160707/article/details/81698542