Flink的Watermaker
时间的分类
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
事件时间EventTime: 事件真真正正发生产生的时间
摄入时间IngestionTime: 事件到达Flink的时间
处理时间ProcessingTime: 事件真正被处理/计算的时间
我们一般更为关注事件时间,因为它是事件的发生时间
Watermaker水印机制/水位线机制
什么是Watermaker?
水印(WaterMark)是 Flink 框架中最晦涩难懂的概念之一,有很大一部分原因是因为翻译的原因。
WaterMark 在正常的英文翻译中是水位,但是在 Flink 框架中,翻译为“水位线”更为合理,它在本质上是一个时间戳。
在上面的时间类型中我们知道,Flink 中的时间:EventTime 每条数据都携带时间戳;
-
ProcessingTime 数据不携带任何时间戳的信息;
-
IngestionTime 和 EventTime 类似,不同的是 Flink 会使用系统时间作为时间戳绑定到每条数据,可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。
所以,我们在处理消息乱序的情况时,会用 EventTime 和 WaterMark 进行配合使用。
水印的本质是什么
水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素。如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。
也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记。
在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。
如何计算Watermaker?
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
在这里要特别说明,Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:
-
watermark 时间 >= window_end_time;
-
在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。
此外,因为 WaterMark 的生成是以对象的形式发送到下游,同样会消耗内存,因此水印的生成时间和频率都要进行严格控制,否则会影响我们的正常作业。
Watermaker有什么用?
之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的!
Watermaker如何触发窗口计算的?
窗口计算的触发条件为:
- 1.窗口中有数据
- 2.Watermaker >= 窗口的结束时间
Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间
示例代码
/**
* @author WGR
* @create 2021/9/13 -- 15:18
*/
public class WindowTest3_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
System.out.println(sensorReading.toString());
return sensorReading;
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.minBy("temperature");
minTempStream.print("minTemp");
env.execute();
}
}
SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
minTemp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
minTemp> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
minTemp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
SensorReading{id='sensor_1', timestamp=1547718226, temperature=30.1}
SensorReading{id='sensor_1', timestamp=1547718227, temperature=29.0}
minTemp> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
[195,210) 加上2秒延迟,所以212会回收
[210,225) 加上2秒延迟,所以227会回收
但是为什么一开始的窗口是195呢?
下面是源码,计算公式为timestamp - (timestamp - offset + windowSize) % windowSize;
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
如果在引入OutputTag+allowedLateness,测试如下:
/**
* @author WGR
* @create 2021/9/14 -- 11:04
*/
public class WindowTest4_EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型,分配时间戳和watermark
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
// 乱序数据设置时间戳和watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
});
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件时间的开窗聚合,统计15秒内温度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
}
}
SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.2}
minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
minTemp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
minTemp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
SensorReading{id='sensor_1', timestamp=1547718213, temperature=23.0}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
SensorReading{id='sensor_1', timestamp=1547718226, temperature=24.0}
SensorReading{id='sensor_1', timestamp=1547718227, temperature=22.0}
minTemp> SensorReading{id='sensor_1', timestamp=1547718213, temperature=23.0}
SensorReading{id='sensor_1', timestamp=1547718209, temperature=31.0}
minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=31.0}
SensorReading{id='sensor_7', timestamp=1547718203, temperature=5.0}
minTemp> SensorReading{id='sensor_7', timestamp=1547718203, temperature=5.0}
SensorReading{id='sensor_7', timestamp=1547718272, temperature=22.0}
minTemp> SensorReading{id='sensor_1', timestamp=1547718227, temperature=22.0}
SensorReading{id='sensor_7', timestamp=1547718206, temperature=23.0}
late> SensorReading{id='sensor_7', timestamp=1547718206, temperature=23.0}
由于一开始的水印窗口是[1547718195,1547718210),加上2秒的延迟,所以第一次会在1547718212关闭第一个窗口。
由于会在接收延迟一分钟的数据,然后在上传播1547718209还是会在聚合的,页面也还是会刷新的。
当输入1547718213的时候,会进入到第二个窗口,当输入到1547718227的时候,第二个窗口进行关闭,1547718227则进入第三个窗口。
当输入1547718272的时候,第三个窗口关闭,所以会打印一条记录,在输入1547718206的时候,由于已经过了一分钟了,所以会到旁路流中