我们还可以将ReduceFunction/AggregateFunction和ProcessWindowFunction结合起来使用。ReduceFunction/AggregateFunction做增量聚合,ProcessWindowFunction提供更多的对数据流的访问权限。如果只使用ProcessWindowFunction(底层的实现为将事件都保存在ListState中),将会非常占用空间。分配到某个窗口的元素将被提前聚合,而当窗口的trigger触发时,也就是窗口收集完数据关闭时,将会把聚合结果发送到ProcessWindowFunction中,这时Iterable参数将会只有一个值,就是前面聚合的值。
实例一
input
.keyBy(...)
.timeWindow(...)
.reduce(
incrAggregator: ReduceFunction[IN],
function: ProcessWindowFunction[IN, OUT, K, W])
input
.keyBy(...)
.timeWindow(...)
.aggregate(
incrAggregator: AggregateFunction[IN, ACC, V],
windowFunction: ProcessWindowFunction[V, OUT, K, W])
我们把之前的需求重新使用以上两种方法实现一下。
case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long) val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData .map(r => (r.id, r.temperature, r.temperature)) .keyBy(_._1) .timeWindow(Time.seconds(5)) .reduce( (r1: (String, Double, Double), r2: (String, Double, Double)) => { (r1._1, r1._2.min(r2._2), r1._3.max(r2._3)) }, new AssignWindowEndProcessFunction ) class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp, String, TimeWindow] { override def process(key: String, ctx: Context, minMaxIt: Iterable[(String, Double, Double)], out: Collector[MinMaxTemp]): Unit = { val minMax = minMaxIt.head val windowEnd = ctx.window.getEnd out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd)) } }
实例二
- 可将
ProcessWindowFunction
与增量聚合函数ReduceFunction
、AggregateFunction
结合。 - 元素到达窗口时增量聚合,当窗口关闭时对增量聚合的结果用
ProcessWindowFunction
再进行全量聚合。 - 既可以增量聚合,也可以访问窗口的元数据信息(如开始结束时间、状态等)。
ProcessWindowFunction与ReduceFunction结合
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值 // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10} // API: 如上ReduceFunction与ProcessWindowFunction // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并获得Key和Window信息。 kafkaStream // 将从Kafka获取的JSON数据解析成Java Bean .process(new KafkaProcessFunction()) // 提取时间戳生成水印 .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) // 按用户分组 .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) // 构造TimeWindow .timeWindow(Time.seconds(windowLengthSeconds)) // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录 .reduce( new ReduceFunction<UserActionLog>() { @Override public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception { return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2; } }, new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception { UserActionLog max = elements.iterator().next(); String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的最大价值对应的那条记录: "+max; out.collect(record); } } ) .print(); // 结果 Key: user_2 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_2', eventTime='2019-11-09 13:54:10', eventType='browse', productID='product_3', productPrice=30} Key: user_4 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_4', eventTime='2019-11-09 13:54:15', eventType='browse', productID='product_3', productPrice=30} Key: user_3 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_3', eventTime='2019-11-09 13:54:12', eventType='browse', productID='product_2', productPrice=20} Key: user_5 窗口开始时间: 2019-11-09 13:54:10 窗口结束时间: 2019-11-09 13:54:20 浏览的商品的最大价值对应的那条记录: UserActionLog{userID='user_5', eventTime='2019-11-09 13:54:17', eventType='browse', productID='product_2', productPrice=20}
ProcessWindowFunction与AggregateFunction结合
// 测试数据: 某个用户在某个时刻浏览了某个商品,以及商品的价值 // {"userID": "user_4", "eventTime": "2019-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10} // API: 如上AggregateFunction与ProcessWindowFunction // 示例: 获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction),并获得Key和Window信息。 kafkaStream // 将从Kafka获取的JSON数据解析成Java Bean .process(new KafkaProcessFunction()) // 提取时间戳生成水印 .assignTimestampsAndWatermarks(new MyCustomBoundedOutOfOrdernessTimestampExtractor(Time.seconds(maxOutOfOrdernessSeconds))) // 按用户分组 .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID) // 构造TimeWindow .timeWindow(Time.seconds(windowLengthSeconds)) // 窗口函数: 获取这段窗口时间内,每个用户浏览的商品的平均价值,并发出Key和Window信息 .aggregate( new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() { // 1、初始值 // 定义累加器初始值 @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } // 2、累加 // 定义累加器如何基于输入数据进行累加 @Override public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) { accumulator.f0 += 1; accumulator.f1 += value.getProductPrice(); return accumulator; } // 3、合并 // 定义累加器如何和State中的累加器进行合并 @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) { acc1.f0 += acc2.f0; acc1.f1 += acc2.f1; return acc1; } // 4、输出 // 定义如何输出数据 @Override public Double getResult(Tuple2<Long, Long> accumulator) { return accumulator.f1 / (accumulator.f0 * 1.0); } }, new ProcessWindowFunction<Double, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception { Double avg = elements.iterator().next(); String windowStart=new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss"); String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg); out.collect(record); } } ) .print(); //结果 Key: user_2 窗口开始时间: 2019-11-09 14:05:40 窗口结束时间: 2019-11-09 14:05:50 浏览的商品的平均价值: 13.33 Key: user_3 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 25.00 Key: user_4 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00 Key: user_2 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 30.00 Key: user_5 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 20.00 Key: user_1 窗口开始时间: 2019-11-09 14:05:50 窗口结束时间: 2019-11-09 14:06:00 浏览的商品的平均价值: 23.33