zoukankan      html  css  js  c++  java
  • spark streaming updateStateByKey 用法

    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount <hostname> <port>")
          System.exit(1)
        }
    
    
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
        //使用updateStateByKey前需要设置checkpoint
        ssc.checkpoint("hdfs://master:8020/spark/checkpoint")
    
        val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
          //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
          val currentCount = currValues.sum
          // 已累加的值
          val previousCount = prevValueState.getOrElse(0)
          // 返回累加后的结果,是一个Option[Int]类型
          Some(currentCount + previousCount)
        }
    
        val lines = ssc.socketTextStream(args(0), args(1).toInt)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
    
        //val currWordCounts = pairs.reduceByKey(_ + _)
        //currWordCounts.print()
    
        val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
        totalWordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    spark streaming updateStateByKey 用法

  • 相关阅读:
    柱状图最大的矩形
    单词搜索
    最小覆盖子串
    颜色分类
    编辑距离
    X的平方根
    二进制求和
    最大子序和
    N皇后
    java8-14-时间API
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7372168.html
Copyright © 2011-2022 走看看