zoukankan      html  css  js  c++  java
  • Flink 动态窗口统计面试题-实现

    之前分享了一个 Flink 的面试题,这里简单回顾下内容:

    有两个输入源,一个是命令流,一个是数据流
    需要将命令流进行广播,然后和数据流进行connect,根据命令流指定的命令进行统计
    实现一个输出到终端的 sink,将统计结果打印出来,每一条记录包括 taskId, targetAttr, periodStartTime(周期开始时间), value (统计后的值,double类型)

    面试题原文链接: https://mp.weixin.qq.com/s/iKx0EE-xvnOyncCIhN6MeA

    实现流程

    1、命令流使用从 kafka 输入,方便手动发送命令,map 解析成对象,广播
    2、数据流实现 SourceFunction 自动生成数据,map 解析成对象
    3、使用数据流关联 命令流,输出数据与命令组合的 tuple
    4、生成 timestamp 和 周期性的 watermark(flink 自带)
    5、数据通过 DynamicTumblingEventTimeWindows.assignWindows 指定动态窗口
    6、使用窗口函数根据命令的 methed 计算对应的结果

    具体实现请移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

    动态窗口实现

    这个问题的难点就在: 根据命令流的规则进行窗口统计(而命令的规则中,指定了统计的目标,也指定了 窗口的长度和开始时间)

    Flink 原生的翻滚、滑动、session 窗口都是固定时间长度的窗口(session 窗口特殊的是,指定的长度不是窗口的长度而是 session timeout 的时间)

    看下 翻滚窗口(TumblingEventTimeWindows)的源码

    public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
        // 指定窗口的长度
        private final long size;
        // 指定窗口的开始时间的偏移长度,如: Flink 的窗口都是整点的,按天的窗口都是从 0 点开始(UTC0),指定 offset = 8 小时,就称为北京时间的 0 点了
        private final long offset;
    
        protected TumblingEventTimeWindows(long size, long offset) {
            if (Math.abs(offset) >= size) {
                throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
            }
    
            this.size = size;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                // Long.MIN_VALUE is currently assigned when no timestamp is present
                // 每条数据进来会根据 当前的 timestam 使用 offset 和 size 计算窗口对于的开始时间,结束时间就是 开始时间 + size 
                long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
                return Collections.singletonList(new TimeWindow(start, start + size));
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
    }
    
    TimeWindow.java
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

    Flink 在使用 TumblingEventTimeWindows 功能的时候,每条数据都会进入 TumblingEventTimeWindows.assignWindows 方法,计算数据属于的窗口(知道窗口的长度,基于 0 的偏移值,任何一个 正常的 timestam 都可以通过上面的 getWindowStartWithOffset 函数计算出该 timestamp 对应窗口的 开始时间和结束时间)。

    动态窗口的实现也是基于 TumblingEventTimeWindows 修改的,主要是"根据每条输入数据的命令,修改 窗口的 size 和 offset" 使窗口称为动态窗口

    核心代码如下:

    /**
     * flink dynamic tumbling event window
     */
    @PublicEvolving
    public class DynamicTumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        // not final, dynamic modify
        private long size;
        private long offset;
    
        protected DynamicTumblingEventTimeWindows() {
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                Tuple2<DataEntity, Command> element1 = (Tuple2<DataEntity, Command>) element;
                Command command = element1._2;
                // cal new window size
                // 大于当前时间的情况又怎么处理呢: 窗口开始时间大于 timestamp,下一窗口命令还未开始,数据属于上一窗口命令,所以不修改 size 与 offset
                if (command.startTime() < timestamp) {
                    long millis = command.startTime() % 999;
                    if ("minute".equalsIgnoreCase(command.periodUnit())) {
                        this.size = command.periodLength() * 60 * 1000;
                        // offset 等于 命令开始时间的 秒值 + 毫秒值
                        long second = command.startTime() / 1000 % 60;
                        offset = second * 1000 + millis;
                    } else {
                        this.size = command.periodLength() * 1000;
                        // offset 等于 命令开始时间的 毫秒值
                        offset = millis;
                    }
                }
                // todo 窗口开始时间大于或者小于 当前 timestamp 的时候,需要处理
                // 小于当前时间,可以计算出当前timestamp 对应的窗口
                long start = getWindowStartWithOffset(timestamp, offset, size);
                return Collections.singletonList(new TimeWindow(start, start + size));
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
        /**
         * cal window start time
         */
        public long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            return timestamp - (timestamp + offset - windowSize) % windowSize;
        }
    
        public static DynamicTumblingEventTimeWindows of() {
            return new DynamicTumblingEventTimeWindows();
        }
    
    }

    DynamicTumblingEventTimeWindows.of 生成窗口的时候,不再指定固定的 size 和 offset(动态窗口的规则中,有指定 对于的 属于进行统计,所以就不指定默认窗口 size 和 offset)

    根据输入数据的命令部分,所以命令大于当前 timestamp 的数据(小于 timestamp 的命令说明该命令尚未开始,数据还是属于上一窗口,前面 数据流与命令流 关联的时候,已经做了处理,这里只是多加一层判断),根据命令中的 startTime 计算命令对应窗口基于 0 毫秒的偏移值(如果是分钟的窗口还有加上 秒 的偏移值),窗口的长度是 periodLength 属性对应的值,这里就得到了命令对应的窗口的 size 和 offset,后面的流程就和 Flink 原生窗口(TumblingEventTimeWindows)一样了,计算下 窗口的开始时间,结束时间

    命令开始时间处理

    对于命令的开始时间,其实也是一个处理的难点

    命令的开始时间可能是小于、等于、大于当前时间的,其中小于和等于的命令,意味着窗口马上就要开始,使用对应的属性计算窗口的 size 和 offset 即可

    对于命令的开始大于当前时间的命令,需要做下特殊处理,大于当前时间,意味着命令还不能生效,不能替换当前命令,当前的数据,是属于上一个正在执行的命令

    在实现的时候,我使用了两个 map 的对应,一个存储当前正在执行命令,一个存储最新的命令(为了简单,假设基于每个属性的命令一次只会有一个在执行)

    核心代码如下:

    new BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]() {
    
        // 存放当前命令的 map(非 keyBy 的不能使用 keyState,用 hashmap 将就了)
        var currentCommand: util.HashMap[String, Command] = _
        // 存放新命令的 map
        var commandState: MapStateDescriptor[String, Command] = _
    
        override def open(parameters: Configuration): Unit = {
    
          currentCommand = new util.HashMap[String, Command]()
          commandState = new MapStateDescriptor[String, Command]("commandState", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint[Command]() {}))
        }
    
        override def processElement(element: DataEntity, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#ReadOnlyContext, out: Collector[(DataEntity, Command)]): Unit = {
          // 命令可以是大于/小于当前时间
          // 小于当前时间的,直接添加即可,之前命令的窗口不会收到新数据,新数据直接进新命令的窗口
          // 大于当前时间的命令,不能直接与流一起往下游输出,等时间小于当前的 processTime 时间后,才会开始新窗口
          val command = ctx.getBroadcastState(commandState).get(element.attr)
          val current = currentCommand.get(element.attr)
          if (command != null && command.startTime <= ctx.currentProcessingTime()) {
            // 当新命令的时间小于当前的处理时间,替换旧命令
            currentCommand.put(element.attr, command)
          }
          // 如果当前命令为空,数据就不往下发送了
          if (current != null) {
            out.collect((element, current))
          }
          // command not exists, ignore it
        }
    
        override def processBroadcastElement(element: Command, ctx: BroadcastProcessFunction[DataEntity, Command, (DataEntity, Command)]#Context, out: Collector[(DataEntity, Command)]): Unit = {
          // only one command are new accepted, cover old command
          logger.info("receive command : " + element)
          ctx.getBroadcastState(commandState).put(element.targetAttr, element)
        }
      }

    代码都有注释,不再赘述

    全部代码,请移步 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/questing/dynamicWindow

    测试

    从 kafka 数据 readme.md 中对应的命令,查看 输出结果

    2020-09-09 20:12:08,812 INFO  - receive command : Command(task1,attr2,sum,SECOND,20,1598596980000)
    2020-09-09 20:12:08,812 INFO  - receive command : Command(task2,attr1,sum,MINUTE,1,1598596980000)
    2020-09-09 20:12:08,812 INFO  - receive command : Command(task3,attr2,max,SECOND,30,1598596980000)
    2020-09-09 20:12:09,816 INFO  - receive command : Command(task4,attr3,min,MINUTE,1,1599640669628)
    sum> {"method":"min","periodStartTime":"20:11:10","targetAttr":"attr3","periodEndTime":"20:12:10","value":"18.0","taskId":"task4"}
    sum> {"method":"max","periodStartTime":"20:11:59","targetAttr":"attr2","periodEndTime":"20:12:29","value":"981.0","taskId":"task3"}
    sum> {"method":"max","periodStartTime":"20:12:29","targetAttr":"attr2","periodEndTime":"20:12:59","value":"937.0","taskId":"task3"}
    sum> {"method":"sum","periodStartTime":"20:11:59","targetAttr":"attr1","periodEndTime":"20:12:59","value":"26876.0","taskId":"task2"}
    sum> {"method":"min","periodStartTime":"20:12:10","targetAttr":"attr3","periodEndTime":"20:13:10","value":"32.0","taskId":"task4"}
    sum> {"method":"max","periodStartTime":"20:12:59","targetAttr":"attr2","periodEndTime":"20:13:29","value":"998.0","taskId":"task3"}
    2020-09-09 20:13:43,712 INFO  - receive command : Command(task5,attr2,sum,SECOND,20,1598596980000)
    2020-09-09 20:13:43,712 INFO  - receive command : Command(task6,attr1,sum,MINUTE,1,1598596980000)
    2020-09-09 20:13:43,712 INFO  - receive command : Command(task7,attr2,max,SECOND,30,1598596980000)
    2020-09-09 20:13:43,712 INFO  - receive command : Command(task8,attr3,min,MINUTE,1,1599640669628)
    sum> {"method":"max","periodStartTime":"20:13:29","targetAttr":"attr2","periodEndTime":"20:13:59","value":"995.0","taskId":"task3"}
    sum> {"method":"sum","periodStartTime":"20:12:59","targetAttr":"attr1","periodEndTime":"20:13:59","value":"31627.0","taskId":"task2"}
    sum> {"method":"min","periodStartTime":"20:13:10","targetAttr":"attr3","periodEndTime":"20:14:10","value":"90.0","taskId":"task4"}
    sum> {"method":"max","periodStartTime":"20:13:59","targetAttr":"attr2","periodEndTime":"20:14:29","value":"945.0","taskId":"task7"}
    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文
  • 相关阅读:
    Linux 小知识点
    Nginx 源码安装
    MySQL user表详解
    Python 资源
    Python 迭代dict的value
    著作权和专利权的区别
    软件设计师05-信息安全基础知识
    记录一次服务器突然宕机的排查
    支付宝微信拉取账单到本地
    软件设计师04-计算机网络
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/13641481.html
Copyright © 2011-2022 走看看