zoukankan      html  css  js  c++  java
  • flink Periodic Watermarks 自定义周期性水印

    1、BoundedOutOfOrdernessGenerator

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
        private final long maxOutOfOrderness = 3000; // 3.0 seconds
    
        private long currentMaxTimestamp;
    
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            long timestamp = element.getCreationTime();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

    效果解析:

    2、TimeLagWatermarkGenerator

    /**
     * This generator generates watermarks that are lagging behind processing time by a fixed amount.
     * It assumes that elements arrive in Flink after a bounded delay.
     */
    public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
        private final long maxTimeLag = 3000; // 3 seconds
    
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            return element.getCreationTime();
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current time minus the maximum time lag
            return new Watermark(System.currentTimeMillis() - maxTimeLag);
        }
    }

    效果解析:

     
  • 相关阅读:
    springboot redis多数据源
    springboot mybatis 多数据源配置
    java jackson 忽略不存在的属性字段 和 按照属性名转json
    java 四舍五入保留两位小数
    supervisord supervisorctl 问题supervisor.sock refused connection
    2
    1
    Python开发【程序】:三级菜单程序
    Python开发【第二章】:补充
    Python开发【第二章】:深浅拷贝剖析
  • 原文地址:https://www.cnblogs.com/asker009/p/11318290.html
Copyright © 2011-2022 走看看