zoukankan      html  css  js  c++  java
  • flink添加水位线

    flink1.11添加水位线

    object UpdateWindowResultWithLateEvent {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val stream = env
          .socketTextStream("db2", 9999, '
    ')
          .map(r => {
            val arr = r.split(" ")
            (arr(0), arr(1).toLong * 1000L)
          })
          .assignTimestampsAndWatermarks(
            WatermarkStrategy
              .forBoundedOutOfOrderness[(String, Long)](Duration.ofSeconds(5))
              .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
                override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = element._2
              })
          )
          .keyBy(r => r._1)
          .timeWindow(Time.seconds(5))
          .allowedLateness(Time.seconds(5))
          .process(new CountWindow)
    
        stream.print()
    
        env.execute()
      }
    
      class CountWindow extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
          // note that here is window state!!! only for current key and current window
          val isUpdate = context.windowState.getState(
            new ValueStateDescriptor[Boolean]("is-update", Types.of[Boolean])
          )
    
          if (!isUpdate.value()) {
            out.collect("first calculate window result!!!!")
            isUpdate.update(true)
          } else {
            out.collect("update window result!!!!")
          }
        }
      }
    }
  • 相关阅读:
    防止重复点击
    刷新当前页面的几种方法
    PHP删除数组中空值
    json转化数组
    两个不能同时共存的条件orWhere查询
    SQLSTATE[42000]
    laravel一个页面两个表格分页处理
    Hash::make与Hash::check
    unbind()清除指定元素绑定效果
    二级联动
  • 原文地址:https://www.cnblogs.com/ttyypjt/p/15152910.html
Copyright © 2011-2022 走看看