RDD.scala
/** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value * pairs, such as `groupByKey` and `join`; * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of * Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] * through implicit. * * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) * * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD * to implement its own way of computing itself. */
/** * RDD, Spark中的基本抽象.表示一个不可变的,分区化的元素集合,可并行操作.该类含所有RDD类型的基本 *操作,如`map`, `filter`,`persist`. *另外,[org.apache.spark.rdd.PairRDDFunctions]仅含k-v对偶类型的RDD的操作,如`groupByKey` and `join` *[org.apache.spark.rdd.DoubleRDDFunctions]仅含Doubles类型RDD的操作 *[org.apache.spark.rdd.SequenceFileRDDFunctions]含可被保存为SequenceFiles文件类型的RDD的操作. *形如RDD[(Int,Int)]类型的RDD通过隐式转换可自动实现PairRddRunctions的操作。 * *每个RDD都包含5个主要属性: * * - 一个分区列表 * - 一个计算每个分片的函数 * - 一个当前RDD和其他RDD的依赖列表 * - 可选, 一个k-v型RDD的Partitioner(例如,RDD是哈希分区的) * - 可选, 一个计算每个切片首选位置的列表(例如,一个HDFS文件的块位置) * * spark中所有调度和执行都基于这些方法完成,所有的调度和执行允许每个RDD实现它自己的计算方式. * 用户可实现自定义的RDDs (例如,对于从一个新的存储系统读数据)通过重写这些函数. */
弹性的理解:如某个RDD的分区,若内存能够装得下数据,则数据在内存中;若装不下,则部分数据存储在磁盘中。数据在内存还是磁盘对用户透明,不需要关注,这就是弹性。
容错的理解:如rdd1,rdd2是由rdd1计算得到的如map、filter等;若rdd2某分区数据丢失,则spark会通过rdd1的数据及相应的算子计算得到目标数据。
创建RDD有两种方式:
1、在驱动程序中并行化一个已经存在的集合
scala> val rdd1 = sc.parallelize(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:36
//调用parallelize
scala> sc.makeRDD(1 to 10) res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
2、关联一个外部存储系统中数据集
scala> val rdd1 = sc.textFile("myspark/temptags.txt") //从HDFS rdd1: org.apache.spark.rdd.RDD[String] = myspark/temptags.txt MapPartitionsRDD[22] at textFile at <console>:36
RDD的操作有两种类型:
1、transforms 变换
从已经存在的数据集创建一个新的数据集;spark中所有的变换都是lazy的,该变换并不马上计算,只在遇到aciton才计算,这样使spark更高效
2、actions 动作
在数据集上执行一个计算后返回给驱动程序一个值
RDD特点:虚的,逻辑上的集合,等价List
轻量级的(不含实际数据,只有分区列表;如果是并行化的创建RDD则含有数据)
一个RDD含多个分区,在一个阶段中,child RDD的分区总数 <= parent RDD的分区总数是,shuffle型算子可重新指定分区
spark中的分区是切片,Hadoop中的分区是map阶段key的目的地
RDD的依赖:
Dependency:child RDD的每个分区和parent RDD的每个分区数量上的对应关系
NarrowDependency:窄依赖,child RDD的每个分区不完全依赖parent RDD的所有分区
OneToOneDependency
RangeDependency
PruneDependency
ShuffleDependency:宽依赖,child RDD的每个分区依赖parent RDD的所有分区
划分stage
sc.textFile("/1.txt")内部先创建HadoopRdd再经过map变换为MapPartitionRdd并返回