zoukankan      html  css  js  c++  java
  • Spark的checkpoint源码讲解

    一、Checkpoint相关源码分为四个部分

    1、Checkpoint的基本使用:spark_core   &   spark_streaming

    2、初始化的源码

    3、Checkpoint的job生成及执行的过程

    4、读Checkpoint的过程

    二、Checkpoint的基本使用

    Checkpoint可以是还原药水。辅助Spark应用从故障中恢复。SparkStreaming宕机恢复,适合调度器有自动重试功能的。对于 SparkCore 则适合那些计算链条超级长或者计算耗时的
    关键点进行 Checkpoint, 便于故障恢复 。


    Checkpoint和persist从根本上是不一样的:

      1、Cache or persist:

        Cache or persist保存了RDD的血统关系,假如有部分cache的数据丢失可以根据血缘关系重新生成。

      2、Checkpoint

        会将RDD数据写到hdfs这种安全的文件系统里面,并且抛弃了RDD血缘关系的记录。即使persist存储到了磁盘里面,在driver停掉之后会被删除,而checkpoint可以被下次启动使用。

    Checkpoint基本使用

      对于spark_streaming的checkpoint:

        spark streaming有一个单独的线程CheckpointWriteHandler,每generate一个batch interval的RDD数据都会触发checkpoint操作。对于kafka的DirectKafkaInputDStreamCheckpointData,实质是重写DStreamCheckpointData的update和restore方法,这样checkpoint的数据就是topic,partition,fromOffset和untilOffset。更多请参考源码例子RecoverableNetworkWordCount

      对于spark_core的checkpoint: 

      docheckpoint: 

         

           recover:

      

    二、Checkpoint的初始化源码

    1、设置Checkpoint目录

    2、调用Checkpoint方法,构建checkpointData

    三、DoCheckpoint源码

    在SparkContext的runjob方法中

    进入之后

    RDDCheckpointData中真正做Checkpoint返回一个新的RDD并清除掉依赖关系

    ReliableRDDCheckpointData中真正进行Checkpoint操作

    在该方法中

    1、获取sc

    2、创建输出目录

    3、以Job的方式进行Checkpoint操作

    4、将分区策略写入Checkpoint目录

    四、读取Checkpoint数据

    三个方法:

    1、同一个Spark任务,共有了Checkpoint的RDD,在该RDD的iterator方法中

    进入 computeOrReadCheckpoint

    如果进行了 Checkpoint, 条件为真firstParent[T].iterator(split, context)其中, firstParent

    /** Returns the first parent RDD */

    接着是获取依赖

    假如进行了Checkpoint,那么CheckpointRDD就是存在

    在初始化Checkpoint的时候,我们已经初始化了CheckpointData了。

    2、RDD的计算链条失败,主动去读Checkpoint文件的过程

    这个要求我们的入口类在下面这个包

    3、SparkStreaming的故障恢复

    首先,看一下SteamingContext的需要

    然后去读取Checkpoint

    分两个步骤:

    A、获取最新的Checkpoint目录

    B、迭代找到最新的Checkpoint就返回

    最后就是使用获取的Checkpoint去构建ssc

    主要是做了一下动作

  • 相关阅读:
    keepalived安装
    Nginx学习笔记
    使用xhprof分析php性能
    使用 .bash_profile与.bashrc修改字符集
    Mysql分区简述
    c语言多线程队列读写
    setsockopt 设置 SO_LINGER 选项
    nginx配置rewrite
    使用PHP+ajax打造聊天室应用
    UDP/TCP通信小记
  • 原文地址:https://www.cnblogs.com/yfb918/p/10971932.html
Copyright © 2011-2022 走看看