zoukankan      html  css  js  c++  java
  • flink

    Watermark 更新策略验证

    前置环境
    服务器监听 7777 端口并把并把控制台输入发送到socket流

    配置
    env.setParallelism(1); //并发数为1, waterMark共用于keyBy后的每个窗口
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(100); //基于处理时间语义而言, 100ms后来的第一个数据才更新到waterMark上

    new OutputTag<SensorReading>("late-sensor"){}; //侧输出流是个抽象类

    timeWindow(Time.seconds(10)) //时间窗口10s
    new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) //有界乱序时间戳提取器
    其中有三个重要变量
    currentMaxTimestamp 数据流当前最大时间戳,
    对象初始化时赋值 Long.MIN_VALUE + this.maxOutOfOrderness
    数据流来数据时 如果当前数据事件时间 > currentMaxTimestamp 则更新
    lastEmittedWatermark 当前窗口上次发出数据的 Watermark, 默认 Long.MIN_VALUE
    maxOutOfOrderness 最大乱序程度, 数据延迟时间, Timer 类型, 构造器入参决定
    计算 Watermark = currentMaxTimestamp - maxOutOfOrderness
    此处 maxOutOfOrderness = 2s, 允许2s数据延迟
    根据当前数据的最大事件时间 - 2s 得到 :
    waterMark < 窗口上限时, 窗口接收所有 事件时间 < 窗口上限 + 2s 的数据,
    waterMark >= 窗口上限时, 窗口关窗, 窗口拒绝所有 事件时间 < 窗口上限 + 2s 的数据, 这部分数据可以走侧输出流
    需要理解 BoundedOutOfOrdernessTimestampExtractor 源码实现

    关于窗口处理细节参考
    org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement
    package galaxy.flink.window;
    
    import galaxy.flink.model.SensorReading;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    public class WaterMarkTrance {
    
        public static void main(String[] args) throws Exception {
    
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.getConfig().setAutoWatermarkInterval(100);  //100ms 周期性自动更新watermark, 默认200ms
    
            //迟到数据输出, 时间戳所属窗口已关闭后来到窗口处理器的数据
            OutputTag<SensorReading> late = new OutputTag<SensorReading>("late-sensor"){};
            DataStreamSource<String> sensorStream = env.socketTextStream("localhost", 7777);
    
            //转换为 SensorReading 类型
            SingleOutputStreamOperator<SensorReading> beforWindStream = sensorStream.map(line -> {
                String[] split = line.split(",");
                return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            }).returns(TypeInformation.of(new TypeHint<SensorReading>() {
    
            //分配时间戳
            })).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                @Override
                public long extractTimestamp(SensorReading element) {
                    return element.getTimestamp() * 1000;
                }
            });
    
            //基于事件时间的开窗
            SingleOutputStreamOperator<SensorReading> winStream = beforWindStream.keyBy("id")
                    .timeWindow(Time.seconds(10))
    //                .minBy("temperature")
                    .sideOutputLateData(late)
                    .process(new ProcessWindowFunction<SensorReading, SensorReading, Tuple, TimeWindow>() {
                        @Override
                        public void process(Tuple tuple, Context context, Iterable<SensorReading> elements, Collector<SensorReading> out) throws Exception {
                            System.out.println(context.window().getStart() / 1000 + " -> " + context.window().getEnd() / 1000
                                    + ", curr: " + context.currentWatermark() / 1000);
                            elements.forEach(out::collect);
                        }
                    });
            winStream.print("win");
            winStream.getSideOutput(late).print("late");
    
            env.execute("数据源测试");
        }
    }
    package galaxy.flink.model;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class SensorReading {
        private String id;
        private Long timestamp;
        private Double temperature;
    }
    测试数逐条输入  验证Watermark更新
    sensor_1,1609765810,15.4
    sensor_4,1609765821,12.4
    sensor_5,1609765822,52.1
    sensor_8,1609765818,56.2
    sensor_8,1609765819,65.2
    sensor_5,1609765826,25.2
    sensor_4,1609765831,45.0
    sensor_4,1609765832,24.52
    sensor_7,1609765817,45.0
    sensor_4,1609765844,56.2
    sensor_4,1609765854,15.8
    
    输出
    1609765810 -> 1609765820, curr: 1609765820
    win> SensorReading(id=sensor_1, timestamp=1609765810, temperature=15.4)
    //由 1609765822 触发, currWaterMark = 1609765822 - 2  > 1609765820 达到窗口关闭条件
    
    late> SensorReading(id=sensor_8, timestamp=1609765818, temperature=56.2)
    late> SensorReading(id=sensor_8, timestamp=1609765819, temperature=65.2)
    //窗口关闭后由延迟数据到达触发
    
    1609765820 -> 1609765830, curr: 1609765830
    win> SensorReading(id=sensor_5, timestamp=1609765822, temperature=52.1)
    win> SensorReading(id=sensor_5, timestamp=1609765826, temperature=25.2)
    1609765820 -> 1609765830, curr: 1609765830
    win> SensorReading(id=sensor_4, timestamp=1609765821, temperature=12.4)
    // 1609765832 到达触发
    
    late> SensorReading(id=sensor_7, timestamp=1609765817, temperature=45.0)
    
    1609765830 -> 1609765840, curr: 1609765842
    win> SensorReading(id=sensor_4, timestamp=1609765831, temperature=45.0)
    win> SensorReading(id=sensor_4, timestamp=1609765832, temperature=24.52)
    // 1609765844 触发
    
    1609765840 -> 1609765850, curr: 1609765852
    win> SensorReading(id=sensor_4, timestamp=1609765844, temperature=56.2)
    // 1609765854 触发
    一次性复制输入     验证Watermark 100ms内未更新, 需要更直观可放大周期
    sensor_1,1609765810,15.4
    sensor_4,1609765821,12.4
    sensor_5,1609765823,52.1
    sensor_8,1609765818,56.2
    sensor_1,1609765819,65.2
    sensor_4,1609765826,25.2
    
    输出
    1609765810 -> 1609765820, curr: 1609765824
    win> SensorReading(id=sensor_1, timestamp=1609765810, temperature=15.4)
    win> SensorReading(id=sensor_1, timestamp=1609765819, temperature=65.2)
    1609765810 -> 1609765820, curr: 1609765824
    win> SensorReading(id=sensor_8, timestamp=1609765818, temperature=56.2)
    
    所有数据在100ms内到达:
    currentMaxTimestamp = 1609765823 时, 窗口处理器的处理时间并没有达到 100ms 周期, 没有计算 waterMark
    currentMaxTimestamp = 1609765826 已输入完所有数据,  窗口处理器的处理时间已达到100ms 周期,
    计算 waterMark = 1609765826 - 2
    配置
        env.setParallelism(2)
    
    逐条输入
    sensor_1,1609765810,15.4
    sensor_4,1609765821,12.4
    sensor_5,1609765822,52.1
    sensor_1,1609765820,15.45
    sensor_8,1609765818,56.2
    sensor_8,1609765819,65.2
    sensor_5,1609765826,25.2
    sensor_4,1609765831,45.0
    sensor_4,1609765832,24.52
    sensor_7,1609765817,45.0
    sensor_4,1609765844,56.2
    
    输出结果
    1609765810 -> 1609765820, curr: 1609765824
    win:2> SensorReading(id=sensor_1, timestamp=1609765810, temperature=15.4)
    1609765810 -> 1609765820, curr: 1609765824
    win:1> SensorReading(id=sensor_8, timestamp=1609765818, temperature=56.2)
    win:1> SensorReading(id=sensor_8, timestamp=1609765819, temperature=65.2)
    //数据默认轮询发送到两个 并行 任务
    //1609765822 使 task1 使 1窗[...10, ...20) 达到关窗条件
    //1609765826 使 task2 使 1窗 达到关窗条件 此时所有task可关窗, 触发窗口计算
    
    //1609765832 使 task1 2窗[...20, ...30) 达到关窗条件
    
    late:2> SensorReading(id=sensor_7, timestamp=1609765817, temperature=45.0)
    //1609765817到达时所属 1窗 已关闭
    
    1609765820 -> 1609765830, curr: 1609765842
    win:2> SensorReading(id=sensor_4, timestamp=1609765821, temperature=12.4)
    1609765820 -> 1609765830, curr: 1609765842
    win:2> SensorReading(id=sensor_1, timestamp=1609765820, temperature=15.45)
    1609765820 -> 1609765830, curr: 1609765842
    win:2> SensorReading(id=sensor_5, timestamp=1609765822, temperature=52.1)
    win:2> SensorReading(id=sensor_5, timestamp=1609765826, temperature=25.2)
    1609765830 -> 1609765840, curr: 1609765842
    win:2> SensorReading(id=sensor_4, timestamp=1609765831, temperature=45.0)
    win:2> SensorReading(id=sensor_4, timestamp=1609765832, temperature=24.52)
    //1609765844 使  task2 2窗 达到关闭条件, 所有task 2窗 可关, 触发窗口计算
  • 相关阅读:
    Leetcode: K-th Smallest in Lexicographical Order
    Leetcode: Minimum Number of Arrows to Burst Balloons
    Leetcode: Minimum Moves to Equal Array Elements
    Leetcode: Number of Boomerangs
    Leetcode: Arranging Coins
    Leetcode: Path Sum III
    Leetcode: All O`one Data Structure
    Leetcode: Find Right Interval
    Leetcode: Non-overlapping Intervals
    Socket网络编程--简单Web服务器(3)
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/14233219.html
Copyright © 2011-2022 走看看