zoukankan      html  css  js  c++  java
  • Flink--Window apply

    和window的操作类似,只不过操作更加灵活,具体的操作需要在匿名内部类的方法中实现;当有比较复杂的需求时候,可以使用;

    object WindowApply {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val textStream: DataStream[String] = env.socketTextStream("localhost" , 9999)
        val flatmapdata: DataStream[String] = textStream.flatMap(x => x.split(","))
        val mapdata: DataStream[(String, Int)] = flatmapdata.map(line => (line,1))
        val keybyStream: KeyedStream[(String, Int), String] = mapdata.keyBy(line => line._1)
        val window: WindowedStream[(String, Int), String, TimeWindow] = keybyStream.timeWindow(Time.of(1 , TimeUnit.SECONDS) ,Time.of(100,TimeUnit.MILLISECONDS))
        val data = window.apply(new WindowFunction[(String, Int) , (String, Int) , String , TimeWindow] {
          override def apply(key: String,
                             window: TimeWindow,
                             input: Iterable[(String, Int)],
                             out: Collector[(String, Int)]): Unit = {
            var output = ""
            var index = 0
            for(in <- input){
                output += "key :" + in._1 + "   value:"+in._2
                index = index + 1
              out.collect(output , index)
              }
          }
        })
        data.print()
        env.execute()
    
      }
    }

    注意,例子中使用的是window,所以对应的匿名内部类是:WindowFunction

    如果使用的是windowAll,则需要使用的内部类是:AllWindowFunction

  • 相关阅读:
    Linux基础知识[1]【ACL权限】
    docker 入门学习篇【基本命令与操作】
    centos7.1下 Docker环境搭建
    RHEL6.5下更新python至2.7版本
    Github初学者探索
    vmware下linux虚拟机传文件解决方案之 xftp
    mysql 常用操作命令
    常用DNS记录
    常见网络协议端口号整理
    DNS原理及其解析过程 精彩剖析
  • 原文地址:https://www.cnblogs.com/niutao/p/10548586.html
Copyright © 2011-2022 走看看