zoukankan      html  css  js  c++  java
  • SparkStreaming之checkpoint检查点

    一.简介

      流应用程序必须保证7*24全天候运行,因此必须能够适应与程序逻辑无关的故障【例如:系统故障、JVM崩溃等】。为了实现这一点,SparkStreaming需要将足够的信息保存到容错存储系统中,以便它可以从故障中恢复。

      检查点有两种类型。

        1.元数据检查点

          将定义流式计算的信息保存到容错存储系统【如HDFS等】。这用于从运行流应用程序所在的节点的故障中恢复。

          元数据包括:

            1.配置

              用于创建流应用程序的配置。

            2.DStream操作

              定义流应用程序的DStream操作集。

            3.不完整的批次

              在任务队列中而尚未完成的批次。

        2.数据检查点

          将生成的RDD保存到可靠的存储系统。在一些跨多个批次组合数据的有状态转换中,这是必须的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖关系链的长度随着时间而增加。为了避免恢复时间的这种无限增加【与依赖链成正比】,有状态变换的中间RDD周期性地检查以存储到可靠的存储系统中,以切断依赖链。

      总而言之,元数据检查点主要用于从节点故障中恢复,而如果使用状态转换,即使对于基本功能也需要数据或RDD检查点。

    二.需要设置检查点的情况

      1.有状态转换的使用,如果在应用程序中使用了updateStateByKey或reduceByKeyAndWindow,则必须提供检查点以缓存之前批次的中间结果。

      2.从运行应用程序的节点故障中恢复,元数据检查点用于使用进度信息进行恢复。

      备注:在没有上述状态转换的简单流应用程序中可以不使用检查点。在这种情况下,节点故障的恢复将是部分性的【某些以接收但未处理的数据可能会丢失】。

    三.配置检查点

      可以通过在容错,可靠的文件系统【例如:HDFS、S3或Windows文件系统】中设置目录来启用检查点,检查点信息将保存到该文件系统中。使用:streamingContext.checkpoint(checkpointDirectory)来设置的。这将允许使用上述状态转换。此外,如果要使应用程序从节点故障中恢复,则应重写流应用程序以使其具有以下行为。

      1.当程序首次启动时,它将创建一个新的StreamingContext,设置所有流后调用start()。

      2.当程序在失败后重新启动时,它将从检查点目录中的检查点数据重新创建StreamingContext。

    四.代码实现

     1 package big.data.analyse.streaming
     2 
     3 import org.apache.log4j.{Level, Logger}
     4 import org.apache.spark.SparkConf
     5 import org.apache.spark.streaming.{Seconds, StreamingContext}
     6 
     7 /**
     8   * Created by zhen on 2019/8/15.
     9   */
    10 object Checkpoint {
    11   def functionToCreateContext():StreamingContext = {
    12     val conf = new SparkConf().setMaster("local[2]").setAppName("StreaingTest")
    13     val ssc = new StreamingContext(conf, Seconds(10))
    14     val lines = ssc.socketTextStream("192.168.245.137", 9999)
    15 
    16     val words = lines.flatMap(_.split(" "))
    17     val pairs = words.map(word=>(word,1)).reduceByKey(_+_)
    18     pairs.foreachRDD(row => row.foreach(println))
    19     ssc.checkpoint("D:\checkpoint")
    20     ssc
    21   }
    22   def main(args: Array[String]) {
    23     /**
    24       * 设置日志级别
    25       */
    26     Logger.getLogger("org").setLevel(Level.WARN) // 设置日志级别
    27 
    28     /**
    29       * 获取入口及设置checkpoint检查点
    30       */
    31     val ssc = StreamingContext.getOrCreate("D:\checkpoint", functionToCreateContext _)
    32 
    33     ssc.start()
    34     ssc.awaitTermination()
    35     ssc.stop()
    36   }
    37 }

    五.结果

      入参:

        

      结果:

        

    六.总结

      1.需要确保节点进程在失败时会自动重启,这只能通过部署基础结构来完成。

      2.检查点的默认间隔是批处理间隔的倍数,且至少为10秒。通常DStream的5~10个滑动间隔为检查点间隔是一个很好的设置。

  • 相关阅读:
    02_5if switch分支与循环语句
    02_4运算符
    02_3程序格式
    Shell脚本调用ftp上传文件
    02_2数据类型转换
    02_1标识符_关键字_数据类型
    01_3Java Application初步
    01_2Java开发环境的下载 安装 配置
    Mac 安装MySQL
    用 Homebrew 带飞你的 Mac
  • 原文地址:https://www.cnblogs.com/yszd/p/11358535.html
Copyright © 2011-2022 走看看