zoukankan      html  css  js  c++  java
  • Flink实例(六十七):自定义时间和窗口的操作符(十二)Flink事件时间何时触发窗口计算


    思考:
        什么时候才会触发窗口计算?
        既然使用的是事件时间那么必然会涉及到水位线(water_mark),水位线在其中扮演的角色是什么?
        此时我们带着疑问,一步一步的探究
    注意:
    1、本篇博客中的所有解释都是在滚动窗口的前提下
    2、浏览本博客前观看本栏另外一篇博客“Flink时间概念与水位线”尤佳。

    Q:为什么要在滚动窗口的前提下进行解释?
    A:因为滚动窗口相比较滑动和会话来说更容易让大家理解,在本篇博客中着重的是讨论水位线在窗口触发下的场景,因此当然采用大家较容易理解的窗口来为大家解释。

    Q:那我要是想了解水位线在其他窗口下的场景呢?
    A:在本栏的其他博客有详细介绍。

    1.1 浏览本博客前你需要了解的知识点

        flink内部是如何划分窗口的?

        首先Windows的时间范围是一个自然时间范围,比如你定义了一个窗口:timeWindow(Time.seconds(3));那么其windows会将窗口中的事件按照3S进行划分(左闭右开)

    [10:11:00,10:11:03)
    [10:11:03,10:11:06)
    … …
    [10:11:21,10:11:24)
    … …


        当一个Event Time = 10:11:22的记录到来时就会生成如下窗口,此时这条消息就存放在这个窗口中;

    [10:11:21,10:11:24]

        触发的条件?

        a、water_mark时间 >= window_end_time只是第一个条件
        b、在[window_start_time,window_end_time)区间中还需要有数据存在,如果没有数据同样是不会触发的。

        何时第一次触发?

        当water_marker >= windows_end_time窗口结束时间,就会触发窗口操作。
    (最新的water_marker时间戳会在过去的windows_end_time窗口结束时间中逐一进行比较,如果发现有 >= 的情况就会触发窗口操作)

    1.2 示例:触发窗口计算

    示例
    最大乱序事件10秒,窗口时间3秒

        第一次触发计算

    输入数据:
    1,1538359882000
    2,1538359886000
    3,1538359892000
    4,1538359893000
    5,1538359894000

        定义下方输出数据中的water_mark

      val watermark: DataStream[(String, Long, String, Int)] = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long, String, Int)] {
        val maxOutOfOrderness = 10000L //最大允许的乱序时间是10s
        var currentMaxTimestamp = 0L
        var a: Watermark = _
    
        override def getCurrentWatermark: Watermark = {
          a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          a
        }
    
    
        override def extractTimestamp(element: (String, Long, String, Int), previousElementTimestamp: Long): Long = {
          val timestamp = element._2
          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
          val end = if (!a.toString.contains("-")) {
            val regEx = "[^0-9]";
            val p = Pattern.compile(regEx);
            val m = p.matcher(a.toString);
            val L_number = m.replaceAll("").trim()
            format.format(L_number.toLong)
          } else a.toString
          println("timestamp:" + element._1 + "," + element._2 + "," + element._3 + "|" +
            s"${a.toString}($end)" + "|" + format.format(currentMaxTimestamp - maxOutOfOrderness))
          val lll: Long = System.currentTimeMillis()
    
          timestamp
        }
      })

     当我们依次输入数据的时候,在输入完5,1538359894000这条数据后,最新的water_marker是:10:11:23。由于该条记录的水位线为:10:11:24 > 10:11:23,因此水位线会进行更新,变成10:11:24,而10:11:24 >= window end time(10:11:24),所以此时就会触发计算操作。

        此时的触发操作计算的是event_time(事件时间)在[10:11:21,10:11:24)窗口之间的数据(也就是只会计算 1,1538359882000 这一条数据)

    输出数据为:
    当前数据 | 当前数据的水位线 , 计算并更新水位线
    timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
    timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
    timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
    timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
    timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
    第一次触发计算: (1,1538359882000, 10:11:22,1)

        何时第二次触发计算

    输入数据:
    1,1538359882000
    2,1538359886000
    3,1538359892000
    4,1538359893000
    5,1538359894000
    6,1538359896000
    7,1538359897000
    输出数据为:
    timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
    timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
    timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
    timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
    timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
    第一次触发计算:> (1,1538359882000, 10:11:22,1)
    timestamp:6,1538359896000,10:11:36|Watermark@1538359884000(10:11:24),10:11:26
    timestamp:7,1538359897000,10:11:37|Watermark@1538359886000(10:11:26), 10:11:27
    第二次触发计算:> (2,1538359886000, 10:11:26,1)

     第一次的窗口[10:11:21,10:11:24)已经被使用了,那么程序下次触发的窗口则是[10:11:24,10:11:27),因此我输入的7,1538359897000这条数据,而该条数据中的water_mark则会更新成10:11:27而10:11:27 >= window_end_time(10:11:27),因此会触发操作。
        (此时计算的是[10:11:24,10:11:27)窗口内的数据,在上面的输入数据中只有2,1538359886000这条数据属于这个窗口,因此计算的是该条数据)

        何时触发多个窗口计算?

    输入数据:
    1,1538359882000
    2,1538359886000
    3,1538359892000
    4,1538359893000
    5,1538359894000
    6,1538359907000
    输出数据:
    timestamp:1,1538359882000,10:11:22|Watermark@-10000(Watermark@-0000),10:11:12
    timestamp:2,1538359886000,10:11:26|Watermark@1538359872000(10:11:12),10:11:16
    timestamp:3,1538359892000,10:11:32|Watermark@1538359876000(10:11:16),10:11:22
    timestamp:4,1538359893000,10:11:33|Watermark@1538359882000(10:11:22), 10:11:23
    timestamp:5,1538359894000,10:11:34|Watermark@1538359883000(10:11:23), 10:11:24
    第一次触发计算:> (1,1538359882000, 10:11:22,1)
    timestamp:6,1538359907000,10:11:47|Watermark@1538359884000(10:11:24),10:11:37
    触发计算:> (2,1538359886000, 10:11:26,1)
    触发计算:> (3,1538359892000, 10:11:32,1)
    触发计算:> (4,1538359893000, 10:11:33,1)
    触发计算:> (5,1538359894000, 10:11:34,1)

    窗口[10:11:21,10:11:24)之间的数据已经在第一次进行了触发 后面的 6,1538359907000 数据之所以会导致窗口操作是因为:10:11:37 >= 窗口时间中的windows_end_Time [10:11:34,10:11:37);
       而此时在[10:11:34,10:11:37)这个窗口之上还有

                          [10:11:21,10:11:24)已经在第一次进行触发了
    [10:11:24,10:11:27)
    [10:11:27,10:11:31)
    [10:11:31,10:11:34)
    [10:11:34,10:11:37)
    … ….


       由于timestamp 2,3,4,5的event_time 分别在这些窗口之间,因此输出的话就会在这次触发操作中全部进行输出了;计算的是每个窗口内的数据。

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13877789.html

  • 相关阅读:
    Django集成CAS
    JAVA命名规范
    Mybatis(5)——动态SQL
    Mybatis(4)——ResultMap
    Mybatis(3)——参数处理
    Mybatis(2)——Mapper映射文件
    Mybatis(1)——配置文件
    Gson(http://www.jianshu.com/p/e740196225a4)
    Gson中fromJson方法
    SessionAttributes和ModelAttribute
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13877789.html
Copyright © 2011-2022 走看看