zoukankan      html  css  js  c++  java
  • Flink--Window apply

    和window的操作类似,只不过操作更加灵活,具体的操作需要在匿名内部类的方法中实现;当有比较复杂的需求时候,可以使用;

    object WindowApply {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val textStream: DataStream[String] = env.socketTextStream("localhost" , 9999)
        val flatmapdata: DataStream[String] = textStream.flatMap(x => x.split(","))
        val mapdata: DataStream[(String, Int)] = flatmapdata.map(line => (line,1))
        val keybyStream: KeyedStream[(String, Int), String] = mapdata.keyBy(line => line._1)
        val window: WindowedStream[(String, Int), String, TimeWindow] = keybyStream.timeWindow(Time.of(1 , TimeUnit.SECONDS) ,Time.of(100,TimeUnit.MILLISECONDS))
        val data = window.apply(new WindowFunction[(String, Int) , (String, Int) , String , TimeWindow] {
          override def apply(key: String,
                             window: TimeWindow,
                             input: Iterable[(String, Int)],
                             out: Collector[(String, Int)]): Unit = {
            var output = ""
            var index = 0
            for(in <- input){
                output += "key :" + in._1 + "   value:"+in._2
                index = index + 1
              out.collect(output , index)
              }
          }
        })
        data.print()
        env.execute()
    
      }
    }

    注意,例子中使用的是window,所以对应的匿名内部类是:WindowFunction

    如果使用的是windowAll,则需要使用的内部类是:AllWindowFunction

  • 相关阅读:
    2.2阶乘末尾0的个数,最低位1的位置
    samba服务器使用
    全排列的非递归算法
    2.1二进制数中1的个数
    2.3发帖水王
    C #与##的使用
    NEU1141the unique number
    【转】4习惯让你越休息越累
    二叉树由先序遍历和中序遍历输出后序遍历
    UVA100
  • 原文地址:https://www.cnblogs.com/niutao/p/10548586.html
Copyright © 2011-2022 走看看