zoukankan      html  css  js  c++  java
  • spark streaming检查点使用

    import org.apache.spark._
    import org.apache.spark.streaming._
    
    
    /**
      * Created by code-pc on 16/3/14.
      */
    object Pi {
    
      def functionToCreateContext():StreamingContext={
    
        def updateStateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
          Some(runningCount.getOrElse(0) + newValues.sum)
        }
    
        val conf = new SparkConf().setMaster("local[5]").setAppName("AndrzejApp")
        val ssc = new StreamingContext(conf, Seconds(4))
        ssc.checkpoint("/tmp")
    
        val line = ssc.socketTextStream("127.0.0.1", 9997)
        val words=line.flatMap(_.split(" "))
    
        val pairs=words.map((_,1))
    
        val retDS = pairs.updateStateByKey[Int](updateStateFunction _)
    
        //retDS.repartition(1).saveAsTextFiles("/tmp/out")
        retDS.print()
    
        ssc
      }
    
    
      def main(args: Array[String]) {
    
        val context=StreamingContext.getOrCreate("/tmp",functionToCreateContext)
    
        context.start()
        context.awaitTermination()
      }
    }

    tcp: nc -lk 9997

    If the checkpointDirectory exists, then the context will be recreated from the checkpoint data.

  • 相关阅读:
    操作系统
    Typora
    C++
    linux sftp 和scp 运用
    python GIL锁与多cpu
    django model 高级进阶
    django template 模板
    django view 视图控制之数据返回的视图函数
    django 创建管理员用户
    jango 模型管理数据model入门
  • 原文地址:https://www.cnblogs.com/ggzone/p/10121131.html
Copyright © 2011-2022 走看看