zoukankan      html  css  js  c++  java
  • Spark Streaming源码分析 – Checkpoint

    Persistence
    Streaming没有做特别的事情,DStream最终还是以其中的每个RDD作为job进行调度的,所以persistence就以RDD为单位按照原先Spark的方式去做就可以了,不同的是Streaming是无限,需要考虑Clear的问题
    在clearMetadata时,在删除过期的RDD的同时,也会做相应的unpersist
    比较特别的是,NetworkInputDStream,是一定会做persistence的,因为会事先将流数据转化为persist block,然后NetworkInputDStream直接从block中读到数据
    在design中看到NetworkInputDStream会将source data存两份,防止丢失,但在代码中没有找到这段逻辑,只看到往blockManager写入一份

    Checkpoint
    在Streaming中Checkpoint有特殊的意义
    对于普通的Spark,没有cp不会影响正确性,因为任何数据都是可以从source replay出来的,而source data往往在HDFS上,所以cp只是一种优化。
    并且Spark也只在worker级别做了failover,worker挂了,没事把上面的tasks换个worker重新replay出来即可, 但是并没有做driver的failover,driver挂了就失败了
    因为Spark本身就看成是个query engine,query失败了没什么损失,again就ok

    但是对于SparkStreaming,这个问题就没有那么简单了,如果driver挂掉,不做任何处理,恢复以后到底从哪里开始做?
    首先一定会丢数据,影响正确性,因为流数据是无限的,你不可能像Spark一样把所有数据replay一遍,即使source支持replay,比如kafka

    所以对于Streaming的checkpoint分为两部分,RDD的cp和DStreamGraph的cp
    对于RDD的cp和Spark是一致的,没有区别
    下面谈谈对于DStreamGraph的cp,目的就是在StreamingContext被重启后,可以从cp中恢复出之前Graph的执行时状况
    a. Graph对象是会整个被序列化到文件,而其中最关键的是outputStreams,看似这里只会persist最终的outputStreams,其实会persist整个graph上所有的DStream
    因为在def dependencies: List[DStream[_]]会包含所有的上一层DStream,依次递归,就会包含所有的DStream对象
    在恢复出DStream对象后,如何恢复当时的RDD状况,可以看到generatedRDDs是@transient的,并不会被persist
    答案在DStream.DStreamCheckpointData中,通过currentCheckpointFiles可以记录下cp时,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
    所以在恢复时只需要将RDD从cpfile中读出来,并加入到generatedRDDs即可
    并且cpfile是需要清理的,当每次完成DStreamGraph的cp时,在该graph中的最老的RDD之前的所有RDD的cpfile都可以删掉,因为这些老的RDD不可能再被用到
    b. 在Checkpoint对象中除了graph对象,还有该比较重要的是pendingTimes,这个记录在cp时,有多少的jobs没有被提交
    这样当JobScheduler重新启动的时候会重新提交这些jobs,这里是at-least once逻辑,因为不知道在cp完多久后crash,所以其中某些job有可能已经被成功执行

    创建cp的过程,
    1. 在JobGenerator中,每次提交一组jobs到Spark后,会执行对DoCheckpoint将Checkpoint对象序列化写入文件(其中Checkpoint对象包含graph对象等信息)
    2. 在完成DoCheckpoint后,会调用ClearCheckpointData清除过期的RDD的checkpoint文件

    使用cp的过程,
    1. 调用StreamingContext.getOrCreate,使用CheckpointReader.read从文件中反序列化出Checkpoint对象, 并使用Checkpoint对象去初始化StreamingContext对象
    2. 在StreamingContext中调用cp_.graph.restoreCheckpointData来恢复每个DStream.generatedRDDs
    3. 在JobGenerator中调用Restart,重新提交哪些在cp中未被提交的jobs

    DStreamGraph

    DStreamCheckpointData

    DStream

    JobGenerator
    1. 在每次runJobs结束,即每次新提交一组jobs后,会执行对DoCheckpoint将Checkpoint对象写入文件
    2. 在restart的时候,会重新run pendingTimes + downTimes的jobs,保证at-least once逻辑

    StreamingContext
    在有checkpoint文件时,需要先读出Checkpoint对象,然后去初始化StreamingContext
    从而使用Checkpoint去恢复graph中所有的DStream

    Checkpoint (org.apache.spark.streaming)
    Checkpoint主要是为了cp DStreamGraph对象,通过CheckpointWriter将Checkpoint序列化到文件

    CheckpointWriter,用于将CP对象写入文件

    CheckpointReader



     

     

  • 相关阅读:
    PHP之项目环境变量设置
    nginx相关服务实践
    模拟器的基本使用
    Redis常见问题汇总
    用OpenResty搭建高性能服务端
    Lua代码规范
    Lua之基础篇
    如何设计一个高性能短链系统?
    通过双 key 来解决缓存并发问题
    Golang常见问题汇总
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3596451.html
Copyright © 2011-2022 走看看