zoukankan      html  css  js  c++  java
  • RDD的源码

    RDD是一个抽象类定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法。

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {

    RDD有5个主要的属性

     *  - A list of partitions
     *  - A function for computing each split
     *  - A list of dependencies on other RDDs
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)

    (一)子类

    CoGroupedRDD, EdgeRDD, EdgeRDDImpl, HadoopRDD, JdbcRDD, NewHadoopRDD, PartitionPruningRDD, ShuffledRDD, UnionRDD, VertexRDD, VertexRDDImpl

    (二)属性

    1、SpackContext

    @transient private var _sc: SparkContext

    在主构建函数中定义,表示RDD所在运行环境,可用于获取配置,清理环境等。

    2、Seq[Dependency[_]]

    @transient private var deps: Seq[Dependency[_]]

    定义了这个RDD对父RDD的依赖关系。

    (三)方法

    1、tranformation与action

    RDD中定义了所有RDD所共用的tranformation与action,如map, filter, reduce, first等,举个filter的例子:

     def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }

    2、缓存

    包括pesist的多个实现及cache等,举个例子

     /**
       * Mark this RDD for persisting using the specified level.
       *
       * @param newLevel the target storage level
       * @param allowOverride whether to override any existing level with the new one
       */
      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // TODO: Handle changes of StorageLevel
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // If this is the first time this RDD is marked for persisting, register it
        // with the SparkContext for cleanups and accounting. Do this only once.
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
  • 相关阅读:
    内存区间poj 1840
    旋转实现iOS(iPhone/iPad) 屏幕旋转响应函数的缺点与窗口大小位置调整,以及解决办法
    文件模式JAVASE16IO流_3
    配置修改postgresql streaming replication
    查看字段postgresql pg_buffercache
    窗体方法VB.NET设置控件和窗体的显示级别
    流量最小HDU 3491 最小割
    方法返回javascript学习实录 之二(数组操作等等utils) 刘啸尘
    配置路由器静态路由配置
    任务问题Oracle 技术支持之现场优化的思维路径
  • 原文地址:https://www.cnblogs.com/itboys/p/6673208.html
Copyright © 2011-2022 走看看