一:UDF
1.自定义UDF
二:UDAF
2.UDAF
3.介绍AbstractGenericUDAFResolver
4.介绍GenericUDAFEvaluator
5.程序
1 package org.apache.hadoop.hive_udf; 2 3 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 4 import org.apache.hadoop.hive.ql.metadata.HiveException; 5 import org.apache.hadoop.hive.ql.parse.SemanticException; 6 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; 8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; 9 import org.apache.hadoop.hive.serde2.io.DoubleWritable; 10 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 11 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; 12 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector; 13 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 14 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; 15 import org.apache.hadoop.io.LongWritable; 16 17 /** 18 * 19 * 需求:实现sum函数,支持int和double类型 20 * 21 */ 22 23 public class UdafProject extends AbstractGenericUDAFResolver{ 24 public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) 25 throws SemanticException { 26 //判断参数是否是全部列 27 if(info.isAllColumns()){ 28 throw new SemanticException("不支持*的参数"); 29 } 30 31 //判断是否只有一个参数 32 ObjectInspector[] inspector = info.getParameterObjectInspectors(); 33 if(inspector.length != 1){ 34 throw new SemanticException("参数只能有一个"); 35 } 36 //判断输入列的数据类型是否为基本类型 37 if(inspector[0].getCategory() != ObjectInspector.Category.PRIMITIVE){ 38 throw new SemanticException("参数必须为基本数据类型"); 39 } 40 41 AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector) inspector[0]; 42 43 //判断是那种基本数据类型 44 45 switch(woi.getPrimitiveCategory()){ 46 case INT: 47 case LONG: 48 case BYTE: 49 case SHORT: 50 return new udafLong(); 51 case FLOAT: 52 case DOUBLE: 53 return new udafDouble(); 54 default: 55 throw new SemanticException("参数必须是基本类型,且不能为string等类型"); 56 57 58 } 59 60 } 61 62 /** 63 * 对整形数据进行求和 64 */ 65 public static class udafLong extends GenericUDAFEvaluator{ 66 67 //定义输入数据类型 68 public PrimitiveObjectInspector inputor; 69 70 //实现自定义buffer 71 static class sumlongagg implements AggregationBuffer{ 72 long sum; 73 boolean empty; 74 } 75 76 //初始化方法 77 @Override 78 public ObjectInspector init(Mode m, ObjectInspector[] parameters) 79 throws HiveException { 80 // TODO Auto-generated method stub 81 82 super.init(m, parameters); 83 if(parameters.length !=1 ){ 84 throw new UDFArgumentException("参数异常"); 85 } 86 if(inputor == null){ 87 this.inputor = (PrimitiveObjectInspector) parameters[0]; 88 } 89 //注意返回的类型要与最终sum的类型一致 90 return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 91 } 92 93 @Override 94 public AggregationBuffer getNewAggregationBuffer() throws HiveException { 95 // TODO Auto-generated method stub 96 sumlongagg slg = new sumlongagg(); 97 this.reset(slg); 98 return slg; 99 } 100 101 @Override 102 public void reset(AggregationBuffer agg) throws HiveException { 103 // TODO Auto-generated method stub 104 sumlongagg slg = (sumlongagg) agg; 105 slg.sum=0; 106 slg.empty=true; 107 } 108 109 @Override 110 public void iterate(AggregationBuffer agg, Object[] parameters) 111 throws HiveException { 112 // TODO Auto-generated method stub 113 if(parameters.length != 1){ 114 throw new UDFArgumentException("参数错误"); 115 } 116 this.merge(agg, parameters[0]); 117 118 } 119 120 @Override 121 public Object terminatePartial(AggregationBuffer agg) 122 throws HiveException { 123 // TODO Auto-generated method stub 124 return this.terminate(agg); 125 } 126 127 @Override 128 public void merge(AggregationBuffer agg, Object partial) 129 throws HiveException { 130 // TODO Auto-generated method stub 131 sumlongagg slg = (sumlongagg) agg; 132 if(partial != null){ 133 slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputor); 134 slg.empty=false; 135 } 136 } 137 138 @Override 139 public Object terminate(AggregationBuffer agg) throws HiveException { 140 // TODO Auto-generated method stub 141 sumlongagg slg = (sumlongagg) agg; 142 if(slg.empty){ 143 return null; 144 } 145 return new LongWritable(slg.sum); 146 } 147 148 } 149 150 /** 151 * 实现浮点型的求和 152 */ 153 public static class udafDouble extends GenericUDAFEvaluator{ 154 155 //定义输入数据类型 156 public PrimitiveObjectInspector input; 157 158 //实现自定义buffer 159 static class sumdoubleagg implements AggregationBuffer{ 160 double sum; 161 boolean empty; 162 } 163 164 //初始化方法 165 @Override 166 public ObjectInspector init(Mode m, ObjectInspector[] parameters) 167 throws HiveException { 168 // TODO Auto-generated method stub 169 170 super.init(m, parameters); 171 if(parameters.length !=1 ){ 172 throw new UDFArgumentException("参数异常"); 173 } 174 if(input == null){ 175 this.input = (PrimitiveObjectInspector) parameters[0]; 176 } 177 //注意返回的类型要与最终sum的类型一致 178 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; 179 } 180 181 182 183 @Override 184 public AggregationBuffer getNewAggregationBuffer() throws HiveException { 185 // TODO Auto-generated method stub 186 sumdoubleagg sdg = new sumdoubleagg(); 187 this.reset(sdg); 188 return sdg; 189 } 190 191 @Override 192 public void reset(AggregationBuffer agg) throws HiveException { 193 // TODO Auto-generated method stub 194 sumdoubleagg sdg = (sumdoubleagg) agg; 195 sdg.sum=0; 196 sdg.empty=true; 197 } 198 199 @Override 200 public void iterate(AggregationBuffer agg, Object[] parameters) 201 throws HiveException { 202 // TODO Auto-generated method stub 203 if(parameters.length != 1){ 204 throw new UDFArgumentException("参数错误"); 205 } 206 this.merge(agg, parameters[0]); 207 } 208 209 @Override 210 public Object terminatePartial(AggregationBuffer agg) 211 throws HiveException { 212 // TODO Auto-generated method stub 213 return this.terminate(agg); 214 } 215 216 @Override 217 public void merge(AggregationBuffer agg, Object partial) 218 throws HiveException { 219 // TODO Auto-generated method stub 220 sumdoubleagg sdg =(sumdoubleagg) agg; 221 if(partial != null){ 222 sdg.sum += PrimitiveObjectInspectorUtils.getDouble(sdg, input); 223 sdg.empty=false; 224 } 225 } 226 227 @Override 228 public Object terminate(AggregationBuffer agg) throws HiveException { 229 // TODO Auto-generated method stub 230 sumdoubleagg sdg = (sumdoubleagg) agg; 231 if (sdg.empty){ 232 return null; 233 } 234 return new DoubleWritable(sdg.sum); 235 } 236 237 } 238 239 }
6.打成jar包
并放入路径:/etc/opt/datas/
7.添加jar到path
格式:
add jar linux_path;
即:
add jar /etc/opt/datas/af.jar
8.创建方法
create temporary function af as 'org.apache.hadoop.hive_udf.UdafProject';
9.在hive中运行
select sum(id),af(id) from stu_info;
三:UDTF
1.UDTF
2.程序
1 package org.apache.hadoop.hive.udf; 2 3 import java.util.ArrayList; 4 5 import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 6 import org.apache.hadoop.hive.ql.metadata.HiveException; 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; 8 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 9 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; 10 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; 11 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 12 13 public class UDTFtest extends GenericUDTF { 14 15 @Override 16 public StructObjectInspector initialize(StructObjectInspector argOIs) 17 throws UDFArgumentException { 18 // TODO Auto-generated method stub 19 if(argOIs.getAllStructFieldRefs().size() != 1){ 20 throw new UDFArgumentException("参数只能有一个"); 21 } 22 ArrayList<String> fieldname = new ArrayList<String>(); 23 fieldname.add("name"); 24 fieldname.add("email"); 25 ArrayList<ObjectInspector> fieldio = new ArrayList<ObjectInspector>(); 26 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); 27 fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); 28 29 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldname, fieldio); 30 } 31 32 @Override 33 public void process(Object[] args) throws HiveException { 34 // TODO Auto-generated method stub 35 if(args.length == 1){ 36 String name = args[0].toString(); 37 String email = name + "@ibeifneg.com"; 38 super.forward(new String[] {name,email}); 39 } 40 } 41 42 @Override 43 public void close() throws HiveException { 44 // TODO Auto-generated method stub 45 super.forward(new String[] {"complete","finish"}); 46 } 47 48 }
3.同样的步骤
4.在hive中运行
select tf(ename) as (name,email) from emp;