zoukankan      html  css  js  c++  java
  • FLINK基础(110): DS算子与窗口(21)窗口 (6) 自定义窗口(2)触发器(Triggers)

    触发器定义了window何时会被求值以及何时发送求值结果。触发器可以到了特定的时间触发也可以碰到特定的事件触发。例如:观察到事件数量符合一定条件或者观察到了特定的事件。

    默认的触发器将会在两种情况下触发

    • 处理时间:机器时间到达处理时间
    • 事件时间:水位线超过了窗口的结束时间

      触发器可以访问流的时间属性以及定时器,还可以对state状态编程。所以触发器和process function一样强大。例如我们可以实现一个触发逻辑:当窗口接收到一定数量的元素时,触发器触发。再比如当窗口接收到一个特定元素时,触发器触发。还有就是当窗口接收到的元素里面包含特定模式(5秒钟内接收到了两个同样类型的事件),触发器也可以触发。在一个事件时间的窗口中,一个自定义的触发器可以提前(在水位线没过窗口结束时间之前)计算和发射计算结果。这是一个常见的低延迟计算策略,尽管计算不完全,但不像默认的那样需要等待水位线没过窗口结束时间。

    每次调用触发器都会产生一个TriggerResult来决定窗口接下来发生什么。TriggerResult可以取以下结果:

    • CONTINUE:什么都不做
    • FIRE:如果window operator有ProcessWindowFunction这个参数,将会调用这个ProcessWindowFunction。如果窗口仅有增量聚合函数(ReduceFunction或者AggregateFunction)作为参数,那么当前的聚合结果将会被发送。窗口的state不变。
    • PURGE:窗口所有内容包括窗口的元数据都将被丢弃。
    • FIRE_AND_PURGE:先对窗口进行求值,再将窗口中的内容丢弃。

    TriggerResult可能的取值使得我们可以实现很复杂的窗口逻辑。一个自定义触发器可以触发多次,可以计算或者更新结果,可以在发送结果之前清空窗口。

    接下来我们看一下Trigger API:

    复制代码
    public abstract class Trigger<T, W extends Window> implements Serializable {
    
      TriggerResult onElement(
        long timestamp,
        W window,
        TriggerContext ctx);
    
      public abstract TriggerResult onProcessingTime(
        long timestamp,
        W window,
        TriggerContext ctx);
    
      public abstract TriggerResult onEventTime(
        long timestamp,
        W window,
        TriggerContext ctx);
    
      public boolean canMerge();
    
      public void onMerge(W window, OnMergeContext ctx);
    
      public abstract void clear(W window, TriggerContext ctx);
    }
    
    public interface TriggerContext {
    
      long getCurrentProcessingTime();
    
      long getCurrentWatermark();
    
      void registerProcessingTimeTimer(long time);
    
      void registerEventTimeTimer(long time);
    
      void deleteProcessingTimeTimer(long time);
    
      void deleteEventTimeTimer(long time);
    
      <S extends State> S getPartitionedState(
        StateDescriptor<S, ?> stateDescriptor);
    }
    
    public interface OnMergeContext extends TriggerContext {
    
      void mergePartitionedState(
        StateDescriptor<S, ?> stateDescriptor
      );
    }
    复制代码

    触发器接口有五种方法,允许触发器对不同的事件作出反应

    1. onElement()添加到每个窗口的元素都会调用此方法。
    2. onEventTime()当注册的事件时间计时器触发时,将调用此方法。
    3. onProcessingTime()当注册的处理时间计时器触发时,将调用此方法。
    4. onMerge()与有状态触发器相关,并在两个触发器对应的窗口合并时合并它们的状态,例如在使用会话窗口时。(目前没使用过,了解不多)
    5. clear()执行删除相应窗口时所需的任何操作。(一般是删除定义的状态、定时器等)

    这里要注意两个地方:清空state和merging合并触发器。

    当在触发器中使用per-window state时,这里我们需要保证当窗口被删除时state也要被删除,否则随着时间的推移,window operator将会积累越来越多的数据,最终可能使应用崩溃。

    当窗口被删除时,为了清空所有状态,触发器的clear()方法需要需要删掉所有的自定义per-window state,以及使用TriggerContext对象将处理时间和事件时间的定时器都删除。

    实例一

      下面的例子展示了一个触发器在窗口结束时间之前触发。当第一个事件被分配到窗口时,这个触发器注册了一个定时器,定时时间为水位线之前一秒钟。当定时事件执行,将会注册一个新的定时事件,这样,这个触发器每秒钟最多触发一次。

    scala version

    复制代码
    class OneSecondIntervalTrigger extends Trigger[SensorReading, TimeWindow] {
    
      override def onElement(
        SensorReading r,
        timestamp: Long,
        window: TimeWindow,
        ctx: Trigger.TriggerContext
      ): TriggerResult = {
        val firstSeen: ValueState[Boolean] = ctx
          .getPartitionedState(
            new ValueStateDescriptor[Boolean](
              "firstSeen", classOf[Boolean]
            )
          )
    
        if (!firstSeen.value()) {
          val t = ctx.getCurrentWatermark
           + (1000 - (ctx.getCurrentWatermark % 1000))
          ctx.registerEventTimeTimer(t)
          ctx.registerEventTimeTimer(window.getEnd)
          firstSeen.update(true)
        }
    
        TriggerResult.CONTINUE
      }
    
      override def onEventTime(
        timestamp: Long,
        window: TimeWindow,
        ctx: Trigger.TriggerContext
      ): TriggerResult = {
        if (timestamp == window.getEnd) {
          TriggerResult.FIRE_AND_PURGE
        } else {
          val t = ctx.getCurrentWatermark
           + (1000 - (ctx.getCurrentWatermark % 1000))
          if (t < window.getEnd) {
            ctx.registerEventTimeTimer(t)
          }
          TriggerResult.FIRE
        }
      }
    
      override def onProcessingTime(
        timestamp: Long,
        window: TimeWindow,
        ctx: Trigger.TriggerContext
      ): TriggerResult = {
        TriggerResult.CONTINUE
      }
    
      override def clear(
        window: TimeWindow,
        ctx: Trigger.TriggerContext
      ): Unit = {
        val firstSeen: ValueState[Boolean] = ctx
          .getPartitionedState(
            new ValueStateDescriptor[Boolean](
              "firstSeen", classOf[Boolean]
            )
          )
        firstSeen.clear()
      }
    }
    复制代码

    java version

    复制代码
    public class TriggerExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            env
                    .socketTextStream("localhost", 9999)
                    .map(new MapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(String s) throws Exception {
                            String[] arr = s.split(" ");
                            return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                        }
                    })
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                    return stringLongTuple2.f1;
                                }
                            })
                    )
                    .keyBy(r -> r.f0)
                    .timeWindow(Time.seconds(5))
                    .trigger(new OneSecondIntervalTrigger())
                    .process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
                        @Override
                        public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {
                            long count = 0L;
                            for (Tuple2<String, Long> i : iterable) count += 1;
                            collector.collect("窗口中有 " + count + " 条元素");
                        }
                    })
                    .print();
    
            env.execute();
        }
    
        public static class OneSecondIntervalTrigger extends Trigger<Tuple2<String, Long>, TimeWindow> {
            // 来一条调用一次
            @Override
            public TriggerResult onElement(Tuple2<String, Long> r, long l, TimeWindow window, TriggerContext ctx) throws Exception {
                ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                        new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
                );
    
                if (firstSeen.value() == null) {
                    // 4999 + (1000 - 4999 % 1000) = 5000
                    System.out.println("第一条数据来的时候 ctx.getCurrentWatermark() 的值是 " + ctx.getCurrentWatermark());
                    long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
                    ctx.registerEventTimeTimer(t);
                    ctx.registerEventTimeTimer(window.getEnd());
                    firstSeen.update(true);
                }
                return TriggerResult.CONTINUE;
            }
    
            // 定时器逻辑
            @Override
            public TriggerResult onEventTime(long ts, TimeWindow window, TriggerContext ctx) throws Exception {
                if (ts == window.getEnd()) {
                    return TriggerResult.FIRE_AND_PURGE;
                } else {
                    System.out.println("当前水位线是:" + ctx.getCurrentWatermark());
                    long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);
                    if (t < window.getEnd()) {
                        ctx.registerEventTimeTimer(t);
                    }
                    return TriggerResult.FIRE;
                }
            }
    
            @Override
            public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {
                ValueState<Boolean> firstSeen = ctx.getPartitionedState(
                        new ValueStateDescriptor<Boolean>("first-seen", Types.BOOLEAN)
                );
                firstSeen.clear();
            }
        }
    }
    复制代码

    实例二

    https://www.cnblogs.com/lillcol/p/12303023.html

    • 需求
    1. 当窗口中的数据量达到一定数量的时候触发计算
    2. 根据执行时间每隔一定时间且窗口中有数据触发计算,如果没有数据不触发计算
    3. 窗口关闭的时候清除数据

    //调用
    dStream
          .keyBy(_.event_id)
          .window(TumblingEventTimeWindows.of(Time.hours(1)))
          .trigger(new CustomTrigger(10, 1 * 60 * 1000L))
    
    //-------------------------------------------------------------------------
    package com.meda.demo
    
    import java.text.SimpleDateFormat
    
    import com.meda.utils.DatePattern
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.common.state.ReducingStateDescriptor
    import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    
    class CustomTrigger extends Trigger[eventInputDT, TimeWindow] {
      //触发计算的最大数量
      private var maxCount: Long = _
      //定时触发间隔时长 (ms)
      private var interval: Long = 60 * 1000
      //记录当前数量的状态
      private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
      //记录执行时间定时触发时间的状态
      private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
      //记录时间时间定时器的状态
      private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])
    
      def this(maxCount: Int) {
        this()
        this.maxCount = maxCount
      }
    
      def this(maxCount: Int, interval: Long) {
        this(maxCount)
        this.interval = interval
      }
    
      override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        val countState = ctx.getPartitionedState(countStateDescriptor)
        //计数状态加1
        countState.add(1L)
    
        //如果没有设置事件时间定时器,需要设置一个窗口最大时间触发器,这个目的是为了在窗口清除的时候 利用时间时间触发计算,否则可能会缺少部分数据
        if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) {
          ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp())
          ctx.registerEventTimeTimer(window.maxTimestamp())
        }
    
        if (countState.get() >= this.maxCount) {
          //达到指定指定数量
          //删除事件时间定时触发的状态
          ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
          //清空计数状态
          countState.clear()
          //触发计算
          TriggerResult.FIRE
        } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {
          //未达到指定数量,且没有指定定时器,需要指定定时器
          //当前定时器状态值加上间隔值
          ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval)
          //注册定执行时间定时器
          ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
          TriggerResult.CONTINUE
        } else {
          TriggerResult.CONTINUE
        }
      }
    
      // 执行时间定时器触发
      override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) {
          println(s"数据量未达到 $maxCount ,由执行时间触发器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 触发计算")
          ctx.getPartitionedState(processTimerStateDescriptor).clear()
          ctx.getPartitionedState(countStateDescriptor).clear()
          TriggerResult.FIRE
        } else {
          TriggerResult.CONTINUE
        }
      }
    
      //事件时间定时器触发
      override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //还有未触发计算的数据
          println(s"事件时间到达最大的窗口时间,并且窗口中还有未计算的数据:${ctx.getPartitionedState(countStateDescriptor).get()},触发计算并清除窗口")
          ctx.getPartitionedState(eventTimerStateDescriptor).clear()
          TriggerResult.FIRE_AND_PURGE
        } else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //没有未触发计算的数据
          println("事件时间到达最大的窗口时间,但是窗口中没有有未计算的数据,清除窗口 但是不触发计算")
          TriggerResult.PURGE
        } else {
          TriggerResult.CONTINUE
    
        }
      }
    
      //窗口结束时清空状态
      override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
        // println(s"清除窗口状态,定时器")
        ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get())
        ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
        ctx.getPartitionedState(processTimerStateDescriptor).clear()
        ctx.getPartitionedState(eventTimerStateDescriptor).clear()
        ctx.getPartitionedState(countStateDescriptor).clear()
      }
    
      //更新状态为累加值
      class Sum extends ReduceFunction[Long] {
        override def reduce(value1: Long, value2: Long): Long = value1 + value2
      }
    
      //更新状态为取新的值
      class Update extends ReduceFunction[Long] {
        override def reduce(value1: Long, value2: Long): Long = value2
      }
    
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782405.html

  • 相关阅读:
    o gdb
    net -t struct ifreq
    file -x stat()-fstat()-lstat()
    I/O -x fcntl()
    I/O -x dup() dup2()
    14条---注意点
    user -x userid
    err -x perror() strerror()
    苹果电脑挂载NTFS移动硬盘
    ASP.NET Core之EF Core学习笔记
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782405.html
Copyright © 2011-2022 走看看