zoukankan      html  css  js  c++  java
  • Spark Streaming HA

    Driver HA:

    1、在提交application的时候,添加 --supervise 选项,如果Driver挂掉,会自动启动一个Driver

    2、代码层面恢复Driver

    3、在恢复checkpoint中数据的时候,把旧的逻辑也一起给恢复了

    主要的作用就是当SparkStreaming 停机之后,下次启动的时候,让代码知道上一次停机的数据处理节点在什么地方,避免从头开始执行

    代码逻辑如下:

    object SparkStreamingDriverHA {
    
      /**
        * Driver HA :
        * 1、在提交application的时候, 添加 --supervise选项, 如果Driver挂掉,会自动启动一个Driver
        * 2、代码层面恢复Driver
        * 3、在恢复的同时,如果新添加了处理逻辑,会将旧的处理逻辑恢复
        * @param args
        */
    
      //设置checkpoint目录:
      val ckDir = "./data/streamingCheckpoint"
    
    
    
    
      def main(args: Array[String]): Unit = {
    
        /**
          *  StreamingContext.getorCreate(ckDir,CreateStreamingContext)
          * 这个方法首先会从CKDir目录中获取StreamingContext【因为StreamingContext是序列化存储在checkpoint中,回复时会尝试反序列化这些object】
          * 如果用修改过的class可能会导致错误,此时需要跟换checkpoibt目录或者删除checkpoint目录中的数据,程序才能跑起来
          *
          */
    
        val ssc = StreamingContext.getOrCreate(ckDir, CreateStreamingContext)
    
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    
      }
    
    
       def CreateStreamingContext(): StreamingContext = {
    
        println("=============create new StreamingContext===============")
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("DriverHA")
    
        val ssc = new StreamingContext(conf, Durations.seconds(5))
    
        ssc.sparkContext.setLogLevel("Error")
    
         /**
           * 默认checkpoint存储
           * 1、配置信息
           * 2、DStream操作逻辑
           * 3、job的执行进度
           * 4、offset
           */
    
         ssc.checkpoint(ckDir)
         val lines = ssc.textFileStream("./data/streamingCopyFile")
         val words = lines.flatMap(line=> {line.split(" ")})
         val pairWords = words.map(word=>{(word,1)})
         val result = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})
    
       /*  result.print()*/
    
         /**
           * 更改逻辑
           *
           *
           */
    
          result.foreachRDD(pairRdd=>{
            pairRdd.filter(one=>{
              println("===========filter============")
              true
            }).foreach(println)
    
    
          })
    
    
         ssc
    
       }

    当重启之后,如果checkpoint记录了上一次的sparkContext,就会按照上一次的执行逻辑执行,否则就按照最新的执行逻辑去执行

    val ssc = StreamingContext.getOrCreate(ckDir, CreateStreamingContext)

    CKDir: 可以找到上一次停止时的执行位置

    CreateStreamingContext: 在checkpoint文件夹中没有数据的情况下,会按照最新的代码逻辑进行数据执行 ,CreateStreamingContext方法就代表最新的执行逻辑

  • 相关阅读:
    Linux设置系统时间并同步到硬件
    centos6.9安装mysql5.7.22并设置初始密码
    mysql修改数据库文件存储位置
    mysql1.7(mysql优化,mysql-mmm软件介绍,mysql高可用集群。)
    mysql1.6(主从同步,数据读写分离)
    Mysql1.5(binlog增量备份与恢复,innobackupex)
    Mysql1.4(用户授权,权限撤销;图形管理工具,数据备份-恢复)
    MYSQL1.3(存储引擎,数据导入导出,管理表记录)
    MYSQL1.2(字段管理,索引类型)
    mysql1.1(搭建,数据类型,基本使用)
  • 原文地址:https://www.cnblogs.com/wcgstudy/p/11090193.html
Copyright © 2011-2022 走看看