zoukankan      html  css  js  c++  java
  • Flink之Window的使用(2):时间窗口

    相关文章链接

    Flink之Window的使用(1):计数窗口

    Flink之Window的使用(2):时间窗口

    Flink之Window的使用(3):WindowFunction的使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val sensorStream: DataStream[SensorReading] = env
        .socketTextStream("localhost", 9999)
        .map(new MyMapToSensorReading)
    
    // 1、使用window方法进行开窗设置
    // 1.1、滚动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类,分别是创建处理时间窗口 和 事件时间窗口(事件时间窗口需要设置时间特性)
     * 2、滚动窗口中,of方法可以设置2个参数,第一个是窗口的大小,第二个是时间偏移量(不设置时默认使用伦敦时间,当设置为-8时,为使用北京时间),偏移量设置时需要小于窗口大小
     */
    val windowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8)))   // 偏移量设置时需要小于窗口大小
        //            .window(TumblingEventTimeWindows.of(Time.seconds(5)))                     // 事件时间窗口
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.2、滑动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类
     * 2、滑动窗口中,of方法可以设置3个参数,第一个是窗口大小,第二个是滑动步长,第三个是偏移量
     */
    val windowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8)))  // 偏移量设置时需要小于窗口大小
        //            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))                // 事件时间窗口
        .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.3、会话窗口
    val windowStream_3: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(EventTimeSessionWindows.withGap(Time.minutes(10)))            // 事件时间会话窗口
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 2、使用timeWindow方法进行开窗
    // 2.1、滚动窗口
    val timeWindowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    // 2.2、滑动窗口
    val timeWindowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(15), Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    windowStream_1.print()
    
    env.execute("TimeWindowDemo")
  • 相关阅读:
    C语言|博客作业08
    C语言|博客作业04
    C语言|博客作业02
    C语言|博客作业06
    C语言|博客作业03
    第一周作业
    C语言|博客作业05
    C语言|博客作业07
    C语言|博客作业09
    为什么get比post更快
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133628.html
Copyright © 2011-2022 走看看