zoukankan      html  css  js  c++  java
  • Flink之Watermarks

    1、代码案例

    package window
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    /**
     * @author yangwj
     * @date 2021/1/7 21:45
     * @version 1.0
     */
    object WaterMarksTime {
      /**
       * 窗口:分为时间窗口和计数窗口
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource)
    //      .assignAscendingTimestamps(_.timestamp)  //assignAscendingTimestamps 升序数据提取时间
          //assignTimestampsAndWatermarks(毫秒)将大部分数据都放入bucket中,得到近似正确数据
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
            override def extractTimestamp(t: SensorReading): Long = {
              t.timestamp
            }
          })
        //每15秒统计一次温度最小值
        val laterTag = new OutputTag[(String, Double, Long)]("later")
        val value: DataStream[(String, Double, Long)] = dataStream
          .map(data => (data.id, data.temperature, data.timestamp))
          .keyBy(_._1) //按照二元组的第一个元素分组
          // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定义滚动时间窗口
          //      .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑动窗口
          //      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 会话窗口
          .timeWindow(Time.seconds(15)) //定义滚动时间窗口
          //      .timeWindow(Time.minutes(5),Time.seconds(5))//滑动窗口
          //.countWindow(5)//计数窗口
          //      .minBy(1)
          .allowedLateness(Time.minutes(1)) // 这个设定将watermark漏网之鱼进行补抓
          .sideOutputLateData(laterTag)
          .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3))
    
        value.print("windows")
    
        value.getSideOutput(laterTag).print("outSide Stream")
        env.execute("reduce test")
    
      }
    }
    //自定义数据源
    class MySensorSource extends SourceFunction[SensorReading]{
    
      //定义一个标识位,用来表示数据源是否正常运行发出数据
      var running :Boolean = true
      //sourceContext 发送数据
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义无线循环,不断产生数据,除非被cancel
        val rand = new Random()
        var curTemp= 1.to(4).map(i => ("sensor" + i, rand.nextDouble() * 100))
    
        while (running){
          curTemp = curTemp.map(data =>(data._1,data._2+rand.nextGaussian()))
    
          val curTime = System.currentTimeMillis()
          println("输入值:"+curTemp+",时间为:"+curTime)
          curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2)))
          Thread.sleep(3000)
        }
      }
    
      override def cancel(): Unit = false
    }
  • 相关阅读:
    oracle的安装与plsql的环境配置
    Working with MSDTC
    soapui-java.lang.Exception Failed to load url
    Oracle 一个owner访问另一个owner的table,不加owner
    Call API relation to TLS 1.2
    Call API HTTP header Authorization: Basic
    VS2008 .csproj cannot be opened.The project type is not supported by this installat
    The changes couldn't be completed.Please reboot your computer and try again.
    Create DB Table View Procedure
    DB Change
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14264944.html
Copyright © 2011-2022 走看看