zoukankan      html  css  js  c++  java
  • flink 如何实现对watermark 的checkpoint,防止数据复写

    fink slink 后的数据被复写了???

    生产环境总会遇到各种各样的莫名其名的数据,一但考虑不周便是车毁人亡啊。


    线上sink 流是es , es 的文档id 是自定义的 id+windowSatarTime

    设window size = 10min , watermark 最大延迟时间是 10s,. 数据中的event time 是乱序到达的,数据最大延迟时间是 30min

    watermark 生成函数

    assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
            val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
          })

    如果现在是10:15 分,当前win的窗口是 [10:10,10:20),意味着[09:40,09:50,10:00] 的统计值已经生成 。

    此时,程序发生异常,并有checkpoint + resart 策略,那么重启后,watermark 会继续从断点处消费?window 是否还是[10:10,10:20)?

    答案是不会,watermark 会从0开始增长,window 也会从新开始。

    重启后,如果不幸第一条数据的eventtime 是 09:45:02 , 那么此时 watermark 是 09:45:00 , window 是 [09:40:09:50), 一段时间后数据再次会聚合生条es 记录文档 [id+09:40], sink 时之前的es 数据会被覆盖

    测试:

    2020-10-21 23:57:01.001 -------watermark: -2
    input:Goods(id=1,count=10,time=10)               // 输入: 1,10,10
    ()
    2020-10-21 23:57:01.001 -------watermark: 8
    .... 2020-10-21 23:57:04.004 -------watermark: 8 // 输入: 0,0,0 触发异常,重启 2020-10-21 23:57:09.009 -------watermark: -2 // watermark 重新开始
    .... 2020-10-21 23:57:17.017 -------watermark: -2 input:Goods(id=1,count=10,time=10) () 2020-10-21 23:57:17.017 -------watermark: 8
    ...

    解决:

    这里的  currentMaxTimestamp 本质可以看做是 Operator State , 那么可以通过实现  CheckpointedFunction、ListCheckpointed 接口来保存这个state

    修改后的water mark 函数

    .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println("watermark", currentMaxTimestamp - maxOutOfOrderness)
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
    
            override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
              Collections.singletonList(currentMaxTimestamp)
            }
    
            override def restoreState(state: util.List[JavaLong]): Unit = {
              val stateMin = state.asScala.min
              if (stateMin > 0) currentMaxTimestamp = stateMin
            }
          })

    测试:

    2020-10-22 00:39:00.000 -------watermark: -2
    input:Goods(id=1,count=10,time=10)      // 输入: 1,10,10
    ()
    2020-10-22 00:39:00.000 -------watermark: 8
    ...
    2020-10-22 00:39:03.003 -------watermark: 8
    input:Goods(id=0,count=0,time=0)        // 输入: 0,0,0 触发异常,重启
    2020-10-22 00:39:08.008 -------watermark: 8  // 从 checkpoints 中获取state
    ...
    2020-10-22 00:39:23.023 -------watermark: 8
    input:Goods(id=1,count=20,time=20)   // 输入: 1,20,20
    ()
    2020-10-22 00:39:23.023 -------watermark: 18
    ....

    完整测试程序

    import java.util.{Collections, Date}
    import java.util
    
    import scala.collection.JavaConverters._
    import java.lang.{Long => JavaLong}
    import java.text.SimpleDateFormat
    import java.util.concurrent.TimeUnit
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.common.time.Time
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.watermark.Watermark
    
    /**
     * CheckpointCount
     */
    object WatermarkCheckpoint {
    
      case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
        override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
      }
    
      def main(args: Array[String]): Unit = {
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.enableCheckpointing(1000 * 10)
        env.getCheckpointConfig.setCheckpointTimeout(1000 * 60) // checkpoint 超时时间
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 5) // 两次 checkpoint 的最小间隔
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 模式
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // checkpoint 并发数
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // cancel job 时持久化checkopint
        env.getCheckpointConfig.setFailOnCheckpointingErrors(false) // 当checkpoint 失败时不会导致任务失败终止
        // restart strategy
        env.setRestartStrategy(
          RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))
        )
        // state backend
        val file_rocksdb = "file:///tmp/state/rocksdb"  // 需要提前建立路径
        env.setStateBackend(new RocksDBStateBackend(file_rocksdb, true))
        env.setParallelism(1)
    
        env.socketTextStream("localhost", 9999)
          .filter(_.nonEmpty)
          .map(x => {
            val arr = x.split(",")
            val g = Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
            println(s"input:$g")
            g
          })
    
          // watermark 没有 checkpoint
          /*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
          })*/
    
          // watermark  checkpoint
          .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
    
            override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
              Collections.singletonList(currentMaxTimestamp)
            }
    
            override def restoreState(state: util.List[JavaLong]): Unit = {
              val stateMin = state.asScala.min
              if (stateMin > 0) currentMaxTimestamp = stateMin
            }
          })
    
          .map(x => {
            if (x.id == 0) throw new RuntimeException("id is 0")
          })
          .print()
    
        env.execute(this.getClass.getSimpleName)
      }
    }
    完整测试代码
  • 相关阅读:
    android29
    android28
    android27
    android26
    Dynamics CRM2011 MspInstallAction failed when installing an Update Rollup
    Dynamics CRM Import Solution Attribute Display Name description is null or empty
    The service cannot be activated because it does not support ASP.NET compatibility
    IIS部署WCF报 无法读取配置节“protocolMapping”,因为它缺少节声明
    Unable to access the IIS metabase.You do not have sufficient privilege
    LM算法与非线性最小二乘问题
  • 原文地址:https://www.cnblogs.com/feiquan/p/13853105.html
Copyright © 2011-2022 走看看