zoukankan      html  css  js  c++  java
  • Flink学习(十八) 状态管理与状态编程

    Flink中的状态

    由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态;可以认为状态就是一个本地变量,可以被任务的业务逻辑访问;Flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以使开发人员可以专注于应用程序的逻辑。

    在Flink中,状态始终与特定算子相关联,为了使运行的Flink了解算子的状态,算子需要预先注册其状态。

    总的来说,有两种类型的状态:

    算子状态(Operator State): 算子状态的作用范围限定为算子任务。

    键控状态(Keyed State): 根据输入数据流中定义的键(key)来维护和访问。

    算子状态(Operator State)

    算子状态的作用范围限定为算子任务,由同一个并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言使共享的,算子状态不能由相同或不同算子的另一个任务访问。

     

    算子状态数据结构

    列表状态(List State): 将状态表示为一组数据的列表。

    联合列表状态(Union list state): 也将状态表示为数据的列表,它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

    广播状态(Broadcast state): 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

    键控状态(Keyed State)

    键控状态时根据输入数据流中定义的键(key)来维护和访问的,Flink为每隔key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态,当文物处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。

    键控状态数据结构

    值状态(Value state): 将状态表示为单个的值。

    列表状态(list state): 将状态表示为一组数据的列表。

    映射状态(Map state): 将状态表示为一组key-value对。

    聚合状态(Reducing state & aggregating State): 将状态表示为一个用于聚合操作的列表。

    键控状态的使用(值状态(Value state)举例:)

    声明一个键控状态

    //定义一个状态,用来保存上一个数据的温度值
    //用懒加载的方式,一开始定义的时候我们还不执行,等到调用的时候去执行
    //所有的状态都这么定义 当成一个变量直接用
    lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))

    读取状态

    //每来一条数据的时候,从状态中取出上一次的温度值

    val preTemp = lastTemp.value()

    对状态赋值

    //更新温度值

     lastTemp.update(value.temperature)

    状态后端(State Backends)

    每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每隔并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储,访问以及维护,由一个可插入的组件决定,这个组件就叫做状态呀

    后端(state backend),状态后端主要负责两件事:一是本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。

        //开启checkpoint 传入参数代表每隔多久进行checkpoint
        env.enableCheckpointing(60000)
    
        //状态后端
        //    env.setStateBackend(new MemoryStateBackend())
        //    env.setStateBackend(new RocksDBStateBackend(""))

    状态编程小应用(三种方式实现)

    /**
    * 需求:
    * 检测两次温度超过一定的范围的话 就报警
    * 传感器的温度 跳变太大
    */

    方式一:继承 KeyedProcessFunction 抽象类
    class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] {
      //定义一个状态变量,保存上次的温度值
      lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
    
    
      override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
    
        //获取上一次的温度值
        val lastTemp = lastTempState.value()
    
        //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
        val diff = (value.temperature - lastTemp).abs
        if(lastTemp==0.0){
          println(s"方法一,第一条数据进来,$value")
        }else if(diff > thresHold) {
          out.collect((value.id, lastTemp, value.temperature))
        }
    
        //做完一次处理之后将状态进行更新
        lastTempState.update(value.temperature)
      }
    }

     

    方式二:继承 RichFlatMapFunction 抽象类
    class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{
    
    
      //定义
      private var lastTempState:ValueState[Double] = _
    
    
      override def open(parameters: Configuration): Unit = {
        //初始化的时候声明变量
        lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double]))
      }
    
      override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
    
        //获取上一次的温度值
        val lastTemp = lastTempState.value()
    
        //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
        val diff = (value.temperature - lastTemp).abs
        if(lastTemp==0.0){
          println(s"方法二,第一条数据进来,$value")
        }else if(diff > thresHold) {
          out.collect((value.id, lastTemp, value.temperature))
        }
    
        //做完一次处理之后将状态进行更新
        lastTempState.update(value.temperature)
      }
    }

    方式三:keyBy后调用 flatMapWithState 方法

     //实现方式3
        val processedStream4 = dataStream.keyBy(_.id)
            .flatMapWithState[(String,Double,Double),Double]{
          //没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
          case (input:SensorReading,None) => {
            println(s"方法三,第一条数据进来,$input")
    
            (List.empty,Some(input.temperature))
          }
            //如果有状态的话,就与上一次的温度值比较差值,如果大于预设值,就报警
          case (input:SensorReading,lastTemp:Some[Double]) => {
            val diff = (input.temperature - lastTemp.get).abs
            if(diff>10.0){
              (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature))
            }else{
              (List.empty,Some(input.temperature))
            }
          }
        }

    完整代码:

    package com.wyh.statebackendApi
    
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    
    /**
      * 需求:
      * 检测两次温度超过一定的范围的话 就报警
      * 传感器的温度 跳变太大
      */
    
    //温度传感器读数样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)
    
    object Demo1 {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //开启checkpoint 传入参数代表每隔多久进行checkpoint
        env.enableCheckpointing(60000)
    
        //状态后端
        //    env.setStateBackend(new MemoryStateBackend())
        //    env.setStateBackend(new RocksDBStateBackend(""))
    
    
        val stream = env.socketTextStream("localhost", 7777)
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = stream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
          //===到来的数据是升序的,准时发车,用assignAscendingTimestamps
          //指定哪个字段是时间戳 需要的是毫秒 * 1000
          //      .assignAscendingTimestamps(_.timestamp * 1000)
          //===处理乱序数据
          //      .assignTimestampsAndWatermarks(new MyAssignerPeriodic())
          //==底层也是周期性生成的一个方法 处理乱序数据 延迟1秒种生成水位 同时分配水位和时间戳 括号里传的是等待延迟的时间
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
          override def extractTimestamp(t: SensorReading): Long = {
            t.timestamp * 1000
          }
        })
    
    //    val processedStream = dataStream.keyBy(_.id)
    //      .process(new TempIncreAlert())
    
        //实现方式1
        val processedStream2 = dataStream.keyBy(_.id)
          .process(new TempChangeAlert(10.0))
    
    
        //实现方式2
        val processedStream3 = dataStream.keyBy(_.id)
            .flatMap(new TempChangeAlertFlatMap(10.0))
    
        //实现方式3
        val processedStream4 = dataStream.keyBy(_.id)
            .flatMapWithState[(String,Double,Double),Double]{
          //没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
          case (input:SensorReading,None) => {
            println(s"方法三,第一条数据进来,$input")
    
            (List.empty,Some(input.temperature))
          }
            //如果有状态的话,就与上一次的温度值比较差值,如果大于预设值,就报警
          case (input:SensorReading,lastTemp:Some[Double]) => {
            val diff = (input.temperature - lastTemp.get).abs
            if(diff>10.0){
              (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature))
            }else{
              (List.empty,Some(input.temperature))
            }
          }
        }
    
        processedStream2.printToErr("processedStream2 data")
    
        processedStream3.printToErr("processedStream3 data")
    
        processedStream4.printToErr("processedStream4 data")
    
        dataStream.print("input data")
    
    
        env.execute("StateBackend Test")
    
      }
    
    }
    
    class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String] {
      //定义一个状态,用来保存上一个数据的温度值
      //用懒加载的方式,一开始定义的时候我们还不执行,等到调用的时候去执行
      //所有的状态都这么定义 当成一个变量直接用
      lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
      //定义一个状态用来保存定时器的时间戳
      lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long]))
    
      //判断温度连续上升
      //跟上一次数据进行比较 如果比较一直大 10秒种内进行报警
      //注册一个定时器 把上一次的数据保存成当前的状态
      override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
        //每来一条数据的时候,从状态中取出上一次的温度值
        val preTemp = lastTemp.value()
        var curTimerTs = currentTimer.value()
    
        //更新温度值
        lastTemp.update(value.temperature)
    
        //加个if判断最开始的温度是否为0来判断是否是第一条数据 温度上升且没有设置过定时器,则注册定时器
        if (preTemp == 0.0) {
          println("这是第一条数据进来")
        } else if ((value.temperature > preTemp) && (curTimerTs == 0L)) {
    
          val timerTs = ctx.timerService().currentProcessingTime() + 10000L
    
          //传入当前时间加1  是时间戳
          ctx.timerService().registerProcessingTimeTimer(timerTs)
          currentTimer.update(timerTs)
        } else if (value.temperature <= preTemp) {
          //如果温度下降 或者是第一条数据 删除定时器
          ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
          //删除定时器之后将状态清空
          currentTimer.clear()
        }
      }
    
      //在回调函数中执行定时器到的逻辑
      //当前的时间 ctx上下文 out输出信息
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
        //直接输出报警信息
        out.collect(ctx.getCurrentKey + "温度连续上升")
        //考虑真实情况,将状态都清空
        currentTimer.clear()
      }
    }
    
    class TempChangeAlert(thresHold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)] {
      //定义一个状态变量,保存上次的温度值
      lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
    
    
      override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
    
        //获取上一次的温度值
        val lastTemp = lastTempState.value()
    
        //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
        val diff = (value.temperature - lastTemp).abs
        if(lastTemp==0.0){
          println(s"方法一,第一条数据进来,$value")
        }else if(diff > thresHold) {
          out.collect((value.id, lastTemp, value.temperature))
        }
    
        //做完一次处理之后将状态进行更新
        lastTempState.update(value.temperature)
      }
    }
    
    
    class TempChangeAlertFlatMap(thresHold: Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{
    
    
      //定义
      private var lastTempState:ValueState[Double] = _
    
    
      override def open(parameters: Configuration): Unit = {
        //初始化的时候声明变量
        lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",classOf[Double]))
      }
    
      override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
    
        //获取上一次的温度值
        val lastTemp = lastTempState.value()
    
        //用当前的温度值和上次的温度值计算一个差值,如果大于预设值,就报警‘
        val diff = (value.temperature - lastTemp).abs
        if(lastTemp==0.0){
          println(s"方法二,第一条数据进来,$value")
        }else if(diff > thresHold) {
          out.collect((value.id, lastTemp, value.temperature))
        }
    
        //做完一次处理之后将状态进行更新
        lastTempState.update(value.temperature)
      }
    }

    运行测试:

    输入数据:

    结果:

  • 相关阅读:
    Virtuabox 虚拟机克隆方法
    CentOS 7 防火墙 出现Failed to start iptables.service: Unit iptables.service failed to load
    Linux系统下安装rz/sz命令及使用说明
    os、sys模块
    collections、random、hashlib、configparser、logging模块
    time、datatime模块
    正则表达式、re模块
    递归、二分查找法
    内置函数、匿名函数
    生成器进阶、生成器表达式
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12995128.html
Copyright © 2011-2022 走看看