zoukankan      html  css  js  c++  java
  • Flink window Function

    package window

    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector

    /**
    * @author: create by maoxiangyi
    * @version: v1.0
    * @description: window
    * @date:2019 /6/4
    */
    object ProcessWordCount {
    def main(args: Array[String]): Unit = {
    //设置环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    //设置数据源
    env.addSource(new SourceFunction[String] {
    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (true) {
    ctx.collect("hello hadoop hello storm hello spark")
    Thread.sleep(1000)
    }
    }

    override def cancel(): Unit = {}
    })
    //计算逻辑
    .flatMap(_.split(" "))
    .map((_, 1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(10), Time.seconds(10))

    .process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
    var value = 0;
    elements.foreach(kv => {
    value = value + kv._2
    })
    out.collect(key, value)
    }
    })
    .print().setParallelism(1)
    env.execute("word count")
    }
    }
  • 相关阅读:
    8-4:Mysql数据库编程基础知识
    adb——Android的ADB工具使用
    BroadcastReceiver--Android广播机制
    怎样投篮更准
    《算法七》(深度寻路算法)
    《算法六》(有序二叉树)
    《算法五》(N叉树定义+增删改查)
    《算法四》(二分排序+汉诺塔问题)
    《算法三》(归并排序)
    《算法二》(希尔排序+基数排序+桶排序)
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/10977967.html
Copyright © 2011-2022 走看看