zoukankan      html  css  js  c++  java
  • Flink Watermark

    原创转载请注明出处:https://www.cnblogs.com/agilestyle/p/15161679.html

    Event Time & Processing Time

    • Event Time:事件创建的时间
    • Processing Time:执行操作算子的当前机器的本地时间

    官网权威解释可以参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time

    真实业务场景中,我们往往更关心事件时间(Event Time),Flink 从1.12起流的时间特性默认设置为 TimeCharacteristic.EventTime

    Watermark

    当 Flink 以 Event Time 模式处理数据流时,会根据数据里的时间戳来处理基于时间的算子,通常系统由于网络抖动、分布式架构等原因,会导致乱序数据的产生,从而导致窗口计算不精确。

    Fink 为了避免乱序数据带来的窗口计算不精确的问题,引入了 Watermark 机制。

    • Watermark 用于标记 Event Time 的前进过程
    • Watermark 跟随 DataStream Event Time 变动,并自身携带 TimeStamp
    • Watermark 用于表明所有较早的事件已经(可能)到达
    • Watermark 本身也属于特殊的事件

    官网权威解释可以参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#event-time-and-watermarks

    在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要开发人员对业务的上下游数据乱序的程度有一定的了解;如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果;而如果 Watermark 到达的太早,则可能收到错误结果,不过可以通过 Flink 处理迟到数据的机制来解决这个问题。

    Demo

    Maven Dependency

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.fool</groupId>
        <artifactId>flink</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
                <version>1.12.5</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>1.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.20</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.26</version>
            </dependency>
        </dependencies>
    
    </project>

    SRC

    src/main/java/org/fool/flink/contract/Sensor.java

    package org.fool.flink.contract;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Sensor {
        private String id;
        private Long timestamp;
        private Double temperature;
    }

    src/main/java/org/fool/flink/window/WindowWatermarkTest.java

    package org.fool.flink.window;
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.Watermark;
    import org.apache.flink.api.common.eventtime.WatermarkGenerator;
    import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
    import org.apache.flink.api.common.eventtime.WatermarkOutput;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.OutputTag;
    import org.fool.flink.contract.Sensor;
    
    public class WindowWatermarkTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setParallelism(1);
            // environment.setParallelism(4);
    
            DataStream<String> inputStream = environment.socketTextStream("localhost", 7878);
    
            DataStream<Sensor> dataStream = inputStream.map(new MapFunction<String, Sensor>() {
                @Override
                public Sensor map(String value) throws Exception {
                    String[] fields = value.split(",");
                    return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2]));
                }
            }).assignTimestampsAndWatermarks(new WatermarkStrategy<Sensor>() {
                @Override
                public WatermarkGenerator<Sensor> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                    return new WatermarkGenerator<Sensor>() {
                        private final long maxOutOfOrderness = 2000; // 2 seconds
    
                        private long currentMaxTimestamp;
    
                        @Override
                        public void onEvent(Sensor sensor, long eventTimestamp, WatermarkOutput output) {
                            // System.out.println("sensor.getTimestamp(): " + sensor.getTimestamp() * 1000L);
                            // System.out.println("eventTimestamp: " + eventTimestamp);
                            currentMaxTimestamp = Math.max(sensor.getTimestamp() * 1000L, eventTimestamp);
                            // System.out.println("currentMaxTimestamp1: " + currentMaxTimestamp);
                        }
    
                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
                            // System.out.println("currentMaxTimestamp2: " + currentMaxTimestamp);
                            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                        }
                    };
                }
            }.withTimestampAssigner(new SerializableTimestampAssigner<Sensor>() {
                @Override
                public long extractTimestamp(Sensor sensor, long recordTimestamp) {
                    return sensor.getTimestamp() * 1000L;
                }
            }));
    
            OutputTag<Sensor> lateTag = new OutputTag<>("late", TypeInformation.of(Sensor.class));
    
            SingleOutputStreamOperator<Sensor> minStream = dataStream.keyBy(new KeySelector<Sensor, String>() {
                @Override
                public String getKey(Sensor sensor) throws Exception {
                    return sensor.getId();
                }
            }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
                    .allowedLateness(Time.minutes(1))
                    .sideOutputLateData(lateTag)
                    .minBy("temperature");
    
            minStream.print("min temp");
    
            minStream.getSideOutput(lateTag).print("late");
            environment.execute();
        }
    
    }

    Note: 当前并行度是 1,Watermark 设置为 2 秒

    environment.setParallelism(1);

    Run

    Socket Input

    1,1628754405,35.8
    1,1628754420,34.8
    1,1628754422,33.8

    Note:1628754422 这个时间点会触发窗口 [05, 20) 这个窗口计算

    Console Output

    min temp> Sensor(id=1, timestamp=1628754405, temperature=35.8)

    Socket Input

    1,1628754406,30.8
    1,1628754407,31.8

    Note:在 1628754422 这个时间点后继续输入, 1628754406、1628754407 后仍旧会触发窗口计算

    Console Output

    min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)
    min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)

    Note:因为设置了 1 分钟的 allowedLateness,1628754406、1628754407 这两个迟到的事件在 [05, 20) 这个窗口已经触发过计算后仍旧会触发窗口计算

    allowedLateness(Time.minutes(1))

    Socket Input

    1,1628754482,28.8

    Note:在 1628754407 这个时间点后继续输入

    Console Output

    min temp> Sensor(id=1, timestamp=1628754422, temperature=33.8)

    Note:1628754482 这个时间点,1 分钟的 allowedLateness 的窗口会关闭,触发窗口计算

    Socket Input

    1,1628754411,30.3
    1,1628754412,31.3

    Note:在 1628754482 这个时间点后继续输入,即 1 分钟的 allowedLateness 的窗口已经关闭

    Console Output

    late> Sensor(id=1, timestamp=1628754411, temperature=30.3)
    late> Sensor(id=1, timestamp=1628754412, temperature=31.3)

    Note:1 分钟的 allowedLateness 的窗口关闭后,1628754411、1628754412 这两个迟到的事件会进入 side output 

    完整的 Socket Input

    完整的 Console Output

    Key Point

    以上操作都是基于并行度为 1 的情况下进行的,当设置设置并行度不为 1 时,比如设置并行度为 4,结果会不一样。

    environment.setParallelism(4);

    并行度不为 1 的时候,测试输出的时候,Watermark 在上下游任务之间传递的规则:必须是每一个分区的 Watermark 都要上升,取最小的值才是当前的 Watermark,才会触发窗口聚合计算

    Socket Input

    Note:4 个分区的 Watermark 都到了 1628754422,才会触发窗口聚合计算

    Console Output

    Reference

    https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/


    欢迎点赞关注和收藏

    强者自救 圣者渡人
  • 相关阅读:
    vue中computed计算属性和methods区别
    java解决跨域问题
    redis服务端开启
    使用excel生成商品条形码
    MySQL主键自增时SQL写法/当前时间写法
    修改MySQL数据库密码
    MySQL5.6.42解压版安装教程
    完全卸载MySQL数据库
    IDEA快捷键及xml文件中网址报错
    IDEA导入外部包
  • 原文地址:https://www.cnblogs.com/agilestyle/p/15161679.html
Copyright © 2011-2022 走看看