zoukankan      html  css  js  c++  java
  • flink 自定义触发器 定时或达到数量触发

    触发器确定窗口(由窗口分配程序形成)何时准备由窗口函数处理。每个WindowAssigner都带有一个默认触发器。
    如果默认触发器不适合需求,我们就需要自定义触发器。

    主要方法

    触发器接口有五种方法,允许触发器对不同的事件作出反应

    1. onElement()添加到每个窗口的元素都会调用此方法。
    2. onEventTime()当注册的事件时间计时器触发时,将调用此方法。
    3. onProcessingTime()当注册的处理时间计时器触发时,将调用此方法。
    4. onMerge()与有状态触发器相关,并在两个触发器对应的窗口合并时合并它们的状态,例如在使用会话窗口时。(目前没使用过,了解不多)
    5. clear()执行删除相应窗口时所需的任何操作。(一般是删除定义的状态、定时器等)

    TriggerResult

    onElement(),onEventTime(),onProcessingTime()都要求返回一个TriggerResult

    TriggerResult包含以下内容

    1. CONTINUE:表示啥都不做。
    2. FIRE:表示触发计算,同时保留窗口中的数据
    3. PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
    4. FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)

    案例

    • 需求
    1. 当窗口中的数据量达到一定数量的时候触发计算
    2. 根据执行时间每隔一定时间且窗口中有数据触发计算,如果没有数据不触发计算
    3. 窗口关闭的时候清除数据

    实现过程

    案例逻辑图.png

    • 依赖
     <properties>
            <hadoop.version>3.1.1.3.1.0.0-78</hadoop.version>
            <flink.version>1.9.1</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <scala.version>2.11.7</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
    
    • 实现代码
    //调用
    dStream
          .keyBy(_.event_id)
          .window(TumblingEventTimeWindows.of(Time.hours(1)))
          .trigger(new CustomTrigger(10, 1 * 60 * 1000L))
    
    //-------------------------------------------------------------------------
    package com.meda.demo
    
    import java.text.SimpleDateFormat
    
    import com.meda.utils.DatePattern
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.state.ReducingStateDescriptor
    import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    
    class CustomTrigger extends Trigger[eventInputDT, TimeWindow] {
      //触发计算的最大数量
      private var maxCount: Long = _
      //定时触发间隔时长 (ms)
      private var interval: Long = 60 * 1000
      //记录当前数量的状态
      private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
      //记录执行时间定时触发时间的状态
      private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
      //记录时间时间定时器的状态
      private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])
    
      def this(maxCount: Int) {
        this()
        this.maxCount = maxCount
      }
    
      def this(maxCount: Int, interval: Long) {
        this(maxCount)
        this.interval = interval
      }
    
      override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        val countState = ctx.getPartitionedState(countStateDescriptor)
        //计数状态加1
        countState.add(1L)
    
        //如果没有设置事件时间定时器,需要设置一个窗口最大时间触发器,这个目的是为了在窗口清除的时候 利用时间时间触发计算,否则可能会缺少部分数据
        if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) {
          ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp())
          ctx.registerEventTimeTimer(window.maxTimestamp())
        }
    
        if (countState.get() >= this.maxCount) {
          //达到指定指定数量
          //删除事件时间定时触发的状态
          ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
          //清空计数状态
          countState.clear()
          //触发计算
          TriggerResult.FIRE
        } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {
          //未达到指定数量,且没有指定定时器,需要指定定时器
          //当前定时器状态值加上间隔值
          ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval)
          //注册定执行时间定时器
          ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
          TriggerResult.CONTINUE
        } else {
          TriggerResult.CONTINUE
        }
      }
    
      // 执行时间定时器触发
      override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) {
          println(s"数据量未达到 $maxCount ,由执行时间触发器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 触发计算")
          ctx.getPartitionedState(processTimerStateDescriptor).clear()
          ctx.getPartitionedState(countStateDescriptor).clear()
          TriggerResult.FIRE
        } else {
          TriggerResult.CONTINUE
        }
      }
    
      //事件时间定时器触发
      override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //还有未触发计算的数据
          println(s"事件时间到达最大的窗口时间,并且窗口中还有未计算的数据:${ctx.getPartitionedState(countStateDescriptor).get()},触发计算并清除窗口")
          ctx.getPartitionedState(eventTimerStateDescriptor).clear()
          TriggerResult.FIRE_AND_PURGE
        } else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //没有未触发计算的数据
          println("事件时间到达最大的窗口时间,但是窗口中没有有未计算的数据,清除窗口 但是不触发计算")
          TriggerResult.PURGE
        } else {
          TriggerResult.CONTINUE
    
        }
      }
    
      //窗口结束时清空状态
      override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
        // println(s"清除窗口状态,定时器")
        ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get())
        ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
        ctx.getPartitionedState(processTimerStateDescriptor).clear()
        ctx.getPartitionedState(eventTimerStateDescriptor).clear()
        ctx.getPartitionedState(countStateDescriptor).clear()
      }
    
      //更新状态为累加值
      class Sum extends ReduceFunction[Long] {
        override def reduce(value1: Long, value2: Long): Long = value1 + value2
      }
    
      //更新状态为取新的值
      class Update extends ReduceFunction[Long] {
        override def reduce(value1: Long, value2: Long): Long = value2
      }
    
    }
    
    

    留下的疑问:
    之前看资料的时候好像说定时器只能设置一个,你设置多个它也只会选择一个执行。
    但是我这里事件、执行时间定时器都设置,好像都生效了。这点还没看懂。
    后续研究下啥情况。

    本文为个人原创文章,转载请注明出处。!!!!

  • 相关阅读:
    cb快捷键
    N的阶乘的长度 V2(斯特林近似)
    最大子序列和(Max Sum ,Super Jumping! Jumping! Jumping! )
    关于莫比乌斯和莫比乌斯反演
    最少拦截系统
    set用法详解
    几种数学公式(环排列 母函数 唯一分解定理 卡特兰数 默慈金数 贝尔数 那罗延数)
    最小堆算法
    并查集算法
    dijkstra算法演示
  • 原文地址:https://www.cnblogs.com/lillcol/p/12303023.html
Copyright © 2011-2022 走看看