zoukankan      html  css  js  c++  java
  • Spark源码分析 – Dependency

    Dependency

    依赖, 用于表示RDD之间的因果关系, 一个dependency表示一个parent rdd, 所以在RDD中使用Seq[Dependency[_]]来表示所有的依赖关系

    image
    Dependency的base class
    可见Dependency唯一的成员就是rdd, 即所依赖的rdd, 或parent rdd

    /**
     * Base class for dependencies.
     */
    abstract class Dependency[T](val rdd: RDD[T]) extends Serializable

    Dependency分为两种, narrow和shuffle

    NarrowDependency

    先看看比较简单的narrow
    定义, parent RDD中的每个partition最多被child RDD中的一个partition使用, 即不需要shuffle
    更直白点, 就是Narrow只有map, partition本身范围不会改变, 一个parititon经过transform还是一个partition, 虽然内容发生了变化, 所以可以在local完成
    而wide就是, partition需要打乱从新划分, 存在shuffle的过程, partition的数目和范围都发生了变化

    唯一的接口getParents, 即给定任一个partition-id, 得到所有依赖的parent partitions的id的seq

    /**
     * Base class for dependencies where each partition of the parent RDD is used by at most one
     * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
     */
    abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
      /**
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    }

    NarrowDependency又分为两种,

    OneToOneDependency
    最简单的依赖关系, 即parent和child里面的partitions是一一对应的, 典型的操作就是map, filter…

    其实partitionId就是partition在RDD中的序号, 所以如果是一一对应, 那么parent和child中的partition的序号应该是一样的

    /**
     * Represents a one-to-one dependency between partitions of the parent and child RDDs.
     */
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int) = List(partitionId) //序号一致
    }

     

    RangeDependency

    虽然仍然是一一对应, 但是是parent RDD中的某个区间的partitions对应到child RDD中的某个区间的partitions
    典型的操作是union, 多个parent RDD合并到一个child RDD, 故每个parent RDD都对应到child RDD中的一个区间
    需要注意的是, 这里的union不会把多个partition合并成一个partition, 而是的简单的把多个RDD中的partitions放到一个RDD里面, partition不会发生变化, 可以参考Spark 源码分析 – RDD 中UnionRDD的实现

    由于是range, 所以直接记录起点和length就可以了, 没有必要加入每个中间rdd, 所以RangeDependency优化了空间效率

    /**
     * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
     * @param rdd the parent RDD
     * @param inStart the start of the range in the parent RDD, parent RDD中区间的起始点
     * @param outStart the start of the range in the child RDD, child RDD中区间的起始点 
     * @param length the length of the range
     */
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int) = {
        if (partitionId >= outStart && partitionId < outStart + length) { //判断partitionId的合理性,必须在child RDD的合理partition范围内
          List(partitionId - outStart + inStart) //算出parent RDD中对应的partition id
        } else {
          Nil
        }
      }
    }

     

    WideDependency

    WideDependency, 也称为ShuffleDependency
    首先需要基于PairRDD, 因为一般需要依据key进行shuffle, 所以数据结构往往是kv
    即RDD中的数据是kv pair, [_ <: Product2[K, V]],

    trait Product2[+T1, +T2] extends Product  // Product2 is a cartesian product of 2 components

    Product2是trait, 这里实现了Product2可以用于表示kv pair? 不是很理解

    其次, 由于需要shuffle, 所以当然需要给出partitioner, 如何完成shuffle

    然后, shuffle不象map可以在local进行, 往往需要网络传输或存储, 所以需要serializerClass

    最后, 每个shuffle需要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加

     

    /**
     * Represents a dependency on the output of a shuffle stage.
     * @param rdd the parent RDD
     * @param partitioner partitioner used to partition the shuffle output
     * @param serializerClass class name of the serializer to use
     */
    class ShuffleDependency[K, V](
        @transient rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,
        val serializerClass: String = null)
      extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
    
      val shuffleId: Int = rdd.context.newShuffleId()
    }
  • 相关阅读:
    vue.js多选列表(绑定到一个数组)
    vue.js选择列表
    vue.js单选按钮
    Vue.js逆转消息
    TTL
    [转]为何需要调用“super viewDidLoad
    Tomcat 发布war包提示war包超出大小修改
    【转】 MySQL与PostgreSQL:该选择哪个开源数据库?哪一个更好?
    【转】 #1451
    【转】互联网网页响应速度测试标准
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3489110.html
Copyright © 2011-2022 走看看