zoukankan      html  css  js  c++  java
  • Flink Window窗口机制

    1. 总览

      • Window 是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。

      • Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。

      • 而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

      • Flink 提供了非常完善的窗口机制。

      • 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。

      • 当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

      • 在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

      • 窗口可以是基于时间驱动的(Time Window,例如:每30秒钟)

      • 也可以是基于数据驱动的(Count Window,例如:每一百个元素)

      • 同时基于不同事件驱动的窗口又可以分成以下几类:

        • 翻滚窗口 (Tumbling Window, 无重叠)
        • 滑动窗口 (Sliding Window, 有重叠)
        • 会话窗口 (Session Window, 活动间隙)
        • 全局窗口 (略)
      • Flink要操作窗口,先得将StreamSource 转成WindowedStream

        Window操作 其作用
        Window Keyed Streaming → WindowedStream 可以在已经分区的KeyedStream上定义Windows,即K,V格式的数据。
        WindowAll DataStream → AllWindowedStream 对常规的DataStream上定义Window,即非K,V格式的数据
        Window Apply WindowedStream → AllWindowedStream AllWindowedStream → DataStream 将函数应用于整个窗口中的数据
        Window Reduce WindowedStream → DataStream 对窗口里的数据进行”reduce”减少聚合统计
        Aggregations on windows WindowedStream → DataStream 对窗口里的数据进行聚合操作: sum(), max(), min()
    2. Tumbling Window(翻滚窗口)

      • 翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口

      • 翻滚窗具有固定的尺寸,不重叠。

      • 例图:

        image-20191113092146338

        • 代码

          package com.ronnie.flink.stream.window;
          
          import org.apache.flink.api.common.functions.MapFunction;
          import org.apache.flink.api.java.tuple.Tuple;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStreamSource;
          import org.apache.flink.streaming.api.datastream.KeyedStream;
          import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
          import org.apache.flink.streaming.api.datastream.WindowedStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
          import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          
          import java.text.SimpleDateFormat;
          import java.util.Random;
          
          /**
           * 翻滚窗口:窗口不可重叠
           * 1、基于时间驱动
           * 2、基于事件驱动
           */
          public class TumblingWindow {
          
              public static void main(String[] args) {
              //设置执行环境,类似spark中初始化sparkContext
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  
                  env.setParallelism(1);
          
                  DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
          
                  SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                      @Override
                      public Tuple2<String, Integer> map(String value) throws Exception {
          
                          SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
          
                          long timeMillis = System.currentTimeMillis();
          
                          int random = new Random().nextInt(10);
          
                          System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis));
          
                          return new Tuple2<String, Integer>(value, random);
                      }
                  });
          
                  KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
          
          
                  // 基于时间驱动,每隔10s划分一个窗口
                  WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));
          
                  // 基于事件驱动, 每相隔3个事件(即三个相同key的数据), 划分一个窗口进行计算
                  // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3);
          
                  // apply是窗口的应用函数,即apply里的函数将应用在此窗口的数据上。
                  timeWindow.apply(new MyTimeWindowFunction()).print();
                  // countWindow.apply(new MyCountWindowFunction()).print();
          
                  try {
                      // 转换算子都是lazy init的, 最后要显式调用 执行程序
                      env.execute();
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
          
              }
          }
          
          
      • 基于时间驱动

        • 场景1:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。

          package com.shsxt.flink.stream.window;
          
          import org.apache.flink.api.java.tuple.Tuple;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
          import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
          import org.apache.flink.util.Collector;
          
          import java.text.SimpleDateFormat;
          
          public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, Tuple, TimeWindow> {
          
              @Override
              public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
          
                  int sum = 0;
          
                  for(Tuple2<String,Integer> tuple2 : input){
                      sum +=tuple2.f1;
                  }
          
                  long start = window.getStart();
                  long end = window.getEnd();
          
                  out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                          + format.format(start) + "  window_end :" + format.format(end)
                  );
          
              }
          }
          
          
      • 基于事件驱动

        • 场景2:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满100个”相同”元素了,就会对窗口进行计算。

          package com.ronnie.flink.stream.window;
          
          import org.apache.flink.api.java.tuple.Tuple;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
          import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
          import org.apache.flink.util.Collector;
          
          import java.text.SimpleDateFormat;
          
          public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {
          
              @Override
              public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
          
                  int sum = 0;
          
                  for (Tuple2<String, Integer> tuple2 : input){
                      sum += tuple2.f1;
                  }
                  //无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。
                  long maxTimestamp = window.maxTimestamp();
          
                  out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"
                          + maxTimestamp + "," + format.format(maxTimestamp)
                  );
              }
          }
          
    3. Sliding Window(滑动窗口)

      • 滑动窗口和翻滚窗口类似,区别在于:滑动窗口可以有重叠的部分。

      • 在滑窗中,一个元素可以对应多个窗口。

      • 例图:

        image-20191113102254911

      • 基于时间的滑动窗口

        • 场景: 我们可以每30秒计算一次最近一分钟用户购买的商品总数。
      • 基于事件的滑动窗口

        • 场景: 每10个 “相同”元素计算一次最近100个元素的总和.
      • 代码:

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.datastream.WindowedStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
        
        import java.text.SimpleDateFormat;
        import java.util.Random;
        
        /**
         * 滑动窗口:窗口可重叠
         * 1、基于时间驱动
         * 2、基于事件驱动
         */
        public class SlidingWindow {
        
            public static void main(String[] args) {
                // 设置执行环境, 类似spark中初始化SparkContext
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
                env.setParallelism(1);
        
                DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
        
                SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        long timeMillis = System.currentTimeMillis();
        
                        int random = new Random().nextInt(10);
                        System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
        
                        return new Tuple2<String, Integer>(value, random);
                    }
                });
                KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
        
                //基于时间驱动,每隔5s计算一下最近10s的数据
             //   WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
                //基于事件驱动,每隔2个事件,触发一次计算,本次窗口的大小为3,代表窗口里的每种事件最多为3个
                WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3, 2);
        
             //   timeWindow.sum(1).print();
        
                countWindow.sum(1).print();
        
             //   timeWindow.apply(new MyTimeWindowFunction()).print();
        
                try {
                    env.execute();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        
    4. Session Window(会话窗口)

      • 会话窗口不重叠,没有固定的开始和结束时间

      • 与翻滚窗口和滑动窗口相反, 当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。

      • 后续的元素将会被分配给新的会话窗口

      • 例图:

        image-20191113102605969

      • 举例:

        • 计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开。
      • 代码:

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.datastream.WindowedStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
        
        import java.text.SimpleDateFormat;
        import java.util.Random;
        
        public class SessionWindow {
        
            public static void main(String[] args) {
        
                // 设置执行环境, 类似spark中初始化sparkContext
        
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
                env.setParallelism(1);
        
                DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
        
                SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        long timeMillis = System.currentTimeMillis();
        
                        int random = new Random().nextInt(10);
        
                        System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
        
                        return new Tuple2<String, Integer>(value, random);
                    }
                });
                KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
        
                //如果连续10s内,没有数据进来,则会话窗口断开。
                WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        
                // window.sum(1).print();
                
                window.apply(new MyTimeWindowFunction()).print();
        
                try {
                    env.execute();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
        
  • 相关阅读:
    FusionSphere产品功能
    openstack各组件详解和通信流程
    素数、杨辉三角、封装结构和集合操作(16)——集合及操作
    素数、杨辉三角、封装结构和集合操作(15)——IPython使用和封装解构
    素数、杨辉三角、封装结构和集合操作(14)——杨辉三角单行覆盖解法
    线性数据结构(13)——求质数
    线性数据结构(12)——菱形、三角形、闪电打印和斐波那契数列计算
    线性数据结构(11)——九九乘法表详解
    Python元组与字符串操作(10)——冒泡法
    lvm调整卷大小
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/11847568.html
Copyright © 2011-2022 走看看