zoukankan      html  css  js  c++  java
  • Spark-Dependency

    1、Spark中採用依赖关系(Dependency)表示rdd之间的生成关系。Spark可利用Dependency计算出失效的RDD。在每一个RDD中都存在一个依赖关系的列表

      private var dependencies_ : Seq[Dependency[_]] = null

    用以记录各rdd中各partition的parent partition。

    2、Spark中存在两类Dependency:


    1)NarrowDependency表示的是一个父partition仅相应于一个子partition。这种依赖关系是不须要shuffle的。在这类依赖中。能够依据getParents方法获取某个partition的父partitions:

    /**
     * :: DeveloperApi ::
     * Base class for dependencies where each partition of the parent RDD is used by at most one
     * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
     */
    @DeveloperApi
    abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
      /**
       * 唯一的接口。获得该partition的全部parent partition
       * Get the parent partitions for a child partition.
       * @param partitionId a partition of the child RDD
       * @return the partitions of the parent RDD that the child partition depends upon
       */
      def getParents(partitionId: Int): Seq[Int]
    }


    这类又可分为:

    a、OneToOneDependency:表示一一相应的依赖关系,因为在这样的依赖中父partition与子partition Id是一致的,所以getParents直接原样返回。相应的转换操作有map和filter

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      /**
       * 事实上partitionId就是partition在RDD中的序号, 所以假设是一一相应, 那么parent和child中的partition的序号应该是一样的
       */
      override def getParents(partitionId: Int) = List(partitionId)//原样返回
    }


    b、PruneDependency(org.apache.spark.rdd.PartitionPruningRDDPartition):未详

    /**
     * Represents a dependency between the PartitionPruningRDD and its parent. In this
     * case, the child RDD contains a subset of partitions of the parents'.
     */
    private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
      extends NarrowDependency[T](rdd) {
    
      @transient
      val partitions: Array[Partition] = rdd.partitions
        .filter(s => partitionFilterFunc(s.index)).zipWithIndex
        .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
    
      override def getParents(partitionId: Int) = {
        List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
      }
    }
    


    c、RangeDependency:这样的是父rdd的连续多个partitions相应子rdd中的连续多个partitions。相应的转换有union

    /**Union
     * :: DeveloperApi ::
     * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
     * @param rdd the parent RDD
     * @param inStart the start of the range in the parent RDD parent RDD中区间的起始点
     * @param outStart the start of the range in the child RDD child RDD中区间的起始点 
     * @param length the length of the range
     */
    @DeveloperApi
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
      extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int) = {
        if (partitionId >= outStart && partitionId < outStart + length) {//推断partitionId的合理性,必须在child RDD的合理partition范围
          List(partitionId - outStart + inStart)//算出parent RDD中相应的partition id
        } else {
          Nil
        }
      }
    }

    2)WideDependency:这样的依赖是指一个父partition能够相应子rdd中多个partitions。因为须要对父partition进行划分,故须要用到shuffle,而shuffle通常是採用键值对的。

    这里为每一个shuffle分配了一个全局唯一的shuffleId。

    为了进行shuffle。须要指定怎样进行shuffle,这相应于參数partitioner;因为shuffle是须要网络传输的。故须要进行序列化Serializer。在宽依赖中并无法获得partition相应的parent partitions?


    /**
     * :: DeveloperApi ::
     * Represents a dependency on the output of a shuffle stage.
     * @param rdd the parent RDD
     * @param partitioner partitioner used to partition the shuffle output
     * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
     *                   the default serializer, as specified by `spark.serializer` config option, will
     *                   be used.
     */
    @DeveloperApi
    class ShuffleDependency[K, V](
        @transient rdd: RDD[_ <: Product2[K, V]],
        val partitioner: Partitioner,//须要给出partitioner, 指示怎样完毕shuffle
        val serializer: Serializer = null)//shuffle不象map能够在local进行, 往往须要网络传输或存储, 所以须要serializerClass
      extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
    
      val shuffleId: Int = rdd.context.newShuffleId()//每一个shuffle须要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加
    
      rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    }
    


  • 相关阅读:
    毕业两年
    Python & PyCharm & Django 搭建web开发环境(续)
    Python & PyCharm & Django 搭建web开发环境
    Jboss7 部署EJB3 简明教程
    java 、HashMap 和单例
    一个Flex 对话框的坑
    一道文本处理题目的思考
    synchronized 与 Lock 的那点事
    推荐5款简洁美观的Hexo主题
    【HTTP缓存】浏览器缓存理论知识
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/6732537.html
Copyright © 2011-2022 走看看