zoukankan      html  css  js  c++  java
  • 流处理 —— Spark Streaming中的操作(状态管理函数 updateStateByKey和mapWithState)

    状态管理函数
      Spark Streaming中状态管理函数包括updateStateByKey和mapWithState,都是用来统计全局key的状态的变化的。它们以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加,在有新的数据信息进入或更新时。能够让用户保持想要的不论任何状状。

    1. updateStateByKey

    概念

    updateStateByKey会统计全局的key的状态,不管又没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateByKey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会被删除(state可以是任意类型的数据的结构)。

    适用场景

    updateStateByKey可以用来统计历史数据,每次输出所有的key值。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标

    使用实例

    条件

    1)首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加 。

    2)updateStateBykey要求必须要设置checkpoint点

    3)updateStateByKey 方法中 updateFunc就要传入的参数,。Seq[V]表示当前key对应的所有值,Option[S] 是当前key的历史状态,返回的是新的封装的数据。

    代码

    object SparkStreamingUpdateStateByKey {
    
      Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder()
          .appName("UpdateStateByKey")
          .master("local[3]")
          .getOrCreate()
    
        val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    
        // 必须设置checkpoint
        ssc.checkpoint("file:\D:\workspace\idea\silent\src\main\resources\checkpoint")
    
        val wordCount: DStream[(String, Int)] = ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
          .flatMap(_.split(" "))
          .map((_, 1))
          .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            /** state.getOrElse(0) 得到历史可以值,如果key值不存在,则为0 */
            Option(values.foldLeft(state.getOrElse(0))(_ + _))
          })
    
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }

    2.  mapWithState

    概念

    mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,只会返回batch中存在的key值统计,类似于增量的感觉。

    适用场景

    mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。

    使用实例

    条件

    1)如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值

    2)还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。

    3) checkpoint 不是必须的

    object SparkStreamingMapWithState {
    
      Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    
      def main(args: Array[String]) {
    
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("UpdateStateByKeyDemo")
          .getOrCreate()
    
        val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    
        val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
    
    
        // 可以不设置checkpoint
        ssc.checkpoint("file:\D:\workspace\idea\silent\src\main\resources\checkpoint")
    
        val wordCount: MapWithStateDStream[String, Int, Int, Any] =
          ssc.textFileStream("file:\D:\workspace\idea\silent\src\main\resources\stream")
            .map((_, 1))
            .mapWithState(StateSpec.function(func).initialState(initialRDD).timeout(Seconds(30)))
    
        wordCount.print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    
      /**
       * word : 代表统计的单词
       * option:代表的是历史数据(使用option是因为历史数据可能有,也可能没有,如第一次进来的数据就没有历史记录)
       * state:代表的是返回的状态
       */
      val func = (word: String, option: Option[Int], state: State[Int]) => {
    
        if (state.isTimingOut()) {
          println(word + "is timeout")
        } else {
          // getOrElse(0)不存在赋初始值为零
          val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
          // 单词和该单词出现的频率/ 获取历史数据,当前值加上上一个批次的该状态的值
          val wordFreq = (word, sum)
          state.update(sum)
          wordFreq
        }
      }
    
    }

    updateStateByKey和mapWithState的区别

      updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

      mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。

  • 相关阅读:
    一个简单的knockout.js 和easyui的绑定
    knockoutjs + easyui.treegrid 可编辑的自定义绑定插件
    Knockout自定义绑定my97datepicker
    去除小数后多余的0
    Windows Azure Web Site (15) 取消Azure Web Site默认的IIS ARR
    Azure ARM (1) UI初探
    Azure Redis Cache (3) 创建和使用P级别的Redis Cache
    Windows Azure HandBook (7) 基于Azure Web App的企业官网改造
    Windows Azure Storage (23) 计算Azure VHD实际使用容量
    Windows Azure Virtual Network (11) 创建VNet-to-VNet的连接
  • 原文地址:https://www.cnblogs.com/yyy-blog/p/12674166.html
Copyright © 2011-2022 走看看