AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
输入类型是输入流中的元素类型,AggregateFunction有一个add方
法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。
package com.lynch.stream.window;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 测试AggFunction——求各个班级英语成绩平均分
*
*/
public class TestAggFunctionOnWindow {
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据
DataStream<Tuple3<String, String, Long>> input = env.fromElements(ENGLISH);
// 求各个班级英语成绩平均分
DataStream<Double> avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate());
avgScore.print();
env.execute("TestAggFunctionOnWindow");
}
public static final Tuple3[] ENGLISH = new Tuple3[] {
Tuple3.of("class1", "张三", 100L),
Tuple3.of("class1", "李四", 40L),
Tuple3.of("class1", "王五", 60L),
Tuple3.of("class2", "赵六", 20L),
Tuple3.of("class2", "小七", 30L),
Tuple3.of("class2", "小八", 50L),
};
//Tuple3<String, String, Long> 输入类型
//Tuple2<Long, Long> 累加器ACC类型,保存中间状态
//Double 输出类型
public static class AverageAggrate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> {
/**
* 创建累加器保存中间状态(sum count)
*
* sum 英语总成绩
* count 学生个数
*
* @return
*/
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
/**
* 将元素添加到累加器并返回新的累加器
*
* @param value 输入类型
* @param acc 累加器ACC类型
*
* @return 返回新的累加器
*/
@Override
public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) {
//acc.f0 总成绩
//value.f2 表示成绩
//acc.f1 人数
return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L);
}
/**
* 从累加器提取结果
*
* @param longLongTuple2
* @return
*/
@Override
public Double getResult(Tuple2<Long, Long> acc) {
return ((double) acc.f0) / acc.f1;
}
/**
* 累加器合并
*
* @param longLongTuple2
* @param acc1
* @return
*/
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
}
}
}