zoukankan      html  css  js  c++  java
  • flink WaterMark之TumblingEventWindow

    1、WaterMark,翻译成水印或水位线,水印翻译更抽象,水位线翻译接地气。

    watermark是用于处理乱序事件的,通常用watermark机制结合window来实现。
    
    流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
    
    但是对于迟到或者乱序的元素,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window进行计算。这个特别的机制,就是watermark。触发时间遵循自然时间以及左闭右开原则。

    正常有序流:watermark实际上与event的时间戳重合

     乱序流:watermark用于触发窗口计算,也就是水印不到,即使流数据已经落入多个窗口也不会触发,如果水印到了,该窗口的数据即使没到也会触发计算,迟到的数据缺省将被抛弃。

    2、TumblingEventWindow 窗口结合WaterMark,用代码验证一下有序和乱序的流。

    从socket里接收文本,文本以对子(时间戳 +文本)出现,字段分隔符是空格,行分隔符是“
    ”,对收到的文本以10秒滚动窗口给文本计数。
    有序情况下:watermark是0,也就是不延时接收数据。
    乱序情况下:watermark是3s,延时3秒触发窗口计算。

    code:

    public class TumblingEventWindowExample {
        public static void main(String args[]) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            DataStream<String> socketStream = env.socketTextStream("192.168.31.10",9000);
            DataStream<Tuple2<String,Long>> resultStream = socketStream
                     //Time.seconds(3)有序的情况修改成0
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                        @Override
                        public long extractTimestamp(String element) {
                            long eventTime = Long.parseLong(element.split(" ")[0]);
                            System.out.println(eventTime);
                            return eventTime;
                        }
                    })
                    .map(new MapFunction<String, Tuple2<String,Long>>() {
                        @Override
                        public Tuple2<String,Long> map(String value) throws Exception {
                            return Tuple2.of(value.split(" ")[1],1L);
                        }
                    }).keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .reduce(new ReduceFunction<Tuple2<String,Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return new Tuple2<>(value1.f0,value1.f1+value2.f1);
                        }
                    });
            resultStream.print();
    
            env.execute();
        }
    }

    2.1 有序的情况,watermark为0s

    第一个窗口:

    10000
    11000
    12000
    13000
    14000
    19888
    13000
    20000
    1> (b,2)
    3> (a,5)

    时间戳20000触发第一个窗口计算,实际上19999也会触发,因为左闭右开原则,20000这个时间戳并不会在第一个窗口计算,第一个窗口是[10000-20000),第二个窗口是[20000-30000),以此类推。

    第二个窗口:

    10000
    11000
    12000
    13000
    14000
    19888
    13000
    20000
    1> (b,2)
    3> (a,5)
    11000
    12000
    21000
    22000
    29999
    3> (a,3)
    1> (b,1)

    第一个窗口触发计算后,后续来的11000,12000这两条数据被抛弃,29999直接触发窗口计算,并且本身也属于第二个窗口,所以也参与计算了。

     2.2 watermark为3s的情况

    10000
    11000
    12000
    20000
    21000
    22000
    23000
    3> (a,2)
    1> (b,1)

    从数据中可以验证,第一个窗口在20000的时候没有触发计算,而是在23000的时候触发计算,计算内容是第一个窗口[10000,20000),所以20000,21000,22000,23000属于第二个窗口,没有参与计算。

    第二个窗口:

    10000
    11000
    12000
    20000
    21000
    22000
    23000
    3> (a,2)
    1> (b,1)
    24000
    29000
    30000
    22000
    23000
    33000
    3> (a,6)
    1> (b,2)

    第二个窗口[20000,30000),它是在33000触发计算,并且,迟到的数据22000,23000也被计算在内(如果这两个数据在水印33000后到达,则会被抛弃),30000和33000是第三个窗口的数据,没有计算在内。

     
  • 相关阅读:
    Silverlight 之 断点调试
    Silverlight 之 浅析
    Silverlight 之 新建项目解析
    Silverlight 之 创建
    有关TCP和UDP 粘包 消息保护边界
    计算机网络杂项
    RTP
    如何取消Linux下,vi中显示的^M符号
    Linux下实现定时器Timer的几种方法
    UNIX网络编程——套接字选项
  • 原文地址:https://www.cnblogs.com/asker009/p/11299848.html
Copyright © 2011-2022 走看看