zoukankan      html  css  js  c++  java
  • 「Flink」事件时间与水印

    我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。

    获取窗口开始时间Flink源代码

    获取窗口的开始时间为以下代码:

    org.apache.flink.streaming.api.windowing.windows.TimeWindow

    /**
    * Method to get the window start for a timestamp.
    *
    * @param timestamp epoch millisecond to get the window start.
    * @param offset The offset which window start would be shifted by.
    * @param windowSize The size of the generated windows.
    * @return window start
    */
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            return timestamp - (timestamp - offset + windowSize) % windowSize;
        }

    这一段代码,我们可以认为Flink并不是把时间戳直接作为窗口的开始时间,而是做了一些“对齐”操作,确保时间能够整除8。

    不同时间类型的窗口时间计算

    1、当TimeCharacteristic为ProcessingTime时

    窗口的开始时间:与窗口接收到的第一条消息的处理时间有关。例如:window operator是2020-02-06 22:02:33接收到的第一条消息,那么窗口的开始时间就是2020-02-06 22:02:33。

    窗口的结束时间:一旦窗口的开始时间确定了,因为窗口的长度是固定的。那么窗口的结束时间就确定下来了,例如:假设这里的时间窗口是3秒,那么窗口的结束时间就是2020-02-06 22:02:36。

    窗口的触发计算时间:假设有一条新的消息到达window operator,此时如果对应operator的系统时间,大于结束时间,就会触发计算。

    一旦窗口的开始时间确定了,那么后续窗口的开始时间,也就都确定下来了。

    问题:

    假设某个时间窗口,2020-2-6 22:12:20 - 2020-2-6 22:12:23,之间没有任何一条数据进来。Flink会如何处理?

    Flink会直接抛弃掉这个时间窗口,新来的事件消息会到其他的时间窗口中计算。


    2、当TimeCharacteristic为IngestionTime时

    窗口的开始时间:与source operator接收到的第一条消息有关。例如:source接收到这条消息的时间是2020-2-6 22:14:50,那么窗口的开始时间就是2020-2-6 22:14:50

    窗口的结束时间:与ProcessTime一致

    窗口的触发计算时间:假设有一条新的消息到达source operator,那么此时的时间如果大于结束时间,就会触发计算。


    除了窗口的开始时间、触发时间都是与source operator算子有关,其他与Processing Time是类似的。


    3、但TimeCharacteristic为EventTime时

    窗口的开始时间:与window operator接收到的第一条消息的事件时间有关,例如:如果这条消息的水印时间是2020-2-6 22:17:50,那么窗口的的开始时间就是2020-2-6 22:17:50

    窗口的结束时间:与ProcessTime一致

    窗口的触发计算时间:假设有一条新的消息到达window operator,如果该事件的水印时间大于窗口的结束时间,就会触发计算。

    通常,我们会让水印时间比事件时间允许延迟几秒钟。这样,如果是因为网络延迟消息晚到了几秒,也不会影响到统计结果了。

    public class WordCountWindow {
        public static void main(String[] args) throws Exception {
            // 1. 初始化流式运行环境
            Configuration conf = new Configuration();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    
            // 2. 设置时间处理类型,这里设置的方式处理时间
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            // 3. 定义数据源,每秒发送一个hadoop单词
            SingleOutputStreamOperator<Tuple2<String, Long>> wordDSWithWaterMark = env.addSource(new RichSourceFunction<Tuple2<String, Long>>() {
    
                private boolean isCanaled = false;
                private int TOTAL_NUM = 20;
    
                @Override
                public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                    while (!isCanaled) {
                        ctx.collect(Tuple2.of("hadooop", System.currentTimeMillis()));
    
                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("事件发送时间:" + sdf.format(System.currentTimeMillis()));
                        Thread.sleep(1000);
                    }
                }
    
                @Override
                public void cancel() {
                    isCanaled = true;
                }
            }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(5)) {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element) {
                    return element.f1;
                }
            });
    
            // 4. 每5秒进行一次,分组统计
            // 4.1 转换为元组
            wordDSWithWaterMark.map(word -> {
                return Tuple2.of(word.f0, 1);
    
            })
                    // 指定返回类型
                    .returns(Types.TUPLE(Types.STRING, Types.INT))
                    // 按照单词进行分组
                    .keyBy(t -> t.f0)
                    // 滚动窗口,3秒计算一次
                    .timeWindow(Time.seconds(3))
                    .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                        @Override
                        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                        }
                    }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                        @Override
                        public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
    
                            // 打印窗口开始、结束时间
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                    + " 窗口结束时间:" + sdf.format(window.getEnd())
                                    + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));
    
                            int sum = 0;
                            Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                            while(iterator.hasNext()) {
                                Integer count = iterator.next().f1;
                                sum += count;
                            }
                            out.collect(Tuple2.of(word, sum));
                        }
                    }).print();
    
            env.execute("app");
        }
    }

    输出结果如下:

    事件发送时间:2020-02-06 22:35:08
    事件发送时间:2020-02-06 22:35:09
    事件发送时间:2020-02-06 22:35:10
    事件发送时间:2020-02-06 22:35:11
    事件发送时间:2020-02-06 22:35:12
    事件发送时间:2020-02-06 22:35:13
    事件发送时间:2020-02-06 22:35:14
    窗口开始时间:2020-02-06 22:35:06 窗口结束时间:2020-02-06 22:35:09 窗口计算时间:2020-02-06 22:35:14
    4> (hadooop,1)

    事件发送时间:2020-02-06 22:35:15
    事件发送时间:2020-02-06 22:35:16
    事件发送时间:2020-02-06 22:35:17
    窗口开始时间:2020-02-06 22:35:09 窗口结束时间:2020-02-06 22:35:12 窗口计算时间:2020-02-06 22:35:17
    4> (hadooop,3)


    参考文件:

    https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/event_time.html

  • 相关阅读:
    [arm]虚拟机,2440开发板,主机三者互通
    Linux下的lds链接脚本简介(四)
    Linux下的lds链接脚本简介(三)
    Linux下的lds链接脚本简介(二)
    Linux下的lds链接脚本简介(一)
    程序员面试资源大收集
    Source Insight 3.50.0065使用详解
    DNW烧写FL2440 NAND Flash分区
    php isset()与empty()的使用
    JSON.parse()和JSON.stringify()的区别
  • 原文地址:https://www.cnblogs.com/ilovezihan/p/12271433.html
Copyright © 2011-2022 走看看