zoukankan      html  css  js  c++  java
  • Spark-RDD-基本介绍

    1.定义

    RDD是只读的记录分区的集合,是一种基于工作集的应用抽象

    创建RDD的方式有两种:

    从驱动程序中的集合中并行创建

    从外部数据集创建

    2.底层存储原理

    每个RDD的数据以Block的形式存储在多个机器上,对于每个Executor都会启动一个BlockManagerSlave,并且管理一部分Block,Driver节点上通过BlockManagerMaster保存Block的元数据,BlockManagerSlave生成Block之后会向BlockManagerMaster注册,然后通过它来管理RDD和Block的关系,如果RDD不需要存储,BlockManagerMasterBlockManagerSlave发送删除指令删除相应的Block

    BlockManager来管理RDD的物理分区,每个Block对应节点上一个数据块,存储位置可以是磁盘或内存,RDD中的Partition是一个逻辑数据块,对应相应的Block,在代码中一个RDD相当于数据的元数据结构,保存数据的分区以及逻辑结构映射关系还有RDD之前的依赖关系

    在每个节点上都是通过BlockManager来管理Block,部分源码如下:

    /**
     * Manager running on every node (driver and executors) which provides interfaces for putting and
     * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
     * BlockManager在每个节点上运行管理Block(Driver和Executors),它提供一个接口检索本地和远程的存储变量,如memory、disk、off-	  heap
     * Note that [[initialize()]] must be called before the BlockManager is usable.
     * 使用BlockManager前必须先初始化
     */
    private[spark] class BlockManager(
        executorId: String,
        rpcEnv: RpcEnv,
        val master: BlockManagerMaster,
        val serializerManager: SerializerManager,
        val conf: SparkConf,
        memoryManager: MemoryManager,
        mapOutputTracker: MapOutputTracker,
        shuffleManager: ShuffleManager,
        val blockTransferService: BlockTransferService,
        securityManager: SecurityManager,
        numUsableCores: Int)
      extends BlockDataManager with BlockEvictionHandler with Logging {
    

    RDD.scala源码如下:

    /**
     * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
     * partitioned collection of elements that can be operated on in parallel. This class contains the
     * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
     * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
     * pairs, such as `groupByKey` and `join`;
     * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
     * Doubles; and
     * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
     * can be saved as SequenceFiles.
     * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
     * through implicit.
     *
     * Internally, each RDD is characterized by five main properties:
     * 5个主要特性
     *  - A list of partitions 一个分区的列表
     *  - A function for computing each split 每个分片有一个计算函数
     *  - A list of dependencies on other RDDs 一个依赖于其他RDD的列表
     *  可选的,一个key-value类型的RDD分区器
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  每个分区都有一个分区位置列表
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)
     *
     * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
     * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
     * reading data from a new storage system) by overriding these functions. Please refer to the
     * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
     * for more details on RDD internals.
     */
    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext, //入口
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {
    

    3.RDD五大特性

    1.A list of partitions

    RDD可以有多个分区,每个分区被一个Task处理,可以在创建RDD时指定分区个数,如果不指定,则为分配到的CPU核数

    2.A function for computing each split

    每个分区有一个compute函数,对具体的分片进行计算,分片是并行的

    3.A list of dependencies on other RDDs

    一个依赖于其他RDD的列表,RDD之间的依赖有两种:Narrow Dependency和Wide Dependency

    Narrow Dependency指每一个父RDD的Partition最多被子RDD的一个Partition使用

    Wide Dependency指多个子RDD的Partition依赖于同一个父RDD的Partition

    4.Optionally, a Partitioner for key-value RDDs

    每个key-value形式的RDD都有Partitioner属性,决定RDD如何分区,Partition的个数决定每个Stage的Task个数

    5.Optionally, a list of preferred locations to compute each split on

    每个分区都有一个优先位置列表,Spark在进行任务调度的时候,会优先将任务分配到处理数据的数据块所在的位置,符合数据本地性

    RDD的源码文件中通过4个方法和一个属性对应了这5大特性:

    /**
       * :: DeveloperApi ::
       * Implemented by subclasses to compute a given partition.
       * 通过子类实现给定分区的计算
       */
      @DeveloperApi
      def compute(split: Partition, context: TaskContext): Iterator[T]
    
      /**
       * Implemented by subclasses to return the set of partitions in this RDD. This method will only
       * be called once, so it is safe to implement a time-consuming computation in it.
       * 通过子类实现,返回一个RDD的分区列表,该方法仅仅被调用一次,因此在其中执行耗时的操作是安全的
       *
       * The partitions in this array must satisfy the following property:
       * 在这个数组中的分区必须符合以下属性
       *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
       */
      protected def getPartitions: Array[Partition]
    
      /**
       * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
       * be called once, so it is safe to implement a time-consuming computation in it.
       * 通过子类实现该RDD是如何依赖父RDDs的,该方法仅仅被调用一次,因此在其中执行耗时的操作是安全的
       */
      protected def getDependencies: Seq[Dependency[_]] = deps
    
      /**
       * Optionally overridden by subclasses to specify placement preferences.
       * 可选的,通过子类覆盖指定优先位置
       */
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
      /** Optionally overridden by subclasses to specify how they are partitioned.
       * 可选的,通过子类覆盖指定分区
       * */
      @transient val partitioner: Option[Partitioner] = None
    

    TaskContext是读取执行任务的环境,可以调用内部的函数访问正在运行任务的环境信息,Partitioner定义了如何在key-value类型的RDD元素中用key分区,Partition时一个RDD的分区标识符,源码如下:

    /**
     * An identifier for a partition in an RDD.
     */
    trait Partition extends Serializable {
      /**
       * Get the partition's index within its parent RDD
       * 获取父RDD的分区索引
       */
      def index: Int
    
      // A better default implementation of HashCode
      // 最好默认实现HashCode
      override def hashCode(): Int = index
    
      override def equals(other: Any): Boolean = super.equals(other)
    }
    
  • 相关阅读:
    开学测试感想
    动手动脑1(00JAVA语言基础)
    9.29 java web注释方式以及servlet映射
    三十道随机算法
    9.30 servlet学习
    C#验证控件的使用方法
    SqlHelper详解
    C#字符串的几种常用方法
    存储过程事务处理
    js url编码
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13455993.html
Copyright © 2011-2022 走看看