zoukankan      html  css  js  c++  java
  • Flink window Function

    package window.non_keyed

    import org.apache.flink.api.common.functions.FlatMapFunction
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    import org.apache.flink.api.scala._

    import scala.collection.mutable

    /**
    * @author: create by maoxiangyi
    * @version: v1.0
    * @description: window
    * @date:2019 /6/4
    */
    object ProcessAllWindowWordCount {
    def main(args: Array[String]): Unit = {
    //设置环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    //设置数据源
    env.addSource(new SourceFunction[String] {
    override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (true) {
    ctx.collect("hello hadoop hello storm hello spark")
    Thread.sleep(1000)
    }
    }

    override def cancel(): Unit = {}
    })
    //计算逻辑
    .flatMap(_.split(" "))
    .map((_, 1))
    .timeWindowAll(Time.seconds(10), Time.seconds(10))

    .process(new ProcessAllWindowFunction[(String, Int), mutable.Map[String, Int], TimeWindow] {
    override def process(context: Context, elements: Iterable[(String, Int)], out: Collector[mutable.Map[String, Int]]): Unit = {
    val wordCountMap = mutable.Map[String, Int]()
    elements.foreach(kv => {
    wordCountMap.put(kv._1, wordCountMap.get(kv._1).getOrElse(0) + kv._2)
    })
    out.collect(wordCountMap)
    }
    }).flatMap(new FlatMapFunction[mutable.Map[String, Int], (String, Int)] {
    override def flatMap(value: mutable.Map[String, Int], out: Collector[(String, Int)]): Unit = {
    value.foreach(out.collect(_))
    }
    })
    .print()
    //提交任务
    env.execute("word count")
    }
    }
  • 相关阅读:
    结对项目--四则运算“软件”之升级版
    个人项目--多元四则运算
    《构建之法》1-5章后感
    git bash的安装与配置
    随笔
    numpy数组及处理:效率对比
    完整的中英文词频统计
    组合数据类型,英文词频统计
    9.13作业2(完整温度转换、数字游戏、解析身份证号...)
    Mad Libs游戏,华氏与摄氏转换
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/10978243.html
Copyright © 2011-2022 走看看