zoukankan      html  css  js  c++  java
  • SparkStreaming updateStateByKey 保存记录信息

    object SparkStreaming_StateFul {

    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setMaster("local[2]")
    .setAppName(this.getClass.getSimpleName)
    .set("spark.executor.memory", "2g")
    .set("spark.cores.max", "8")
    .setJars(Array("E:\ScalaSpace\Spark_Streaming\out\artifacts\Spark_Streaming.jar"))
    val context = new SparkContext(conf)

    val updateFunc = (values : Seq[Int],state : Option[Int]) => {
    val currentCount = values.foldLeft(0)(_+_)
    val previousCount = state.getOrElse(0)
    查看是否存在,如果存在直接获取
    Some(currentCount + previousCount)
    }

    //step1 create streaming context
    val ssc = new StreamingContext(context,Seconds(10))
    ssc.checkpoint(
    ".")


    //step2 create a networkInputStream on get ip:port and count the words in input stream of delimited text
    val lines = ssc.socketTextStream("218.193.154.79",12345)

    val data = lines.flatMap(_.split(" "))
    val wordDstream = data.map(x => (x,1))

    //使用updateStateByKey 来更新状态
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
    }
    }

    ssc.checkPoint 如果在集群上运行会报出如下的错误:
    org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[9] at print at SparkStreaming_StateFul.scala:43(0) has different number of partitions from original RDD MapPartitionsRDD[8] at updateStateByKey at SparkStreaming_StateFul.scala:41(2)
    	at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:73)
    	at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)

    这是因为没有将文件保存到hdfs环境中导致的








  • 相关阅读:
    快速排序中的partition函数的枢纽元选择,代码细节,以及其标准实现
    并发包的线程池第二篇--Executors的构造
    并发包的线程池第一篇--ThreadPoolExecutor执行逻辑
    Servlet学习笔记
    npm + webpack +react
    取消eclipse启动时的subclipse Usage弹窗
    关于webpack最好的文档
    WebStorm2016.1 破解 激活
    微信web调试工具
    webstorm下设置sass
  • 原文地址:https://www.cnblogs.com/zDanica/p/5471611.html
Copyright © 2011-2022 走看看