05.Spark--RDD详解
RDD详解--groupByKey--reduceByKey
[MapPartitionRDD单词统计]
单词统计
import org.apache.spark.{SparkConf,SparkContext}
object WordCountScala{
def main(args:Array[String]):Unit={
//创建spark配置对象
val conf=new SparkConf()
conf.setAppName("WCScala")
conf.setMaster("local")
//创建上下文
val sc=new SparkContext(conf)
//加载文档,这个文件是文本文件,调的是hadoopFile
val rdd1=sc.textFile("file:///d:/mr/word.txt")
[textFile,hadoopFile]
//K是longtegr hadoop里面的 pair
hadoopFile(path,classOf[TextInputFormat],classOf[LognWritable],classOf[Test],
minPartitions).map(pair=>pair._2.toString).setName(path)//map做的版面
//压扁
val rdd2=rdd1.flatMap(_.split(" "))
//标1成对
val rdd3=rdd2.map(_,1)
//聚合
val rdd4=rdd3.reduceByKey(_+_)
val arr=rdd4.collect()
arr.foreach(println)
//链式编程
//sc.textFile("file:///d:/mr/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
}
}
RDD的依赖列表是如何呈现的?
//[T:ClassTag]主构造
abstract class RDD[T:ClassTag]{
@transient private var _sc:SparContext,
//体现出了依赖集合,RDD需要的依赖列表 什么时候创建的?
@transient private var deps:Seq[Dependency[_]] //[Dependency[_]]泛型
}extends Serialiizable with Logging{
...
}
//映射分区RDD
MapPartitionsRDD(org.apache.spark.rdd)
private[spark] class MapPartitionsRDD[U:ClassTag,T:ClassTag](
var prev:RDD[T],
f:(TaskCOntext,Int,Iterator[T])=>Iterator[U].
preserversPartitioning:Boolean=false)
//prev是上级的RDD
extends RDD[U](prev){
//构造一个rdd用one-to-one依赖
...此时RDD会调用 def this(@transoentoneParent:RDD[_])=this(oneParent.context,List(new OneToOneDependency(oneParent)))//一对一的依赖,OneToOneDependency
总结:当它去调MapPartitionsRDD的时候,它继承了父的RDD,而父RDD它只传了一个上级RDD的prev这个属性,因为它走的是(def this(@transoent oneParent:RDD[_]))辅助构造。辅助构造它把这个RDD的上下文(oneParent)取出,放入这里面.这里面创建了一个List(new OneToOneDependency(oneParent),创建了OneToOneDependency依赖。oneParent上级的RDD。
}
)
class OneToOneDependency[T](rdd:RDD[T])extends NarrowDependency[T](rdd){
override def getParents(partitionId:Int):List[Int]=List(partitionId)
//其实它是一个链条,RDD本身是依赖列表。每一个依赖于上级关联。所以不是MapPartitionRDD于preRDD之间直接关联。是通过依赖走了一圈。
}
如何判断是宽依赖还是窄依赖的? MapPartitionsRDD就是窄依赖,在reduceByKey的时候就已经ShuffledRDD了。ShuffledRDD与依赖有啥关系?
那是因为在创建RDD的时候,就已经把依赖关联进了去了。因为huffer依赖不是它划分边界的关键。它通过依赖,因为宽依赖就是Shuffer,窄依赖就不是Shuffer了。当它在创建RDD进来的时候,这个依赖就在这里面了。所以它是固定的。
RDD它里面有一个分区列表,分区列表它是一个集合。可以理解为一个引用。集合里面放了一堆的依赖。其中RDD是一个抽象类,有一个是MapPartitionRDD,它是RDD的一个子类。它具备了RDD的特点。也得有RDD的分区列表。它创建了一对一的依赖。RDD中所传的prev是上一家RDD,也是在构造里面。上一个RDD存放哪?为了构造MapPartitionRDD它是通过其它的RDD变换。MapPartitionRDD是如何与preRDD关联起来的。是因为MapPartitionRDD它有依赖,而在这个依赖当中它有一个RDD的属性(deps)关联到preRDD的。
从Hadoop到flatMap再到表一成对它们全都是窄依赖。到了reduceByKey它返回的是ShuffledRDD它用到的就是Shuffler依赖了。
ShufflerdRDD:这个结果RDD,它是要通过Shuffle来产生的。参数是由上一个RDD还有分区类,K类.V类还有组合函数,ShuffledRDD也是继承了RDD的。RDD是抽象的,它有两个子类MapPartitionsRDD和ShuffleRDD.MapPartition和ShuffleRDD都继承于RDD。RDD它有分区列表,作为Dependecy(依赖)。一个RDD它可以由多个Dependecy(依赖)。这种关系叫做多重性关系。Dependecy(依赖)分为两种依赖,宽依赖(ShuffleDep)和窄依赖(NarrowDep)。宽依赖(NarrowDep)分为三种依赖,One2OneDep,RangeDep,PruneDep它们都继承窄依赖(NarrowDep)。每一个RDD都和上一个RDD是有关系的。它是直接关联上去 的吗?不它不是,它是通过依赖Dependency(依赖关联上去的)。所以1个RDD里面它会有多个依赖。那么每个依赖它有多少个RDD? asttract class Dependency[T]extebds Serializable{def rdd:RDD[T]}只有一个RDD。Dependecy(依赖)与RDD的关系是一对一的关系。对于每一RDD它是走依赖再找上一个RDD。ShuffleRDD是与ShuffleDep有关系的。ShuffledRDD它是重写get依赖的方法。getDependencies,它的依赖它的方法里面List(new ShuffleDependency(prev,part,seralizer,keyOrdering,aggregator,mapSideCombine),它返回的是ShuffleDependency依赖。prev还给了上级。part分区。seralizer串行化类,keyOrdering排序以及aggregator聚合器以及mapSideCombine合成函数。ShuffleRDD是依赖于ShuffleDep。MapPartitionsRDD是依赖于One2OneDep。什么时候创建依赖?是在创建RDD的时候,就已经产生了依赖。Spark给了那么多的RDD。它们都有对应的。RDD的依赖是在RDD的构造函数中出现的。看看filter(过滤)它用的也是MapPartitionsRDD.
groupByKey和reduceByKey之间的区别?假如它们都能实现相同功能下优先使用?优先reduceByKey 为什么? 有一个合成过程,hadoop的合成链条是怎样的?map分为三个阶段,第一setup():做一些初始化的配置的。 第二 while() 找每一行,每一行都会经过while()循环。在调用map()函数的时候,第三cleanup()收尾工作的。Spark的分区和hadoop的分区一样吗?不一样,hadoop的分区是指在map端的分区过程,map之后有一个分区。分区分多少个区,就是Reduce的个数。hadoop的分区只能是Reduce的个数。是Map过程中对key进行分发的目的地。hadoop的MR是map阶段进行完后,它要经过hash。经过分发,分发到集合空间里面去。几个空间就是几个分区。这里的分区数和reduce的个数对应。reduce的个数是和程序来设置的。跟我们的切片没有关系。Spark的是分区,Spark的分区就是切片,map的个数。当加载文件的时候,这个文件被切成了多少片,每一片要一般要对应一个任务。所以Spark的分区就是切片的个数。而且每一个RDD都有自己的分区数。这是它们的不同。Spark的分区就是切片。分成多少片,当你变换之后。也是产生新的RDD,它又有分区。groupByKey在hadoop中,map产生的K,V是要经过分发。要进入到分区,当分区完的下一步就Combiner(合成)。合成必须有吗?不一定 合成的目的就是减少网络负载。单词统计中,hello统计了100万,如果不做Combiner它就要分发做100万遍了。但是如果它做了Combiner它只要做reduce个数了。因为每个分区里面都把数据先聚合起来了。假如有3个分区每个分区都有100万数据它是标1的,如果它不做Combiner。它就要把300万逗号1发走。所以这网络负载是很大的。那就没有必要了。Combiner是map端的聚合。Combiner是map端的Reduce,Combiner也叫做预聚合。这样一来,每个map端就编程了“hell 1百万“(数据格式)了,这样就只要发送这一条数据就行了。因为它已经聚合好了。groupByKey合reduceByKey : groupByKey是没有Combine过程的,reduceByKey是有Combiner过程。结果一定会变少,变少之后,再经过网络分发。那就是网络带宽就占少了,就不用分发那么多了。它有一种数据的压紧的工作。假如你用的分组是组成一个新的集合List[],这也是一个聚合过程。对于这样的结果来讲groupByKey和reduceByKey的结果相同吗?也不相同 为什么?因为groupbyKey的话它就分到一个组上了。groupByKeyList它没有Combiner所以它在Reduce
在很多map中,可以在map内聚合,可以在map内聚合。在map端聚合完后.不管是groupByKey还是reduceByKey都是调用combineByKeyWithClassTag(按类标记符来合成Key,按k合成)方法。mapSideCombine默认值是true.reduceByKey没有传递这个参数,它就是默认值。groupByKey传递的值是false,所以它不进行map端聚合。groupByKey它可以改变V的类型。reduceByKey没有机会。reduceByKey是两V聚成一V,类型是相同的。如果想用reduceByKey来实现。 变换是没有机会指定的,但是Shuffer是有机会指定的。MapPartitionsRDD当你在分组的时候getPartitions。numPartitions:Int这个是分区数。在这里是可以指定分区数的。而且来可以带一个HashPartitioner(分区函数)默认的是Hash分区打散。