zoukankan      html  css  js  c++  java
  • Flink之Window的使用(3):WindowFunction的使用

    相关文章链接

    Flink之Window的使用(1):计数窗口

    Flink之Window的使用(2):时间窗口

    Flink之Window的使用(3):WindowFunction的使用

    具体实现代码如下所示:

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env
        .socketTextStream("localhost", 9999)
        .map(new MyMapToSensorReading)
        .keyBy(_.id)
        .timeWindow(Time.seconds(5))
    
    // 1、incremental aggregation functions(增量聚合函数)(来一条数据,计算一次)
    // 1.1、ReduceFunction 增量集合函数(使用匿名内部类)
    val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {
        override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
            SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)
        }
    })
    // 1.2、AggregateFunction(相比reduce,优势是可以指定累加值类型,输入类型和输出类型也可以不一样)
    val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {
        // 初始化累加值
        override def createAccumulator(): Long = 0L
    
        // 累加方法
        override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1
    
        // 获取结果
        override def getResult(accumulator: Long): Long = accumulator
    
        // 分区的归并操作
        override def merge(a: Long, b: Long): Long = a + b
    })
    
    // 2、full window functions(全窗口函数)
    /**
     * 知识点:
     *  1、apply方法中,可以添加WindowFunction对象,会将该窗口中所有的数据先缓存,当时间到了一次性计算
     *  2、需要设置4个类型,分别是:输入类型,输出类型,keyBy时key的类型(如果用字符串来划分key类型为Tuple,窗口类型
     *  3、所有的计算都在apply中进行,可以通过window获取窗口的信息,比如开始时间,结束时间
     */
    val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
            out.collect((window.getStart, input.size))
        }
    })
    
    // 3、窗口函数中其他API
    val otherResult: DataStream[SensorReading] = sensorStream
        .allowedLateness(Time.seconds(1))                       // 允许处理迟到的数据
        .sideOutputLateData(new OutputTag[SensorReading]("late"))    // 将迟到的数据放入侧输出流
        .reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
    // 获取侧输出流(侧输出流为迟到很久的数据,当allowedLateness和watermark之后还是没到的数据会放入侧输出流,可以在最后统一处理)
    val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))
    
    
    // 打印输出
    applyResult.print()
    
    env.execute("WindowFunctionDemo")
  • 相关阅读:
    进程通信之信号通信
    分数化小数
    台湾大学公开课《概率》第五周一道不会作的作业题 ,一种看不懂的解法
    网络子系统53_ip协议分片重组_内存阈值
    Centos 6.3 Realtek Audio Driver Compile
    I.MX6 PHY fixup 调用流程 hacking
    I.MX6 AR8031 寄存器操作
    I.MX6 ethtool 移植
    I.MX6 U-Boot ping网络
    Android tcpdump 使用
  • 原文地址:https://www.cnblogs.com/yangshibiao/p/14133638.html
Copyright © 2011-2022 走看看