zoukankan      html  css  js  c++  java
  • flink 处理实时数据的三重保障

    flink 处理实时数据的三重保障

    1. window+watermark 来处理乱序数据
      对于 TumblingEventTimeWindows window 的元数据startTime,endTime 和程序启动时间无关,当你指定出 window.size 时, window的startTime,endTime就分配好了

    2. allowedLateness 来处理迟到的数据
      相当于延迟了window 的生命周期, 【startTime,endTime) -> [startTime,endTime+ allowedLateness]

    3. sideOutput 是最后的兜底策略, 当window 的生命周期结束后, 延迟的数据可以通过侧输出收集起来,自定义后续的处理流程

    测试

    1. 程序
    import java.util.Date
    
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
    
    object LastElement {
    
      case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) {
        override def toString: String = s"Goods(id=$id,count=$count,time=$time)"
      }
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        // 创建延迟数据 OutputTag, 标记为 late-data
        val lateOutputTag = OutputTag[Goods]("late-data")
    
        val stream = env
          .socketTextStream("localhost", 9999)
          .filter(_.nonEmpty)
          .map(x => {
            val arr = x.split(",")
            Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time
          })
          .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] {
            val maxOutOfOrderness = 2L // 最大无序数据到达的时间,用来生成水印2ms
            var currentMaxTimestamp: Long = _
    
            override def getCurrentWatermark: Watermark = {
              new Watermark(currentMaxTimestamp - maxOutOfOrderness)
            }
    
            override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp)
              element.time
            }
          })
    
        val streamFunc = stream
          .keyBy(_.id)
          .timeWindow(Time.milliseconds(10))
          .trigger(EventTimeTrigger.create())
          .allowedLateness(Time.milliseconds(3)) //  允许延时的最大时间
          .sideOutputLateData(lateOutputTag) // 对延时数据进行标记
          .reduce { (v1, v2) => Goods(v1.id, v1.count + v2.count, v2.time) }
    
        // lateOutputTag 从窗口结果中获取迟到数据局产生的统计结果
        val lateStream = streamFunc.getSideOutput(lateOutputTag)
    
        stream
          .print()
    
        streamFunc
          .map(("_________sum: ", _))
          .print()
    
        lateStream
          .map(("+++++++++++late: ", _))
          .print()
    
        env.execute(this.getClass.getSimpleName)
      }
    }

    input:

    1,1,0
    1,1,9
    1,2,10
    1,1,5
    1,2,11
    1,1,8
    1,2,13
    1,1,2
    1,2,17
    1,1,3
    1,3,20
    1,3,21

    output:

    Goods(id=1,count=1,time=0)
    Goods(id=1,count=1,time=9)
    Goods(id=1,count=2,time=10)
    Goods(id=1,count=1,time=5)
    Goods(id=1,count=2,time=11)
    (_________sum: ,Goods(id=1,count=3,time=5))
    Goods(id=1,count=1,time=8)
    (_________sum: ,Goods(id=1,count=4,time=8))
    Goods(id=1,count=2,time=13)
    Goods(id=1,count=1,time=2)
    (_________sum: ,Goods(id=1,count=5,time=2))
    Goods(id=1,count=2,time=17)
    Goods(id=1,count=1,time=3)
    (+++++++++++late: ,Goods(id=1,count=1,time=3))
    Goods(id=1,count=3,time=20)
    Goods(id=1,count=3,time=21)
    (_________sum: ,Goods(id=1,count=8,time=17))

    分析:

    1,1,0  // win1 start
    1,1,9  // win1 end 注意此时win1 没有关闭
    1,2,10 // win2 start
    1,1,5  // win1 这一条数据属于win1无序的数据,此时 watermark=7 < win1.endTime=9.
    1,2,11 // win2 && win1 触发计算,原因是 watermark=9 >= win1.endTime=9 && win1中有数据。如果没有 allowedLateness(3ms)的话此时就已经关闭 win1 了,但是有延时3ms 所以还没有关闭
    1,1,8  // win1 由于有 allowedLateness(3ms),这一条数据属于win1无序的数据,并触发 update;而不是 win1的 sideOutput 数据
    1,2,13 // win2 && win1 处于 close 边缘,win1 真正的生命周期从 [0,9+2) -> [0,9+2+3]
    1,1,2  // win1 allowedLateness(3ms) 导致 update
    1,2,17 // win2 && win1 close
    1,1,3  // win1 此时win1 已经close, 这条数据属于win1 的 sideOutput
    1,3,20 // win3 start
    1,3,21 // win3 && win2 触发计算
    
    // 所以最后的结果:
    win1: 1,5,2   + sideOutPut: 1,1,3
    win2: 1,8,17
    win3: 1,6,21
  • 相关阅读:
    数据结构与算法分析-Code Blocks中出现的找不到头文件的问题
    数据结构与算法分析-用C语言实现栈(数组方式)
    数据结构与算法分析-用C语言实现栈(链表方式)
    数据结构与算法分析-用C语言实现单链表
    C语言经典算法100例-结束语
    C++ Primer 7.33 练习编写成员函数
    C语言经典算法100例-073-链表逆序插入节点
    C语言经典算法100例-072-创建一个链表
    LintCode-编辑距离
    LintCode-乘积最大子序列
  • 原文地址:https://www.cnblogs.com/feiquan/p/13842149.html
Copyright © 2011-2022 走看看