一、以Wordcount为例来分析
1、Wordcount
val lines = sc.textFile() val words = lines.flatMap(line => line.split(" ")) val pairs = words.map(word => (word, 1)) val counts = pairs.reduceByKey(_ + _) counts.foreach(count => println(count._1 + ": " + count._2))
2、源码分析
###org.apache.spark/SparkContext.scala ###textFile() /** * 首先,hadoopFile()方法的调用,会创建一个HadoopRDD,其中的元素,其实是(key,value)pais * key是hdfs或文本文件的每一行的offset,value是文本行 * 然后对HadoopRDD调用map()方法,会剔除key,只保留value,然后会获得一个MapPartitionRDD * MapPartitionRDD内部的元素,其实就是一行一行的文本行 * @param path * @param minPartitions * @return */ def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map( pair => pair._2.toString).setName(path) } ###org.apache.spark.rdd/RDD.scala def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } def map[U: ClassTag](f: T => U): RDD[U] = { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } 其实RDD里是没有reduceByKey的,因此对RDD调用reduceByKey()方法的时候,会触发scala的隐式转换;此时就会在作用域内,寻找隐式转换, 会在RDD中找到rddToPairRDDFunctions()隐式转换,然后将RDD转换为PairRDDFunctions。 implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) } 接着会调用PairRDDFunctions中的reduceByKey()方法; def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } ###org.apache.spark.rdd/RDD.scala def foreach(f: T => Unit) { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } foreach调用了runJob方法,一步步追踪runJob方法,首先调用SparkContext的runJob: def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.size, false) } … 最后: def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { if (stopped) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies: " + rdd.toDebugString) } // 调用SparkContext,之前初始化时创建的dagScheduler的runJob()方法 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }