zoukankan      html  css  js  c++  java
  • Flink ProcessFunction API自定义事件处理

    作用:可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。例如,Flink SQL 就是使用 Process Function 实现的。

    完整代码:

    object WindowTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val socketStream = env.socketTextStream("hadoop102", 7777)
    
        val dataStream: DataStream[SensorReading] = socketStream.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
            override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
          })
    
        //自定义温度报警处理
        val processStream = dataStream.keyBy(_.id).process(new TempIncreAlert)
    
        //输出自定义业务逻辑
        processStream.print("process stream")
    
        //打印原始的dataStream
        dataStream.print("data stream")
    
    
        env.execute("window test")
      }
    
    }
    
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    //温度报警处理类
    class TempIncreAlert extends KeyedProcessFunction[String, SensorReading, String] {
    
      //定义一个状态,保存上一条数据的温度
      lazy val savedTemper: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
    
      //定义一个状态,保存定时器的时间戳
      lazy val savedTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long]))
    
    
      override def processElement(i: SensorReading, context: KeyedProcessFunction[String, SensorReading, String]#Context, collector: Collector[String]): Unit = {
    
        //取出上一个温度值
        val preTemp = savedTemper.value()
    
        //更新温度值
        savedTemper.update(i.temperature)
    
        //取出上一个定时器的时间戳
        val preTimerTs = savedTimer.value()
    
        //如果温度在30度以上上升,则注册定时器
        if (i.temperature > preTemp && preTemp > 30 && preTimerTs == 0) {
          //注册3秒后触发的定时器
          val ts = context.timerService().currentProcessingTime()
          context.timerService().registerProcessingTimeTimer(ts + 3000)
    
          //保存刚注册的定时器时间戳
          savedTimer.update(ts)
        } else if (i.temperature < preTemp || preTemp == 0) {
          //删除定时器
          context.timerService().deleteProcessingTimeTimer(preTimerTs)
          //清空状态
          savedTimer.clear()
        }
    
      }
    
      //定时器回调函数
      override def onTimer(timestamp: Long,
                           ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
                           out: Collector[String]): Unit = {
        //输出报警信息
        out.collect(ctx.getCurrentKey + "温度超过30度且连续上升!")
      }
    }
    

    端口输入数据

    [atguigu@hadoop102 ~]$ nc -lk 7777
    sensor_1, 1547718200, 30
    sensor_1, 1547718200, 29    
    sensor_1, 1547718200, 31
    sensor_1, 1547718200, 33

    控制台打印

    data stream> SensorReading(sensor_1,1547718200,30.0)
    data stream> SensorReading(sensor_1,1547718200,29.0)
    data stream> SensorReading(sensor_1,1547718200,31.0)
    data stream> SensorReading(sensor_1,1547718200,33.0)
    process stream> sensor_1温度超过30度且连续上升!

      

  • 相关阅读:
    spark streaming 概述
    spark sql 的性能调优
    LeetCode 106. Construct Binary Tree from Inorder and Postorder Traversal (用中序和后序树遍历来建立二叉树)
    LeetCode 105. Construct Binary Tree from Preorder and Inorder Traversal (用先序和中序树遍历来建立二叉树)
    LeetCode 90. Subsets II (子集合之二)
    LeetCode 88. Merge Sorted Array(合并有序数组)
    LeetCode 81. Search in Rotated Sorted Array II(在旋转有序序列中搜索之二)
    LeetCode 80. Remove Duplicates from Sorted Array II (从有序序列里移除重复项之二)
    LeetCode 79. Word Search(单词搜索)
    LeetCode 78. Subsets(子集合)
  • 原文地址:https://www.cnblogs.com/noyouth/p/12891844.html
Copyright © 2011-2022 走看看