zoukankan      html  css  js  c++  java
  • flink 如何实现对watermark 的checkpoint,防止数据复写

    fink slink 后的数据被复写了???

    生产环境总会遇到各种各样的莫名其名的数据,一但考虑不周便是车毁人亡啊。


    线上sink 流是es , es 的文档id 是自定义的 id+windowSatarTime

    设window size = 10min , watermark 最大延迟时间是 10s,. 数据中的event time 是乱序到达的,数据最大延迟时间是 30min

    watermark 生成函数

    assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
            val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
          })

    如果现在是10:15 分,当前win的窗口是 [10:10,10:20),意味着[09:40,09:50,10:00] 的统计值已经生成 。

    此时,程序发生异常,并有checkpoint + resart 策略,那么重启后,watermark 会继续从断点处消费?window 是否还是[10:10,10:20)?

    答案是不会,watermark 会从0开始增长,window 也会从新开始。

    重启后,如果不幸第一条数据的eventtime 是 09:45:02 , 那么此时 watermark 是 09:45:00 , window 是 [09:40:09:50), 一段时间后数据再次会聚合生条es 记录文档 [id+09:40], sink 时之前的es 数据会被覆盖

    测试:

    2020-10-21 23:57:01.001 -------watermark: -2
    input:Goods(id=1,count=10,time=10)               // 输入: 1,10,10
    ()
    2020-10-21 23:57:01.001 -------watermark: 8
    .... 2020-10-21 23:57:04.004 -------watermark: 8 // 输入: 0,0,0 触发异常,重启 2020-10-21 23:57:09.009 -------watermark: -2 // watermark 重新开始
    .... 2020-10-21 23:57:17.017 -------watermark: -2 input:Goods(id=1,count=10,time=10) () 2020-10-21 23:57:17.017 -------watermark: 8
    ...

    解决:

    这里的  currentMaxTimestamp 本质可以看做是 Operator State , 那么可以通过实现  CheckpointedFunction、ListCheckpointed 接口来保存这个state

    修改后的water mark 函数

    .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println("watermark", currentMaxTimestamp - maxOutOfOrderness)
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
    
            override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
              Collections.singletonList(currentMaxTimestamp)
            }
    
            override def restoreState(state: util.List[JavaLong]): Unit = {
              val stateMin = state.asScala.min
              if (stateMin > 0) currentMaxTimestamp = stateMin
            }
          })

    测试:

    2020-10-22 00:39:00.000 -------watermark: -2
    input:Goods(id=1,count=10,time=10)      // 输入: 1,10,10
    ()
    2020-10-22 00:39:00.000 -------watermark: 8
    ...
    2020-10-22 00:39:03.003 -------watermark: 8
    input:Goods(id=0,count=0,time=0)        // 输入: 0,0,0 触发异常,重启
    2020-10-22 00:39:08.008 -------watermark: 8  // 从 checkpoints 中获取state
    ...
    2020-10-22 00:39:23.023 -------watermark: 8
    input:Goods(id=1,count=20,time=20)   // 输入: 1,20,20
    ()
    2020-10-22 00:39:23.023 -------watermark: 18
    ....

    完整测试程序

    import java.util.{Collections, Date}
    import java.util
    
    import scala.collection.JavaConverters._
    import java.lang.{Long => JavaLong}
    import java.text.SimpleDateFormat
    import java.util.concurrent.TimeUnit
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies
    import org.apache.flink.api.common.time.Time
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
    import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.watermark.Watermark
    
    /**
     * CheckpointCount
     */
    object WatermarkCheckpoint {
    
      case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
        override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
      }
    
      def main(args: Array[String]): Unit = {
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.sss")
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.enableCheckpointing(1000 * 10)
        env.getCheckpointConfig.setCheckpointTimeout(1000 * 60) // checkpoint 超时时间
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 5) // 两次 checkpoint 的最小间隔
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 模式
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // checkpoint 并发数
        env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // cancel job 时持久化checkopint
        env.getCheckpointConfig.setFailOnCheckpointingErrors(false) // 当checkpoint 失败时不会导致任务失败终止
        // restart strategy
        env.setRestartStrategy(
          RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS))
        )
        // state backend
        val file_rocksdb = "file:///tmp/state/rocksdb"  // 需要提前建立路径
        env.setStateBackend(new RocksDBStateBackend(file_rocksdb, true))
        env.setParallelism(1)
    
        env.socketTextStream("localhost", 9999)
          .filter(_.nonEmpty)
          .map(x => {
            val arr = x.split(",")
            val g = Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
            println(s"input:$g")
            g
          })
    
          // watermark 没有 checkpoint
          /*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
          })*/
    
          // watermark  checkpoint
          .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] with ListCheckpointed[JavaLong] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              println(s"${dateFormat.format(new Date().getTime)} -------watermark: ${currentMaxTimestamp - maxOutOfOrderness}")
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
    
            override def snapshotState(checkpointId: Long, timestamp: Long): util.List[JavaLong] = {
              Collections.singletonList(currentMaxTimestamp)
            }
    
            override def restoreState(state: util.List[JavaLong]): Unit = {
              val stateMin = state.asScala.min
              if (stateMin > 0) currentMaxTimestamp = stateMin
            }
          })
    
          .map(x => {
            if (x.id == 0) throw new RuntimeException("id is 0")
          })
          .print()
    
        env.execute(this.getClass.getSimpleName)
      }
    }
    完整测试代码
  • 相关阅读:
    你可能不知道的css-doodle
    js变量提升与函数提升的详细过程
    绑定Github上的个人博客到Godaddy域名
    基于Github&Hexo的个人博客搭建过程
    github提交代码contributions不显示小绿块
    从零开始学 Web 系列教程
    从零开始学 Web 之 Vue.js(六)Vue的组件
    从零开始学 Web 之 Vue.js(五)Vue的动画
    从零开始学 Web 之 Vue.js(四)Vue的Ajax请求和跨域
    CSS(二)- 选择器
  • 原文地址:https://www.cnblogs.com/feiquan/p/13853105.html
Copyright © 2011-2022 走看看