zoukankan      html  css  js  c++  java
  • 流处理 —— Spark Streaming中的操作(状态管理函数 updateStateByKey和mapWithState)

    状态管理函数
      Spark Streaming中状态管理函数包括updateStateByKey和mapWithState,都是用来统计全局key的状态的变化的。它们以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。能够让用户保持想要的不论任何状状。

    1. updateStateByKey

    概念

    updateStateByKey会统计全局的key的状态,不管又没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateByKey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会被删除(state可以是任意类型的数据的结构)。

    适用场景

    updateStateByKey可以用来统计历史数据,每次输出所有的key值。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标

    使用实例

    条件

    1)首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加 。

    2)updateStateBykey要求必须要设置checkpoint点

    3)updateStateByKey 方法中 updateFunc就要传入的参数,。Seq[V]表示当前key对应的所有值,Option[S] 是当前key的历史状态,返回的是新的封装的数据。

    代码

    object SparkStreamingUpdateStateByKey {
    
      Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder()
          .appName("UpdateStateByKey")
          .master("local[3]")
          .getOrCreate()
    
        val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    
        // 必须设置checkpoint
        ssc.checkpoint("file:\D:\workspace\idea\silent\src\main\resources\checkpoint")
    
        val wordCount: DStream[(String, Int)] = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
          .flatMap(_.split(" "))
          .map((_, 1))
          .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            /** state.getOrElse(0) 得到历史可以值,如果key值不存在,则为0 */
            Option(values.foldLeft(state.getOrElse(0))(_ + _))
          })
    
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }

    2.  mapWithState

    概念

    mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只会返回batch中存在的key值统计,类似于增量的感觉。

    适用场景

    mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。

    使用实例

    条件

    1)如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值

    2)还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。

    3) checkpoint 不是必须的

    object SparkStreamingMapWithState {
    
      Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("UpdateStateByKeyDemo")
          .getOrCreate()
    
        val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    
        val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
    
    
        // 可以不设置checkpoint
        ssc.checkpoint("file:\D:\workspace\idea\silent\src\main\resources\checkpoint")
    
        val wordCount: MapWithStateDStream[String, Int, Int, Any] =
          ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
            .map((_, 1))
            .mapWithState(StateSpec.function(func).initialState(initialRDD).timeout(Seconds(30)))
    
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
      /**
       * word : 代表统计的单词
       * option:代表的是历史数据(使用option是因为历史数据可能有,也可能没有,如第一次进来的数据就没有历史记录)
       * state:代表的是返回的状态
       */
      val func = (word: String, option: Option[Int], state: State[Int]) => {
    
        if (state.isTimingOut()) {
          println(word + "is timeout")
        } else {
          // getOrElse(0)不存在赋初始值为零
          val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
          // 单词和该单词出现的频率/ 获取历史数据,当前值加上上一个批次的该状态的值
          val wordFreq = (word, sum)
          state.update(sum)
          wordFreq
        }
      }
    
    }

    updateStateByKey和mapWithState的区别

      updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

      mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。

  • 相关阅读:
    1.5寻找倒数第k个元素
    MySQL基础之分组函数
    MySQL基础之单行函数
    MySQL基础查询(一)
    gem install redis Fetching: redis-4.1.3.gem (100%) ERROR: Error installing redis: redis requires Ruby version >= 2.3.0.
    SQL语句
    使用kill无法杀死mysql进程
    Ansible学习笔记
    rsync报错:rsync: chgrp ".hejian.txt.D1juHb" (in backup) failed: Operation not permitted (1)
    Linux磁盘管理
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/12674166.html
Copyright © 2011-2022 走看看