和window的操作类似,只不过操作更加灵活,具体的操作需要在匿名内部类的方法中实现;当有比较复杂的需求时候,可以使用;
object WindowApply { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val textStream: DataStream[String] = env.socketTextStream("localhost" , 9999) val flatmapdata: DataStream[String] = textStream.flatMap(x => x.split(",")) val mapdata: DataStream[(String, Int)] = flatmapdata.map(line => (line,1)) val keybyStream: KeyedStream[(String, Int), String] = mapdata.keyBy(line => line._1) val window: WindowedStream[(String, Int), String, TimeWindow] = keybyStream.timeWindow(Time.of(1 , TimeUnit.SECONDS) ,Time.of(100,TimeUnit.MILLISECONDS)) val data = window.apply(new WindowFunction[(String, Int) , (String, Int) , String , TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { var output = "" var index = 0 for(in <- input){ output += "key :" + in._1 + " value:"+in._2 index = index + 1 out.collect(output , index) } } }) data.print() env.execute() } }
注意,例子中使用的是window,所以对应的匿名内部类是:WindowFunction
如果使用的是windowAll,则需要使用的内部类是:AllWindowFunction