zoukankan      html  css  js  c++  java
  • spark streaming检查点使用

    import org.apache.spark._
    import org.apache.spark.streaming._
    
    
    /**
      * Created by code-pc on 16/3/14.
      */
    object Pi {
    
      def functionToCreateContext():StreamingContext={
    
        def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
          Some(runningCount.getOrElse(0) + newValues.sum)
        }
    
        val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
        val ssc = new StreamingContext(conf, Seconds(4))
        ssc.checkpoint("/tmp")
    
        val line = ssc.socketTextStream("127.0.0.1", 9997)
        val words=line.flatMap(_.split(" "))
    
        val pairs=words.map((_,1))
    
        val retDS = pairs.updateStateByKey[Int](updateStateFunction _)
    
        //retDS.repartition(1).saveAsTextFiles("/tmp/out")
        retDS.print()
    
        ssc
      }
    
    
      def main(args: Array[String]) {
    
        val context=StreamingContext.getOrCreate("/tmp",functionToCreateContext)
    
        context.start()
        context.awaitTermination()
      }
    }

    tcp: nc -lk 9997

    If the checkpointDirectory exists, then the context will be recreated from the checkpoint data.

  • 相关阅读:
    修改spring boot 启动logo
    查看jvm常用命令
    intellij IDEA破解
    hdu 新生晚会
    How many prime numbers(素数)
    2077 汉诺塔IV
    Factorial
    双人黑白块
    EasyX
    七夕情人节
  • 原文地址:https://www.cnblogs.com/ggzone/p/7512881.html
Copyright © 2011-2022 走看看