zoukankan      html  css  js  c++  java
  • Flink之Watermarks

    1、代码案例

    package window
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    /**
     * @author yangwj
     * @date 2021/1/7 21:45
     * @version 1.0
     */
    object WaterMarksTime {
      /**
       * 窗口:分为时间窗口和计数窗口
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val dataStream: DataStream[SensorReading] = env.addSource(new MySensorSource)
    //      .assignAscendingTimestamps(_.timestamp)  //assignAscendingTimestamps 升序数据提取时间
          //assignTimestampsAndWatermarks(毫秒)将大部分数据都放入bucket中,得到近似正确数据
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
            override def extractTimestamp(t: SensorReading): Long = {
              t.timestamp
            }
          })
        //每15秒统计一次温度最小值
        val laterTag = new OutputTag[(String, Double, Long)]("later")
        val value: DataStream[(String, Double, Long)] = dataStream
          .map(data => (data.id, data.temperature, data.timestamp))
          .keyBy(_._1) //按照二元组的第一个元素分组
          // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //定义滚动时间窗口
          //      .window(SlidingEventTimeWindows.of(Time.minutes(5),Time.seconds(5)))//滑动窗口
          //      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))// 会话窗口
          .timeWindow(Time.seconds(15)) //定义滚动时间窗口
          //      .timeWindow(Time.minutes(5),Time.seconds(5))//滑动窗口
          //.countWindow(5)//计数窗口
          //      .minBy(1)
          .allowedLateness(Time.minutes(1)) // 这个设定将watermark漏网之鱼进行补抓
          .sideOutputLateData(laterTag)
          .reduce((curs, newd) => (curs._1, curs._2.min(newd._2), newd._3))
    
        value.print("windows")
    
        value.getSideOutput(laterTag).print("outSide Stream")
        env.execute("reduce test")
    
      }
    }
    //自定义数据源
    class MySensorSource extends SourceFunction[SensorReading]{
    
      //定义一个标识位,用来表示数据源是否正常运行发出数据
      var running :Boolean = true
      //sourceContext 发送数据
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        //定义无线循环,不断产生数据,除非被cancel
        val rand = new Random()
        var curTemp= 1.to(4).map(i => ("sensor" + i, rand.nextDouble() * 100))
    
        while (running){
          curTemp = curTemp.map(data =>(data._1,data._2+rand.nextGaussian()))
    
          val curTime = System.currentTimeMillis()
          println("输入值:"+curTemp+",时间为:"+curTime)
          curTemp.foreach(data => sourceContext.collect(SensorReading(data._1,curTime,data._2)))
          Thread.sleep(3000)
        }
      }
    
      override def cancel(): Unit = false
    }
  • 相关阅读:
    <<C++ Primer>> 第三章 字符串, 向量和数组 术语表
    <<C++ Primer>> 第二章 变量和基本类型 术语表
    <<C++ Primer>> 第一章 开始 术语表
    PAT A1077 Kuchiguse (20)
    PAT A1035 Password (20)
    PAT A1005 Spell It Right (20)
    <<C++ Primer>> 术语表 (总) (待补充)
    PAT A1001 A+B Format (20 分)
    PAT B1048 数字加密 (20)
    Protocol
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14264944.html
Copyright © 2011-2022 走看看