zoukankan      html  css  js  c++  java
  • Flink之Window的使用(2):时间窗口

    相关文章链接

    Flink之Window的使用(1):计数窗口

    Flink之Window的使用(2):时间窗口

    Flink之Window的使用(3):WindowFunction的使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val sensorStream: DataStream[SensorReading] = env
        .socketTextStream("localhost", 9999)
        .map(new MyMapToSensorReading)
    
    // 1、使用window方法进行开窗设置
    // 1.1、滚动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类,分别是创建处理时间窗口 和 事件时间窗口(事件时间窗口需要设置时间特性)
     * 2、滚动窗口中,of方法可以设置2个参数,第一个是窗口的大小,第二个是时间偏移量(不设置时默认使用伦敦时间,当设置为-8时,为使用北京时间),偏移量设置时需要小于窗口大小
     */
    val windowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8)))   // 偏移量设置时需要小于窗口大小
        //            .window(TumblingEventTimeWindows.of(Time.seconds(5)))                     // 事件时间窗口
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.2、滑动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类
     * 2、滑动窗口中,of方法可以设置3个参数,第一个是窗口大小,第二个是滑动步长,第三个是偏移量
     */
    val windowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8)))  // 偏移量设置时需要小于窗口大小
        //            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))                // 事件时间窗口
        .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.3、会话窗口
    val windowStream_3: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(EventTimeSessionWindows.withGap(Time.minutes(10)))            // 事件时间会话窗口
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 2、使用timeWindow方法进行开窗
    // 2.1、滚动窗口
    val timeWindowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    // 2.2、滑动窗口
    val timeWindowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(15), Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    windowStream_1.print()
    
    env.execute("TimeWindowDemo")
  • 相关阅读:
    document.all还是document.getElementsByName
    B/S架构下软件开发技术参考
    "未将对象引用设置到对象的实例"异常的原因,请大家接下去 1、ViewState 对象为Unll。
    爱晚红枫的博客配色绿野仙踪
    适用于.text系统的博客皮肤Nature和purple
    XML数据岛,数据绑定
    【收藏①】17种正则表达式
    如何去除字符串中的多余空格?
    在NTFS分区中复制文件的同时如何复制权限
    我的后大学时代
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133628.html
Copyright © 2011-2022 走看看