ReduceFunction
实例一
例子: 计算每个传感器15s窗口中的温度最小值
val minTempPerWindow = sensorData .map(r => (r.id, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(15)) .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
实例二
ReduceFunction定义了如何把两个输入的元素进行合并来生成相同类型的输出元素的过程,Flink使用ReduceFunction来对窗口中的元素进行增量聚合
package com.lynch.stream.window;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 测试ReduceFunction
*
*/
public class TestReduceFunctionOnWindow {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取数据
DataStream<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH);
//keyBy(0) 计算班级总成绩,下标0表示班级
//countWindow(2) 根据元素个数对数据流进行分组切片,达到2个,触发窗口进行计算
DataStream<Tuple3<String,String,Integer>> totalPoints = input.keyBy(0).countWindow(2).reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {
//效果如下:
//(class1,张三,100)
//(class1,李四,30)
//==============
System.out.println("" + value1);
System.out.println("" + value2);
System.out.println("==============");
return new Tuple3<>(value1.f0, value1.f1, value1.f2+value2.f2);
}
});
//输出结果
//效果如下:
//2> (class1,张三,130)
totalPoints.print();
env.execute("TestReduceFunctionOnWindow");
}
/**
* 定义班级的三元数组
*/
public static final Tuple3[] ENGLISH = new Tuple3[]{
//班级 姓名 成绩
Tuple3.of("class1","张三",100),
Tuple3.of("class1","李四",30),
Tuple3.of("class1","王五",70),
Tuple3.of("class2","赵六",50),
Tuple3.of("class2","小七",40),
Tuple3.of("class2","小八",10),
};
}
ReduceFunction执行返回结果
(class2,赵六,50) (class2,小七,40) ============== 1> (class2,赵六,90) (class1,张三,100) (class1,李四,30) ============== 2> (class1,张三,130)
实例三
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值
// {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
// API
// T: 输入输出元素类型
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
// 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction)
kafkaStream
// 将从Kafka获取的JSON数据解析成Java Bean
.process(new KafkaProcessFunction())
// 提取时间戳生成水印
.assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds)))
// 按用户分组
.keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
// 构造TimeWindow
.timeWindow(Time.seconds(windowLengthSeconds))
// 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录
.reduce(new ReduceFunction<UserActionLog>() {
@Override
public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
}
})
.print();
# 结果
UserActionLog{userID='user_4', eventTime='2019-11-09 12:51:25', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_2', eventTime='2019-11-09 12:51:29', eventType='browse', productID='product_2', productPrice=20}
UserActionLog{userID='user_1', eventTime='2019-11-09 12:51:22', eventType='browse', productID='product_3', productPrice=30}
UserActionLog{userID='user_5', eventTime='2019-11-09 12:51:21', eventType='browse', productID='product_3', productPrice=30}
注意: ReduceFunction输入输出元素类型相同。