zoukankan      html  css  js  c++  java
  • Flink的Windows

    在讲解windows的众多操作之前,需要讲解一个概念:

    源源不断的数据流是无法进行统计工作的,因为数据流没有边界,就无法统计到底有多少数据经过了这个流。也无法统计数据流中的最大值,最小值,平均值,累加值等信息。

    如果在数据流上,截取固定大小的一部分,这部分是可以进行统计的。 截取方式主要有两种,

    关于window的理论+实践

    tumbling-time-window (无重叠数据)

    1.红绿灯路口会有汽车通过,一共会有多少汽车通过,无法计算。因为车流源源不断,计算没有边界。

    2.统计每15秒钟通过红路灯的汽车数量,第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆。。。

    1.发送命令
    nc -lk 9999
    
    2.发送内容
    9,3
    9,2
    9,7
    4,9
    2,6
    1,5
    2,3
    5,7
    5,4

    代码:

    object Window {
      def main(args: Array[String]): Unit = {
        //TODO time-window
        //1.创建运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //2.定义数据流来源
        val text = env.socketTextStream("localhost", 9999)
    
        //3.转换数据格式,text->CarWc
        case class CarWc(sensorId: Int, carCnt: Int)
        val ds1: DataStream[CarWc] = text.map {
          line => {
            val tokens = line.split(",")
            CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
          }
        }
    
        //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒
        //也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。
        val ds2: DataStream[CarWc] = ds1
          .keyBy("sensorId")
          .timeWindow(Time.seconds(5))
          .sum("carCnt")
    
        //5.显示统计结果
        ds2.print()
    
        //6.触发流计算
        env.execute(this.getClass.getName)
    
      }
    }

    sliding-time-window (有重叠数据)

    //TODO 2.tumbling-time-window(有重叠)
    //1.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.定义数据流来源
    val text = env.socketTextStream("localhost", 9999)
    
    //3.转换数据格式,text->CarWc
    case class CarWc(sensorId: Int, carCnt: Int)
    val ds1: DataStream[CarWc] = text.map {
      line => {
        val tokens = line.split(",")
        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
      }
    }
    //4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒
    //也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
    val ds2: DataStream[CarWc] = ds1
      .keyBy("sensorId")
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum("carCnt")
    
    //5.显示统计结果
    ds2.print()
    
    //6.触发流计算
    env.execute(this.getClass.getName)

    tumbling-count-window (无重叠数据)

    按照个数进行统计,比如:

    每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量

    1.发送命令
    nc -lk 9999
    2.发送内容
    9,3
    9,2
    9,7
    4,9
    2,6
    1,5
    2,3
    5,7
    5,4
    //TODO tumbling-count-window (无重叠数据)
    //1.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.定义数据流来源
    val text = env.socketTextStream("localhost", 9999)
    
    //3.转换数据格式,text->CarWc
    case class CarWc(sensorId: Int, carCnt: Int)
    val ds1: DataStream[CarWc] = text.map {
      (f) => {
        val tokens = f.split(",")
        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
      }
    }
    //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5
    //按照key进行收集,对应的key出现的次数达到5次作为一个结果
    val ds2: DataStream[CarWc] = ds1
      .keyBy("sensorId")
      .countWindow(5)
      .sum("carCnt")
    
    //5.显示统计结果
    ds2.print()
    
    //6.触发流计算
    env.execute(this.getClass.getName)

    sliding-count-window (有重叠数据)

    同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3

    //TODO sliding-count-window(有重叠)
    //1.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    //2.定义数据流来源
    val text = env.socketTextStream("localhost", 9999)
    
    //3.转换数据格式,text->CarWc
    case class CarWc(sensorId: Int, carCnt: Int)
    val ds1: DataStream[CarWc] = text.map {
      (f) => {
        val tokens = f.split(",")
        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
      }
    }
    //4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据
    //也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量
    val ds2: DataStream[CarWc] = ds1
      .keyBy("sensorId")
      .countWindow(5, 3)
      .sum("carCnt")
    
    //5.显示统计结果
    ds2.print()
    
    //6.触发流计算
    env.execute(this.getClass.getName)

    window总结

    1.flink支持两种划分窗口的方式(time和count)

    如果根据时间划分窗口,那么它就是一个time-window

    如果根据数据划分窗口,那么它就是一个count-window

    2.flink支持窗口的两个重要属性(size和interval)

    如果size=interval,那么就会形成tumbling-window(无重叠数据)

    如果size>interval,那么就会形成sliding-window(有重叠数据)

    如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。

    3.通过组合可以得出四种基本窗口

    time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

    time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

    count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

    count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

  • 相关阅读:
    mybatis中的#和$的区别
    spring ioc三种注入方式
    JSP中动态INCLUDE与静态INCLUDE的区别
    j2ee部分
    面试 框架部分
    注册Jdbc驱动程序的三种方式
    union和union all有什么不同?
    面试 JavaWeb 部分
    Cordova插件开发(iOS/Android)--看这篇就够了
    程序员,请不要只看技术
  • 原文地址:https://www.cnblogs.com/niutao/p/10548500.html
Copyright © 2011-2022 走看看