zoukankan      html  css  js  c++  java
  • Spark-Streaming 常用流式计算算子

    UpdateStateByKey

    使用说明:维护key的状态。

    使用注意:使用该算子需要设置checkpoint

    使用示例:

    object UpdateStateByKeyTest {
      def main(args: Array[String]): Unit = {
       
        val conf=new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyTest")
        
        val  ssc=new StreamingContext(conf,Seconds(2));
        
        /**
         * 数据源
         */
        val fileDS=ssc.socketTextStream("hadoop1", 9999)
        /**
         * 需要设置一个checkpoint的目录
         * 因为我们的计算结果有中间状态,这些中间状态需要存储
         */
        ssc.checkpoint(".")
        val wordDS=fileDS.flatMap { line => line.split("	") }
        .map { word => (word,1) }
       /**
        * updateFunc: (Seq[Int], Option[S]) => Option[S]
        * updateFunc 这是一个匿名函数
        *  (Seq[Int], Option[S]) 两个参数
    * * 参数一:Seq[Int] Seq代表的是一个集合,int代表的是V的数据类型 * ---分组的操作,key相同的为一组 (hadoop,{1,1,1,1}) * 参数二:Option[S] S代表的是中间状态State的数据类型,S对于我们的这个wordcount例子来讲,应该是 * int类型。中间状态存储的是单词出现的次数。 hadoop -> 4 * * Option[S] 返回值 * */ val wordcountDS=wordDS.updateStateByKey((values:Seq[Int],state:Option[Int]) =>{ val currentCount= values.sum; //获取此次本单词出现的次数 val count=state.getOrElse(0);//获取上一次的结果 也就是中间状态 Some(currentCount+count); }) wordcountDS.print() ssc.start() ssc.awaitTermination() } }

     

    源码描述:

    def updateStateByKey[S: ClassTag](
          updateFunc: (Seq[V], Option[S]) => Option[S]
        ): DStream[(K, S)] = ssc.withScope {
        updateStateByKey(updateFunc, defaultPartitioner())
      }

    mapWithStage

    使用说明:维护key的状态。updateStateByKey的升级

    使用注意:使用该算子需要设置checkpoint

    使用示例:

    object MapWithStateTest {
      def main(args: Array[String]): Unit = {
        
        val conf=new SparkConf().setMaster("local[2]").setAppName("MapWithStateDemo")
        
        val  ssc=new StreamingContext(conf,Seconds(2));
        ssc.checkpoint(".")
        
        val fileDS=ssc.socketTextStream("hadoop1", 9999)
        val wordDstream =fileDS.flatMap { line => line.split("	") }
        .map { word => (word,1) }
        
        /**
         * word: String, one: Option[Int], state: State[Int]
         * 这个函数里面有三个参数
         * 第一个参数:word: String  代表的就是key
         * 第二个参数:one: Option[Int] 代表的就是value
         * 第三个参数:state: State[Int] 代表的就是状态(历史状态,也就是上次的结果)
         * 
         * hello,4
         * 
         * hello,1
         * 
         * hello,5
         */
          val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
          val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
          val output = (word, sum)
          state.update(sum)
          output
         }
          
        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
         
        val stateDstream = wordDstream.mapWithState(
          StateSpec.function(mappingFunc).initialState(initialRDD))
      
          stateDstream.print();
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    简单的购物车
    分页显示
    登录验证码的实现
    简单遗传算法代码
    jQ
    2.servlet的会话机制session
    1.servlet的会话机制cookie
    基本数据类型和引用数据类型的区别
    struts2-第一章-基础用法2
    struts2第一章-基本用法
  • 原文地址:https://www.cnblogs.com/holos/p/9075519.html
Copyright © 2011-2022 走看看