Persistence
Streaming没有做特别的事情,DStream最终还是以其中的每个RDD作为job进行调度的,所以persistence就以RDD为单位按照原先Spark的方式去做就可以了,不同的是Streaming是无限,需要考虑Clear的问题
在clearMetadata时,在删除过期的RDD的同时,也会做相应的unpersist
比较特别的是,NetworkInputDStream,是一定会做persistence的,因为会事先将流数据转化为persist block,然后NetworkInputDStream直接从block中读到数据
在design中看到NetworkInputDStream会将source data存两份,防止丢失,但在代码中没有找到这段逻辑,只看到往blockManager写入一份
Checkpoint
在Streaming中Checkpoint有特殊的意义
对于普通的Spark,没有cp不会影响正确性,因为任何数据都是可以从source replay出来的,而source data往往在HDFS上,所以cp只是一种优化。
并且Spark也只在worker级别做了failover,worker挂了,没事把上面的tasks换个worker重新replay出来即可, 但是并没有做driver的failover,driver挂了就失败了
因为Spark本身就看成是个query engine,query失败了没什么损失,again就ok
但是对于SparkStreaming,这个问题就没有那么简单了,如果driver挂掉,不做任何处理,恢复以后到底从哪里开始做?
首先一定会丢数据,影响正确性,因为流数据是无限的,你不可能像Spark一样把所有数据replay一遍,即使source支持replay,比如kafka
所以对于Streaming的checkpoint分为两部分,RDD的cp和DStreamGraph的cp
对于RDD的cp和Spark是一致的,没有区别
下面谈谈对于DStreamGraph的cp,目的就是在StreamingContext被重启后,可以从cp中恢复出之前Graph的执行时状况
a. Graph对象是会整个被序列化到文件,而其中最关键的是outputStreams,看似这里只会persist最终的outputStreams,其实会persist整个graph上所有的DStream
因为在def dependencies: List[DStream[_]]会包含所有的上一层DStream,依次递归,就会包含所有的DStream对象
在恢复出DStream对象后,如何恢复当时的RDD状况,可以看到generatedRDDs是@transient的,并不会被persist
答案在DStream.DStreamCheckpointData中,通过currentCheckpointFiles可以记录下cp时,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢复时只需要将RDD从cpfile中读出来,并加入到generatedRDDs即可
并且cpfile是需要清理的,当每次完成DStreamGraph的cp时,在该graph中的最老的RDD之前的所有RDD的cpfile都可以删掉,因为这些老的RDD不可能再被用到
b. 在Checkpoint对象中除了graph对象,还有该比较重要的是pendingTimes,这个记录在cp时,有多少的jobs没有被提交
这样当JobScheduler重新启动的时候会重新提交这些jobs,这里是at-least once逻辑,因为不知道在cp完多久后crash,所以其中某些job有可能已经被成功执行
创建cp的过程,
1. 在JobGenerator中,每次提交一组jobs到Spark后,会执行对DoCheckpoint将Checkpoint对象序列化写入文件(其中Checkpoint对象包含graph对象等信息)
2. 在完成DoCheckpoint后,会调用ClearCheckpointData清除过期的RDD的checkpoint文件
使用cp的过程,
1. 调用StreamingContext.getOrCreate,使用CheckpointReader.read从文件中反序列化出Checkpoint对象, 并使用Checkpoint对象去初始化StreamingContext对象
2. 在StreamingContext中调用cp_.graph.restoreCheckpointData来恢复每个DStream.generatedRDDs
3. 在JobGenerator中调用Restart,重新提交哪些在cp中未被提交的jobs
DStreamGraph
final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) } def clearCheckpointData(time: Time) { logInfo("Clearing checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.clearCheckpointData(time)) } logInfo("Cleared checkpoint data for time " + time) } def restoreCheckpointData() { logInfo("Restoring checkpoint data") this.synchronized { outputStreams.foreach(_.restoreCheckpointData()) } logInfo("Restored checkpoint data") } }
DStreamCheckpointData
private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() // Mapping of the batch time to the checkpointed RDD file of that time @transient private var timeToCheckpointFile = new HashMap[Time, String] // 保存所有被cp的RDD的(time,cpfilename) // Mapping of the batch time to the time of the oldest checkpointed RDD // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] //保存每次cp时的当前时间和其中最old RDD的时间的关系 @transient private var fileSystem : FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] // 保存此次被cp的RDD的(time,cpfilename) /** * Updates the checkpoint data of the DStream. This gets called every time * the graph checkpoint is initiated. Default implementation records the * checkpoint files to which the generate RDDs of the DStream has been saved. */ def update(time: Time) { // Get the checkpointed RDDs from the generated RDDs val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) // 从当前的dstream.generatedRDDs过滤出已经完成cp的RDDs .map(x => (x._1, x._2.getCheckpointFile.get)) // Add the checkpoint files to the data to be serialized if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles // 更新currentCheckpointFiles // Add the current checkpoint files to the map of all checkpoint files // This will be used to delete old checkpoint files timeToCheckpointFile ++= currentCheckpointFiles // Remember the time of the oldest checkpoint RDD in current state timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) // 找出此次cp中最old的那个RDD对应的时间 } } /** * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been * written to the checkpoint directory. */ def cleanup(time: Time) { // Get the time of the oldest checkpointed RDD that was written as part of the // checkpoint of `time` timeToOldestCheckpointFileTime.remove(time) match { //timeToOldestCheckpointFileTime中记录了在time时的cp中最old的rdd的时间lastCheckpointFileTime case Some(lastCheckpointFileTime) => // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` // This is because checkpointed RDDs older than this are not going to be needed // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) //清除所有比lastCheckpointFileTime更老的cpFile logDebug("Files to delete: " + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) => try { val path = new Path(file) if (fileSystem == null) { fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } fileSystem.delete(path, true) timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { } } case None => logDebug("Nothing to delete") } } /** * Restore the checkpoint data. This gets called once when the DStream graph * (along with its DStreams) are being restored from a graph checkpoint file. * Default implementation restores the RDDs from their checkpoint files. */ def restore() { // Create RDDs from the checkpoint data currentCheckpointFiles.foreach { case(time, file) => { // 恢复,即从cpFile中反序列化出RDD,并加入dstream.generatedRDDs中 dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } } }
DStream
//DStream // Checkpoint details private[streaming] val mustCheckpoint = false private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) /** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ def checkpoint(interval: Duration): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( "Cannot change checkpoint interval of an DStream after streaming context has started") } persist() checkpointDuration = interval this } /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override * this method to save custom checkpoint data. */ private[streaming] def updateCheckpointData(currentTime: Time) { checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) } private[streaming] def clearCheckpointData(time: Time) { checkpointData.cleanup(time) dependencies.foreach(_.clearCheckpointData(time)) } /** * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method * that should not be called directly. This is a default implementation that recreates RDDs * from the checkpoint file names stored in checkpointData. Subclasses of DStream that * override the updateCheckpointData() method would also need to override this method. */ private[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) }
JobGenerator
1. 在每次runJobs结束,即每次新提交一组jobs后,会执行对DoCheckpoint将Checkpoint对象写入文件
2. 在restart的时候,会重新run pendingTimes + downTimes的jobs,保证at-least once逻辑
//JobGenerator private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => jobScheduler.runJobs(time, jobs) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) //在完成runJobs后,对DStreamGraph进行CP } /** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time) = synchronized { if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { ssc.graph.updateCheckpointData(time) //先更新graph中DStream的currentCheckpointFiles checkpointWriter.write(new Checkpoint(ssc, time)) //使用checkpointWriter将Checkpoint对象写入文件 } } def onCheckpointCompletion(time: Time) { eventActor ! ClearCheckpointData(time) //在完成DStreamGraph的CP后,需要清除该DStream之前的RDD的CP文件 } /** Clear DStream checkpoint data for the given `time`. */ private def clearCheckpointData(time: Time) { ssc.graph.clearCheckpointData(time) } /** Restarts the generator based on the information in checkpoint */ private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart time val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) //在最后一次checkpoint到restart之间这段时间内RDD的times logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) // 在CP该graph时,jobsets仍没有提交的jobset logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) // 需要Reschedule的为pendingTimes + downTimes logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => jobScheduler.runJobs(time, graph.generateJobs(time)) ) // Restart the timer timer.start(restartTime.milliseconds) logInfo("JobGenerator restarted at " + restartTime) }
StreamingContext
在有checkpoint文件时,需要先读出Checkpoint对象,然后去初始化StreamingContext
从而使用Checkpoint去恢复graph中所有的DStream
//StreamingContext class StreamingContext private[streaming] ( sc_ : SparkContext, cp_ : Checkpoint, batchDur_ : Duration ) extends Logging { private[streaming] val isCheckpointPresent = (cp_ != null) private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { assert(batchDur_ != null, "Batch duration for streaming context cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } } /** * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. * Note that this must be a fault-tolerant file system like HDFS for */ def checkpoint(directory: String) { //仅仅是创建checkpointDir,函数名起的不好 if (directory != null) { val path = new Path(directory) val fs = path.getFileSystem(sparkContext.hadoopConfiguration) fs.mkdirs(path) val fullPath = fs.getFileStatus(path).getPath().toString sc.setCheckpointDir(fullPath) checkpointDir = fullPath } else { checkpointDir = null } } private[streaming] def initialCheckpoint: Checkpoint = { if (isCheckpointPresent) cp_ else null } } object StreamingContext extends Logging { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be * recreated from the checkpoint data. If the data does not exist, then the StreamingContext * will be created by called the provided `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc Function to create a new StreamingContext * @param hadoopConf Optional Hadoop configuration if necessary for reading from the * file system * @param createOnError Optional, whether to create a new StreamingContext if there is an * error in reading checkpoint data. By default, an exception will be * thrown on error. */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { val checkpointOption = try { //从CPfile里面读出Checkpoint对象 CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) } catch { case e: Exception => if (createOnError) { None } else { throw e } } checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) //用Checkpoint对象去初始化StreamingContext } }
Checkpoint (org.apache.spark.streaming)
Checkpoint主要是为了cp DStreamGraph对象,通过CheckpointWriter将Checkpoint序列化到文件
private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName val sparkHome = ssc.sc.getSparkHome.getOrElse(null) val jars = ssc.sc.jars val graph = ssc.graph //关键需要cp的graph信息 val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration //从JobScheduler的jobSets取出没有被run的jobset的time列表 val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll }
CheckpointWriter,用于将CP对象写入文件
/** * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] class CheckpointWriter( jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false private var fs_ : FileSystem = _ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") // 临时文件 val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) // 正式的Cp文件 val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) // 备份文件 while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") // Write checkpoint to temp file,先写到临时文件 fs.delete(tempFile, true) // just in case it exists val fos = fs.create(tempFile) fos.write(bytes) fos.close() // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { fs.delete(backupFile, true) // just in case it exists if (!fs.rename(checkpointFile, backupFile)) { // 将当前的CP rename成backup文件 logWarning("Could not rename " + checkpointFile + " to " + backupFile) } } // Rename temp file to the final checkpoint file,再将临时文件rename成cp文件 if (!fs.rename(tempFile, checkpointFile)) { logWarning("Could not rename " + tempFile + " to " + checkpointFile) } // Delete old checkpoint files val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs) if (allCheckpointFiles.size > 4) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { logInfo("Deleting " + file) fs.delete(file, true) }) } // All done, print success val finishTime = System.currentTimeMillis() jobGenerator.onCheckpointCompletion(checkpointTime) // Checkpoint完成是,触发jobGenerator.onCheckpointCompletion return } catch { } } } } def write(checkpoint: Checkpoint) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) oos.writeObject(checkpoint) // 将Checkpoint对象序列化 oos.close() bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) // 用线程去执行CheckpointWriteHandler将数据写入文件 } catch { } } }
CheckpointReader
private[streaming] object CheckpointReader extends Logging { def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse if (checkpointFiles.isEmpty) { return None } // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) checkpointFiles.foreach(file => { logInfo("Attempting to load checkpoint from file " + file) try { val fis = fs.open(file) // ObjectInputStream uses the last defined user-defined class loader in the stack // to find classes, which maybe the wrong class loader. Hence, a inherited version // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val zis = compressionCodec.compressedInputStream(fis) val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] // 将文件内容反序列化成Checkpoint对象 ois.close() fs.close() cp.validate() return Some(cp) } catch { } }) } }