zoukankan      html  css  js  c++  java
  • 窄依赖与宽依赖&stage的划分依据

    RDD根据对父RDD的依赖关系,可分为窄依赖与宽依赖2种。 
    主要的区分之处在于父RDD的分区被多少个子RDD分区所依赖,如果一个就为窄依赖,多个则为宽依赖。更好的定义应该是: 
    窄依赖的定义是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)

    与依赖相关的以下5个类:

    Dependency
    <--NarrowDependency
        <--OneToOneDependency
        <--RangeDependency
    <--ShuffleDependency

    它们全部在同一个Scala文件中,Dependency是一个abstract class, NarrowDependency(abstract class)与ShuffleDependency直接继承与它,OneToOneDependency与RangeDependency继承自NarrowDependency,大致如上图所示。

    因此,关于Dependency的真正实现有三个,2个窄依赖:OneToOneDependency与RangeDependency,一个宽依赖:ShuffleDependency。

    (一)Dependency

    Dependency是一个抽象类,所有的依赖相关的类都必须继承自它。Dependency只有一个成员变量,表示的是父RDD。

    /**
     * :: DeveloperApi ::
     * Base class for dependencies.
     */
    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]
    }

    (一)窄依赖

    1、NarrowDependency

    看看代码中对NarrowDependency的说明:

    Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。 
    即窄依赖的定义应该是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)。

    /**
     * :: DeveloperApi ::
     * Base class for dependencies where each partition of the child RDD depends on a small number
     * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
     */
    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }

    getParents根据子RDD的分区ID返回父RDD的分区ID。

    主构建函数中的rdd是父RDD,下同。

    2、OneToOneDependency

    一对一依赖,即每个子RDD的分区的与父RDD的分区一一对应。

    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between partitions of the parent and child RDDs.
     */
    @DeveloperApi
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }

    重写了NarrowDependency的getParents方法,返回一个List,这个List只有一个元素,且与子RDD的分区ID相同。即子分区的ID与父分区的ID一一对应且相等。

    3、RangeDependency

    子RDD中的每个分区依赖于父RDD的几个分区,而父RDD的每个分区仅补一个子RDD分区所依赖,即多对一的关系。它仅仅被UnionRDD所使用。

    /**
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
     * @param rdd the parent RDD
     * @param inStart the start of the range in the parent RDD
     * @param outStart the start of the range in the child RDD
     * @param length the length of the range
     */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }

    (二)宽依赖

    宽依赖只有一种:shuffleDependency,即子RDD依赖于父RDD的所有分区,父RDD的分每个区被所有子RDD的分区所依赖。

    /**
     * :: DeveloperApi ::
     * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
     * the RDD is transient since we don't need it on the executor side.
     *
     * @param _rdd the parent RDD
     * @param partitioner partitioner used to partition the shuffle output
     * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
     *                   explicitly then the default serializer, as specified by `spark.serializer`
     *                   config option, will be used.
     * @param keyOrdering key ordering for RDD's shuffles
     * @param aggregator map/reduce-side aggregator for RDD's shuffle
     * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
     */
    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
        @transient private val _rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializer: Serializer = SparkEnv.get.serializer,
        val keyOrdering: Option[Ordering[K]] = None,
        val aggregator: Option[Aggregator[K, V, C]] = None,
        val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
      private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
      private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
      // Note: It's possible that the combiner class tag is null, if the combineByKey
      // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
      private[spark] val combinerClassName: Option[String] =
        Option(reflect.classTag[C]).map(_.runtimeClass.getName)
    
      val shuffleId: Int = _rdd.context.newShuffleId()
    
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
        shuffleId, _rdd.partitions.length, this)
    
      _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }

    (三)stage的划分

    DAG根据宽依赖来划分stage,每个宽依赖的处理均会是一个stage的划分点。同一个stage中的多个操作会在一个task中完成。因为子RDD的分区仅依赖于父RDD的一个分区,因此这些步骤可以串行执行。

  • 相关阅读:
    samba 4.11 or newer version enable SMB1
    linux下使用mv将递归的文件从多个目录移动到一个目录中
    【转载】修改Windows下键盘按键对应功能的一些方案
    ACR122U读卡器在win7以上系统使用过程中的设置项
    恢复Chrome 78以上版本的地址栏https和www显示
    配置sshd的免密码登录
    1234
    MarkDown 语法记录
    语法测试
    解决 Linux grep 不高亮显示
  • 原文地址:https://www.cnblogs.com/itboys/p/6673046.html
Copyright © 2011-2022 走看看