场景
将Stu1(name:String,location:String,time:Long)按名字分组并转为Stu2(name:String,location_time:String)
以fromElements获取源没问题
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.{TimeCharacteristic, watermark}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object Test {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val ds: DataStream[Stu1] = env.fromElements(
Stu1("aaa", "beijing", 100.0, 1603764498486L),
Stu1("aaa", "hangzhou", 100.0, 1603764499486L),
Stu1("bbb", "wuhan", 100.0, 1603764500486L),
Stu1("bbb", "tianjing", 100.0, 1603764501486L),
Stu1("ccc", "shanghai", 100.0, 1603764502486L),
Stu1("eee", "changsha", 100.0, 1603764503486L),
Stu1("eee", "yiyang", 100.0, 1603764504486L),
Stu1("fff", "changde", 100.0, 1603764505486L),
Stu1("fff", "haebing", 100.0, 1603764506486L),
Stu1("fff", "wuchang", 100.0, 1603764507486L),
Stu1("iii", "xian", 100.0, 1603764508486L),
Stu1("jjj", "huaihua", 100.0, 1603764509486L),
Stu1("kkk", "huarong", 100.0, 1603764510486L),
Stu1("lll", "jinmeng", 100.0, 1603764511486L),
Stu1("mmm", "huahehaote", 100.0, 1603764512486L),
Stu1("mmm", "xinjiang", 100.0, 1603764513486L),
Stu1("nn", "niemenggu", 100.0, 1603764514486L),
Stu1("ll", "eluosi", 100.0, 1603764515486L),
Stu1("kk", "meiguo", 100.0, 1603764516486L),
Stu1("aaa", "yinbgguuo", 100.0, 1603764517486L),
Stu1("aaa", "aaa", 100.0, 1603764518486L),
Stu1("ccc", "bbbb", 100.0, 1603764519486L)
)
val data: DataStream[Stu1] = ds.assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))
data.print("data--->" )
val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
value.print("结果------->")
env.execute()
}
}
case class Stu1(name:String,location:String,money:Double,time:Long)
case class Stu2(name:String,location_time:String)
class MyProcessFunction extends ProcessWindowFunction[Stu1, Stu2, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[Stu1], out: Collector[Stu2]): Unit = {
var map = mutable.Map[String, Long]()
for (elem <- elements) {
map += (elem.location -> elem.time)
}
println("key--->" + key + " map的大小----->" + map.size)
val stu = Stu2(key, map.toString())
out.collect(stu)
}
}
class MyPeriodicAssigner(maxLateless:Long) extends AssignerWithPeriodicWatermarks[Stu1]{
var maxTimeStamp:Long = 0L
override def getCurrentWatermark: Watermark = new watermark.Watermark(maxTimeStamp-maxLateless)
override def extractTimestamp(element: Stu1, previousElementTimestamp: Long): Long = {
val current_timestamp: Long = element.time
maxTimeStamp = Math.max(current_timestamp,maxTimeStamp)
current_timestamp
}
}
以socketTextStream却不出结果,原因时没有指定setStreamTimeCharacteristic和并行度
触发要基于这个事件时间,不指定并行度就会默认为8个(线程数),所以就没有数据输出
object Test {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// env.setParallelism(1)
val ds: DataStream[String] = env.socketTextStream("test01", 9999)
val data: DataStream[Stu1] = ds.map(line => {
val array: Array[String] = line.split(",")
//println(array(0) + " " + array(1) + " " + array(2).toDouble + " " + array(3).toLong)
Stu1(array(0), array(1), array(2).toDouble, array(3).toLong)
}).assignTimestampsAndWatermarks(new MyPeriodicAssigner(1))
data.print("data--->" )
val value: DataStream[Stu2] = data.keyBy(_.name).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new MyProcessFunction())
value.print("结果------->")
env.execute()
}
}