zoukankan      html  css  js  c++  java
  • 【原】1.1RDD源码解读(一)

    1.RDDResilient Distributed DataSet)是Spark生态系统中最基本的抽象,代表不可变的、可并行操作的分区元素集合。RDD这个类有RDD系列所有基本的操作,比如mapfilterpersist.另外,org.apache.spark.rdd.PairRDDFunctions含有key-value类型的RDD的基本操作,比如groupbyjoinorg.apache.spark.rdd.DoubleRDDFunctions含有Double类型的RDD的基本操作;org.apache.spark.rdd.SequenceFileRDDFunctions含有可以将RDD保存SequenceFiles的基本操作。所有的操作会通过有隐式转换适用于任何RDD

           每个RDD5个主要属性:

    - A list of partitions
    - A function for computing each split
    - A list of dependencies on other RDDs
    - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

    2.重要方法解读

    (1)//注册一个新的RDD,并根据当前值加1返回它的RDD的ID
    private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

    (2)缓存相关

                 a)persist、cache

    /**
     * 指定RDD缓存的Level,详见StorageLevel object
     *
     * @param newLevel 缓存Level

     * @param allowOverride 是否重写缓存
     */
    private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
      if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
        throw new UnsupportedOperationException(
          "Cannot change storage level of an RDD after it was already assigned a level")
      }
      if (storageLevel == StorageLevel.NONE) {
        sc.cleaner.foreach(_.registerRDDForCleanup(this))
        sc.persistRDD(this)
      }
      storageLevel = newLevel
      this
    }

    //可见cache其实是调用的persist方法,RDD默认的缓存策略是MEMORY_ONLY

    def cache(): this.type = persist()

    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

                b)unpersist

    //RDD设置为不缓存,并且把内存或磁盘上的blocks都删除

    def unpersist(blocking: Boolean = true): this.type = {
      logInfo("Removing RDD " + id + " from persistence list")
      sc.unpersistRDD(id, blocking)

      //将缓存Level设置为NONE
      storageLevel = StorageLevel.NONE
      this
    }

    unpersistRDDidblocking)的源码如下所示:

    /**
     * 将内存或磁盘中缓存的RDD删除

     */
    private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
      env.blockManager.master.removeRdd(rddId, blocking)

      //persistentRdds是一个弱引用得HashMap,key为rddId,value为对应的RDD
      persistentRdds.remove(rddId)
      listenerBus.post(SparkListenerUnpersistRDD(rddId))
    }

    (3)分区partitions

    //得到RDD的所有分区,并以数组形式返回

    final def partitions: Array[Partition] = {
      checkpointRDD.map(_.partitions).getOrElse {
        if (partitions_ == null) {
          partitions_ = getPartitions
        }
        partitions_
      }
    }

    4

    //得到分区预先存放的位置

    final def preferredLocations(split: Partition): Seq[String] = {
      checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
      }
    }

    (5)依赖

    //得到窄依赖的祖先节点

    private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
      val ancestors = new mutable.HashSet[RDD[_]]

      def visit(rdd: RDD[_]) {
        val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
        val narrowParents = narrowDependencies.map(_.rdd)
        val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
        narrowParentsNotVisited.foreach { parent =>
          ancestors.add(parent)
          visit(parent)
        }
      }

  • 相关阅读:
    IDEA安装及破解永久版教程————鹏鹏
    Hive(2)-Hive的安装,使用Mysql替换derby,以及一丢丢基本的HQL
    Hive(1)-基本概念
    ZooKeeper(3)-内部原理
    Hadoop(25)-高可用集群配置,HDFS-HA和YARN-HA
    ZooKeeper(2)-安装和配置
    ZooKeeper(1)-入门
    Hadoop(24)-Hadoop优化
    Hadoop(23)-Yarn资源调度器
    Hadoop(22)-Hadoop数据压缩
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5260027.html
Copyright © 2011-2022 走看看