基本概念:Job,Stage,Task,DagScheduler,TaskScheduler……
RDD的操作可以分为Transformations和Actions,Transformations是lazy的不立即执行,Action则会触发作业的提交和执行。例如本例中的foreach
def foreach(f: T => Unit) { sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f)) }
一句话,Actions调用sc.runJob触发作业运行。
SparkContext中的runJob有多个版本的重载
foreach调用的版本,以rdd和func为参数,返回执行的结果
/** * Run a job on all partitions in an RDD and return the results in an array. */ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.size, false) }
然后,进入下一个runJob,加入参数partitions和allowLocal
/** * Run a job on a given set of partitions of an RDD, but take a function of type * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) }
之后调用下一个runJob,将结果返回到result数组中
/** * Run a function on a given set of partitions in an RDD and return the results as an array. The * allowLocal flag specifies whether the scheduler can run the computation on the driver rather * than shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res) results }
最后调用,参数中加入resultHandler句柄
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) val start = System.nanoTime dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() }
sc.runJob最终调用dagScheduler.runJob。
需要提到的一点是
val cleanedFunc = clean(func)
其作用在注释中
/** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) f }
END