zoukankan      html  css  js  c++  java
  • Flink 自定义触发器

    import org.apache.flink.api.common.state.ReducingState;
    import org.apache.flink.api.common.state.ReducingStateDescriptor;
    import org.apache.flink.api.common.typeutils.base.LongSerializer;
    import org.apache.flink.api.common.typeutils.base.IntSerializer;
    import org.apache.flink.streaming.api.windowing.triggers.Trigger;
    import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
    import org.apache.flink.streaming.api.windowing.windows.Window;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @Auther WeiJiQian
     * @描述 CountAndTimeTrigger : 满足一定条数和时间触发
     *  * 条数的触发使用计数器计数
     *  * 时间的触发,使用 flink 的 timerServer,注册触发器触发
     */
    public class CountAndTimeTrigger<W extends Window> extends Trigger<Object, W> {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        // 触发的条数
        private final long size;
        // 触发的时长
        private final long interval;
        private static final long serialVersionUID = 1L;
        // 条数计数器
        private final ReducingStateDescriptor<Integer> countStateDesc =
                new ReducingStateDescriptor<>("count", new ReduceSum(), IntSerializer.INSTANCE);
        // 时间计数器,保存下一次触发的时间
        private final ReducingStateDescriptor<Long> timeStateDesc =
                new ReducingStateDescriptor<>("fire-interval", new ReduceMin(), LongSerializer.INSTANCE);
    
        public CountAndTimeTrigger(long size, long interval) {
            this.size = size;
            this.interval = interval;
        }
    
        // 每条元素到来时.
        @Override
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
            // 注册窗口结束的触发器, 不需要会自动触发
    //        ctx.registerProcessingTimeTimer(window.maxTimestamp());
            // count
            ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
            //interval
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
            // 每条数据 counter + 1
            count.add(1);
            if (count.get() >= size) {
                System.out.println("窗口结束: 计数器触发 count : {}"+ count.get());
                // 满足条数的触发条件,先清 0 条数计数器
                count.clear();
                // 满足条数时也需要清除时间的触发器,如果不是创建结束的触发器
                if (fireTimestamp.get() != window.maxTimestamp()) {
    //                logger.info("delete trigger : {}, {}", sdf.format(fireTimestamp.get()), fireTimestamp.get());
                    ctx.deleteProcessingTimeTimer(fireTimestamp.get());
                }
                fireTimestamp.clear();
                // fire 触发计算
                return TriggerResult.FIRE;
            }
    
            // 触发之后,下一条数据进来才设置时间计数器注册下一次触发的时间
    
            timestamp = ctx.getCurrentProcessingTime();
    //        timestamp = System.currentTimeMillis();
            if (fireTimestamp.get() == null) {
    //            long start = timestamp - (timestamp % interval);
                long nextFireTimestamp = timestamp + interval;
    //            logger.info("register trigger : {}, {}", sdf.format(nextFireTimestamp), nextFireTimestamp);
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
                fireTimestamp.add(nextFireTimestamp);
            }
            return TriggerResult.CONTINUE;
        }
    
        // 处理时间到的时候,开始处理
        @Override
        public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
    
            // count
            ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
            //interval
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
    
            // time trigger and window end
            if (fireTimestamp.get() != null && time == window.maxTimestamp()) {
                System.out.println("窗口结束: 正常结束 {}" + time);
                // 窗口结束,清0条数和时间的计数器
                count.clear();
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
                fireTimestamp.clear();
                return TriggerResult.FIRE_AND_PURGE;
            } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
                System.out.println("窗口结束:时间计数器触发, time : {}" + time);
                // 时间计数器触发,清0条数和时间计数器
                count.clear();
                fireTimestamp.clear();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
            // count
            ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
            //interval
            ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
    
            // time trigger and window end
            if (time == window.maxTimestamp()) {
                System.out.println("窗口结束 : {}"+ time);
                // 窗口结束,清0条数和时间的计数器
                count.clear();
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
                fireTimestamp.clear();
                return TriggerResult.FIRE_AND_PURGE;
            } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
                System.out.println("时间计数器触发, time : {}"+ time);
                // 时间计数器触发,清0条数和时间计数器
                count.clear();
                fireTimestamp.clear();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
            ctx.getPartitionedState(countStateDesc).clear();
            ctx.getPartitionedState(timeStateDesc).clear();
        }
    
    
        // 多个slot 中的 数据合并.
        @Override
        public void onMerge(W window, OnMergeContext ctx) throws Exception {
            super.onMerge(window, ctx);
            ctx.mergePartitionedState(timeStateDesc);
            ctx.mergePartitionedState(countStateDesc);
        }
    }
    
    
  • 相关阅读:
    python中常用的数据类型之整型(int),浮点型(float), 布尔值(bool), 复数(complex)
    requests库的基础使用
    socket,urllib,urllib3,request多种方法请求网页首页
    nginx日志切割
    gitlab社区版安装
    批量修改文件编码
    lvm磁盘扩展及添加磁盘lvm分区
    函数
    集合
    数据类型练习题
  • 原文地址:https://www.cnblogs.com/weijiqian/p/14034426.html
Copyright © 2011-2022 走看看