先给出一个job从被generate到被执行的整个过程
在JobGenerator中,需要定时的发起GenerateJobs事件,而每个job其实就是针对DStream中的一个RDD,发起一个SparkContext.runJob,通过对DStream中每个RDD都runJob来模拟流处理
//StreamingContext.scala
private[streaming] val scheduler = new JobScheduler(this)
//JobScheduler.scala
private val jobGenerator = new JobGenerator(this)
//JobGenerator.scala,SparkStreaming最核心的一句,用离散化来模拟流处理
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime))) //以batchDuration为间隔,不断发起jobs,这是整个SparkStreaming的发动机
//JobGenerator.eventActor对GenerateJobs event处理逻辑
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
//这里两步,首先是调用DStreamGraph.generateJobs生成jobs, 然后使用JobScheduler.runJobs去执行job
//DStreamGraph.generateJobs
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
//Spark都是反向执行,只需要对outputStream执行generateJob就ok
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
jobs
}
//DStream.generateJob
//DStream中的实现,可以看到stream处理最终仍然是转化为context.sparkContext.runJob(rdd, emptyFunc)来执行
//但这里处理逻辑是emptyFunc,真正的outputStream需要重写generateJob给出真正的output逻辑
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
//JobScheduler.runJobs
def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
} else {
val jobSet = new JobSet(time, jobs) //创建JobSet
jobSets.put(time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job))) //启动executor线程执行JobHandler
logInfo("Added jobs for time " + time)
}
}
//JobScheduler.JobHandler
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run() //这里job.run只是向Spark提交一个job,具体的事情Spark会去做
eventActor ! JobCompleted(job)
}
}
//Job.run
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
var result: Try[_] = null
def run() {
result = Try(func())
}
}
再看个outputStream的具体实现
saveAsTextFiles
//DStream
/**
* Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
//封装output函数, 并使用DStream.register将outputStream注册到DStreamGragh中去
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
//org.apache.spark.streaming.dstream
//重写了generateJob以调用相应的output逻辑
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {
override def dependencies = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
}
最后,再强调一下RDD中执行中如果从InputDStream取到数据的,就全打通了
就再看看NetworkInputDStream.compute是如何最终获取数据的
//NetworkInputDStream.compute
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime) //从networkInputTracker获取InputDStream已经产生的blockids
Some(new BlockRDD[T](ssc.sc, blockIds)) //封装成BlockRDD
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
//NetworkInputTracker.getBlockIds
/** Return all the blocks received from a receiver. */
def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { //虽然时间作为参数,其实是返回所有该InputDStream的所有blockids
val queue = receivedBlockIds.synchronized {
receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
}
val result = queue.synchronized {
queue.dequeueAll(x => true) //dequeue所有,和时间产生无关,不会check RDD的时间和block之间的关系
}
logInfo("Stream " + receiverId + " received " + result.size + " blocks")
result.toArray
}
JobScheduler
SparkStreaming的主控线程,用于初始化和启动,JobGenerator和NetworkInputTracker
分别用于,产生并定时提交job,和从InputDStream不断读取数据
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
private val jobSets = new ConcurrentHashMap[Time, JobSet] // 缓存没有被run的jobset,即pending,当job run成功后会从jobSets中删除
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) // 决定StreamingJob的并发度
private val executor = Executors.newFixedThreadPool(numConcurrentJobs) // 创建executor线程池
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
var networkInputTracker: NetworkInputTracker = null
private var eventActor: ActorRef = null
def start() = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { // 创建eventActor来后台处理JobSchedulerEvent
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
listenerBus.start()
networkInputTracker = new NetworkInputTracker(ssc) // 初始化和启动NetworkInputTracker,开始从input读取数据
networkInputTracker.start()
Thread.sleep(1000)
jobGenerator.start() // 启动JobGenerator,开始提交job
logInfo("JobScheduler started")
}
def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
} else {
val jobSet = new JobSet(time, jobs)
jobSets.put(time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + time)
}
}
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job) => handleJobStart(job)
case JobCompleted(job) => handleJobCompletion(job)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
}
}
private def handleJobStart(job: Job) {
val jobSet = jobSets.get(job.time)
if (!jobSet.hasStarted) {
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
jobSet.handleJobStart(job)
SparkEnv.set(ssc.env)
}
private def handleJobCompletion(job: Job) {
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
case Failure(e) =>
}
}
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
eventActor ! JobCompleted(job)
}
}
}
JobGenerator
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
*/
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private val ssc = jobScheduler.ssc
private val graph = ssc.graph
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, // 不断的GenerateJobs,SparkStreaming的心脏
longTime => eventActor ! GenerateJobs(new Time(longTime)))
private lazy val checkpointWriter =
if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
// eventActor is created when generator starts.
// This not being null means the scheduler has been started and not stopped
private var eventActor: ActorRef = null
/** Start generation of jobs */
def start() = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { // 创建actor用于后台处理JobGeneratorEvent
def receive = {
case event: JobGeneratorEvent =>
processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("JobGenerator started at " + startTime)
}
/** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
val batchDuration = ssc.graph.batchDuration
// Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration)
// Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
// Restart the timer
timer.start(restartTime.milliseconds)
}
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) { // generateJobs
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
eventActor ! DoCheckpoint(time)
}
/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)
}
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time) = synchronized {
if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
}
}
}
DStreamGraph
用于track job中的inputStreams和outputStreams,并做为DStream workflow对外的接口
最关键的接口是generateJobs
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
inputStreams.par.foreach(_.start())
}
}
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time)) //只是对outputStream调用
}
jobs
}
def clearMetadata(time: Time) {
logDebug("Clearing metadata for time " + time)
this.synchronized {
outputStreams.foreach(_.clearMetadata(time))
}
logDebug("Cleared old metadata for time " + time)
}
def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time " + time)
}
def clearCheckpointData(time: Time) {
logInfo("Clearing checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.clearCheckpointData(time))
}
logInfo("Cleared checkpoint data for time " + time)
}
def restoreCheckpointData() {
logInfo("Restoring checkpoint data")
this.synchronized {
outputStreams.foreach(_.restoreCheckpointData())
}
logInfo("Restored checkpoint data")
}
}