zoukankan      html  css  js  c++  java
  • Flink Watermark

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15161679.html

    Event Time & Processing Time

    • Event Time:事件创建的时间
    • Processing Time:执行操作算子的当前机器的本地时间

    官网权威解释可以参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time

    真实业务场景中,我们往往更关心事件时间(Event Time),Flink 从1.12起流的时间特性默认设置为 TimeCharacteristic.EventTime

    Watermark

    当 Flink 以 Event Time 模式处理数据流时,会根据数据里的时间戳来处理基于时间的算子,通常系统由于网络抖动、分布式架构等原因,会导致乱序数据的产生,从而导致窗口计算不精确。

    Fink 为了避免乱序数据带来的窗口计算不精确的问题,引入了 Watermark 机制。

    • Watermark 用于标记 Event Time 的前进过程
    • Watermark 跟随 DataStream Event Time 变动,并自身携带 TimeStamp
    • Watermark 用于表明所有较早的事件已经(可能)到达
    • Watermark 本身也属于特殊的事件

    官网权威解释可以参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#event-time-and-watermarks

    在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要开发人员对业务的上下游数据乱序的程度有一定的了解;如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果;而如果 Watermark 到达的太早,则可能收到错误结果,不过可以通过 Flink 处理迟到数据的机制来解决这个问题。

    Demo

    Maven Dependency

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.fool</groupId>
        <artifactId>flink</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.20</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.26</version>
            </dependency>
        </dependencies>
    
    </project>

    SRC

    src/main/java/org/fool/flink/contract/Sensor.java

    package org.fool.flink.contract;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Sensor {
        private String id;
        private Long timestamp;
        private Double temperature;
    }

    src/main/java/org/fool/flink/window/WindowWatermarkTest.java

    package org.fool.flink.window;
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.Watermark;
    import org.apache.flink.api.common.eventtime.WatermarkGenerator;
    import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
    import org.apache.flink.api.common.eventtime.WatermarkOutput;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.OutputTag;
    import org.fool.flink.contract.Sensor;
    
    public class WindowWatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setParallelism(1);
            // environment.setParallelism(4);
    
            DataStream<String> inputStream = environment.socketTextStream("localhost", 7878);
    
            DataStream<Sensor> dataStream = inputStream.map(new MapFunction<String, Sensor>() {
                @Override
                public Sensor map(String value) throws Exception {
                    String[] fields = value.split(",");
                    return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2]));
                }
            }).assignTimestampsAndWatermarks(new WatermarkStrategy<Sensor>() {
                @Override
                public WatermarkGenerator<Sensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<Sensor>() {
                        private final long maxOutOfOrderness = 2000; // 2 seconds
    
                        private long currentMaxTimestamp;
    
                        @Override
                        public void onEvent(Sensor sensor, long eventTimestamp, WatermarkOutput output) {
                            // System.out.println("sensor.getTimestamp(): " + sensor.getTimestamp() * 1000L);
                            // System.out.println("eventTimestamp: " + eventTimestamp);
                            currentMaxTimestamp = Math.max(sensor.getTimestamp() * 1000L, eventTimestamp);
                            // System.out.println("currentMaxTimestamp1: " + currentMaxTimestamp);
                        }
    
                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                            // System.out.println("currentMaxTimestamp2: " + currentMaxTimestamp);
                            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                        }
                    };
                }
            }.withTimestampAssigner(new SerializableTimestampAssigner<Sensor>() {
                @Override
                public long extractTimestamp(Sensor sensor, long recordTimestamp) {
                    return sensor.getTimestamp() * 1000L;
                }
            }));
    
            OutputTag<Sensor> lateTag = new OutputTag<>("late", TypeInformation.of(Sensor.class));
    
            SingleOutputStreamOperator<Sensor> minStream = dataStream.keyBy(new KeySelector<Sensor, String>() {
                @Override
                public String getKey(Sensor sensor) throws Exception {
                    return sensor.getId();
                }
            }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(lateTag)
                    .minBy("temperature");
    
            minStream.print("min temp");
    
            minStream.getSideOutput(lateTag).print("late");
            environment.execute();
        }
    
    }

    Note: 当前并行度是 1,Watermark 设置为 2 秒

    environment.setParallelism(1);

    Run

    Socket Input

    1,1628754405,35.8
    1,1628754420,34.8
    1,1628754422,33.8

    Note:1628754422 这个时间点会触发窗口 [05, 20) 这个窗口计算

    Console Output

    min temp> Sensor(id=1, timestamp=1628754405, temperature=35.8)

    Socket Input

    1,1628754406,30.8
    1,1628754407,31.8

    Note:在 1628754422 这个时间点后继续输入, 1628754406、1628754407 后仍旧会触发窗口计算

    Console Output

    min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)
    min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)

    Note:因为设置了 1 分钟的 allowedLateness,1628754406、1628754407 这两个迟到的事件在 [05, 20) 这个窗口已经触发过计算后仍旧会触发窗口计算

    allowedLateness(Time.minutes(1))

    Socket Input

    1,1628754482,28.8

    Note:在 1628754407 这个时间点后继续输入

    Console Output

    min temp> Sensor(id=1, timestamp=1628754422, temperature=33.8)

    Note:1628754482 这个时间点,1 分钟的 allowedLateness 的窗口会关闭,触发窗口计算

    Socket Input

    1,1628754411,30.3
    1,1628754412,31.3

    Note:在 1628754482 这个时间点后继续输入,即 1 分钟的 allowedLateness 的窗口已经关闭

    Console Output

    late> Sensor(id=1, timestamp=1628754411, temperature=30.3)
    late> Sensor(id=1, timestamp=1628754412, temperature=31.3)

    Note:1 分钟的 allowedLateness 的窗口关闭后,1628754411、1628754412 这两个迟到的事件会进入 side output 

    完整的 Socket Input

    完整的 Console Output

    Key Point

    以上操作都是基于并行度为 1 的情况下进行的,当设置设置并行度不为 1 时,比如设置并行度为 4,结果会不一样。

    environment.setParallelism(4);

    并行度不为 1 的时候,测试输出的时候,Watermark 在上下游任务之间传递的规则:必须是每一个分区的 Watermark 都要上升,取最小的值才是当前的 Watermark,才会触发窗口聚合计算

    Socket Input

    Note:4 个分区的 Watermark 都到了 1628754422,才会触发窗口聚合计算

    Console Output

    Reference

    https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/


    欢迎点赞关注和收藏

    强者自救 圣者渡人
  • 相关阅读:
    C#listbox使用方法
    poj 3894 System Engineer (二分图最大匹配--匈牙利算法)
    Java实现 蓝桥杯VIP 算法训练 连接字符串
    Java实现 蓝桥杯VIP 算法训练 连接字符串
    Java实现 蓝桥杯VIP 算法训练 比较字符串
    Java实现 蓝桥杯VIP 算法训练 比较字符串
    Java实现 蓝桥杯VIP 算法训练 比较字符串
    Java实现 蓝桥杯VIP 算法训练 比较字符串
    Java实现 蓝桥杯VIP 算法训练 比较字符串
    Java实现 蓝桥杯VIP 算法训练 黑白无常
  • 原文地址:https://www.cnblogs.com/agilestyle/p/15161679.html
Copyright © 2011-2022 走看看