zoukankan      html  css  js  c++  java
  • updataStateByKey算子的使用

    updataStateByKeyApp.scala

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object updataStateByKeyApp extends App {
    
      //配置入口点
      val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
      val ssc= new StreamingContext(conf, Seconds(1))
    
      //设置checkpoint的目录
      ssc.checkpoint(".")
    
      //输入数据流(DStream)
      val lines = ssc.socketTextStream("localhost", 9999)
    
      //todo...
      val pairs = lines.flatMap(_.split(" ")).map((_,1))
      val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
      //输出打印到控制台
      runningCounts.print()
    
      //启动StreamingContext,接收数据,然后处理数据
      ssc.start()
      ssc.awaitTermination()
    
    
      /**
        * 把当前的数据去更新已有的或者是老的数据
        * @param currentValues  当前的
        * @param preValues 老的
        * @return
        */
      def updateFunction(currentValues: Seq[Int], preValues : Option[Int]): Option[Int] = {
        val current =  currentValues.sum
        val pre = preValues.getOrElse(0)
    
        Some(current + pre)
      }
    }
    
  • 相关阅读:
    ubuntu后台运行命令
    jquery获取焦点位于的元素
    thymeleaf 处理模板为字符串
    以字符串形式获取excel单元格中的内容
    ajax 上传文件
    springboot logback
    javaService
    Assistant For Chess Cards
    E生活
    易兼职-找工作兼职平台
  • 原文地址:https://www.cnblogs.com/suixingc/p/updatastatebykey-suan-zi-de-shi-yong.html
Copyright © 2011-2022 走看看