zoukankan      html  css  js  c++  java
  • Flink window Function

    package window

    import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, WindowedStream}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow

    /**
    * @author: create by maoxiangyi
    * @version: v1.0
    * @description: window
    * @date:2019 /6/4
    */
    object FoldWordCount {
    def main(args: Array[String]): Unit = {
    //设置环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    //设置数据源
    val window: WindowedStream[(String, Int), String, TimeWindow] = env.addSource(new SourceFunction[String] {
    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (true) {
    ctx.collect("hello hadoop hello storm hello spark")
    Thread.sleep(1000)
    }
    }

    override def cancel(): Unit = {}
    })
    //计算逻辑
    .flatMap(_.split(" "))
    .map((_, 1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(10), Time.seconds(10))


    window.fold(("", 0), new FoldFunction[(String, Int), (String, Int)] {
    override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
    (value._1, value._2 + accumulator._2)
    }
    }).print()

    env.execute("word count")
    }
    }
  • 相关阅读:
    函数的对象
    函数的调用
    函数的参数
    函数的返回值
    定义函数的三种方式
    网络的瓶颈效应
    编程语言分类
    计算机操作系统
    【建议收藏】2020最全阿里,腾讯,美团面试题总结(附答案整理)
    建议收藏!2020阿里面试题(JVM+Spring Cloud+微服务)上
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/10977910.html
Copyright © 2011-2022 走看看