zoukankan      html  css  js  c++  java
  • Flink之Window的使用(2):时间窗口

    相关文章链接

    Flink之Window的使用(1):计数窗口

    Flink之Window的使用(2):时间窗口

    Flink之Window的使用(3):WindowFunction的使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val sensorStream: DataStream[SensorReading] = env
        .socketTextStream("localhost", 9999)
        .map(new MyMapToSensorReading)
    
    // 1、使用window方法进行开窗设置
    // 1.1、滚动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类,分别是创建处理时间窗口 和 事件时间窗口(事件时间窗口需要设置时间特性)
     * 2、滚动窗口中,of方法可以设置2个参数,第一个是窗口的大小,第二个是时间偏移量(不设置时默认使用伦敦时间,当设置为-8时,为使用北京时间),偏移量设置时需要小于窗口大小
     */
    val windowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(TumblingProcessingTimeWindows.of(Time.days(5), Time.hours(-8)))   // 偏移量设置时需要小于窗口大小
        //            .window(TumblingEventTimeWindows.of(Time.seconds(5)))                     // 事件时间窗口
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.2、滑动窗口
    /**
     * 知识点:
     * 1、在该方法中,可以使用 TumblingProcessingTimeWindows 和 TumblingEventTimeWindows 类
     * 2、滑动窗口中,of方法可以设置3个参数,第一个是窗口大小,第二个是滑动步长,第三个是偏移量
     */
    val windowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(SlidingProcessingTimeWindows.of(Time.days(7), Time.days(1), Time.hours(-8)))  // 偏移量设置时需要小于窗口大小
        //            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))                // 事件时间窗口
        .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(5)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 1.3、会话窗口
    val windowStream_3: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        //            .window(EventTimeSessionWindows.withGap(Time.minutes(10)))            // 事件时间会话窗口
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    // 2、使用timeWindow方法进行开窗
    // 2.1、滚动窗口
    val timeWindowStream_1: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    // 2.2、滑动窗口
    val timeWindowStream_2: DataStream[SensorReading] = sensorStream
        .keyBy(_.id)
        .timeWindow(Time.seconds(15), Time.seconds(5))
        .reduce((x, y) => SensorReading(x.id, y.timestamp, x.temperature + y.temperature))
    
    windowStream_1.print()
    
    env.execute("TimeWindowDemo")
  • 相关阅读:
    SharePoint 2013 代码实现自定义的站点模版创建Site Collection
    today's learning of english 2
    拉普拉斯变换
    today's learning of english 1
    Kubernetes集群(RKE)安装ArgoCD排坑
    Jenkins Maven镜像Surefire插件运行失败
    Jenkins加载Spring扩展库出错排查
    Elasticsearch BM25相关度评分算法超详细解释
    简单方便的堡垒机自动登录脚本
    完美的Vim学习体验:VimReference
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133628.html
Copyright © 2011-2022 走看看