作用:可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。例如,Flink SQL 就是使用 Process Function 实现的。
完整代码:
object WindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream = env.socketTextStream("hadoop102", 7777) val dataStream: DataStream[SensorReading] = socketStream.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000 }) //自定义温度报警处理 val processStream = dataStream.keyBy(_.id).process(new TempIncreAlert) //输出自定义业务逻辑 processStream.print("process stream") //打印原始的dataStream dataStream.print("data stream") env.execute("window test") } } case class SensorReading(id: String, timestamp: Long, temperature: Double) //温度报警处理类 class TempIncreAlert extends KeyedProcessFunction[String, SensorReading, String] { //定义一个状态,保存上一条数据的温度 lazy val savedTemper: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) //定义一个状态,保存定时器的时间戳 lazy val savedTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long])) override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = { //取出上一个温度值 val preTemp = savedTemper.value() //更新温度值 savedTemper.update(i.temperature) //取出上一个定时器的时间戳 val preTimerTs = savedTimer.value() //如果温度在30度以上上升,则注册定时器 if (i.temperature > preTemp && preTemp > 30 && preTimerTs == 0) { //注册3秒后触发的定时器 val ts = context.timerService().currentProcessingTime() context.timerService().registerProcessingTimeTimer(ts + 3000) //保存刚注册的定时器时间戳 savedTimer.update(ts) } else if (i.temperature < preTemp || preTemp == 0) { //删除定时器 context.timerService().deleteProcessingTimeTimer(preTimerTs) //清空状态 savedTimer.clear() } } //定时器回调函数 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { //输出报警信息 out.collect(ctx.getCurrentKey + "温度超过30度且连续上升!") } }
端口输入数据
[atguigu@hadoop102 ~]$ nc -lk 7777 sensor_1, 1547718200, 30 sensor_1, 1547718200, 29 sensor_1, 1547718200, 31 sensor_1, 1547718200, 33
控制台打印
data stream> SensorReading(sensor_1,1547718200,30.0) data stream> SensorReading(sensor_1,1547718200,29.0) data stream> SensorReading(sensor_1,1547718200,31.0) data stream> SensorReading(sensor_1,1547718200,33.0) process stream> sensor_1温度超过30度且连续上升!