RDD
RDD是Resilient Distributed Dataset
的英文缩写,是spark的基本数据抽象,代表着一个不可变的、多分区的、可并行操作的元素集合。
RDD有5个主要属性:
- 分区列表 (partition list)
- 计算某个分区函数(compute)
- 依赖列表 (dependency list)
- kv类型RDD的分区器(可选的)
- 计算某个分区最优位置的函数(可选的)
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
@transient val partitioner: Option[Partitioner] = None
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
可见血统关系是通过deps
依赖列表来保存的,如果不指定依赖列表则默认创建一对一的依赖关系OneToOneDependency
函数注入
RDD类中定义了一些通用的转换函数如map``fliter``union
等同时RDD的伴生对象中通过隐式转换的方式定义了一些额外的转换函数,比如kv类型的RDD一些转换函数:groupByKey
cogroup
等
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
依赖关系
Dependency
抽象类来描述依赖关系,有两种子类:
- NarrowDependency 这也就是常说的窄依赖,子RDD的每一个分区依赖固定个父RDD的分区。这种依赖关系是固定的可以通过
def getParents(partitionId: Int): Seq[Int]
函数计算出子RDD的某个分区依赖的父RDD的分区。 - ShuffleDependency 也就是常说的宽依赖,这种依赖关系会触发shuffle,也是spark任务划分stage的标准。
具体实现
- MapPartitionsRDD 不创建新的分区列表,采用一对一的依赖关系,每个分区的计算就是在对应父分区上运用传入的转换函数。
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context)
- ParallelCollectionRDD 作为source类型的RDD,依赖列表为空,会根据传入的数据和并行度计算新的分区列表,用
ParallelCollectionPartition
对象保存每个分区的数据。
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
}
- 把握好5个主要属性很容易实现自定义的RDD