zoukankan      html  css  js  c++  java
  • 05.RDD详解

    05.Spark--RDD详解

    RDD详解--groupByKey--reduceByKey

    [MapPartitionRDD单词统计]

    单词统计
    import org.apache.spark.{SparkConf,SparkContext}
    object WordCountScala{
      def main(args:Array[String]):Unit={
        //创建spark配置对象
        val conf=new SparkConf()
        conf.setAppName("WCScala")
        conf.setMaster("local")
        //创建上下文
        val sc=new SparkContext(conf)
        //加载文档,这个文件是文本文件,调的是hadoopFile
        val rdd1=sc.textFile("file:///d:/mr/word.txt")
        [textFile,hadoopFile]
        //K是longtegr  hadoop里面的  pair 
        hadoopFile(path,classOf[TextInputFormat],classOf[LognWritable],classOf[Test],
                   minPartitions).map(pair=>pair._2.toString).setName(path)//map做的版面
        //压扁
        val rdd2=rdd1.flatMap(_.split(" "))
        //标1成对
        val rdd3=rdd2.map(_,1)
        //聚合
        val rdd4=rdd3.reduceByKey(_+_)
        val arr=rdd4.collect()
        arr.foreach(println)
        //链式编程
        //sc.textFile("file:///d:/mr/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
      }
    }
    RDD的依赖列表是如何呈现的?
    //[T:ClassTag]主构造
    abstract class RDD[T:ClassTag]{
      @transient private var _sc:SparContext,
      //体现出了依赖集合,RDD需要的依赖列表  什么时候创建的?
      @transient private var deps:Seq[Dependency[_]] //[Dependency[_]]泛型
    }extends Serialiizable with Logging{
      ...
    }
    //映射分区RDD
    MapPartitionsRDD(org.apache.spark.rdd)
    private[spark] class MapPartitionsRDD[U:ClassTag,T:ClassTag](
       var prev:RDD[T],
       f:(TaskCOntext,Int,Iterator[T])=>Iterator[U].
      preserversPartitioning:Boolean=false)
    
    //prev是上级的RDD
    extends RDD[U](prev){
      //构造一个rdd用one-to-one依赖
      ...此时RDD会调用  def this(@transoentoneParent:RDD[_])=this(oneParent.context,List(new OneToOneDependency(oneParent)))//一对一的依赖,OneToOneDependency
      总结:当它去调MapPartitionsRDD的时候,它继承了父的RDD,而父RDD它只传了一个上级RDD的prev这个属性,因为它走的是(def this(@transoent oneParent:RDD[_]))辅助构造。辅助构造它把这个RDD的上下文(oneParent)取出,放入这里面.这里面创建了一个List(new OneToOneDependency(oneParent),创建了OneToOneDependency依赖。oneParent上级的RDD。
    }
    )
      class OneToOneDependency[T](rdd:RDD[T])extends NarrowDependency[T](rdd){
        override def getParents(partitionId:Int):List[Int]=List(partitionId)
        //其实它是一个链条,RDD本身是依赖列表。每一个依赖于上级关联。所以不是MapPartitionRDD于preRDD之间直接关联。是通过依赖走了一圈。
      }
    如何判断是宽依赖还是窄依赖的?  MapPartitionsRDD就是窄依赖,在reduceByKey的时候就已经ShuffledRDD了。ShuffledRDD与依赖有啥关系?
    那是因为在创建RDD的时候,就已经把依赖关联进了去了。因为huffer依赖不是它划分边界的关键。它通过依赖,因为宽依赖就是Shuffer,窄依赖就不是Shuffer了。当它在创建RDD进来的时候,这个依赖就在这里面了。所以它是固定的。
      RDD它里面有一个分区列表,分区列表它是一个集合。可以理解为一个引用。集合里面放了一堆的依赖。其中RDD是一个抽象类,有一个是MapPartitionRDD,它是RDD的一个子类。它具备了RDD的特点。也得有RDD的分区列表。它创建了一对一的依赖。RDD中所传的prev是上一家RDD,也是在构造里面。上一个RDD存放哪?为了构造MapPartitionRDD它是通过其它的RDD变换。MapPartitionRDD是如何与preRDD关联起来的。是因为MapPartitionRDD它有依赖,而在这个依赖当中它有一个RDD的属性(deps)关联到preRDD的。
      从Hadoop到flatMap再到表一成对它们全都是窄依赖。到了reduceByKey它返回的是ShuffledRDD它用到的就是Shuffler依赖了。
    


    ShufflerdRDD:这个结果RDD,它是要通过Shuffle来产生的。参数是由上一个RDD还有分区类,K类.V类还有组合函数,ShuffledRDD也是继承了RDD的。RDD是抽象的,它有两个子类MapPartitionsRDD和ShuffleRDD.MapPartition和ShuffleRDD都继承于RDD。RDD它有分区列表,作为Dependecy(依赖)。一个RDD它可以由多个Dependecy(依赖)。这种关系叫做多重性关系。Dependecy(依赖)分为两种依赖,宽依赖(ShuffleDep)和窄依赖(NarrowDep)。宽依赖(NarrowDep)分为三种依赖,One2OneDep,RangeDep,PruneDep它们都继承窄依赖(NarrowDep)。每一个RDD都和上一个RDD是有关系的。它是直接关联上去 的吗?不它不是,它是通过依赖Dependency(依赖关联上去的)。所以1个RDD里面它会有多个依赖。那么每个依赖它有多少个RDD? asttract class Dependency[T]extebds Serializable{def rdd:RDD[T]}只有一个RDD。Dependecy(依赖)与RDD的关系是一对一的关系。对于每一RDD它是走依赖再找上一个RDD。ShuffleRDD是与ShuffleDep有关系的。ShuffledRDD它是重写get依赖的方法。getDependencies,它的依赖它的方法里面List(new ShuffleDependency(prev,part,seralizer,keyOrdering,aggregator,mapSideCombine),它返回的是ShuffleDependency依赖。prev还给了上级。part分区。seralizer串行化类,keyOrdering排序以及aggregator聚合器以及mapSideCombine合成函数。ShuffleRDD是依赖于ShuffleDep。MapPartitionsRDD是依赖于One2OneDep。什么时候创建依赖?是在创建RDD的时候,就已经产生了依赖。Spark给了那么多的RDD。它们都有对应的。RDD的依赖是在RDD的构造函数中出现的。看看filter(过滤)它用的也是MapPartitionsRDD.
    groupByKey和reduceByKey之间的区别?假如它们都能实现相同功能下优先使用?优先reduceByKey 为什么? 有一个合成过程,hadoop的合成链条是怎样的?map分为三个阶段,第一setup():做一些初始化的配置的。 第二 while() 找每一行,每一行都会经过while()循环。在调用map()函数的时候,第三cleanup()收尾工作的。Spark的分区和hadoop的分区一样吗?不一样,hadoop的分区是指在map端的分区过程,map之后有一个分区。分区分多少个区,就是Reduce的个数。hadoop的分区只能是Reduce的个数。是Map过程中对key进行分发的目的地。hadoop的MR是map阶段进行完后,它要经过hash。经过分发,分发到集合空间里面去。几个空间就是几个分区。这里的分区数和reduce的个数对应。reduce的个数是和程序来设置的。跟我们的切片没有关系。Spark的是分区,Spark的分区就是切片,map的个数。当加载文件的时候,这个文件被切成了多少片,每一片要一般要对应一个任务。所以Spark的分区就是切片的个数。而且每一个RDD都有自己的分区数。这是它们的不同。Spark的分区就是切片。分成多少片,当你变换之后。也是产生新的RDD,它又有分区。groupByKey在hadoop中,map产生的K,V是要经过分发。要进入到分区,当分区完的下一步就Combiner(合成)。合成必须有吗?不一定 合成的目的就是减少网络负载。单词统计中,hello统计了100万,如果不做Combiner它就要分发做100万遍了。但是如果它做了Combiner它只要做reduce个数了。因为每个分区里面都把数据先聚合起来了。假如有3个分区每个分区都有100万数据它是标1的,如果它不做Combiner。它就要把300万逗号1发走。所以这网络负载是很大的。那就没有必要了。Combiner是map端的聚合。Combiner是map端的Reduce,Combiner也叫做预聚合。这样一来,每个map端就编程了“hell 1百万“(数据格式)了,这样就只要发送这一条数据就行了。因为它已经聚合好了。

    groupByKey合reduceByKey : groupByKey是没有Combine过程的,reduceByKey是有Combiner过程。结果一定会变少,变少之后,再经过网络分发。那就是网络带宽就占少了,就不用分发那么多了。它有一种数据的压紧的工作。假如你用的分组是组成一个新的集合List[],这也是一个聚合过程。对于这样的结果来讲groupByKey和reduceByKey的结果相同吗?也不相同 为什么?因为groupbyKey的话它就分到一个组上了。groupByKeyList它没有Combiner所以它在Reduce

    在很多map中,可以在map内聚合,可以在map内聚合。在map端聚合完后.不管是groupByKey还是reduceByKey都是调用combineByKeyWithClassTag(按类标记符来合成Key,按k合成)方法。mapSideCombine默认值是true.reduceByKey没有传递这个参数,它就是默认值。groupByKey传递的值是false,所以它不进行map端聚合。groupByKey它可以改变V的类型。reduceByKey没有机会。reduceByKey是两V聚成一V,类型是相同的。如果想用reduceByKey来实现。 变换是没有机会指定的,但是Shuffer是有机会指定的。MapPartitionsRDD当你在分组的时候getPartitions。numPartitions:Int这个是分区数。在这里是可以指定分区数的。而且来可以带一个HashPartitioner(分区函数)默认的是Hash分区打散。

  • 相关阅读:
    TCP 和 UDP 的区别
    python--各种锁机制归纳整理
    七牛的配置
    全文检索的配置
    记一则css3计算
    C# 生成序号不足补0
    JavaScript Post提交数据并跳转到页面(模拟Form表单提交)
    Swift 修改UITextField.Placeholder颜色
    Swift 延迟运行代码
    Swift 动画片段
  • 原文地址:https://www.cnblogs.com/SteveDZC/p/9794325.html
Copyright © 2011-2022 走看看