zoukankan      html  css  js  c++  java
  • Flink 状态编程

      流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。有状态的计算则会基于多个事件输出结果。简单来说,有状态的计算不仅处理当前的数据,还要和以前接收到的数据进行比较、聚合等操作。所以需要一个状态来对之前的数据进行记录。
    方式一:  
      在如下的代码中,数据先进行keyBy,然后进行process,在处理中记录了上一次数据的温度状态。这种进行分区后维护的状态也叫键控状态(keyed state)。
    object StateTest {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val socketStream = env.socketTextStream("hadoop102", 7777)
    
        val dataStream: DataStream[SensorReading] = socketStream.map(d => {
          val arr = d.split(",")
          SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
        })
          .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
            override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
          })
    
        //温度变动超过10度报警
        val processStream = dataStream.keyBy(_.id)
          .process(new TempChangeAlert(10.0))
    
        dataStream.print("data stream")
    
        processStream.print("alert stream")
    
    
    
        env.execute("test")
      }
    
    }
    
    class TempChangeAlert(threshold: Double) extends KeyedProcessFunction[String, SensorReading, String] {
    
      //维护一个状态
      lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("tempState", classOf[Double]))
    
      override def processElement(value: SensorReading,
                                  ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
                                  out: Collector[String]): Unit = {
        //取出上一个温度
        val lastTemperature = lastTemp.value()
    
        val diff = (lastTemperature - value.temperature).abs
        if (diff > threshold) {
          out.collect(value.id + "," + lastTemperature + "," + value.temperature)
        }
        lastTemp.update(value.temperature)
      }
    }
    

    方式二:  

      如果用不到ProcessFunction中的时间服务等内容,可以简单使用富函数实现同样的功能。关键代码如下

    //温度变动超过10度报警
    val processStream = dataStream.keyBy(_.id)
      .flatMap(new TempChangeAlert2(10.0))
    

      自定义类继承富函数类

    class TempChangeAlert2(threshold:Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{
    
      private var lastTemp: ValueState[Double] = _
    
      override def open(parameters: Configuration): Unit = {
        lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("tempState2", classOf[Double]))
      }
    
      override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
        //取出上一个温度
        val lastTemperature = lastTemp.value()
    
        val diff = (lastTemperature - value.temperature).abs
        if (diff > threshold) {
          out.collect((value.id,lastTemperature,value.temperature))
        }
    
        lastTemp.update(value.temperature)
      }
      
    }
    

    方式三:

      直接使用带状态的flatMapWithState方法

    val alertStream3 = dataStream.keyBy(_.id)
      .flatMapWithState[(String,Double,Double),Double]{
        //入参1:stream中的数据
        //入参2:上一次的状态
        //出参1:输出的内容
        //出参2:更新后的状态
        case (input:SensorReading,None) => (List.empty,Some(input.temperature))
        case (input:SensorReading,lastTemp:Some[Double])=>{
          val diff = (input.temperature-lastTemp.get).abs
          if (diff>10.0){
            (List((input.id,lastTemp.get,input.temperature)),Some(input.temperature))
          }else{
            (List.empty,Some(input.temperature))
          }
        }
      }
    

      

      

  • 相关阅读:
    2013ACM多校联合(1)
    AcDream 1083 完美数 数位DP
    AcDream 1079 郭氏数
    AcDream 1084 同心树 几何
    AcDream 1078 递推数 嵌套循环节+矩阵快速幂
    AcDream 1081 平衡树 Tire树
    ZOJ1455 Schedule Problem 差分约束
    在程序中加载log4net配置,防止其他人看到配置文件
    sqlite错误 The database disk image is malformed database disk image is malformed 可解决
    由于这台计算机没有终端服务器客户端访问许可证,远程会话被中断。请与服务器管理员联系 解决
  • 原文地址:https://www.cnblogs.com/noyouth/p/12905327.html
Copyright © 2011-2022 走看看