知识点:
1、Flink 提供了 8 个 Process Function: ProcessFunction KeyedProcessFunction CoProcessFunction ProcessJoinFunction BroadcastProcessFunction KeyedBroadcastProcessFunction ProcessWindowFunction ProcessAllWindowFunction
2、KeyedProcessFunction重要方法:
a)processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素 都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
b)onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回 调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定 的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器 触发的时间信息(事件时间或者处理时间)。
3、Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前 watermark 的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定 时器。如果没有这个时间戳的定时器,则不执行
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时 器,如果没有此时间戳的定时器,则不执行。 当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 keyed streams 上面使用。
场景:
10s钟温度都是上升,则报警
1、处理代码案例1
package processFunction import com.yangwj.api.SensorReading import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * @author yangwj * @date 2021/1/10 21:25 * @version 1.0 */ object ProcessFunctionTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt" val input: DataStream[String] = env.readTextFile(inputFile) val dataStream = input.map(data => { val arr: Array[String] = data.split(",") SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) }) // .keyBy(_.id) // .process(new MykeyedProcessFunction()) val warningStream: DataStream[String] = dataStream.keyBy(_.id).process(new TempIncreWarning(10000L)) warningStream.print() env.execute("KeyedProcessFunction Test") } } //10s钟温度都是上升,则报警 class TempIncreWarning(inerval:Long) extends KeyedProcessFunction[String,SensorReading,String]{ //定义状态,保存上一个温度值进行比较,保存注册定时器的时间戳用于删除 lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double] ("last-temp",classOf[Double])) lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long] ("timer-ts",classOf[Long])) override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = { //先取状态 val lastTemp: Double = lastTempState.value() val timerTs: Long = timerTsState.value() //更新温度值 lastTempState.update(i.temperature) //当前温度值和上次温度进行比较 if(i.temperature > lastTemp && timerTs == 0){//初始化 //如果温度上升,且没有定时器,那么注册当前时间10s之后的定时器 val ts: Long = context.timerService().currentProcessingTime() + inerval context.timerService().registerProcessingTimeTimer(ts) timerTsState.update(ts) } else if(i.temperature < lastTemp){ //如果温度下降,那么删除定时器 context.timerService().deleteProcessingTimeTimer(timerTs) timerTsState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("传感器" + ctx.getCurrentKey+"的温度连续"+inerval/1000 + "秒连续上升") timerTsState.clear() } } //KeyedProcessFunction 工能测试 class MykeyedProcessFunction extends KeyedProcessFunction[String,SensorReading,String]{ //定时器 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { } override def open(parameters: Configuration): Unit = { val valueState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate", classOf[Int])) } //context上下文 override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = { //注册触发,onTimer() context.timerService().registerEventTimeTimer(context.timestamp()+60000L) //删除定时器 // context.timerService().deleteEventTimeTimer() } }