zoukankan      html  css  js  c++  java
  • Spark Streaming带状态更新

    带状态的更新是使用的updateStateByKey方法,里面传入一个函数,函数要自己写,注意需要设置checkpoint

    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * 需要设置checkpoint
      * 有状态的计算
      */
    class UpdataByKey {
    
    }
    object UpdataByKey{
        //自定义函数进行带状态更新
      def addFunc (currValue:Seq[Int],point:Option[Int])={
        Some(currValue.sum+point.getOrElse(0));
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("UpdataByKey").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc,Seconds(10))
        val topics = "xiaopeng";
        val topicMap = topics.split(",").map((_,2)).toMap
        val lines = KafkaUtils.createStream(ssc,"192.168.10.219:2181","han",topicMap)
        val words = lines.flatMap(line =>line._2.split(" ")).map(word =>(word,1))
        words.updateStateByKey[Int](addFunc _)
        words.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 相关阅读:
    RaisedButton
    Icon
    RichText
    GridView
    HTML常用标签
    HTML语法
    HTML简史
    17_继承
    16_Math
    16_ArrayList
  • 原文地址:https://www.cnblogs.com/itboys/p/6860778.html
Copyright © 2011-2022 走看看