zoukankan      html  css  js  c++  java
  • Flink之ProcessFunction案例

    知识点:

    1、Flink 提供了 8 个 Process Function: 
        ProcessFunction  
        KeyedProcessFunction 
        CoProcessFunction 
        ProcessJoinFunction 
        BroadcastProcessFunction 
        KeyedBroadcastProcessFunction
        ProcessWindowFunction 
        ProcessAllWindowFunction

    2、
    KeyedProcessFunction重要方法:

      a)processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素 都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。 

      b)onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回 调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定 的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器 触发的时间信息(事件时间或者处理时间)。

     3、Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法

        currentProcessingTime(): Long 返回当前处理时间 

        currentWatermark(): Long 返回当前 watermark 的时间戳 
        registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的 processing time 的定时器。当 processing time 到达定时时间时,触发 timer。 

        registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
        deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定 时器。如果没有这个时间戳的定时器,则不执行
        deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时 器,如果没有此时间戳的定时器,则不执行。 当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 keyed streams 上面使用。

    场景:

    10s钟温度都是上升,则报警

    1、处理代码案例1

    package processFunction
    
    import com.yangwj.api.SensorReading
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    /**
     * @author yangwj
     * @date 2021/1/10 21:25
     * @version 1.0
     */
    object ProcessFunctionTest {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
        val input: DataStream[String] = env.readTextFile(inputFile)
    
        val dataStream = input.map(data => {
          val arr: Array[String] = data.split(",")
          SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        })
    //      .keyBy(_.id)
    //      .process(new MykeyedProcessFunction())
    
        val warningStream: DataStream[String] = dataStream.keyBy(_.id).process(new TempIncreWarning(10000L))
        warningStream.print()
        env.execute("KeyedProcessFunction Test")
    
      }
    }
    
    
    //10s钟温度都是上升,则报警
    class TempIncreWarning(inerval:Long) extends KeyedProcessFunction[String,SensorReading,String]{
    
      //定义状态,保存上一个温度值进行比较,保存注册定时器的时间戳用于删除
      lazy val lastTempState:ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double] ("last-temp",classOf[Double]))
      lazy val timerTsState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long] ("timer-ts",classOf[Long]))
    
      override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = {
        //先取状态
        val lastTemp: Double = lastTempState.value()
        val timerTs: Long = timerTsState.value()
        //更新温度值
        lastTempState.update(i.temperature)
    
        //当前温度值和上次温度进行比较
        if(i.temperature > lastTemp && timerTs == 0){//初始化
          //如果温度上升,且没有定时器,那么注册当前时间10s之后的定时器
          val ts: Long = context.timerService().currentProcessingTime() + inerval
          context.timerService().registerProcessingTimeTimer(ts)
          timerTsState.update(ts)
    
        } else if(i.temperature < lastTemp){
          //如果温度下降,那么删除定时器
          context.timerService().deleteProcessingTimeTimer(timerTs)
          timerTsState.clear()
        }
    
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
        out.collect("传感器" + ctx.getCurrentKey+"的温度连续"+inerval/1000 + "秒连续上升")
        timerTsState.clear()
      }
    }
    
    
    //KeyedProcessFunction 工能测试
    class MykeyedProcessFunction extends KeyedProcessFunction[String,SensorReading,String]{
    
      //定时器
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    
      }
    
      override def open(parameters: Configuration): Unit = {
        val valueState: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("mystate", classOf[Int]))
    
      }
    
      //context上下文
      override def processElement(i: SensorReading,
           context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = {
        //注册触发,onTimer()
        context.timerService().registerEventTimeTimer(context.timestamp()+60000L)
    
        //删除定时器
        //    context.timerService().deleteEventTimeTimer()
      }
    }
    
    
  • 相关阅读:
    Intellij idea使用过程中遇到的一些问题
    微信小程序云函数Windows下安装wx-server-sdk
    计算文件MD5的方法
    IntelliJ IDEA取消自动import所有import *
    在IntelliJ IDEA中使用VIM
    STS启动时卡在loading加载 dashboard.ui
    NoSuchMethodError
    BeanUtils.copyProperties和PropertyUtils.copyProperties的使用区别
    一致性哈希算法介绍
    Maven修改本地仓库路径
  • 原文地址:https://www.cnblogs.com/ywjfx/p/14264923.html
Copyright © 2011-2022 走看看