zoukankan      html  css  js  c++  java
  • Flink窗口介绍及应用

    Windows是Flink流计算的核心,本文将概括的介绍几种窗口的概念,重点只放在窗口的应用上。

    本实验的数据采用自拟电影评分数据(userId, movieId, rating, timestamp),userId和movieId范围分别为1-100和1-200的随机数,rating范围为[0:0.5:5.0]一共10个档位,timestamp为10000-20000之间的随机数,且数据顺序采用timestamp的升序排列。(2.1-2.6节的数据是乱序)

    一、窗口(window)的类型

    对于窗口的操作主要分为两种,分别对于Keyedstream和Datastream。他们的主要区别也仅仅在于建立窗口的时候一个为.window(...),一个为.windowAll(...)。对于Keyedstream的窗口来说,他可以使得多任务并行计算,每一个logical key stream将会被独立的进行处理。

    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)/.windowAll(...)  <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    按照窗口的Assigner来分,窗口可以分为

    Tumbling window, sliding window,session window,global window,custom window

    每种窗口又可分别基于processing time和event time,这样的话,窗口的类型严格来说就有很多。

    还有一种window叫做count window,依据元素到达的数量进行分配,之后也会提到。

    窗口的生命周期开始在第一个属于这个窗口的元素到达的时候,结束于第一个不属于这个窗口的元素到达的时候。

    二、窗口的操作

    2.1 Tumbling window

    固定相同间隔分配窗口,每个窗口之间没有重叠看图一眼明白。

    下面的例子定义了每隔3毫秒一个窗口的流:

    WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
        .keyBy(MovieRate::getUserId)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(3)));
    

    2.2 Sliding Windows

    跟上面一样,固定相同间隔分配窗口,只不过每个窗口之间有重叠。窗口重叠的部分如果比窗口小,窗口将会有多个重叠,即一个元素可能被分配到多个窗口里去。

    下面的例子给出窗口大小为10毫秒,重叠为5毫秒的流:

    WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                    .keyBy(MovieRate::getUserId)
                    .window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5)));
    

    2.3 Session window

    这种窗口主要是根据活动的事件进行窗口化,他们通常不重叠,也没有一个固定的开始和结束时间。一个session window关闭通常是由于一段时间没有收到元素。在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。

    // 静态间隔时间
    WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                    .keyBy(MovieRate::getUserId)
                    .window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
    // 动态时间
    WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
                    .keyBy(MovieRate::getUserId)
                    .window(EventTimeSessionWindows.withDynamicGap(()));
    

    2.4 Global window

    将所有相同keyed的元素分配到一个窗口里。好吧,就这样:

    WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
        .keyBy(MovieRate::getUserId)
        .window(GlobalWindows.create());
    

    三、窗口函数

    窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction。前两个执行得更有效,因为Flink可以增量地聚合每个到达窗口的元素。

    Flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用ProcessWindowFunction进行操作效率不高。不过ProcessWindowFunction可以跟其他的窗口函数结合使用,其他函数接受增量信息,ProcessWindowFunction接受窗口的元数据。

    举一个AggregateFunction的例子吧,下面代码为MovieRate按user分组,且分配5毫秒的Tumbling窗口,返回每个user在窗口内评分的所有分数的平均值。

    DataStream<Tuple2<Integer,Double>> Rates = rates
                    .keyBy(MovieRate::getUserId)
                    .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
                    .aggregate(new AggregateFunction<MovieRate, AverageAccumulator, Tuple2<Integer,Double>>() {
                        @Override
                        public AverageAccumulator createAccumulator() {
                            return new AverageAccumulator();
                        }
    
                        @Override
                        public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
                            acc.userId = movieRate.userId;
                            acc.sum += movieRate.rate;
                            acc.count++;
                            return acc;
                        }
    
                        @Override
                        public Tuple2<Integer,Double> getResult(AverageAccumulator acc) {
                            return  Tuple2.of(acc.userId, acc.sum/(double)acc.count);
                        }
    
                        @Override
                        public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
                            acc0.count += acc1.count;
                            acc0.sum += acc1.sum;
                            return acc0;
                        }
                    });
    
    public static class AverageAccumulator{
            int userId;
            int count;
            double sum;
        }
    

    以下是部分输出:

    ...
    1> (44,3.0)
    4> (96,0.5)
    2> (51,0.5)
    3> (90,2.75)
    ...
    

    看上面的代码,会发现add()函数特别生硬,因为我们想返回Tuple2<Integer, Double>类型,即Integer为key,但AggregateFunction似乎没有提供这个机制可以让AverageAccumulator的构造函数提供参数。所以,这里引入ProcessWindowFunction与AggregateFunction的结合版,AggregateFunction进行增量叠加,当窗口关闭时,ProcessWindowFunction将会被提供AggregateFunction返回的结果,进行Tuple封装:

    DataStream<Tuple2<Integer,Double>> Rates = rates
        .keyBy(MovieRate::getUserId)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))
        .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());
    
    
    public static class MyAggregateFunction implements AggregateFunction<MovieRate, AverageAccumulator, Double> {
        @Override
        public AverageAccumulator createAccumulator() {
            return new AverageAccumulator();
        }
    
        @Override
        public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {
            acc.sum += movieRate.rate;
            acc.count++;
            return acc;
        }
    
        @Override
        public Double getResult(AverageAccumulator acc) {
            return  acc.sum/(double)acc.count;
        }
    
        @Override
        public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {
            acc0.count += acc1.count;
            acc0.sum += acc1.sum;
            return acc0;
        }
    }
    
    public static class MyProcessWindowFunction extends
        ProcessWindowFunction<Double, Tuple2<Integer, Double>, Integer, TimeWindow> {
    
        @Override
        public void process(Integer key,
                            Context context,
                            Iterable<Double> results,
                            Collector<Tuple2<Integer, Double>> out) throws Exception {
            Double result = results.iterator().next();
            out.collect(new Tuple2<>(key, result));
        }
    }
    
    public static class AverageAccumulator{
        int count;
        double sum;
    }
    

    可以得到,结果与上面一样,但代码好看了很多。

    四、其他操作

    4.1 Triggers(触发器)

    触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(...)自定义触发器。

    通常来说,默认的触发器适用于多种场景。例如,多有的event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。

    PS:GlobalWindow默认的触发器时NeverTrigger,该触发器从不出发,所以在使用GlobalWindow时必须自定义触发器。

    4.2 Evictors(驱逐器)

    Evictors可以在触发器触发之后以及窗口函数被应用之前和/或之后可选择的移除元素。使用Evictor可以防止预聚合,因为窗口的所有元素都必须在应用计算逻辑之前先传给Evictor进行处理

    4.3 Allowed Lateness

    当使用event-time窗口时,元素可能会晚到,例如Flink用于跟踪event-time进度的watermark已经超过了窗口的结束时间戳。

    默认来说,当watermark超过窗口的末尾时,晚到的元素会被丢弃。但是flink也允许为窗口operator指定最大的allowed lateness,以至于可以容忍在彻底删除元素之前依然接收晚到的元素,其默认值是0。

    为了支持该功能,Flink会保持窗口的状态,知道allowed lateness到期。一旦到期,flink会删除窗口并删除其状态。

    把晚到的元素当作side output。

    SingleOutputStreamOperator<T> result = input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .allowedLateness(<time>)
        .sideOutputLateData(lateOutputTag)
        .<windowed transformation>(<window function>);
    
  • 相关阅读:
    python-判断
    python-文件读写
    python-数据类型
    python简介
    Charles--简单使用
    【模拟赛】BYVoid魔兽世界模拟赛 解题报告
    【最短路】埃雷萨拉斯寻宝(eldrethalas) 解题报告
    【递推】地铁重组(subway) 解题报告
    【背包型动态规划】灵魂分流药剂(soultap) 解题报告
    【最短路】血色先锋军(scarlet) 解题报告
  • 原文地址:https://www.cnblogs.com/bjwu/p/10393146.html
Copyright © 2011-2022 走看看