zoukankan      html  css  js  c++  java
  • RDD的源码

    RDD是一个抽象类定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法。

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {

    RDD有5个主要的属性

     *  - A list of partitions
     *  - A function for computing each split
     *  - A list of dependencies on other RDDs
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)

    (一)子类

    CoGroupedRDD, EdgeRDD, EdgeRDDImpl, HadoopRDD, JdbcRDD, NewHadoopRDD, PartitionPruningRDD, ShuffledRDD, UnionRDD, VertexRDD, VertexRDDImpl

    (二)属性

    1、SpackContext

    @transient private var _sc: SparkContext

    在主构建函数中定义,表示RDD所在运行环境,可用于获取配置,清理环境等。

    2、Seq[Dependency[_]]

    @transient private var deps: Seq[Dependency[_]]

    定义了这个RDD对父RDD的依赖关系。

    (三)方法

    1、tranformation与action

    RDD中定义了所有RDD所共用的tranformation与action,如map, filter, reduce, first等,举个filter的例子:

     def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }

    2、缓存

    包括pesist的多个实现及cache等,举个例子

     /**
       * Mark this RDD for persisting using the specified level.
       *
       * @param newLevel the target storage level
       * @param allowOverride whether to override any existing level with the new one
       */
      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // TODO: Handle changes of StorageLevel
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // If this is the first time this RDD is marked for persisting, register it
        // with the SparkContext for cleanups and accounting. Do this only once.
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
  • 相关阅读:
    遥控器油门摇杆电位器封装尺寸图
    Microhard P900 900MHz跳频电台核心模块
    航路点
    当电桥为恒流源时惠斯通电桥电压的计算方法
    曲轴位置传感器
    16种发动机动态工作原理图,神奇的帅呆了!
    ffmpeg mediacodec 硬解初探
    ffmpeg编码常见问题排查方法
    阿里云 访问控制RAM
    WannaCry勒索病毒处理指南
  • 原文地址:https://www.cnblogs.com/itboys/p/6673208.html
Copyright © 2011-2022 走看看