zoukankan      html  css  js  c++  java
  • Spark面试常见问题(一)--RDD基础

    1 RDD基础知识

    1.1 Spark的RDD五大特性

    1.1.1 A list of partitions

      RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的List;将数据加载为RDD时,一般一个hdfs里的block会加载为一个partition。
       对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目

    1.1.2 A function for computing each split

      RDD的每个partition上面都会有function,也就是函数应用,其作用是实现RDD之间partition的转换。
       Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    1.1.3 A list of dependencies on other RDDs

       RDD会记录它的依赖 ,为了容错,也就是说在内存中的RDD操作时出错或丢失会进行重算,依赖又分为宽依赖和窄依赖,但不是所有的RDD都有依赖。
      RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

    1.1.4 Optionally,a Partitioner for Key-value RDDs

      可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面
      Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量

    1.1.5 Optionally, a list of preferred locations to compute each split on

      最优的位置去计算,也就是数据的本地性,比如HDFS的block的所在位置应该是优先计算的位置。
      对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

      对应着上面这几点,我们在RDD里面能找到这4个方法和1个属性

    //只计算一次  
    protected def getPartitions: Array[Partition]  
    //对一个分片进行计算,得出一个可遍历的结果
    def compute(split: Partition, context: TaskContext): Iterator[T]
    //只计算一次,计算RDD对父RDD的依赖
    protected def getDependencies: Seq[Dependency[_]] = deps
    //可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
    @transient val partitioner: Option[Partitioner] = None
    //可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.2 宽窄依赖

    1.2.1 宽依赖

      多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生

    1.2.2 窄依赖

      指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生,可能多个父RDD的Partition被一个子RDD使用,即多对一。
      区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖

    1.3 spark划分stage

    1.3.1 划分规则

      Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是宽窄依赖,遇到宽依赖就划分stage,每个stage包含一个或多个task,然后将这些task以taskSet的形式提交给TaskScheduler运行,stage是由一组并行的task组成
      spark程序中可以因为不同的action触发众多的job,一个程序中可以有很多的job,每一个job是由一个或者多个stage构成的,后面的stage依赖于前面的stage,也就是说只有前面依赖的stage计算完毕后,后面的stage才会运行
       stage 的划分标准就是宽依赖:何时产生宽依赖就会产生一个新的stage,例如reduceByKey,groupByKey,join的算子,会导致宽依赖的产生
      切割规则:从后往前,遇到宽依赖就切割stage;

    1.3.2 计算格式

       计算格式:pipeline管道计算模式,piepeline只是一种计算思想,一种模式
      spark的pipeline管道计算模式相当于执行了一个高阶函数,也就是说来一条数据然后计算一条数据,会把所有的逻辑走完,然后落地,而MapReduce是1+1=2,2+1=3这样的计算模式,也就是计算完落地,然后再计算,然后再落地到磁盘或者内存,最后数据是落在计算节点上,按reduce的hash分区落地。管道计算模式完全基于内存计算,所以比MapReduce快的原因。
       管道中的RDD何时落地:shuffle write的时候,对RDD进行持久化的时候。
      stage的task的并行度是由stage的最后一个RDD的分区数来决定的,一般来说,一个partition对应一个task,但最后reduce的时候可以手动改变reduce的个数,也就是改变最后一个RDD的分区数,也就改变了并行度。例如:reduceByKey(+,3)
      优化:提高stage的并行度:reduceByKey(+,patition的个数) ,join(+,patition的个数)

    1.3.3 DAGScheduler分析

      概述:是一个面向stage的调度器
      主要入参有:dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)

    rdd: final RDD;
    cleanedFunc: 计算每个分区的函数;
    resultHander: 结果侦听器;
    
    • 1
    • 2
    • 3

       主要功能:

    1.接受用户提交的job;
    2.将job根据类型划分为不同的stage,记录那些RDD,stage被物化,并在每一个stage内产生一系列的task,并封装成taskset;
    3.决定每个task的最佳位置,任务在数据所在节点上运行,并结合当前的缓存情况,将taskSet提交给TaskScheduler;
    4.重新提交shuffle输出丢失的stage给taskScheduler;
    注:一个stage内部的错误不是由shuffle输出丢失造成的,DAGScheduler是不管的,由TaskScheduler负责尝试重新提交task执行。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3.4 Job生成

      一旦driver程序中出现action,就会生成一个job,比如count等,向DAGScheduler提交job,如果driver程序后面还有action,那么其他action也会对应生成相应的job,所以,driver端有多少action就会提交多少job,这可能就是为什么spark将driver程序称为application而不是job 的原因。每一个job可能会包含一个或者多个stage,最后一个stage生成result,在提交job 的过程中,DAGScheduler会首先从后往前划分stage,划分的标准就是宽依赖,一旦遇到宽依赖就划分,然后先提交没有父阶段的stage们,并在提交过程中,计算该stage的task数目以及类型,并提交具体的task,在这些无父阶段的stage提交完之后,依赖该stage 的stage才会提交。

    1.3.5 有向无环图

      DAG,有向无环图,简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任意一条路径会将其带回到出发点的顶点位置,为每个spark job计算具有依赖关系的多个stage任务阶段,通常根据shuffle来划分stage,如reduceByKey,groupByKey等涉及到shuffle的transformation就会产生新的stage ,然后将每个stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来执行,其中不同stage之前的RDD为宽依赖关系,TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况

    1.4编程API

    1.4.1 Transformation

      RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    转换含义
    map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
    sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])  
    sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
    cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    cartesian(otherDataset)  
    pipe(command, [envVars])  
    coalesce(numPartitions)  
    repartition(numPartitions)  
    repartitionAndSortWithinPartitions(partitioner)  

    1.4.2 Action

    动作含义
    reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering])  
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path)  
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
  • 相关阅读:
    Photoshop 基础七 位图 矢量图 栅格化
    Photoshop 基础六 图层
    Warfare And Logistics UVALive
    Walk Through the Forest UVA
    Airport Express UVA
    Guess UVALive
    Play on Words UVA
    The Necklace UVA
    Food Delivery ZOJ
    Brackets Sequence POJ
  • 原文地址:https://www.cnblogs.com/ExMan/p/14318504.html
Copyright © 2011-2022 走看看