zoukankan      html  css  js  c++  java
  • spark streaming updateStateByKey 用法

    object NetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: NetworkWordCount <hostname> <port>")
          System.exit(1)
        }
    
    
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(10))
        //使用updateStateByKey前需要设置checkpoint
        ssc.checkpoint("hdfs://master:8020/spark/checkpoint")
    
        val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
          //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
          val currentCount = currValues.sum
          // 已累加的值
          val previousCount = prevValueState.getOrElse(0)
          // 返回累加后的结果,是一个Option[Int]类型
          Some(currentCount + previousCount)
        }
    
        val lines = ssc.socketTextStream(args(0), args(1).toInt)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
    
        //val currWordCounts = pairs.reduceByKey(_ + _)
        //currWordCounts.print()
    
        val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
        totalWordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

    spark streaming updateStateByKey 用法

  • 相关阅读:
    函数柯里化
    常用正则
    校验table行内的form编辑
    前端代码 读取excel表格数据
    cocos2d-x 帧动画学习
    Linux 下vim配置
    Android开发笔记 二
    cocos2d-x CCDictionary类学习
    Android开发笔记
    Cococs2d-x移植到Window下的问题
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7372168.html
Copyright © 2011-2022 走看看