zoukankan      html  css  js  c++  java
  • Spark Streaming概念学习系列之Spark Streaming容错

    Spark Streaming容错

    检查点机制-checkpoint

      

      什么是检查点机制?

        Spark Streaming 周期性地把应用数据存储到诸如HDFS 或Amazon S3 这样的可靠存储系统中以供恢复时使用的机制叫做检查点机制

     

      检查点机制的作用

        控制发生失败时需要重算的状态数

        Spark Streaming通过lineage重算,检查点机制则可以控制需要在lineage中回溯多远

        提供驱动器程序容错

        如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序,并让驱动器程序从检查点恢复,这样SparkStreaming就可以读取之前运行的程序处理数据的进度,并从那里继续。

     

      checkpoint 两种类型的数据:

      Metadata(元数据) checkpointing - 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:

        1.配置 - 用于创建该 streaming application 的所有配置

        2.DStream 操作 - DStream 一些列的操作

        3.未完成的 batches - 那些提交了 job 但尚未执行或未完成的 batches。

      RDD Data checkpointing - 保存已生成的RDDs至可靠的存储。

     

      metadata checkpointing 主要用来恢复 driver;而 RDD数据的 checkpointing 对于stateful 转换操作是必要的。

      对于window和stateful操作必须checkpoint(Spark Streaming会检查并给出提示)。

      通过StreamingContext的checkpoint来指定目录,默认按照batch Duration来做checkpoint。

      通过DStream的checkpoint指定当前DStream的间隔时间。

      间隔必须是slide interval的倍数。

     

     

     

     

     

    检查点机制-checkpoint的形式

      checkpoint 的形式是将类 Checkpoint的实例序列化后写入外部存储,值得一提的是,有专门的一个线程来做将序列化后的 checkpoint 写入外部存储的操作。类 Checkpoint 包含以下数据:

      除了 Checkpoint 类,还有 CheckpointWriter 类用来导出 checkpoint,CheckpointReader 用来导入 checkpoint。

    检查点机制-checkpoint的局限

      Spark Streaming 的 checkpoint 机制看起来很美好,却有一个硬伤。

      前面提到最终刷到外部存储的是类 Checkpoint 对象序列化后的数据。那么在 Spark Streaming application 重新编译后,再去反序列化 checkpoint 数据就会失败。这个时候就必须新建 StreamingContext。

      针对这种情况,在我们结合 Spark Streaming + kafka 的应用中,需要自行维护了消费的 offsets,这样一来即使重新编译 application,还是可以从需要的 offsets 来消费数据。对于其他的情况需要大家根据实际需求自行处理。

     

     

     

     

    Driver节点容错

      如果你想让你的application能从driver失败中恢复,你的application要满足:

      若application为首次重启,将创建一个新的 StreamContext 实例

      如果application是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例

    def createStreamingContext() = {
    ...
    val sparkConf = new SparkConf().setAppName(“xxx")
    val ssc = new StreamingContext(sparkConf , Seconds(1))
    ssc.checkpoint(checkpointDir)
    ssc
    }
    ...
    val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

      除调用 getOrCreate 外, 你还需要编写在驱动器程序崩溃时重启驱动器进程的代码。

      在 yarn 模式下,driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点上启动一个新的 ApplicationMaster。

      Spark standalone模式下:

      ./bin/spark-submit --deploy-mode cluster --supervise --master spark://... App.jar

     

     

     

     

     

    Worker节点容错

      为了应对工作节点失败的问题,Spark Streaming 使用与Spark 的容错机制相同的方法,根据不同的输入源,分两种情况:

      使用可靠输入源—如HDFS

      由于输入数据是可靠的,所有数据都可以重新计算,因此不会丢失数据

      使用基于网络接收的输入源—例如Kafka、Flume等

      接收到的数据会在集群的不同节点间复制(默认复本数为2)

      一个工作节点失效,在恢复时可以从另一个工作节点的数据中重新计算

      如果是接收数据的工作节点失效,那就可能丢失数据(数据已经收到但是还未复制到其他节点,也没有处理完是失效了)

     

     

     

     

     

      处理保证

      所有转换操作为精确一次保证(Spark Streaming 工作节点的容错保障)

      输出操作在把结果保存到外部存储时,写结果的任务可能因故障而执行多次,一些数据可能也就被写了多次

      可以使用事务操作来写入外部系统(即原子化地将一个RDD分区一次写入),或者设计幂等的更新操作(即多次运行同一个更新操作仍生成相同的结果)。比如Spark Streaming 的saveAs...File 操作会在一个文件写完时自动将其原子化地移动到最终位置上,以此确保每个输出文件只存在一份。

     

     

     

     

  • 相关阅读:
    代码重构(转)
    Apache负载均衡 配置
    恒久的忍耐
    setInterval全面的介绍
    引用 110个Oracle 常用函数的总结
    ssl和tls
    JSTL
    java异常处理的陋习(转载)
    Java 6 JVM参数选项大全(中文版)
    liunx基础常用命令
  • 原文地址:https://www.cnblogs.com/zlslch/p/6945160.html
Copyright © 2011-2022 走看看