zoukankan      html  css  js  c++  java
  • Flink Streaming基于滚动窗口的事件时间分析

      使用flink-1.9.0进行的测试,在不同的并行度下,Flink对事件时间的处理逻辑不同。包括1.1在并行度为1的本地模式分析和1.2在多并行度的本地模式分析两部分。通过理论结合源码进行验证,得到具有说服力的结论。

    一、使用并行度为1的本地模式测试

    1.1、Flink时间时间窗口代码,使用SocketSource:

     1 package com.mengyao.flink.stream.window;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.ArrayList;
     5 import java.util.List;
     6 
     7 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     8 import org.apache.flink.api.common.typeinfo.Types;
     9 import org.apache.flink.api.java.tuple.Tuple;
    10 import org.apache.flink.api.java.tuple.Tuple4;
    11 import org.apache.flink.configuration.ConfigConstants;
    12 import org.apache.flink.configuration.Configuration;
    13 import org.apache.flink.configuration.RestOptions;
    14 import org.apache.flink.streaming.api.TimeCharacteristic;
    15 import org.apache.flink.streaming.api.datastream.DataStream;
    16 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    17 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    18 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    19 import org.apache.flink.streaming.api.watermark.Watermark;
    20 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    21 import org.apache.flink.streaming.api.windowing.time.Time;
    22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    23 import org.apache.flink.util.Collector;
    24 
    25 import com.mengyao.flink.stream.utils.DateUtil;
    26 
    27 /**
    28  * 启动netcat:nc -L -p 9999 -v
    29  * 
    30  * Created by: mengyao
    31  * 2019年10月15日
    32  */
    33 public class SocketEventTimeWindowApp {
    34 
    35     private static String jobName = SocketEventTimeWindowApp.class.getSimpleName();
    36     
    37     
    38     public static void main(String[] args) throws Exception {
    39         Configuration conf = new Configuration();
    40         conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    41         conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
    42         final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
    43         // 设置重启策略,5次尝试,每次尝试的间隔为30秒
    44         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000));
    45         // 使用事件时间
    46         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    47         // 数据处理
    48         DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999)
    49             .map(line -> {
    50                 String[] fields = line.split(",",3);
    51                 return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime());
    52             })
    53             .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
    54             .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配时间戳并定期生成水印
    55                 private static final long serialVersionUID = -4195773369390603522L;
    56                 private SimpleDateFormat formatter = DateUtil.FMT15.get();
    57                 long currentMaxTimestamp = 0L;
    58                 long maxOutOfOrderness = 0L;//允许最大的乱序时间是0秒
    59                 @Override
    60                 public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) {
    61                     long timestamp = ele.f3;
    62                     currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    63                     System.out.println("事件: "+ele+", 最大时间戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印时间戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter));
    64                     return timestamp;
    65                 }
    66                 @Override
    67                 public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
    68             });
    69         
    70         inputDS
    71             .keyBy(0)
    72             .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滚动窗口
    73             .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() {
    74                 private static final long serialVersionUID = -4990083905742822422L;
    75                 private SimpleDateFormat formatter = DateUtil.FMT15.get();
    76                 @Override
    77                 public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input,
    78                         Collector<String> out) throws Exception {
    79                     // 按照事件时间升序排序
    80                     List<Tuple4<String, String, String, Long>> list = new ArrayList<>();
    81                     input.forEach(t4->list.add(t4));
    82                     list.sort((e1,e2)->e1.f3.compareTo(e2.f3));
    83                     System.out.println("==== "+key+", 窗口开始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口结束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口内的数据:"+list);
    84                 }
    85             })
    86             .print();
    87         
    88         env.execute(jobName);
    89     }
    90     
    91 }

    1.2、使用netcat启动SocketServer,发送数据到FlinkStreaming中(数据是有序的情况下)

    C:Usersmengyao>nc -l -p 9999 -v
    listening on [any] 9999 ...
    connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 63187
    1,1,20190101090000000
    1,1,20190101090001000
    1,1,20190101090001999
    1,1,20190101090002000
    1,1,20190101090003000
    1,1,20190101090004000
    1,1,20190101090005000
    1,1,20190101090005500
    1,1,20190101090007000

    1.3、程序控制台输出:

      log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
      log4j:WARN Please initialize the log4j system properly.
      log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
      事件: (1,1,20190101090000000,1546304400000), 最大时间戳:2019-01-01 09:00:00.000, 水印时间戳:2019-01-01 09:00:00.000
      事件: (1,1,20190101090001000,1546304401000), 最大时间戳:2019-01-01 09:00:01.000, 水印时间戳:2019-01-01 09:00:01.000
      事件: (1,1,20190101090001999,1546304401999), 最大时间戳:2019-01-01 09:00:01.999, 水印时间戳:2019-01-01 09:00:01.999
      ==== (1), 窗口开始:2019-01-01 09:00:00.000,窗口结束:2019-01-01 09:00:02.000; 窗口内的数据:[(1,1,20190101090000000,1546304400000), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999)]
      事件: (1,1,20190101090002000,1546304402000), 最大时间戳:2019-01-01 09:00:02.000, 水印时间戳:2019-01-01 09:00:02.000
      事件: (1,1,20190101090003000,1546304403000), 最大时间戳:2019-01-01 09:00:03.000, 水印时间戳:2019-01-01 09:00:03.000
      事件: (1,1,20190101090004000,1546304404000), 最大时间戳:2019-01-01 09:00:04.000, 水印时间戳:2019-01-01 09:00:04.000
      ==== (1), 窗口开始:2019-01-01 09:00:02.000,窗口结束:2019-01-01 09:00:04.000; 窗口内的数据:[(1,1,20190101090002000,1546304402000), (1,1,20190101090003000,1546304403000)]
      事件: (1,1,20190101090005000,1546304405000), 最大时间戳:2019-01-01 09:00:05.000, 水印时间戳:2019-01-01 09:00:05.000
      事件: (1,1,20190101090005500,1546304405500), 最大时间戳:2019-01-01 09:00:05.500, 水印时间戳:2019-01-01 09:00:05.500
      事件: (1,1,20190101090007000,1546304407000), 最大时间戳:2019-01-01 09:00:07.000, 水印时间戳:2019-01-01 09:00:07.000
      ==== (1), 窗口开始:2019-01-01 09:00:04.000,窗口结束:2019-01-01 09:00:06.000; 窗口内的数据:[(1,1,20190101090004000,1546304404000), (1,1,20190101090005000,1546304405000), (1,1,20190101090005500,1546304405500)]

    1.4、滚动窗口分析: 

     

    控制台打印如下:
    第1次触发窗口 (水印时间戳:2019-01-01 09:00:01.999)
    ==== (1), 窗口开始:2019-01-01 09:00:00.000,窗口结束:2019-01-01 09:00:02.000; 窗口内的数据:[(1,1,20190101090000000,1546304400000), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999)]
    解释:
    窗口的开始时间:2019-01-01 09:00:00.000
    窗口开始时间由事件数据决定,即接收到第一条事件数据是:(1,1,20190101090000000,1546304400000),所以窗口开始时间为:20190101090000000。
    结论是:
      窗口的开始时间 = 第一条事件数据的时间。
    窗口的结束时间:2019-01-01 09:00:02.000
    窗口开始时间是2019-01-01 09:00:00.000,窗口长度是2秒,那窗口结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:02.000。
    结论是:

      窗口的结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:02.000。
    窗口的长度:2秒
    在1.1代码的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滚动窗口,且窗口长度为2秒。
    结论是:
      窗口长度 =
    window(TumblingEventTimeWindows.of(Time.seconds(2)))代码中设置的2秒。
    进入窗口的数据:[
              (1,1,20190101090000000,1546304400000),
              (1,1,20190101090001000,1546304401000),
              (1,1,20190101090001999,1546304401999)
            ]
    因为窗口属于左闭右开(包前不包后),所以这个窗口的时间范围是从2019-01-01 09:00:00.000 到 2019-01-01 09:00:02.000 - 1。只要数据的事件时间属于该区间就会落在这个窗口中。
    窗口结束条件的源码如下:

    结论是:
      当数据的事件时间 >= 窗口开始时间 && <=窗口结束时间-1时,都会落在窗口内。
       窗口的水印时间:因延迟时间为0,所以水印时间 = 事件时间。
    在1.1代码的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
    currentMaxTimestamp是事件数据Tuple3(id,count,time)的时间f3字段,也就是事件时间。
    maxOutOfOrderness是允许最大乱序延迟时间,该值=0。
    所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
    结论是:
      如果最大允许的乱序时间是0, 则:水印时间 = 事件时间。
       窗口的触发时机:窗口结束时间-1
    源码如下(TriggerResult枚举类的FIRE状态表示将窗口求值并发出结果,不清除窗口数据,会保留所有元素)。
    触发窗口计算的源码:

    触发器的触发状态定义源码:

    结论是:
      1. 当事件时间 = 水印时间的情况下:
        1.1、水印时间(最后一条事件数据的事件时间) = 窗口结束时间-1,会触发窗口计算;【因为最后一条数据的事件时间是20190101090001999满足窗口结束时间-1毫秒】
        1.2、水印时间(最后一条事件数据的事件时间 = 窗口结束时间), 会触发计算;
        1.3、水印时间(最后一条事件数据的事件时间 > 窗口结束时间), 会触发计算;


      

      


































































    第二次触发窗口计算
    控制台打印如下:
    第2次触发窗口 (水印时间戳:2019-01-01 09:00:04.000)
    ==== (1), 窗口开始:2019-01-01 09:00:02.000,窗口结束:2019-01-01 09:00:04.000; 窗口内的数据:[(1,1,20190101090002000,1546304402000), (1,1,20190101090003000,1546304403000)]
    解释:
    窗口的开始时间:2019-01-01 09:00:02.000
    窗口开始时间由事件数据决定,即接收到第一条事件数据是:(1,1,20190101090002000,1546304402000),所以窗口开始时间为:20190101090002000。
    结论是:
      窗口的开始时间 = 大于或等于前一个窗口结束时间的数据。
    窗口的结束时间:2019-01-01 09:00:04.000
    窗口开始时间是2019-01-01 09:00:02.000,窗口长度是2秒,那窗口结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:04.000。
    结论是:

      窗口的结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:04.000。
    窗口的长度:2秒
    在1.1代码的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滚动窗口,且窗口长度为2秒。
    结论是:
      窗口长度 = window(TumblingEventTimeWindows.of(Time.seconds(2)))代码中设置的2秒。
    进入窗口的数据:[
              (1,1,20190101090002000,1546304402000),
              (1,1,20190101090003000,1546304403000)
            ]
    因为窗口属于左闭右开(包前不包后),所以这个窗口的时间范围是从2019-01-01 09:00:02.000 到 2019-01-01 09:00:04.000 - 1。只要数据的事件时间属于该区间就会落在这个窗口中。
    窗口结束条件的源码如下:

    结论是:
      当数据的事件时间 >= 窗口开始时间 && <=窗口结束时间-1时,都会落在窗口内。
       窗口的水印时间:因延迟时间为0,所以事件时间 = 水印时间。
    在1.1代码的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
    currentMaxTimestamp是事件数据Tuple3(id,count,time)的时间f3字段,也就是事件时间。
    maxOutOfOrderness是允许最大乱序延迟时间,该值=0。
    所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
    结论是:
      如果最大允许的乱序时间是0, 则:水印时间 = 事件时间。
       窗口的触发时机:窗口结束时间-1
    源码如下(TriggerResult枚举类的FIRE状态表示将窗口求值并发出结果,不清除窗口数据,会保留所有元素)。
    触发窗口计算的源码:

    触发器的触发状态定义源码:

    结论是:
      1. 当事件时间 = 水印时间的情况下:
        1.1、水印时间(最后一条事件数据的事件时间) = 窗口结束时间-1,会触发窗口计算;
        1.2、水印时间(最后一条事件数据的事件时间 = 窗口结束时间), 会触发窗口计算;【因为最后一条数据的事件时间是20190101090003000小于窗口结束时间无法触发,而下一条数据20190101090004000等于窗口结束时间,所以触发计算】
        1.3、水印时间(最后一条事件数据的事件时间 > 窗口结束时间), 会触发窗口计算;
























































































    第三次触发窗口计算:

    控制台打印如下:
    第3次触发窗口 (水印时间戳:2019-01-01 09:00:07.000)
    ==== (1), 窗口开始:2019-01-01 09:00:04.000,窗口结束:2019-01-01 09:00:06.000; 窗口内的数据:[(1,1,20190101090004000,1546304404000), (1,1,20190101090005000,1546304405000), (1,1,20190101090005500,1546304405500)]
    解释:
    窗口的开始时间:2019-01-01 09:00:04.000
    窗口开始时间由事件数据决定,即接收到第一条事件数据是:(1,1,20190101090004000,1546304404000),所以窗口开始时间为:20190101090004000。
    结论是:
      窗口的开始时间 = 大于或等于前一个窗口的数据。
    窗口的结束时间:2019-01-01 09:00:06.000
    窗口开始时间是2019-01-01 09:00:04.000,窗口长度是2秒,那窗口结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:06.000。
    结论是:

      窗口的结束时间 = 窗口开始时间+2秒 = 2019-01-01 09:00:06.000。
    窗口的长度:2秒
    在1.1代码的72行:window(TumblingEventTimeWindows.of(Time.seconds(2)))。使用滚动窗口,且窗口长度为2秒。
    结论是:
      窗口长度 = window(TumblingEventTimeWindows.of(Time.seconds(2)))代码中设置的2秒。
    进入窗口的数据:[
              (1,1,20190101090004000,1546304404000),
              (1,1,20190101090005000,1546304405000),
              (1,1,20190101090005500,1546304405500)

            ]
    因为窗口属于左闭右开(包前不包后),所以这个窗口的时间范围是从2019-01-01 09:00:04.000 到 2019-01-01 09:00:06.000 - 1。只要数据的事件时间属于该区间就会落在这个窗口中。
    窗口结束条件的源码如下:

    结论是:
      当数据的事件时间 >= 窗口开始时间 && <=窗口结束时间-1时,都会落在窗口内。
       窗口的水印时间:因延迟时间为0,所以事件时间 = 水印时间。
    在1.1代码的67行,public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
    currentMaxTimestamp是事件数据Tuple3(id,count,time)的时间f3字段,也就是事件时间。
    maxOutOfOrderness是允许最大乱序延迟时间,该值=0。
    所以,currentMaxTimestamp - maxOutOfOrderness 即 currentMaxTimestamp - 0。
    结论是:
      如果最大允许的乱序时间是0, 则:水印时间 = 事件时间。
       窗口的触发时机:窗口结束时间-1
    源码如下(TriggerResult枚举类的FIRE状态表示将窗口求值并发出结果,不清除窗口数据,会保留所有元素)。
    触发窗口计算的源码:

    触发器的触发状态定义源码:

    结论是:
      1. 当事件时间 = 水印时间的情况下:
        1.1、水印时间(最后一条事件数据的事件时间) = 窗口结束时间-1,会触发窗口计算;
        1.2、水印时间(最后一条事件数据的事件时间 = 窗口结束时间), 会触发窗口计算;
        1.3、水印时间(最后一条事件数据的事件时间 > 窗口结束时间), 会触发窗口计算;【因为窗口3最后一条数据的事件时间是20190101090005500小于窗口结束时间无法触发,而下一条数据20190101090007000大于窗口结束时间,所以触发计算】
















      

    总结:
      先上图

    =================================================
    这3个窗口的规律总结为:
      1、水印时间:事件时间 - 允许的最大乱序时间0秒,即每个水印时间落后于事件时间0秒,代码:new Watermark(currentMaxTimestamp - maxOutOfOrderness)。
      2、窗口的开始时间:基于事件时间的滚动窗口下,是以(第一条数据的事件时间 or 事件时间大于等于前一个窗口的数据)作为窗口的开始时间,而不是算子所处节点的系统时钟。
      3、窗口的结束时间:基于事件时间的滚动窗口下,窗口结束时间 = 窗口开始时间 + 窗口的长度即TumblingEventTimeWindows.of(Time.seconds(2))。
      4、窗口的计算时机:数据的水印时间 >= 窗口结束时间 or 数据的水印时间 = 窗口结束时间-1毫秒时,就触发了窗口的计算。
      5、落入窗口内的数据:窗口属于左闭右开,即数据的事件时间 >= 窗口起始时间 并且 < 窗口的结束时间。

    二、使用并行度为4(多并行度)的本地模式测试

    2.1、Flink时间时间窗口代码,使用SocketSource:

    package com.mengyao.flink.stream.window;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.configuration.ConfigConstants;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import com.mengyao.flink.stream.utils.DateUtil;
    
    /**
     * 启动netcat:nc -L -p 9999 -v
     * 验证多并行度时的事件时间窗口。
     * Created by: mengyao
     * 2019年10月15日
     */
    public class MultipleParallelismSocketEventTimeWindowApp {
    
        private static String jobName = MultipleParallelismSocketEventTimeWindowApp.class.getSimpleName();
        
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
            conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(4, conf);
            // 设置重启策略,5次尝试,每次尝试的间隔为30秒
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 30000));
            // 使用事件时间
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 数据处理
            DataStream<Tuple4<String, String, String, Long>> inputDS = env.socketTextStream("localhost", 9999)
                .map(line -> {
                    String[] fields = line.split(",",3);
                    return Tuple4.of(fields[0], fields[1], fields[2], DateUtil.FMT05.get().parse((fields[2])).getTime());
                })
                .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple4<String, String, String, Long>>() {// 分配时间戳并定期生成水印
                    private static final long serialVersionUID = -4195773369390603522L;
                    private SimpleDateFormat formatter = DateUtil.FMT15.get();
                    long currentMaxTimestamp = 0L;
                    long maxOutOfOrderness = 000L;//允许最大的乱序时间是5秒
                    @Override
                    public long extractTimestamp(Tuple4<String, String, String, Long> ele, long prevEleTs) {
                        long timestamp = ele.f3;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        long tId = Thread.currentThread().getId();
                        System.out.println("线程ID:"+tId+", 事件: "+ele+", 最大时间戳:"+DateUtil.ts2DateStr(currentMaxTimestamp, formatter)+", 水印时间戳:"+DateUtil.ts2DateStr(getCurrentWatermark().getTimestamp(), formatter));
                        return timestamp;
                    }
                    @Override
                    public Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}
                });
            
            inputDS
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))// 使用滚动窗口
                .apply(new WindowFunction<Tuple4<String,String,String,Long>, String, Tuple, TimeWindow>() {
                    private static final long serialVersionUID = -4990083905742822422L;
                    private SimpleDateFormat formatter = DateUtil.FMT15.get();
                    @Override
                    public void apply(Tuple key, TimeWindow window, Iterable<Tuple4<String, String, String, Long>> input,
                            Collector<String> out) throws Exception {
                        // 按照事件时间升序排序
                        List<Tuple4<String, String, String, Long>> list = new ArrayList<>();
                        input.forEach(t4->list.add(t4));
                        list.sort((e1,e2)->e1.f3.compareTo(e2.f3));
                        long tId = Thread.currentThread().getId();
                        System.out.println("==== 线程ID:"+tId+",key="+key+", 窗口开始:"+DateUtil.ts2DateStr(window.getStart(), formatter)+",窗口结束:"+DateUtil.ts2DateStr(window.getEnd(), formatter)+"; 窗口内的数据:"+list);
                    }
                })
                .print();
            
            env.execute(jobName);
        }
        
    }

    2.2、使用netcat启动SocketServer,发送数据到FlinkStreaming中

    listening on [127.0.0.1] 9999 ...
    connect to [127.0.0.1] from DESKTOP-H7J35OJ [127.0.0.1] 59356
    1,1,20190101090000000
    1,1,20190101090000500
    1,1,20190101090001000
    1,1,20190101090001999
    1,1,20190101090002000
    1,1,20190101090002500
    1,1,20190101090001999
    1,1,20190101090002300
    1,1,20190101090003000
    1,1,20190101090003999
    1,1,20190101090004000
    1,1,20190101090004500
    1,1,20190101090003999
    1,1,20190101090005999
    1,1,20190101090005999
    1,1,20190101090005999
    1,1,20190101090005999

    2.3、程序控制台输出:

    线程ID:63, 事件: (1,1,20190101090000000,1546304400000), 最大时间戳:2019-01-01 09:00:00.000, 水印时间戳:2019-01-01 09:00:00.000
    线程ID:64, 事件: (1,1,20190101090000500,1546304400500), 最大时间戳:2019-01-01 09:00:00.500, 水印时间戳:2019-01-01 09:00:00.500
    线程ID:65, 事件: (1,1,20190101090001000,1546304401000), 最大时间戳:2019-01-01 09:00:01.000, 水印时间戳:2019-01-01 09:00:01.000
    线程ID:66, 事件: (1,1,20190101090001999,1546304401999), 最大时间戳:2019-01-01 09:00:01.999, 水印时间戳:2019-01-01 09:00:01.999
    线程ID:63, 事件: (1,1,20190101090002000,1546304402000), 最大时间戳:2019-01-01 09:00:02.000, 水印时间戳:2019-01-01 09:00:02.000
    线程ID:64, 事件: (1,1,20190101090002500,1546304402500), 最大时间戳:2019-01-01 09:00:02.500, 水印时间戳:2019-01-01 09:00:02.500
    线程ID:65, 事件: (1,1,20190101090001999,1546304401999), 最大时间戳:2019-01-01 09:00:01.999, 水印时间戳:2019-01-01 09:00:01.999
    ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:00.000,窗口结束:2019-01-01 09:00:02.000; 窗口内的数据:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)]
    线程ID:66, 事件: (1,1,20190101090002300,1546304402300), 最大时间戳:2019-01-01 09:00:02.300, 水印时间戳:2019-01-01 09:00:02.300
    线程ID:63, 事件: (1,1,20190101090003000,1546304403000), 最大时间戳:2019-01-01 09:00:03.000, 水印时间戳:2019-01-01 09:00:03.000
    线程ID:64, 事件: (1,1,20190101090003999,1546304403999), 最大时间戳:2019-01-01 09:00:03.999, 水印时间戳:2019-01-01 09:00:03.999
    线程ID:65, 事件: (1,1,20190101090004000,1546304404000), 最大时间戳:2019-01-01 09:00:04.000, 水印时间戳:2019-01-01 09:00:04.000
    线程ID:66, 事件: (1,1,20190101090004500,1546304404500), 最大时间戳:2019-01-01 09:00:04.500, 水印时间戳:2019-01-01 09:00:04.500
    线程ID:63, 事件: (1,1,20190101090003999,1546304403999), 最大时间戳:2019-01-01 09:00:03.999, 水印时间戳:2019-01-01 09:00:03.999
    ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:02.000,窗口结束:2019-01-01 09:00:04.000; 窗口内的数据:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)]
    线程ID:64, 事件: (1,1,20190101090005999,1546304405999), 最大时间戳:2019-01-01 09:00:05.999, 水印时间戳:2019-01-01 09:00:05.999
    线程ID:65, 事件: (1,1,20190101090005999,1546304405999), 最大时间戳:2019-01-01 09:00:05.999, 水印时间戳:2019-01-01 09:00:05.999
    线程ID:66, 事件: (1,1,20190101090005999,1546304405999), 最大时间戳:2019-01-01 09:00:05.999, 水印时间戳:2019-01-01 09:00:05.999
    线程ID:63, 事件: (1,1,20190101090005999,1546304405999), 最大时间戳:2019-01-01 09:00:05.999, 水印时间戳:2019-01-01 09:00:05.999
    ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:04.000,窗口结束:2019-01-01 09:00:06.000; 窗口内的数据:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]

    2.4、滚动窗口分析:

    第一次窗口触发(水印时间:2019-01-01 09:00:01.999)
      控制台打印如下:
        ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:00.000,窗口结束:2019-01-01 09:00:02.000; 窗口内的数据:[(1,1,20190101090000000,1546304400000), (1,1,20190101090000500,1546304400500), (1,1,20190101090001000,1546304401000), (1,1,20190101090001999,1546304401999), (1,1,20190101090001999,1546304401999)]
      
    解释:
        当每一个线程中最新的水印时间都 > = 窗口结束时间 / 窗口结束时间-1毫秒时,就会触发窗口的计算。
    
    第二次窗口触发(水印时间:2019-01-01 09:00:03.999)
      控制台打印如下:
        
    ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:02.000,窗口结束:2019-01-01 09:00:04.000; 窗口内的数据:[(1,1,20190101090002000,1546304402000), (1,1,20190101090002300,1546304402300), (1,1,20190101090002500,1546304402500), (1,1,20190101090003000,1546304403000), (1,1,20190101090003999,1546304403999), (1,1,20190101090003999,1546304403999)]
      解释:
        当每一个线程中最新的水印时间都 > = 窗口结束时间 / 窗口结束时间-1毫秒时,就会触发窗口的计算。
    第三次窗口触发(水印时间:2019-01-01 09:00:05.999)
      控制台打印如下:
        ==== 线程ID:72,key=(1), 窗口开始:2019-01-01 09:00:04.000,窗口结束:2019-01-01 09:00:06.000; 窗口内的数据:[(1,1,20190101090004000,1546304404000), (1,1,20190101090004500,1546304404500), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999), (1,1,20190101090005999,1546304405999)]
      解释:
        当每一个线程中最新的水印时间都 > = 窗口结束时间 / 窗口结束时间-1毫秒时,就会触发窗口的计算。

    多并行度下事件时间窗口总结:
      1、当所有线程内的最新水印时间 >= 窗口结束时间 / 窗口结束时间-1。 就会触发窗口的计算。
      

  • 相关阅读:
    StampedLock
    面试题:final关键字
    VTK 图像处理_显示(vtkImageViewer2 & vtkImageActor)
    VTK 图像处理_创建
    VTK 数据读写_图像数据的读写
    VTK 基本数据结构_如何把几何结构&拓扑结构加入到数据集
    VTK 基本数据结构_数据对象&数据集
    VTK 可视化管道的连接与执行
    VTK 坐标系统及空间变换(窗口-视图分割)
    VTK 三维场景基本要素:相机
  • 原文地址:https://www.cnblogs.com/mengyao/p/11687088.html
Copyright © 2011-2022 走看看