我们知道因为通常流是无限的(无界的),所以在流上的工作方式与批处理不同,使用相同的方式对流中的所有元素进行计数是不可能的。
但是很多时候又需要使用聚合事件(比如计数、求和)统计流上的数据,这个时候的聚合就用到了 window,因为需要由 window 来划定范围,比如 "计算过去的5分钟" , "统计最后100个元素的和" 等等。
window 窗口操作是一种可以把无限数据切割为有限数据块的手段。本文就用实际中的经典场景来看看 Flink 中窗口操作的编程套路。
本文结构如下:
1 窗口中重要的概念
-
Time 分类
-
EventTime 事件时间
-
Ingestion Time 事件进入 Flink 的时间
-
Processing Time 事件被处理时当前系统的时间
spark Streaming 用的的是Processing Time (Spark Streaming 不支持 EventTime ) ,使用处理时间是不准确的,因为当由延迟的数据涌入时导致数据量不准确。
实际中更多的情况下使用的是事件时间 EventTime,后面的案例也都是基于 Flink EventTime 进行总结的。
-
Watermark (水位)
Watermark 也就是常说的水位机制目的是处理乱序的数据。
窗口操作需要水位的原因:
在流式处理时间中,从事件产生,到流经source,再到 oprator ,中间是需要时间的。虽然在大部分情况下,流到 operator 的数据都是按事件产生的时间顺序来的,但是依然不排除由于网络延迟等原因导致乱序的产生,特别是在使用 kafka 的情况下,多个分区数据无法保证有序。
而在 window 计算的时候,我们又不会无限期的等下去,必须要有个机制来保证一个特定的时间后,必须要触发 window 计算,这个特别的机制就是 watermark。
-
窗口类型
window 分类:
-
tumbling windows 滚动窗口 没有重叠
-
sliding windows 滑动窗口 有重叠
-
session windows 会话窗口
-
global windows 没有窗口 ,自定义窗口
timeWindow : 与时间强相关,可能是 session windows ,tumbling windows, sliding windows 。
countWindow : 与数量有关,可能是 tumbling windows ,sliding windows。
小提示 :countWindow(3,2) 的意思是每隔 2 条元素 统计最新 3 条数据。
-
常用的函数
window 操作中常用的增量聚合函数,意思是 来一条数据处理一条。
常用的聚合算子有:
-
reduce(function)
-
aggregate(function)
-
sum() ,max() ,min()
window 全量聚合函数,意思是等属于窗口的数据到齐了才开始进行聚合计算(可以实现对窗口内的数据进行排序等需求)
-
apply(function)
-
process(function)
-
processWindowFunction 比 windowFunction 多提供了上下文信息,类似 map 与 richMap 的关系。
-
触发器和驱逐器
触发器用来触发程序的运行 ;驱逐器用来 驱逐 淘汰数据。
触发器 Trigger 返回值说明:
-
TriggerResult.CONTINUE:表示啥都不做;
-
TriggerResult.FIRE:表示触发计算,同时保留窗口中的数据;
-
TriggerResult.PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息;
-
TriggerResult.FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)
2 窗口编程的套路
这部分选取了3种比较经典的场景进行编程套路的总结,分别是 求热门 TopN ,黑名单,统计日(月、年)总量 三种场景,在步骤套路上是类似的,只有些许的区别。
-
求 topN 场景
需求:每 5 秒统计近10分钟的 前三个 热门页面
样例数据:
ip | userId | eventTime | method | url |
---|---|---|---|---|
83.149.9.123 | - | 17/05/2020:10:05:03 +0000 | GET | presentations/logstash-kafkamonitor-2020/images/kibana-search.png |
需求步骤总结:
1 获取数据
//数据格式 模拟kafka 中的消费数据 // 83.149.9.123 - - 17/05/2020:10:05:03 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-search.png DataStreamSource<String> readTextFileSource = env.readTextFile(".\data\data2.log");
2 解析数据
//将日志数据转成自定义的 MyApacheLogEvent 对象 方便后续处理 SingleOutputStreamOperator<MyApacheLogEvent> mapResult = readTextFileSource.map(new MyParseLogFunction());
3 添加水位
//添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用 SingleOutputStreamOperator<MyApacheLogEvent> watermarkResult = mapResult.assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator((context) -> new MyPeriodWatermarkGenerator())//指定水位计算方式,单位毫秒 .withTimestampAssigner((context -> new MyTimeStampExtactor()))//指定时间字段,单位毫秒 );
4 根据业务分组
// 这里需求用到日志中最后的字段 url 统计热门页面 以 url 分组 KeyedStream<MyApacheLogEvent, String> urlResult = watermarkResult.keyBy(log -> log.getUrl());
5 滑动窗口/滚动窗口
//需求每5秒统计近10分钟的数据 使用 滑动窗口 SlidingEventTimeWindows WindowedStream<MyApacheLogEvent, String, TimeWindow> windowResult = urlResult.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(5)));
6 窗口分组
//先使用增量聚合函数统计每个 URL 出现的次数 SingleOutputStreamOperator<MyUrlView> aggregateResult = windowResult.aggregate(new MyPageCountAgg(), new MyPageWndowResult()); //因为需求为 topN ,统计的肯定是同一窗口期的数据 ,所以需求将窗口分组 KeyedStream<MyUrlView, Long> windowKeyByResult = aggregateResult.keyBy(urlView -> urlView.getWindowEnd());//按窗口分组
7 求topN
// 使用全量聚合函数 process 求 topN ,使用 MapState 进行统计,详见完整代码 SingleOutputStreamOperator<String> process = windowKeyByResult.process(new MyTopNHotPage(3));
完整代码:
//由于完整代码太长影响阅读,这里只贴出主类 ,完整版见文末 public static void main(String[] args) throws Exception { //1 获取数据 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2 解析数据 //模拟读 kafka //数据格式 // 83.149.9.123 - - 17/05/2020:10:05:03 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-search.png DataStreamSource<String> readTextFileSource = env.readTextFile(".\data\data2.log"); SingleOutputStreamOperator<MyApacheLogEvent> mapResult = readTextFileSource.map(new MyParseLogFunction()); //3 添加水位 //context WatermarkGenerator.context 这里添加水位一定要加 context SingleOutputStreamOperator<MyApacheLogEvent> watermarkResult = mapResult.assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator((context) -> new MyPeriodWatermarkGenerator())//指定水位计算方式 .withTimestampAssigner((context -> new MyTimeStampExtactor()))//指定时间字段 ); //4 按url分组 KeyedStream<MyApacheLogEvent, String> urlResult = watermarkResult.keyBy(log -> log.getUrl()); //5 实现滑动窗口 WindowedStream<MyApacheLogEvent, String, TimeWindow> windowResult = urlResult.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(5))); //每 5秒 统计 近 10分钟的数据 //6 按窗口分组 两个参数 第一个是 计算函数 ,第二个是 窗口函数 SingleOutputStreamOperator<MyUrlView> aggregateResult = windowResult.aggregate(new MyPageCountAgg(), new MyPageWndowResult());//统计每个 URL 出现的次数 KeyedStream<MyUrlView, Long> windowKeyByResult = aggregateResult.keyBy(urlView -> urlView.getWindowEnd());//按窗口分组 //7 求TopN SingleOutputStreamOperator<String> process = windowKeyByResult.process(new MyTopNHotPage(3)); //8 打印需求 process.print(); env.execute("HotPage"); }
-
黑名单场景
需求:每个用户 每个广告 点击次数 大于 100
样例数据:
用户ID | 广告ID | 省份 | 城市 | 时间 |
---|---|---|---|---|
662867 | 2244074 | 广东 | 广州 | 1511658060 |
需求步骤总结:
1 获取数据
//加载测试数据 模拟kafka消费 DataStreamSource<String> streamSource = env.readTextFile(".\data\data3.csv");
2 数据转换
//数据转换 将日志转为对象 --> MyAdClickBean
SingleOutputStreamOperator<MyAdClickBean> mapStream = streamSource.map(new MyAdClickMapFuntion());
3 添加水位
//添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用
SingleOutputStreamOperator<MyAdClickBean> adStreamWatermark = mapStream.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator((context) -> new MyWatermarkBlackAgg())//指定水位计算方式,单位毫秒
.withTimestampAssigner((ctx) -> new MyWatermarkBlackAssgin())//指定水位计算方式 ,单位毫秒
);
4 按业务分组
//由于需要统计每个用户对每个广告的点击,这里使用自定义分组策略 ,按用户id、广告id 分组 KeyedStream<MyAdClickBean, Tuple2<String, String>> keyedStream = adStreamWatermark.keyBy(new MyBlackKey());
5 统计次数
//使用全量聚合函数 统计每个分组中的数据量,这里没有进行窗口分组,而是在处理过程中使用了定时器机制,于每天凌晨进行统计,详见完整代码 SingleOutputStreamOperator<MyAdClickBean> blackListOutput = keyedStream.process(new MyFilterBlackListUser(100));
完整代码:
//1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2
DataStreamSource<String> streamSource = env.readTextFile("D:\fpy\workspace\study\flink-window-stream\src\main\data\data3.csv");
SingleOutputStreamOperator<MyAdClickBean> outPut = streamSource.map(new MyAdClickMapFuntion())
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context) -> new MyWatermarkBlackAgg()).withTimestampAssigner((ctx) -> new MyWatermarkBlackAssgin()))
.keyBy(new MyBlackKey()) //没有窗口
.process(new MyFilterBlackListUser(100));
//将上游结果输出到侧输出流 方便下游使用
outPut.getSideOutput(blackListOutputTag).print();
env.execute("AdClickCount");
-
统计日(月/年)指标场景
需求:统计每日的订单金额数量
样例数据:
订单编号 | 订单金额 | 订单状态 | 用户id | 支付方式 | 支付流水 | 创建时间 | 支付时间 |
---|---|---|---|---|---|---|---|
6623957652 | 248800 | 18 | 3887511420 | CFT | null | 2020-09-18 00:00:02 | 2020-09-18 00:30:02 |
需求步骤总结:
1 获取数据
//加载测试数据 模拟kafka消费 DataStreamSource<String> dataStreamSource = env.readTextFile(".\data\order_test");
2 解析数据
// 日志数据转成三元组 类似对象 Tuple3<String, String, Double> <时间(截取到日),完整时间,金额> SingleOutputStreamOperator<Tuple3<String, String, Double>> orderMapStream = dataStreamSource.map(new MyOrderInfoMoneyFunction());
3 添加水位
//添加水位 是为了处理延迟过来的数据 ,并且这一步是固定模式 ,可以直接复用 SingleOutputStreamOperator<Tuple3<String, String, Double>> orderStreamAddWatermark = orderMapStream.assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd())//指定水位计算方式,单位毫秒 . withTimestampAssigner((context) -> new OrderWatermarkAssign())//指定水位字段,单位毫秒 );
4 按需求分组
//按天分组 也就是三元组的第一个元素分组 KeyedStream<Tuple3<String, String, Double>, String> tuple3StringKeyedStream = orderStreamAddWatermark.keyBy(tuple3 -> tuple3.f0);
5 窗口操作
//由于需要按天统计 没有时间重叠部分 使用滚动窗口 //这里为什么 要用 Time.hours(-8)呢? Flink 使用的是时间纪元,而我们使用的是 东八区,所以FLink窗口的时间要比我们当前时间少8小时 // 有时候 TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)) 会报错 解决办法: // 改为 TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)) ,最新版本已经修复 WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> windowWindowedStream = tuple3StringKeyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)));
6 触发器
//每个5秒触发一次计算,数据会滚动一次 类似 实时大屏效果 WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> triggerCountStream = windowWindowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));//每5秒
7 驱逐器
//将老数据删除掉 意思就是将老的计算的数据丢掉,只保留最新计算的数据 WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> evictorCountStream = triggerCountStream.evictor(TimeEvictor.of(Time.seconds(0), true));
8 统计计算
//使用全量聚合函数 统计每天的订单金额 valueState 记录金额总量 ,详见完整代码 SingleOutputStreamOperator<String> orderSumStream = evictorCountStream.process(new OrderInfoSumMoney());
完整代码:
public static void main(String[] args) throws Exception { //1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2 DataStreamSource<String> dataStreamSource = env.readTextFile("D:\fpy\workspace\study\flink-window-stream\src\main\data\order_test"); /* 分步操作 SingleOutputStreamOperator<Tuple3<String, String, Double>> orderMapStream = dataStreamSource.map(new MyOrderInfoMoneyFunction()); SingleOutputStreamOperator<Tuple3<String, String, Double>> orderStreamAddWatermark = orderMapStream.assignTimestampsAndWatermarks( WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd()). withTimestampAssigner((context) -> new OrderWatermarkAssign()) ); KeyedStream<Tuple3<String, String, Double>, String> tuple3StringKeyedStream = orderStreamAddWatermark.keyBy(tuple3 -> tuple3.f0); WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> windowWindowedStream = tuple3StringKeyedStream.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))); WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> triggerCountStream = windowWindowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));//每5秒 WindowedStream<Tuple3<String, String, Double>, String, TimeWindow> evictorCountStream = triggerCountStream.evictor(TimeEvictor.of(Time.seconds(0), true)); SingleOutputStreamOperator<String> orderSumStream = evictorCountStream.process(new OrderInfoSumMoney());*/ DataStreamSink<String> streamSink = dataStreamSource.map(new MyOrderInfoMoneyFunction()) .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context) -> new MyWaterMarkAggOrd()).withTimestampAssigner((context) -> new OrderWatermarkAssign())) .keyBy(tuple3 -> tuple3.f0) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) // 每天 凌晨刷新 最近1天的数据 .trigger(ContinuousEventTimeTrigger.of(Time.seconds(5))) .evictor(TimeEvictor.of(Time.seconds(0), true))// 去掉原来的数据 .process(new OrderInfoSumMoney()).print(); env.execute("sumcount"); }
这部分选取了3种比较经典的场景进行编程套路的总结,分别是 求热门 TopN ,黑名单,统计日(月、年)总量 三种场景,在步骤套路上是类似的,只有些许的区别。
大致可总结为如下几步:
1 获取数据
2 解析数据
3 添加水位
4 根据业务需求分组 (可以是多个角度分组)
5 实现滑动窗口/滚动
6 按窗口分组 (增量函数使用)
7 触发器和驱逐器
8 最终的全量数据处理 ( 可能使用到定时器 窗口结束的时候触发定时器)
PS: 公号后台回复 "Flink窗口或窗口",查看完整案例代码。
— THE END —