zoukankan      html  css  js  c++  java
  • Flink的Watermaker

    Flink的Watermaker

    时间的分类

    在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

    image-20210913143255043

    事件时间EventTime: 事件真真正正发生产生的时间

    摄入时间IngestionTime: 事件到达Flink的时间

    处理时间ProcessingTime: 事件真正被处理/计算的时间

    我们一般更为关注事件时间,因为它是事件的发生时间

    Watermaker水印机制/水位线机制

    什么是Watermaker?

    水印(WaterMark)是 Flink 框架中最晦涩难懂的概念之一,有很大一部分原因是因为翻译的原因。

    WaterMark 在正常的英文翻译中是水位,但是在 Flink 框架中,翻译为“水位线”更为合理,它在本质上是一个时间戳。

    在上面的时间类型中我们知道,Flink 中的时间:EventTime 每条数据都携带时间戳;

    • ProcessingTime 数据不携带任何时间戳的信息;

    • IngestionTime 和 EventTime 类似,不同的是 Flink 会使用系统时间作为时间戳绑定到每条数据,可以防止 Flink 内部处理数据是发生乱序的情况,但无法解决数据到达 Flink 之前发生的乱序问题。

    所以,我们在处理消息乱序的情况时,会用 EventTime 和 WaterMark 进行配合使用。

    水印的本质是什么

    水印的出现是为了解决实时计算中的数据乱序问题,它的本质是 DataStream 中一个带有时间戳的元素。如果 Flink 系统中出现了一个 WaterMark T,那么就意味着 EventTime < T 的数据都已经到达,窗口的结束时间和 T 相同的那个窗口被触发进行计算了。

    也就是说:水印是 Flink 判断迟到数据的标准,同时也是窗口触发的标记。

    在程序并行度大于 1 的情况下,会有多个流产生水印和窗口,这时候 Flink 会选取时间戳最小的水印。

    如何计算Watermaker?

    Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间

    在这里要特别说明,Flink 在用时间 + 窗口 + 水印来解决实际生产中的数据乱序问题,有如下的触发条件:

    • watermark 时间 >= window_end_time;

    • 在 [window_start_time,window_end_time) 中有数据存在,这个窗口是左闭右开的。

    此外,因为 WaterMark 的生成是以对象的形式发送到下游,同样会消耗内存,因此水印的生成时间和频率都要进行严格控制,否则会影响我们的正常作业。

    Watermaker有什么用?

    之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口,一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失!
    那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的!

    Watermaker如何触发窗口计算的?

    窗口计算的触发条件为:

    • 1.窗口中有数据
    • 2.Watermaker >= 窗口的结束时间

    Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间

    image-20210914093748906

    示例代码

    /**
     * @author WGR
     * @create 2021/9/13 -- 15:18
     */
    public class WindowTest3_EventTimeWindow {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.getConfig().setAutoWatermarkInterval(100);
    
            // socket文本流
            DataStream<String> inputStream = env.socketTextStream("192.168.1.180", 9998);
    
            // 转换成SensorReading类型,分配时间戳和watermark
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                SensorReading sensorReading = new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
                System.out.println(sensorReading.toString());
                return sensorReading;
            })
                    // 乱序数据设置时间戳和watermark
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                        @Override
                        public long extractTimestamp(SensorReading element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            // 基于事件时间的开窗聚合,统计15秒内温度的最小值
            SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
                    .timeWindow(Time.seconds(15))
                    .minBy("temperature");
    
            minTempStream.print("minTemp");
            env.execute();
        }
    }
    
    SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    minTemp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    minTemp> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1}
    minTemp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    SensorReading{id='sensor_1', timestamp=1547718226, temperature=30.1}
    SensorReading{id='sensor_1', timestamp=1547718227, temperature=29.0}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
    

    [195,210) 加上2秒延迟,所以212会回收

    [210,225) 加上2秒延迟,所以227会回收

    但是为什么一开始的窗口是195呢?

    下面是源码,计算公式为timestamp - (timestamp - offset + windowSize) % windowSize;

    	@Override
    	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    		if (timestamp > Long.MIN_VALUE) {
    			// Long.MIN_VALUE is currently assigned when no timestamp is present
    			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
    			return Collections.singletonList(new TimeWindow(start, start + size));
    		} else {
    			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
    					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
    					"'DataStream.assignTimestampsAndWatermarks(...)'?");
    		}
    	}
    
    	/**
    	 * Method to get the window start for a timestamp.
    	 *
    	 * @param timestamp epoch millisecond to get the window start.
    	 * @param offset The offset which window start would be shifted by.
    	 * @param windowSize The size of the generated windows.
    	 * @return window start
    	 */
    	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
    		return timestamp - (timestamp - offset + windowSize) % windowSize;
    	}
    
    image-20210914100257698

    如果在引入OutputTag+allowedLateness,测试如下:

    /**
     * @author WGR
     * @create 2021/9/14 -- 11:04
     */
    public class WindowTest4_EventTimeWindow {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.getConfig().setAutoWatermarkInterval(100);
    
            // socket文本流
            DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    
            // 转换成SensorReading类型,分配时间戳和watermark
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            })
                    // 乱序数据设置时间戳和watermark
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                        @Override
                        public long extractTimestamp(SensorReading element) {
                            return element.getTimestamp() * 1000L;
                        }
                    });
    
            OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
            };
    
            // 基于事件时间的开窗聚合,统计15秒内温度的最小值
            SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
                    .timeWindow(Time.seconds(15))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(outputTag)
                    .minBy("temperature");
    
            minTempStream.print("minTemp");
            minTempStream.getSideOutput(outputTag).print("late");
    
            env.execute();
        }
    }
    
    SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8}
    SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3}
    SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.2}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    minTemp> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7}
    minTemp> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4}
    SensorReading{id='sensor_1', timestamp=1547718213, temperature=23.0}
    SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8}
    SensorReading{id='sensor_1', timestamp=1547718226, temperature=24.0}
    SensorReading{id='sensor_1', timestamp=1547718227, temperature=22.0}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718213, temperature=23.0}
    SensorReading{id='sensor_1', timestamp=1547718209, temperature=31.0}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718209, temperature=31.0}
    SensorReading{id='sensor_7', timestamp=1547718203, temperature=5.0}
    minTemp> SensorReading{id='sensor_7', timestamp=1547718203, temperature=5.0}
    SensorReading{id='sensor_7', timestamp=1547718272, temperature=22.0}
    minTemp> SensorReading{id='sensor_1', timestamp=1547718227, temperature=22.0}
    SensorReading{id='sensor_7', timestamp=1547718206, temperature=23.0}
    late> SensorReading{id='sensor_7', timestamp=1547718206, temperature=23.0}
    
    image-20210914160639079

    由于一开始的水印窗口是[1547718195,1547718210),加上2秒的延迟,所以第一次会在1547718212关闭第一个窗口。

    由于会在接收延迟一分钟的数据,然后在上传播1547718209还是会在聚合的,页面也还是会刷新的。

    当输入1547718213的时候,会进入到第二个窗口,当输入到1547718227的时候,第二个窗口进行关闭,1547718227则进入第三个窗口。

    当输入1547718272的时候,第三个窗口关闭,所以会打印一条记录,在输入1547718206的时候,由于已经过了一分钟了,所以会到旁路流中

  • 相关阅读:
    装饰器
    内置函数
    文件操作
    函数
    数据结构[总结笔记]
    汉诺塔解题思路
    springboot事物
    mysql5.7.29 zip包安装教程
    mysql常用语句【转载】
    springboot+mysql+jpa+sharding-jdbc+druid读写分离
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15268363.html
Copyright © 2011-2022 走看看