zoukankan      html  css  js  c++  java
  • FLINK基础(111): DS算子与窗口(22)窗口 (8) 自定义窗口(3)清理器(EVICTORS)

    evictor可以在window function求值之前或者之后移除窗口中的元素。

    我们看一下Evictor的接口定义:

    public interface Evictor<T, W extends Window> extends Serializable {
      void evictBefore(
        Iterable<TimestampedValue<T>> elements,
        int size,
        W window,
        EvictorContext evictorContext);
    
      void evictAfter(
        Iterable<TimestampedValue<T>> elements,
        int size,
        W window,
        EvictorContext evictorContext);
    
      interface EvictorContext {
    
        long getCurrentProcessingTime();
    
        long getCurrentWatermark();
      }
    }

      evictBefore()和evictAfter()分别在window function计算之前或者之后调用。

      Iterable迭代器包含了窗口所有的元素,size为窗口中元素的数量,window object和EvictorContext可以访问当前处理时间和水位线。

      可以对Iterator调用remove()方法来移除窗口中的元素。

      evictor也经常被用在GlobalWindow上,用来清除部分元素,而不是将窗口中的元素全部清空。

    -----------------------

    驱逐器能够在触发器触发之后,窗口函数使用之前或之后从窗口中清除元素。
      evictBefore()在窗口函数之前使用。而 evictAfter() 在窗口函数之后使用。在使用窗口函数之前被逐出的元素将不被处理。

    Flink带有三种内置驱逐器:
      CountEvictor:在窗口维护用户指定数量的元素,如果多于用户指定的数量,从窗口缓冲区的开头丢弃多余的元素。
      DeltaEvictor:使用 DeltaFunction 和一个阈值,来计算窗口缓冲区中的最后一个元素与其余每个元素之间的差值,并删除差值大于或等于阈值的元素。
      TimeEvictor:以毫秒为单位的时间间隔(interval)作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。

    默认情况下,所有内置的驱逐器在窗口函数之前使用。指定驱逐器可以避免预聚合(pre-aggregation),因为窗口内所有元素必须在窗口计算之前传递给驱逐器。

    Flink 不保证窗口内元素的顺序。这意味着虽然驱逐器可以从窗口开头移除元素,但这些元素不一定是先到的还是后到的。

    实例一

    class MyEvictor() extends Evictor[MyTime, TimeWindow] {
      override def evictBefore(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        val ite: util.Iterator[TimestampedValue[MyTime]] = iterable.iterator()
        while (ite.hasNext) {
          val elment: TimestampedValue[MyTime] = ite.next()
          //指定事件事件获取到的就是事件时间
          println("驱逐器获取到的时间:" + elment.getTimestamp)
          //模拟去掉非法参数数据
          if (elment.getValue.timestamp <= 0) {
            ite.remove()
          }
        }
      }
    
      override def evictAfter(iterable: lang.Iterable[TimestampedValue[MyTime]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
    
      }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782408.html

  • 相关阅读:
    Vue less使用scope时渗入修改子组件样式
    Spring容器初始话原理图
    Java的动态代理
    Spring_xml和注解混合方式开发
    Spring_xml方式开发
    Spring入门初体验
    数论
    虚拟IP和IP漂移
    字符串hash + 二分答案
    字符串hash
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782408.html
Copyright © 2011-2022 走看看