Persistence
Streaming没有做特别的事情,DStream最终还是以其中的每个RDD作为job进行调度的,所以persistence就以RDD为单位按照原先Spark的方式去做就可以了,不同的是Streaming是无限,需要考虑Clear的问题
在clearMetadata时,在删除过期的RDD的同时,也会做相应的unpersist
比较特别的是,NetworkInputDStream,是一定会做persistence的,因为会事先将流数据转化为persist block,然后NetworkInputDStream直接从block中读到数据
在design中看到NetworkInputDStream会将source data存两份,防止丢失,但在代码中没有找到这段逻辑,只看到往blockManager写入一份
Checkpoint
在Streaming中Checkpoint有特殊的意义
对于普通的Spark,没有cp不会影响正确性,因为任何数据都是可以从source replay出来的,而source data往往在HDFS上,所以cp只是一种优化。
并且Spark也只在worker级别做了failover,worker挂了,没事把上面的tasks换个worker重新replay出来即可, 但是并没有做driver的failover,driver挂了就失败了
因为Spark本身就看成是个query engine,query失败了没什么损失,again就ok
但是对于SparkStreaming,这个问题就没有那么简单了,如果driver挂掉,不做任何处理,恢复以后到底从哪里开始做?
首先一定会丢数据,影响正确性,因为流数据是无限的,你不可能像Spark一样把所有数据replay一遍,即使source支持replay,比如kafka
所以对于Streaming的checkpoint分为两部分,RDD的cp和DStreamGraph的cp
对于RDD的cp和Spark是一致的,没有区别
下面谈谈对于DStreamGraph的cp,目的就是在StreamingContext被重启后,可以从cp中恢复出之前Graph的执行时状况
a. Graph对象是会整个被序列化到文件,而其中最关键的是outputStreams,看似这里只会persist最终的outputStreams,其实会persist整个graph上所有的DStream
因为在def dependencies: List[DStream[_]]会包含所有的上一层DStream,依次递归,就会包含所有的DStream对象
在恢复出DStream对象后,如何恢复当时的RDD状况,可以看到generatedRDDs是@transient的,并不会被persist
答案在DStream.DStreamCheckpointData中,通过currentCheckpointFiles可以记录下cp时,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢复时只需要将RDD从cpfile中读出来,并加入到generatedRDDs即可
并且cpfile是需要清理的,当每次完成DStreamGraph的cp时,在该graph中的最老的RDD之前的所有RDD的cpfile都可以删掉,因为这些老的RDD不可能再被用到
b. 在Checkpoint对象中除了graph对象,还有该比较重要的是pendingTimes,这个记录在cp时,有多少的jobs没有被提交
这样当JobScheduler重新启动的时候会重新提交这些jobs,这里是at-least once逻辑,因为不知道在cp完多久后crash,所以其中某些job有可能已经被成功执行
创建cp的过程,
1. 在JobGenerator中,每次提交一组jobs到Spark后,会执行对DoCheckpoint将Checkpoint对象序列化写入文件(其中Checkpoint对象包含graph对象等信息)
2. 在完成DoCheckpoint后,会调用ClearCheckpointData清除过期的RDD的checkpoint文件
使用cp的过程,
1. 调用StreamingContext.getOrCreate,使用CheckpointReader.read从文件中反序列化出Checkpoint对象, 并使用Checkpoint对象去初始化StreamingContext对象
2. 在StreamingContext中调用cp_.graph.restoreCheckpointData来恢复每个DStream.generatedRDDs
3. 在JobGenerator中调用Restart,重新提交哪些在cp中未被提交的jobs
DStreamGraph
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time " + time)
}
def clearCheckpointData(time: Time) {
logInfo("Clearing checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.clearCheckpointData(time))
}
logInfo("Cleared checkpoint data for time " + time)
}
def restoreCheckpointData() {
logInfo("Restoring checkpoint data")
this.synchronized {
outputStreams.foreach(_.restoreCheckpointData())
}
logInfo("Restored checkpoint data")
}
}
DStreamCheckpointData
private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
// Mapping of the batch time to the checkpointed RDD file of that time
@transient private var timeToCheckpointFile = new HashMap[Time, String] // 保存所有被cp的RDD的(time,cpfilename)
// Mapping of the batch time to the time of the oldest checkpointed RDD
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] //保存每次cp时的当前时间和其中最old RDD的时间的关系
@transient private var fileSystem : FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] // 保存此次被cp的RDD的(time,cpfilename)
/**
* Updates the checkpoint data of the DStream. This gets called every time
* the graph checkpoint is initiated. Default implementation records the
* checkpoint files to which the generate RDDs of the DStream has been saved.
*/
def update(time: Time) {
// Get the checkpointed RDDs from the generated RDDs
val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) // 从当前的dstream.generatedRDDs过滤出已经完成cp的RDDs
.map(x => (x._1, x._2.getCheckpointFile.get))
// Add the checkpoint files to the data to be serialized
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles // 更新currentCheckpointFiles
// Add the current checkpoint files to the map of all checkpoint files
// This will be used to delete old checkpoint files
timeToCheckpointFile ++= currentCheckpointFiles
// Remember the time of the oldest checkpoint RDD in current state
timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) // 找出此次cp中最old的那个RDD对应的时间
}
}
/**
* Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
* written to the checkpoint directory.
*/
def cleanup(time: Time) {
// Get the time of the oldest checkpointed RDD that was written as part of the
// checkpoint of `time`
timeToOldestCheckpointFileTime.remove(time) match { //timeToOldestCheckpointFileTime中记录了在time时的cp中最old的rdd的时间lastCheckpointFileTime
case Some(lastCheckpointFileTime) =>
// Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
// This is because checkpointed RDDs older than this are not going to be needed
// even after master fails, as the checkpoint data of `time` does not refer to those files
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) //清除所有比lastCheckpointFileTime更老的cpFile
logDebug("Files to delete:
" + filesToDelete.mkString(","))
filesToDelete.foreach {
case (time, file) =>
try {
val path = new Path(file)
if (fileSystem == null) {
fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
}
fileSystem.delete(path, true)
timeToCheckpointFile -= time
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
} catch {
}
}
case None =>
logDebug("Nothing to delete")
}
}
/**
* Restore the checkpoint data. This gets called once when the DStream graph
* (along with its DStreams) are being restored from a graph checkpoint file.
* Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
// Create RDDs from the checkpoint data
currentCheckpointFiles.foreach {
case(time, file) => {
// 恢复,即从cpFile中反序列化出RDD,并加入dstream.generatedRDDs中
dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
}
}
}
}
DStream
//DStream
// Checkpoint details
private[streaming] val mustCheckpoint = false
private[streaming] var checkpointDuration: Duration = null
private[streaming] val checkpointData = new DStreamCheckpointData(this)
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}
/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
* a default implementation that saves only the file names of the checkpointed RDDs to
* checkpointData. Subclasses of DStream (especially those of InputDStream) may override
* this method to save custom checkpoint data.
*/
private[streaming] def updateCheckpointData(currentTime: Time) {
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
}
private[streaming] def clearCheckpointData(time: Time) {
checkpointData.cleanup(time)
dependencies.foreach(_.clearCheckpointData(time))
}
/**
* Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
* that should not be called directly. This is a default implementation that recreates RDDs
* from the checkpoint file names stored in checkpointData. Subclasses of DStream that
* override the updateCheckpointData() method would also need to override this method.
*/
private[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
}
JobGenerator
1. 在每次runJobs结束,即每次新提交一组jobs后,会执行对DoCheckpoint将Checkpoint对象写入文件
2. 在restart的时候,会重新run pendingTimes + downTimes的jobs,保证at-least once逻辑
//JobGenerator
private lazy val checkpointWriter =
if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time) //在完成runJobs后,对DStreamGraph进行CP
}
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time) = synchronized {
if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
ssc.graph.updateCheckpointData(time) //先更新graph中DStream的currentCheckpointFiles
checkpointWriter.write(new Checkpoint(ssc, time)) //使用checkpointWriter将Checkpoint对象写入文件
}
}
def onCheckpointCompletion(time: Time) {
eventActor ! ClearCheckpointData(time) //在完成DStreamGraph的CP后,需要清除该DStream之前的RDD的CP文件
}
/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)
}
/** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
val batchDuration = ssc.graph.batchDuration
// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration) //在最后一次checkpoint到restart之间这段时间内RDD的times
logInfo("Batches during down time (" + downTimes.size + " batches): "
+ downTimes.mkString(", "))
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) // 在CP该graph时,jobsets仍没有提交的jobset
logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
pendingTimes.mkString(", "))
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) // 需要Reschedule的为pendingTimes + downTimes
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
)
// Restart the timer
timer.start(restartTime.milliseconds)
logInfo("JobGenerator restarted at " + restartTime)
}
StreamingContext
在有checkpoint文件时,需要先读出Checkpoint对象,然后去初始化StreamingContext
从而使用Checkpoint去恢复graph中所有的DStream
//StreamingContext
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
) extends Logging {
private[streaming] val isCheckpointPresent = (cp_ != null)
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
assert(batchDur_ != null, "Batch duration for streaming context cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
/**
* Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS for
*/
def checkpoint(directory: String) { //仅仅是创建checkpointDir,函数名起的不好
if (directory != null) {
val path = new Path(directory)
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
fs.mkdirs(path)
val fullPath = fs.getFileStatus(path).getPath().toString
sc.setCheckpointDir(fullPath)
checkpointDir = fullPath
} else {
checkpointDir = null
}
}
private[streaming] def initialCheckpoint: Checkpoint = {
if (isCheckpointPresent) cp_ else null
}
}
object StreamingContext extends Logging {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the StreamingContext
* will be created by called the provided `creatingFunc`.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param creatingFunc Function to create a new StreamingContext
* @param hadoopConf Optional Hadoop configuration if necessary for reading from the
* file system
* @param createOnError Optional, whether to create a new StreamingContext if there is an
* error in reading checkpoint data. By default, an exception will be
* thrown on error.
*/
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = try { //从CPfile里面读出Checkpoint对象
CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf)
} catch {
case e: Exception =>
if (createOnError) {
None
} else {
throw e
}
}
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) //用Checkpoint对象去初始化StreamingContext
}
}
Checkpoint (org.apache.spark.streaming)
Checkpoint主要是为了cp DStreamGraph对象,通过CheckpointWriter将Checkpoint序列化到文件
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
val jars = ssc.sc.jars
val graph = ssc.graph //关键需要cp的graph信息
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
//从JobScheduler的jobSets取出没有被run的jobset的time列表
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
}
CheckpointWriter,用于将CP对象写入文件
/**
* Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
class CheckpointWriter(
jobGenerator: JobGenerator,
conf: SparkConf,
checkpointDir: String,
hadoopConf: Configuration
) extends Logging {
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
private var stopped = false
private var fs_ : FileSystem = _
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp") // 临时文件
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) // 正式的Cp文件
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) // 备份文件
while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile
+ "'")
// Write checkpoint to temp file,先写到临时文件
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
fos.write(bytes)
fos.close()
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
if (fs.exists(checkpointFile)) {
fs.delete(backupFile, true) // just in case it exists
if (!fs.rename(checkpointFile, backupFile)) { // 将当前的CP rename成backup文件
logWarning("Could not rename " + checkpointFile + " to " + backupFile)
}
}
// Rename temp file to the final checkpoint file,再将临时文件rename成cp文件
if (!fs.rename(tempFile, checkpointFile)) {
logWarning("Could not rename " + tempFile + " to " + checkpointFile)
}
// Delete old checkpoint files
val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs)
if (allCheckpointFiles.size > 4) {
allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => {
logInfo("Deleting " + file)
fs.delete(file, true)
})
}
// All done, print success
val finishTime = System.currentTimeMillis()
jobGenerator.onCheckpointCompletion(checkpointTime) // Checkpoint完成是,触发jobGenerator.onCheckpointCompletion
return
} catch {
}
}
}
}
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint) // 将Checkpoint对象序列化
oos.close()
bos.close()
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) // 用线程去执行CheckpointWriteHandler将数据写入文件
} catch {
}
}
}
CheckpointReader
private[streaming]
object CheckpointReader extends Logging {
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] =
{
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse
if (checkpointFiles.isEmpty) {
return None
}
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
val fis = fs.open(file)
// ObjectInputStream uses the last defined user-defined class loader in the stack
// to find classes, which maybe the wrong class loader. Hence, a inherited version
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis,
Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint] // 将文件内容反序列化成Checkpoint对象
ois.close()
fs.close()
cp.validate()
return Some(cp)
} catch {
}
})
}
}