zoukankan      html  css  js  c++  java
  • RDD的源码

    RDD是一个抽象类定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法。

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {

    RDD有5个主要的属性

     *  - A list of partitions
     *  - A function for computing each split
     *  - A list of dependencies on other RDDs
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)

    (一)子类

    CoGroupedRDD, EdgeRDD, EdgeRDDImpl, HadoopRDD, JdbcRDD, NewHadoopRDD, PartitionPruningRDD, ShuffledRDD, UnionRDD, VertexRDD, VertexRDDImpl

    (二)属性

    1、SpackContext

    @transient private var _sc: SparkContext

    在主构建函数中定义,表示RDD所在运行环境,可用于获取配置,清理环境等。

    2、Seq[Dependency[_]]

    @transient private var deps: Seq[Dependency[_]]

    定义了这个RDD对父RDD的依赖关系。

    (三)方法

    1、tranformation与action

    RDD中定义了所有RDD所共用的tranformation与action,如map, filter, reduce, first等,举个filter的例子:

     def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }

    2、缓存

    包括pesist的多个实现及cache等,举个例子

     /**
       * Mark this RDD for persisting using the specified level.
       *
       * @param newLevel the target storage level
       * @param allowOverride whether to override any existing level with the new one
       */
      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
        // TODO: Handle changes of StorageLevel
        if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of an RDD after it was already assigned a level")
        }
        // If this is the first time this RDD is marked for persisting, register it
        // with the SparkContext for cleanups and accounting. Do this only once.
        if (storageLevel == StorageLevel.NONE) {
          sc.cleaner.foreach(_.registerRDDForCleanup(this))
          sc.persistRDD(this)
        }
        storageLevel = newLevel
        this
      }
  • 相关阅读:
    总结几个 webpack 打包优化的方法,前端项目必备
    vue-cli 3.0 axios 跨域请求代理配置及生产环境 baseUrl 配置
    React之MobX使用
    Couldn't load this key (OpenSSH SSH-2 private key(old PEM format))的解决办法
    HTML基础篇(一,认识HTML)
    Angular学习之路-一、配置项目
    小程序开发日志-3、调用相机竖屏拍照,并将照片转横屏显示
    自定义handsome主题默认文章头图
    DruidDataSource无限重连(mybatis数据源)
    mysql获取表字段信息(字段名,字段长度,字段类型,精度,小数点位)
  • 原文地址:https://www.cnblogs.com/itboys/p/6673208.html
Copyright © 2011-2022 走看看