zoukankan      html  css  js  c++  java
  • FLINK基础(98): DS算子与窗口(9)单流算子(7)KeyBy(3) KeyedProcessFunction(2)

    KeyedProcessFunction

      KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:

    • processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。
    • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如firing trigger的时间信息(事件时间或者处理时间)。

    时间服务和定时器

    Context和OnTimerContext所持有的TimerService对象拥有以下方法:

    • currentProcessingTime(): Long 返回当前处理时间
    • currentWatermark(): Long 返回当前水位线的时间戳
    • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的timer。当processing time到达定时时间时,触发timer。
    • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time timer。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
    • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
    • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

      当定时器timer触发时,执行回调函数onTimer()。processElement()方法和onTimer()方法是同步(不是异步)方法,这样可以避免并发访问和操作状态。

      针对每一个key和timestamp,只能注册一个定期器。也就是说,每一个key可以注册多个定时器,但在每一个时间戳只能注册一个定时器。KeyedProcessFunction默认将所有定时器的时间戳放在一个优先队列中。在Flink做检查点操作时,定时器也会被保存到状态后端中。

    实例一

    举个例子说明KeyedProcessFunction如何操作KeyedStream。

    下面的程序展示了如何监控温度传感器的温度值,如果温度值在一秒钟之内(processing time)连续上升,报警。

    scala version

    val warnings = readings
      .keyBy(r => r.id)// 此处键的类型是String,与接下来一处标红处对应
      .process(new TempIncreaseAlertFunction)
    复制代码
      class TempIncrease extends KeyedProcessFunction[String, SensorReading, String] {
        // 懒加载;
        // 状态变量会在检查点操作时进行持久化,例如hdfs
        // 只会初始化一次,单例模式
        // 在当机重启程序时,首先去持久化设备寻找名为`last-temp`的状态变量,如果存在,则直接读取。不存在,则初始化。
        // 用来保存最近一次温度
        // 默认值是0.0
        lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
          new ValueStateDescriptor[Double]("last-temp", Types.of[Double])
        )
    
        // 默认值是0L
        lazy val timer: ValueState[Long] = getRuntimeContext.getState(
          new ValueStateDescriptor[Long]("timer", Types.of[Long])
        )
    
        override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
          // 使用`.value()`方法取出最近一次温度值,如果来的温度是第一条温度,则prevTemp为0.0
          val prevTemp = lastTemp.value()
          // 将到来的这条温度值存入状态变量中
          lastTemp.update(value.temperature)
    
          // 如果timer中有定时器的时间戳,则读取
          val ts = timer.value()
    
          if (prevTemp == 0.0 || value.temperature < prevTemp) {
            ctx.timerService().deleteProcessingTimeTimer(ts)
            timer.clear()
          } else if (value.temperature > prevTemp && ts == 0) {
            val oneSecondLater = ctx.timerService().currentProcessingTime() + 1000L
            ctx.timerService().registerProcessingTimeTimer(oneSecondLater)
            timer.update(oneSecondLater)
          }
        }
    
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
          out.collect("传感器ID是 " + ctx.getCurrentKey + " 的传感器的温度连续1s上升了!")
          timer.clear()
        }
      }
    复制代码

    实例二

      通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)

    mport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.api.common.typeinfo.TypeInformation
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.api.scala.typeutils.Types
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
     
    object ProcessFuncationScala {
     
     
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
        val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)
        typeAndData
        .keyBy(
    0)//此处键的类型是Tuple,与接下来标红处相对应
        .process(new MyprocessFunction()).print("结果") env.execute() } /** * 实现: * 根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警 */ class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{ //统计间隔时间 val delayTime : Long = 1000 * 10 lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]])) override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = { printf("定时器触发,时间为:%d,状态为:%s,key为:%s ",timestamp,state.value(),ctx.getCurrentKey) if(state.value()._2==0){ //该时间段数据为0,进行预警 printf("类型为:%s,数据为0,预警 ",state.value()._1) } //定期数据统计完成后,清零 state.update(state.value()._1,0) //再次注册定时器执行 val currentTime: Long = ctx.timerService().currentProcessingTime() ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = { printf("状态值:%s,state是否为空:%s ",state.value(),(state.value()==null)) if(state.value() == null){ //获取时间 val currentTime: Long = ctx.timerService().currentProcessingTime() //注册定时器十秒后触发 ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime) printf("定时器注册时间:%d ",currentTime+10000L) state.update(value._1,value._2.toInt) } else{ //统计数据 val key: String = state.value()._1 var count: Long = state.value()._2 count += value._2.toInt //更新state值 state.update((key,count)) } println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value) printf("状态值:%s ",state.value()) //返回处理后结果 out.collect("处理后返回数据->"+value) } } }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13782285.html

  • 相关阅读:
    jQuery 语法
    HTML DOM Document 对象
    JavaScript
    JavaScript Cookies
    JavaScript 计时事件
    九度OJ 1352 和为S的两个数字
    九度0J 1374 所有员工年龄排序
    九度OJ 1373 整数中1出现的次数(从1到n整数中1出现的次数)
    九度OJ 1370 数组中出现次数超过一半的数字
    九度OJ 1361 翻转单词顺序
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13782285.html
Copyright © 2011-2022 走看看