Spark UDAF
无泛型约束的UDAF extends UserDefinedAggregateFunction extends Aggregator dataframe设计的
有泛型约束的UDAF extends Aggregator 该UDAF时允许添加泛型,保障函数更加安全.但是这种UDAF不可直接在SQL中被调用运算 适用于强类型Datasets
01.在Spark中使用
1.编写UDAF<两种类型的UDAF都可以>
2. 在spark中注册UDAF,为其绑定一个名字,使用
02.在Spark SQL 中使用
1.编写UDAF<使用继承 UserDefinedAggregateFunction 类型编写>
2. 打Jar包,并上传
3. 注册临时聚合函数,并使用
ADD jar TestSpark.jar;
CREATE TEMPORARY FUNCTION mean_my AS 'com.test.structure.udaf.MeanMy';
select t1.data,mean_my(t1.age)
from (select 33 as age, '1' as data union all select 55 as age, '1' as data
union all select 66 as age, '2' as data)t1
group by t1.data;
Spark UDAF开发
`
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class MeanFloatUDAF extends UserDefinedAggregateFunction {
/**
* 聚合函数的输入数据结构
* 函数的参数列表,不过需要写成StructType的格式
*/
@Override
public StructType inputSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField( "field_nm", DataTypes.DoubleType, true ));
return DataTypes.createStructType( structFields );
}
/**
* 聚缓存区数据结构 - 产生中间结果的数据类型
* 如果是求平均数,存储总和以及计数,总和及计数就是中间结果
* count buffer.getInt(0)
* sum_field buffer.getDouble(1)
*/
@Override
public StructType bufferSchema() {
List<StructField> structFields = new ArrayList<>();
structFields.add(DataTypes.createStructField( "count", DataTypes.IntegerType, true ));
structFields.add(DataTypes.createStructField( "sum_field", DataTypes.DoubleType, true ));
return DataTypes.createStructType( structFields );
}
/**
* 聚合函数返回值数据结构
*/
@Override
public DataType dataType() {
return DataTypes.DoubleType;
}
/**
* 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
*/
@Override
public boolean deterministic() {
return true;
}
/**
* 初始化缓冲区
* buffer是中间结果,是Row类的子类
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
//相加的初始值,这里的要和上边的中间结果的类型和位置相对应 - buffer.getInt(0)
buffer.update(0,0);
//参与运算数字个数的初始值
buffer.update(1,Double.valueOf(0.0) );
}
/**
* 给聚合函数传入一条新数据进行处理
* //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的计算)
* buffer里面存放着累计的执行结果,input是当前的执行结果
*/
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
//个数加1
buffer.update(0,buffer.getInt(0)+1);
//每有一个数字参与运算就进行相加(包含中间结果)
buffer.update(1,buffer.getDouble(1)+Double.valueOf(input.getDouble(0)));
}
/**
* 合并聚合函数缓冲区 //全局聚合
*/
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0));
buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1));
}
/**
* 计算最终结果
*/
@Override
public Object evaluate(Row buffer) {
return buffer.getDouble(1)/buffer.getInt(0);
}
}
`
Spark聚合函数使用
在Spark中使用 extends UserDefinedAggregateFunction类型的UDAF的使用
`
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class MeanUDAFMain {
public static void main(String[] args){
try {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value")
.master("local[2]")
.getOrCreate();
List<Row> dataExample = Arrays.asList(
RowFactory.create( "2019-0801", 4,9.2),
RowFactory.create( "2020-0802", 3,8.6),
RowFactory.create( "2021-0803",2,5.5),
RowFactory.create( "2021-0803",2,5.5),
RowFactory.create( "2021-0803",7,4.5)
);
StructType schema = new StructType(new StructField[]{
new StructField("date", DataTypes.StringType, false, Metadata.empty()),
new StructField("dist_mem", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("dm_mem", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> itemsDF = spark.createDataFrame(dataExample, schema);
itemsDF.printSchema();
itemsDF.createOrReplaceTempView("test_mean_table");
// 注册自定义聚合函数 -2. 在spark中注册UDAF,为其绑定一个名字
spark.udf().register("mymean",new MeanFloatUDAF ());
spark.sql("select dist_mem from test_mean_table").show();
spark.sql("select date,mymean(dm_mem) memdoubleMean from test_mean_table group by date").show();
} catch (Exception e) {
e.printStackTrace();
}
}
}
`
附录
在开发Spark 的UDAF中,查询了相关资料,涉及一些其他的逻辑设计,但没有相关的思路。而搜集的资料和示例也不是很清晰。此时,将复杂的问题进行拆解。
一是通过复习相关的基础点,对开发中涉及到的基础内容进行深入的学习和理解,结合当前的内容进行扩展,构建扎实的知识体系和对当前内容的理解。
二是对搜集的资料和示例进行修改,通过修改相应的内容来确认不是很清晰的概念。
本例就是使用第二种方式,来确认各个输入和输出之间的关系。通过对简单示例的学习修改和改进调试,不断深入。
总结: 资料和示例搜集很重要,不断改变关键词,搜集相关内容
资料的来源和准确性要进行确认和测试,不可盲信
原理和基础内容,看着用处不大,但实际上功底所在,自然而然。基础扎实,是走的顺和走的远的一个必要条件。
参考
Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版) https://www.cnblogs.com/xing901022/p/6436161.html
Spark笔记之使用UDAF(User Defined Aggregate Function) https://www.cnblogs.com/cc11001100/p/9471859.html
User Defined Aggregate Functions (UDAFs) http://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
SparkSQL自定义聚合函数(UDAF)实现bitmap函数 https://blog.csdn.net/xiongbingcool/article/details/81282118