zoukankan      html  css  js  c++  java
  • 【杂谈】RDD-依赖

    RDD的依赖构成了它的血统(linage)--叫族谱更容易理解

    有2种依赖关系:窄依赖和宽依赖

    构成窄依赖关系的RDD,每一个分区对应一个task(一个线程),所有task可以并行运行;
    宽依赖关系的RDD,要等到父RDD所有分区计算结束后,进行Shuffle,然后才能开始计算。

    一个RDD可以依赖多个父RDD,在RDD中,依赖是一个集合

    protected def getDependencies: Seq[Dependency[_]] = deps
    

    针对每一个父RDD的依赖,是这样子定义的

    // 依赖基类,抽象类
    @DeveloperApi
    abstract class Dependency[T] extends Serializable {
      def rdd: RDD[T]       // 父RDD
    }
    

    窄依赖

    // 它是一个抽象类
    @DeveloperApi
    abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
    
      // 获得和分区(id)相对应的父RDD分区(id),不可能多个,这里应该是为了扩展,不会存在多个父RDD分区对应一个子RDD分区的场景
      def getParents(partitionId: Int): Seq[Int]
    
      override def rdd: RDD[T] = _rdd
    }
    
    // 1:1的窄依赖, 子RDD分区和父RDD分区是一一对应的,e.g. map()
    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
      override def getParents(partitionId: Int): List[Int] = List(partitionId)
    }
    
    // 范围依赖,是一种1:1的依赖,它是把RDD分区叠加起来,e.g. union()
    class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
       extends NarrowDependency[T](rdd) {
    
      override def getParents(partitionId: Int): List[Int] = {
        // 这个计算基于这样的模型:2个父RDD的分区,叠加到一起,当然,不改变相对顺序
        // e.g. PRDD1[p11, p12, p13] union PRDD2[p21, p22, p23] -> UnionRDD[p11, p12, p13, p21, p22, p23]
        // 当前,UnionRDD, inStart 一直为0
        // outStart,父RDD在UnionRDD中的起始位置,对于我们的例子,PRDD1在UnionRDD对应的outStart = 0; PRDD2在UnionRDD对应的outStart = 2
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    

    宽依赖

    宽依赖中,RDD分区是没有什么对应关系的,就是对应全部的分区

    @DeveloperApi
    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    	@transient private val _rdd: RDD[_ <: Product2[K, V]],
    	val partitioner: Partitioner,
    	val serializer: Serializer = SparkEnv.get.serializer,
    	val keyOrdering: Option[Ordering[K]] = None,
    	val aggregator: Option[Aggregator[K, V, C]] = None,
    	val mapSideCombine: Boolean = false)
      extends Dependency[Product2[K, V]] {
    
      override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
    
    
          // 注册shuffle manager,shuffle manager管理着shuffle的整个过程:和mapreduce shuffle相似;父RDD分区的数据按分区号在内存中排序,如果数据量很大,就会溢写到磁盘;多个溢写磁盘的文件会需要进行一个归并排序;所有RDD的分区完成上面“写”的过程以后,数据会被shuffle到Reducer中的Executor
      val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    	shuffleId, _rdd.partitions.length, this)
    }
  • 相关阅读:
    asp.net mvc上传图片案例
    kafka 常用参数
    play framework 笔记
    调试 kafka manager 源码
    kafka AdminClient 闲时关闭连接
    kafka 心跳和 rebalance
    kafka producer batch 发送消息
    kafka producer 发送消息简介
    zk 的配额
    kafka consumer 指定 offset,进行消息回溯
  • 原文地址:https://www.cnblogs.com/ivanny/p/spark_rdd_dependency.html
Copyright © 2011-2022 走看看