zoukankan      html  css  js  c++  java
  • Flink学习(十五) 滑动事件时间窗口加上水位线开始窗口时间如何确定?(底层源码)

    先看上一节的代码程序

    package com.wyh.windowsApi
    
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    
    
    object WindowTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //周期性生成watermark 默认是200毫秒
        env.getConfig.setAutoWatermarkInterval(100L)
    
        /**
          * 从文件中读取数据
          *
          *
          */
        //val stream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        val stream = env.socketTextStream("localhost", 7777)
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = stream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
          //===到来的数据是升序的,准时发车,用assignAscendingTimestamps
          //指定哪个字段是时间戳 需要的是毫秒 * 1000
          //      .assignAscendingTimestamps(_.timestamp * 1000)
          //===处理乱序数据
          //      .assignTimestampsAndWatermarks(new MyAssignerPeriodic())
          //==底层也是周期性生成的一个方法 处理乱序数据 延迟1秒种生成水位 同时分配水位和时间戳 括号里传的是等待延迟的时间
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
          override def extractTimestamp(t: SensorReading): Long = {
            t.timestamp * 1000
          }
        })
    
        //统计10秒内的最小温度
        val minTemPerWindowStream = dataStream
          .map(data => (data.id, data.temperature))
          .keyBy(0)
          //      .timeWindow(Time.seconds(10)) //开时间窗口  滚动窗口 没有数据的窗口不会触发
          //左闭右开 包含开始 不包含结束 延迟1秒触发的那个时间的数据不包含
          //可以直接调用底层方法,第三个参数传offset代表时区
          //.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))
          .timeWindow(Time.seconds(15), Time.seconds(5)) //滑动窗口,每隔5秒输出一次
          .reduce((data1, data2) => (data1._1, data1._2.min(data2._2))) //用reduce做增量聚合
    
    
        minTemPerWindowStream.print("min temp")
    
        dataStream.print("input data")
    
        env.execute("window Test")
    
      }
    
    }
    
    
    //设置水位线(水印) 这里有两种方式实现
    //一种是周期性生成 一种是以数据的某种特性进行生成水位线(水印)
    /**
      * 周期性生成watermark 默认200毫秒
      */
    class MyAssignerPeriodic() extends AssignerWithPeriodicWatermarks[SensorReading] {
      val bound: Long = 60 * 1000
      var maxTs: Long = Long.MaxValue
    
      override def getCurrentWatermark: Watermark = {
        //定义一个规则进行生成
        new Watermark(maxTs - bound)
      }
    
      //用什么抽取这个时间戳
      override def extractTimestamp(t: SensorReading, l: Long): Long = {
        //保存当前最大的时间戳
        maxTs = maxTs.max(t.timestamp)
        t.timestamp * 1000
      }
    }
    
    
    /**
      * 乱序生成watermark
      * 每来一条数据就生成一个watermark
      */
    class MyAssignerPunctuated() extends AssignerWithPunctuatedWatermarks[SensorReading] {
      override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {
        new Watermark(l)
      }
    
      override def extractTimestamp(t: SensorReading, l: Long): Long = {
        t.timestamp * 1000
      }
    }

    开始点源码 Ctrl + 鼠标左键

     点进去发现是KededStream里面的其中一个方法,继续点

     我们发现实际上是封装了一层java代码,代码中TimeWindow本身就是一个简写,这里发现底层还是.window() 方法 传入窗口类型参数

     我们发现,如果窗口的时间是处理时间就调用滑动处理时间窗口,我们在代码中设置了事件时间,

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    所以这里是滑动事件处理时间窗口。

    继续点

     点进去我们就看到实现的方法了

     先来看最开始的时间是如何生成的,继续点

     我们就看到这样的一个计算公式

     来解释一下,我们可以看到这样一个参数,offset,它如果我们没有设置就默认为0。它本身是用来指定时间的时区的。注意:这里有个参数其实叫windowSize 其实传进来的是一个滑动步长!!!但是不影响结果

    如何在代码中添加这个offset呢:.window() 方法中传入

    SlidingEventTimeWindows.of()  第三个参数就是offset
    .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5),Time.hours(-8)))

    在这里,就计算出第一个窗口开始时间。

    继续看调用的

    我们可以看到,一个for循环追加了好多窗口window.

    判断如果开始时间大于时间戳减去窗口的大小,那么就把当前这个窗口加上一个创建口大小,然后再减去一个滑动步长,再判断是否大于时间戳减去窗口的大小,以此类推,知道小于,就结束创建,就可以得出最早创建的窗口。

    如果是滚动窗口,传进来的就是最早结束的时间,直接加上窗口大小

  • 相关阅读:
    EntityFramework+MySql 笔记2
    EntityFramework+MySql 笔记1
    软件详细设计文档(终)
    软件测试文档(终)
    软件测试计划文档(初)
    软件概要设计文档(终)
    软件需求规格说明文档(终)
    例会记录(六)
    例会记录(五)
    例会记录(四)
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12943387.html
Copyright © 2011-2022 走看看