zoukankan      html  css  js  c++  java
  • Flink:Window Api

    基本概念

    窗口 window

    image-20210902165907439

    一般真实的流都是无界的,怎样处理无界的数据?

    可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也就是得到有界流

    窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析

    window类型:

    • 时间窗口:按照时间生成 Window。
      • 滚动时间窗口
      • 滑动时间窗口
      • 会话窗口
    • 计数窗口:窗口(window)就是将无限流切割为有限流的一种方式,它会将流 数据分发到有限大小的桶(bucket)中进行分析
      • 滚动计数窗口
      • 滑动计数窗口

    滚动窗口-Tumbling Windows

    将数据依据固定的窗口长度对数据进行切片。

    特点:时间对齐,窗口长度固定,没有重叠。

    滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗 口,窗口的创建如下图所示:

    image-20210902171055226

    适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。

    滑动窗口-Sliding Windows

    滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

    特点:时间对齐,窗口长度固定,可以有重叠。

    滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中

    例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包 含着上个 10 分钟产生的数据,如下图所示:

    image-20210902171934857

    适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

    会话窗口-Session Windows

    由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的 session,也就是一段时间没有接收到新数据就会生成新的窗口。

    特点:时间无对齐。

    session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗 口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

    image-20210902172951558

    窗口分配器

    窗口分配器 —— window() 方法

    我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用。

    Flink 提供了更加简单的 .timeWindow 和 .countWindow 方法,用于定义时间窗口和计数窗口。

    • window() 方法接收的输入参数是一个 WindowAssigner
    • WindowAssigner 负责将每条输入的数据分发到正确的 window 中
    • Flink 提供了通用的 WindowAssigner
      • 滚动窗口--tumbling window
      • 滑动窗口--sliding window
      • 会话窗口--session window
      • 全局窗口--global window

    创建不同类型的窗口

    • 滚动时间窗口(tumbling time window)

    image-20210903100111842

    • 滑动时间窗口(sliding time window)

    image-20210903100138936

    • 会话窗口(session window)

    image-20210903100219683

    • 滚动计数窗口(tumbling count window)

    image-20210903100250516

    • 滑动计数窗口(sliding count window)

    image-20210903100355230

    窗口函数

    window function 定义了要对窗口中收集的数据做的计算操作

    可以分为两类

    • 增量聚合函数(incremental aggregation functions)
      • 增量聚合函数(incremental aggregation functions)
      • ReduceFunction, AggregateFunction
    • 全窗口函数(full window functions)
      • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
      • ProcessWindowFunction,WindowFunction

    时间窗口增量聚合

    下面计算每三秒中数据的个数:

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism(1);
    
            DataStream<String> inputStream = env.socketTextStream("192.168.1.77", 7777);
    
            DataStream<SensorReading> mapStream = inputStream.map((str) -> {
                String[] split = str.split(" ");
                return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            });
    
            DataStream<Integer> resultStream = mapStream.keyBy("id")
                	//开一个时间窗口
                    .timeWindow(Time.seconds(3))
                	//聚合
                    .aggregate(new MyAggregateFun());
    
            resultStream.print();
    
            env.execute();
        }
    
        private static class MyAggregateFun implements AggregateFunction<SensorReading, Integer, Integer>{
    
            //创建一个累加器
            @Override
            public Integer createAccumulator() {
                return 0;
            }
    
            @Override
            public Integer add(SensorReading value, Integer accumulator) {
                //累加操作
                return accumulator + 1;
            }
    
            @Override
            public Integer getResult(Integer accumulator) {
                return accumulator;
            }
    
            @Override
            public Integer merge(Integer a, Integer b) {
                return  a + b;
            }
        }
    

    测试效果:

    动画

    全窗口聚合

    代码:

            //id 结束时间 个数
            DataStream<Tuple3<String, Long, Integer>> resultStream = mapStream.keyBy("id")
                    .timeWindow(Time.seconds(3))
                    .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) {
                            String id = tuple.getField(0);
                            long windowEnd = window.getEnd();
                            int count = IteratorUtils.toList(input.iterator()).size();
                            out.collect(new Tuple3<>(id, windowEnd, count));
                        }
                    });
    

    效果:动画2

    计数窗口测试

    滑动计数窗口测试:

            DataStream<Double> resultStream = mapStream.keyBy("id")
                    .countWindow(10, 2)
                    .aggregate(new MyAvgFunc());
            resultStream.print();
    

    MyAvgFunc.java

        /**
         * @author wen.jie
         * @date 2021/9/3 11:00
         * 求平均温度
         */
        public static class MyAvgFunc implements AggregateFunction<SensorReading, Tuple2<Double, Integer>, Double>{
    
            @Override
            public Tuple2<Double, Integer> createAccumulator() {
                return new Tuple2<>(0.0, 0);
            }
    
            @Override
            public Tuple2<Double, Integer> add(SensorReading value, Tuple2<Double, Integer> accumulator) {
                return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
            }
    
            @Override
            public Double getResult(Tuple2<Double, Integer> accumulator) {
                return accumulator.f0 / accumulator.f1;
            }
    
            @Override
            public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
                return new Tuple2<>(a.f0+b.f0, a.f1+b.f1);
            }
        }
    

    效果:每两条数据滑动一次

    动画

    其他可选API

    • .trigger() —— 触发器
      • 定义 window 什么时候关闭,触发计算并输出结果
    • .evictor() —— 移除器
      • 定义移除某些数据的逻辑
    • .allowedLateness() —— 允许处理迟到的数据
    • .sideOutputLateData() —— 将迟到的数据放入侧输出流(旁路输出)
    • .getSideOutput() —— 获取侧输出流(旁路输出)

    同过allowedLateness可以处理迟到数据。

    在使用“事件时间”窗口时,可能会发生元素迟到的情况,具体表现是,Flink用于跟踪“事件时间”进度的水位线已经超过了元素所属窗口的结束时间戳。

    在默认情况下,当水位线超过窗口末端时将删除迟到的元素。但是,Flink允许为窗口算子指定最大允许延迟--在删除指定元素之前可以延迟的时间,默认值为0。

    Flink保持窗口的状态,直到允许的延迟过期为止。一旦发生这种情况,Flink将删除该窗口并删除其状态。

    侧输出流(旁路输出流的使用方法):

    //标记旁路输出
    final OutputTag<T> tag = new OutputTag<>("later-data");
    //创建源数据
    DataStream<T> input = .......;
    SingleOutputStreamOperator<T> sumStream = input
        //keyBy:键控流转换算子
        .keyBy("id")
        //窗口转换算子
        .timeWindow(Time.seconds(15))
        //运行延迟时间
        .allowedLateness(Time.minutes(1))
        //将迟到的数据发送到用OutputTag标识的旁路输出流中
        .sideOutputLateData(tag)
        .sum("temperature");
    
    //加载旁路输出数据
    sumStream.getSideOutput(tag).print();
    
  • 相关阅读:
    Ocelot简易教程(一)之Ocelot是什么
    如何测量并报告ASP.NET Core Web API请求的响应时间
    ASP.NET Core 2.1中基于角色的授权
    Net Core平台灵活简单的日志记录框架NLog+SqlServer初体验
    Net Core平台灵活简单的日志记录框架NLog+Mysql组合初体验
    Net Core集成Exceptionless分布式日志功能以及全局异常过滤
    CentOS7 PostgreSQL安装
    Python 的 List 要印出 中文 編碼
    如何让你的Python代码更加pythonic ?
    4种格式化字符串方式的时间效率比较
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/15223023.html
Copyright © 2011-2022 走看看