Hive中编写自定义函数
自定义函数有三类
UDF
UDAF
UDTF
Hive中编写UDF函数的方式
Hive有两个不同的接口编写UDF程序。一个是基础的UDF接口,一个是复杂的GenericUDF接口。
01.UDF
重写 evaluate
2.GenericUDF :增强版的 udf ( 支持复杂类型的输入和输出 )
01、继承GenericUDF
02、实现initialize、evaluate、getDisplayString方法
initialize : 这个方法的目标是确定参数的返回类型
evaluate :实现主要逻辑
getDisplayString :显示函数的用法
Hive编写UDAF
1.继承 AbstractGenericUDAFResolver
2.继承 GenericUDAFEvaluator
3.Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
init 初始化
iterate 函数处理读入的行数据
terminatePartial 返回iterate处理的中间结果
merge 合并上述处理结果
terminate 返回最终值
案例
public static enum Mode {
/**
* PARTIAL1: from original data to partial aggregation data: iterate() and terminatePartial() will be called.
* PARTIAL1: 从原始数据到部分聚合数据的过程,会调用 iterate()和 terminatePartial()
* 可以理解为MapReduce过程中的map阶段
*/
PARTIAL1,
/**
* PARTIAL2: from partial aggregation data to partial aggregation data: * merge() and terminatePartial() will be called.
* PARTIAL2: 从部分聚合数据到部分聚合数据的过程(多次聚合),会调用 merge()和 terminatePartial()
* 可以理解为MapReduce过程中的combine阶段
*/
PARTIAL2,
/**
* FINAL: from partial aggregation to full aggregation: merge() and terminate() will be called.
* FINAL: 从部分聚合数据到全部聚合数据的过程,会调用 merge()和 terminate()
* 可以理解为MapReduce过程中的reduce阶段
*/
FINAL,
/**
* COMPLETE: from original data directly to full aggregation: iterate() and terminate() will be called.
* COMPLETE: 从原始数据直接到全部聚合数据的过程,会调用 iterate()和 terminate()
* 可以理解为MapReduce过程中的直接map输出阶段,没有reduce阶段
*/
COMPLETE
};
3.程序执行过程:
1)PARTIAL1(阶段1:map):init() --> iterate() --> terminatePartial()
2)PARTIAL2(阶段2:combine):init() --> merge() --> terminatePartial()
3)FINAL (最终阶段:reduce):init() --> merge() --> terminate()
4)COMPLETE(直接输出阶段:只有map):init() --> iterate() --> terminate()
注:每个阶段都会执行init()初始化操作。
一个GenericUDAF必须先了解以下两个抽象类:
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
过程理解:
这个UDAF函数读取数据(mapper),
聚集一堆mapper输出到部分聚集结果(combiner),
并且最终创建一个最终的聚集结果(reducer)。
因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果
完整的UDAF逻辑是一个mapreduce过程,
如果有mapper和reducer, 就会经历PARTIAL1(mapper),FINAL(reducer),
如果有mapper和reducer还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果
Hive写UDTF
POM文件
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.4</version>
<scope>provided</scope>
</dependency>
类
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
重写的方法
initialize process close
//该方法中,指定输入输出参数:输入参数的ObjectInspector 与输出参数的StructObjectInspector
abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;
//处理一条输入记录,输出若干条结果记录
abstract void process(Object[] record) throws HiveException;
//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
abstract void close() throws HiveException;
Hive UDTF代码示例
代码主要参考网上资料,做了注释用于学习
`
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
public class GenericUDTFEBitMap extends GenericUDTF {
/**
* 输入数据
*/
private transient BinaryObjectInspector binaryOI = null;
/**
*
* @param args 输入参数的ObjectInspector
* @return 输出参数的StructObjectInspector
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("GenericUDTFEBitMap() takes only one argument");
}
/** 有两种数据类型
* two categories of Hive Data types that are primitive data type and complex data type
* PRIMITIVE : Numeric Date/time String Miscellaneous
* Complex Type: Array Map Struct Union
* public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() {}}
*/
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
&& ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.BINARY) {
throw new UDFArgumentException("GenericUDTFEBitMap() () takes a binary as a parameter");
}
// 输入格式(inspectors)
binaryOI = (BinaryObjectInspector) args[0];
// ObjectInspector生成的方式
// 输出格式(inspectors) -- 有两个属性的对象
// 静态属性获得列名和列的类型 定义 List<ObjectInspector>列表
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col_id");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
// 通过 ObjectInspectorFactory 构造ObjectInspector
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
/**
* forwardListObj forwardMapObj
* forward
* @param record
* @throws HiveException 处理一条输入记录,输出若干条结果记录
*/
@Override
public void process(Object[] record) throws HiveException {
byte[] idBytes = this.binaryOI.getPrimitiveJavaObject(record[0]);
if(idBytes !=null && idBytes.length>0){
ImmutableRoaringBitmap other = new ImmutableRoaringBitmap(ByteBuffer.wrap(idBytes));
Iterator<Integer> iterator = other.iterator();
while (iterator.hasNext()){
// forward Passes an output row to the collector.
// 真正的处理过程在process函数中,在process中,每一次forward()调用产生一行
forward(new Object[] { iterator.next() });
}
}
}
/**
* 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
* @throws HiveException
*/
@Override
public void close() throws HiveException {
}
/**
* toString
*/
@Override
public String toString() {
return "GenericUDTFEBitMap";
}
}`
参考
Hive之自定义聚合函数UDAF https://blog.csdn.net/weixin_39469127/article/details/89766266
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
https://github.com/sunyaf/bitmapudf
Hive Data Types – Primitive and Complex Data Types in Hive https://data-flair.training/blogs/hive-data-types/