zoukankan      html  css  js  c++  java
  • Flink-Window

    Flink的高级API

    Flink的基石

    Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

    image-20210909165727879

    Checkpoint

    这是Flink最重要的一个特性。

    Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。

    Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。

    Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。

    State

    提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。

    Time

    除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。

    Window

    另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

    为什么需要Window

    在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
    在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

    Window的分类

    按照time和count分类

    • time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
    • count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据

    image-20210909170711596

    按照slide和size分类

    窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

    • tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据

    image-20210909170742524

    • sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据

    image-20210909170802505

    总结:

    按照上面窗口的分类方式进行组合,可以得出如下的窗口:

    1.基于时间的滚动窗口tumbling-time-window--用的较多

    2.基于时间的滑动窗口sliding-time-window--用的较多

    3.基于数量的滚动窗口tumbling-count-window--用的较少

    4.基于数量的滑动窗口sliding-count-window--用的较少

    注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算

    Window的API

    window和windowAll

    image-20210910101524084

    使用keyby的流,应该使用window方法

    未使用keyby的流,应该调用windowAll方法

    WindowAssigner

    window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中,

    Flink提供了很多各种场景用的WindowAssigner:

    image-20210910101923593

    如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

    evictor

    evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行

    用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter两个方法。

    Flink 提供了如下三种通用的 evictor:

    • CountEvictor 保留指定数量的元素

    • TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。

    • DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。

    trigger

    trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,

    如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

    • onElement() 每次往 window 增加一个元素的时候都会触发

    • onEventTime() 当 event-time timer 被触发的时候会调用

    • onProcessingTime() 当 processing-time timer 被触发的时候会调用

    • onMerge() 对两个 `rigger 的 state 进行 merge 操作

    • clear() window 销毁的时候被调用

    上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选择:

    • CONTINUE 不做任何事情

    • FIRE 触发 window

    • PURGE 清空整个 window 的元素并销毁窗口

    • FIRE_AND_PURGE 触发窗口,然后销毁窗口

    基于时间的滚动和滑动窗口

    时间的滚动窗口

    /**
     * @author WGR
     * @create 2021/9/10 -- 9:24
     */
    public class WindowDemo01_TimeWindow {
    
        public static void main(String[] args) throws Exception {
            
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {
    
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new CartInfo(arr[0], Integer.parseInt(arr[1]));
                }
            });
    
            // 需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
            SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .sum("count");
    
            result1.print();
            env.execute();
    
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    
    

    结果:

    7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=1)
    7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=9)
    7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=18)
    7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=17)
    7> WindowDemo01_TimeWindow.CartInfo(sensorId=1, count=10)
    
    image-20210910094045857

    时间的滑动窗口

    /**
     * @author WGR
     * @create 2021/9/10 -- 9:42
     */
    public class WindowDemo02_TimeWindow {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {
    
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new CartInfo(arr[0], Integer.parseInt(arr[1]));
                }
            });
    
            // 需求2:每3秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
            SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))
                    .sum("count");
    
            result1.print();
            env.execute();
    
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    

    结果:

    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=3)   1+2
    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 1+2+3+4
    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=15) 4+5+6
    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=21) 6+7+8
    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=27)  8+9+10
    7> WindowDemo02_TimeWindow.CartInfo(sensorId=1, count=10) 10
    
    image-20210910094820856

    基于数量的滚动和滑动窗口

    数量的滚动窗口

    /**
     * @author WGR
     * @create 2021/9/10 -- 9:53
     */
    public class WindowDemo03_CountWindow {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, CartInfo>() {
    
                @Override
                public CartInfo map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new CartInfo(arr[0], Integer.parseInt(arr[1]));
                }
            });
    
            // 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
            SingleOutputStreamOperator<CartInfo> result1  = cartInfoDS.keyBy(CartInfo::getSensorId)
                    .countWindow(5L)
                    .sum("count");
    
            result1.print();
            env.execute();
        }
    
    
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class CartInfo {
            private String sensorId;//信号灯id
            private Integer count;//通过该信号灯的车的数量
        }
    }
    

    结果:需要等到id为1的数量达到5个的时候才会触发。

    7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=6)
    1,1
    1,1
    1,1
    1,1
    2,1
    1,2
    

    数量的滑动窗口

    /**
     * @author WGR
     * @create 2021/9/10 -- 9:53
     */
    public class WindowDemo04_CountWindow {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {
    
                @Override
                public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
                }
            });
    
            // 统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
            SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1  = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
                    .countWindow(5L,3L)
                    .sum("count");
    
            result1.print();
            env.execute();
        }
    
    }
    
    

    结果:

    7> WindowDemo03_CountWindow.CartInfo(sensorId=1, count=3)
    1,1
    1,1
    2,1
    1,1
    2,1
    3,1
    4,1
    

    基于会话窗口

    设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

    /**
     * @author WGR
     * @create 2021/9/10 -- 10:09
     */
    public class WindowDemo05_SessionWindow {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> socketDS  = env.socketTextStream("192.168.1.180", 9998);
    
            SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> cartInfoDS  = socketDS.map(new MapFunction<String, WindowDemo03_CountWindow.CartInfo>() {
    
                @Override
                public WindowDemo03_CountWindow.CartInfo map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new WindowDemo03_CountWindow.CartInfo(arr[0], Integer.parseInt(arr[1]));
                }
            });
    
            // 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!)
            SingleOutputStreamOperator<WindowDemo03_CountWindow.CartInfo> result1  = cartInfoDS.keyBy(WindowDemo03_CountWindow.CartInfo::getSensorId)
                    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                    .sum("count");
    
            result1.print();
            env.execute();
        }
    }
    
    
  • 相关阅读:
    JDK1.8中对hashmap的优化
    ShutdownHook作用
    【工作相关】常用Shell命令
    [技术学习]收藏技术博客
    【技术学习】Mongo Administration
    【技术学习】saltstack 笔记(一) --匹配Minion
    【工作相关】个人常用脚本及代码
    【工作相关】常用工具
    【现场问题】add trust cert into JAVA_HOME
    【工作相关】替换Rancher证书
  • 原文地址:https://www.cnblogs.com/dalianpai/p/15250323.html
Copyright © 2011-2022 走看看