zoukankan      html  css  js  c++  java
  • 【源码解析】Flink 是如何基于事件时间生成Timestamp和Watermark

    生成Timestamp和Watermark 的三个重载方法介绍可参见上一篇博客: Flink assignAscendingTimestamps 生成水印的三个重载方法

     之前想研究下Flink是怎么处理乱序的数据,看了相关的源码,加上测试,发现得到了与预期完全不相同的结果。

    预期是:乱序到达的数据,flink可以基于数据的事件时间,自动整理数据,依次计算输出

    结果是:在assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]指派timestamp和watermark的情况下,乱序到达的数据:迟到的数据直接从侧边输出了超前的数据直接结束当前的窗口,开启超前数据对应的窗口,后面到达的正常数据,直接作为迟到数据处理了

     在得到上面的结果的过程中,仔细的研究了一下生产Timestamp和Watermark相关的源码。

    Flink DataStream API 目前只能通过 assignTimestampsAndWatermarks方法创建时间戳和水印有两种生成模式:

      1、基于事件时间创建每个事件的Timestamp 和 基于事件时间周期性的创建Watermark(默认周期为200ms)

      2、基于事件时间创建每个事件的Timestamp 和 基于事件时间每个事件都创建一个Watermark(如果新的Watermark大于当前的Watermark,才会发出)

     事件时间下,事件的Timestamp的创建都是直接依赖于事件携带的事件时间,而Watermark则是基于事件时间生成Watermark,所以有周期性创建Watermark和标记的Watermark(With Punctuated Watermarks)的区分(官网中基于Kafka 的分区时间作为Watermark 也是周期性的生成Watermark,只不过传入的事件时间改为事件在kafka中的timestamp了) 

    1、周期性的创建Watermark

    周期性的创建Watermark的有两种方法(kafka的分区时间的忽略):  

    assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]
    assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

    1.1  assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] 对应源码 

    调用方法如下:

    .assignAscendingTimestamps(element => {
          // 方便打断点debug 
          println("xxxxxx : " + element.createTime)
          sdf.parse(element.createTime).getTime
        })

     周期性的创建Watermark 是在 TimestampsAndPeriodicWatermarksOperator 中生成、发出,对应的时间来源是调用不同的生成timestamp 和 Watermark 的实现类

    TimestampsAndPeriodicWatermarksOperator  相应代码如下:

      /*    
            处理事件元素: 获取对应的事件时间的时间戳,替换事件默认的时间戳(如果数据源是kafka,时间戳就是数据在kafka中的时间戳)
         */
        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                    element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
    
            output.collect(element.replace(element.getValue(), newTimestamp));
        }
        /*
            处理时间(Watermark) : 获取当前时间对应的上一次的事件时间,生成新的watermark,新的watermark的时间戳大于当前的watermark,就发出新的watermark
         */
        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            // 从这里可以看到,每200ms 打印一次
            System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis());
            // register next timer
            Watermark newWatermark = userFunction.getCurrentWatermark();
            if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
                currentWatermark = newWatermark.getTimestamp();
                // emit watermark
                output.emitWatermark(newWatermark);
            }
    
            long now = getProcessingTimeService().getCurrentProcessingTime();
    // 注册timer ,周期性的调用,下面会展开 getProcessingTimeService().registerTimer(now
    + watermarkInterval, this); }

    在这种生成timestamp 和 Watermark 的情况下,userFunction  对应的类是:AscendingTimestampExtractor

    对应源码如下:

    @Override
        public final long extractTimestamp(T element, long elementPrevTimestamp) {
                    // 调用 assignAscendingTimestamps 的参数函数
            final long newTimestamp = extractAscendingTimestamp(element);
            if (newTimestamp >= this.currentTimestamp) {
    // 这是为了下面生成Watermark的方法,总能得到 大于等于 当前Watermark的 时间戳
    this.currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } } @Override public final Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); }

    timestamp的生成: TimestampsAndPeriodicWatermarksOperator#processElement  方法,调用AscendingTimestampExtractor#extractTimestamp 再调用 用户代码中具体生成timestamp 的方法,最终生成事件对应的timestamp,替换原有的timestamp

    Watermark的生成:TimestampsAndPeriodicWatermarksOperator#onProcessingTime 方法,调用 AscendingTimestampExtractor#getCurrentWatermark, 返回生成timestamp 时的 currentTimestamp -1 ,生成  Watermark,如果生成的Watermark的timestamp 大于当前的  Watermark的timestamp 就发出新的Watermark

    1.2 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 

    调用方法如下:

    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
          override def extractTimestamp(element: LateDataEvent): Long = {
            println("current timestamp : " + sdf.parse(element.createTime).getTime)
            sdf.parse(element.createTime).getTime
          }
        })

    在这种生成timestamp 和 Watermark 的情况下,userFunction  对应的类是:BoundedOutOfOrdernessTimestampExtractor

    对应源码

        @Override
        public final Watermark getCurrentWatermark() {
            // this guarantees that the watermark never goes backwards.
            long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
            if (potentialWM >= lastEmittedWatermark) {
                lastEmittedWatermark = potentialWM;
            }
            return new Watermark(lastEmittedWatermark);
        }
    
        @Override
        public final long extractTimestamp(T element, long previousElementTimestamp) {
            long timestamp = extractTimestamp(element);
            if (timestamp > currentMaxTimestamp) {
    // 这是为了上面上次Watermark的方法总能获取到 大于等于 当前Watermark的时间戳 currentMaxTimestamp
    = timestamp; } return timestamp; }

     基本上与上面相同,只是这种情况下,生成Watermark会 减去相应的 maxOutOfOrderness (允许延迟时间,就是代码中BoundedOutOfOrdernessTimestampExtractor对应的参数)

    之所以说是周期性的,是因为生成Watermark的方法是周期性调用的:

    // 注册timer 定期执行
    getProcessingTimeService().registerTimer(now + watermarkInterval, this); // 对应 watermarkInterval 来自与系统配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); // 对应配置, 默认 200ms env.getConfig.setAutoWatermarkInterval(400)

    看代码可知,生成timestamp和Watermark是两条线,timestamp 是每个事件消息都会生成,而Watermark 是周期的

    2、标记的Watermark(With Punctuated Watermarks)

    这种Watermark的生成只有一种,对应代码如下:

    .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
          // check extractTimestamp emitted watermark is non-null and large than previously 生成当前事件的Watermark
          override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
            new Watermark(extractedTimestamp)
          }
    
          // generate next watermark 生成当前事件的timestamp
          override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
            val eventTime = sdf.parse(element.createTime).getTime
            eventTime
          }
        })

    对应上面生成添加时间戳到事件中和发出Watermark  在 TimestampsAndPunctuatedWatermarksOperator中具体如下:

    @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            final T value = element.getValue();
                    // extractTimestamp 方法就是assignTimestampsAndWatermarks 中的 extractTimestamp 生成事件的时间戳
            final long newTimestamp = userFunction.extractTimestamp(value,
                    element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
    
            output.collect(element.replace(element.getValue(), newTimestamp));
                    // checkAndGetNextWatermark 方法就是assignTimestampsAndWatermarks 中的 checkAndGetNextWatermark,检查Watermark
            final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
    // 新的Watermark大于当前的Watermark才会发出
    if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }

    这里可以看出,每条数据都会生成 tiemstamp 和 Watermark(不一定会发出,如果数据都是正常的,Watermark的消息会和事件的消息一样多,所以会影响性能)

    搞定。

     欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    植物大战僵尸课设个人报告
    MXNET框架基础2CVhotdog
    MXNET框架基础3GPU计算
    MXNET框架基础7BN
    MXNET框架基础0预备知识
    开课吧名企计算机视觉第一课
    MXNET框架基础5符号式编程
    MXNET框架基础1模型构建基本原理
    faceid算法原理简述
    MXNET框架基础6图像增广
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11461328.html
Copyright © 2011-2022 走看看