zoukankan      html  css  js  c++  java
  • SparkStreaming之WordCount(UpdateStateByKey)

    代码:

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object UpdateStateByKeyWordCount {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("checkpointdirectory")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap( _.split(" ")).map(word => (word, 1))
    val result = words.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
    var newValue = state.getOrElse(0)
    for(value <- values) {
    newValue += value
    }
    Option(newValue)
    })
    result.print()

    ssc.start()
    ssc.awaitTermination()
    }
    }

    结果:

     

     

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    第七次作业-话费充值
    第七次作业-qq登录&跳转
    第六次作业
    第五次作业
    第四次作业
    jsp第二次作业
    第一次jsp作业
    第九次作业
    第八次作业
    第七次2作业
  • 原文地址:https://www.cnblogs.com/wddqy/p/12024343.html
Copyright © 2011-2022 走看看