zoukankan      html  css  js  c++  java
  • Spark面试常见问题(一)--RDD基础

    1 RDD基础知识

    1.1 Spark的RDD五大特性

    1.1.1 A list of partitions

      RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的List;将数据加载为RDD时,一般一个hdfs里的block会加载为一个partition。
       对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目

    1.1.2 A function for computing each split

      RDD的每个partition上面都会有function,也就是函数应用,其作用是实现RDD之间partition的转换。
       Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    1.1.3 A list of dependencies on other RDDs

       RDD会记录它的依赖 ,为了容错,也就是说在内存中的RDD操作时出错或丢失会进行重算,依赖又分为宽依赖和窄依赖,但不是所有的RDD都有依赖。
      RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

    1.1.4 Optionally,a Partitioner for Key-value RDDs

      可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面
      Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量

    1.1.5 Optionally, a list of preferred locations to compute each split on

      最优的位置去计算,也就是数据的本地性,比如HDFS的block的所在位置应该是优先计算的位置。
      对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

      对应着上面这几点,我们在RDD里面能找到这4个方法和1个属性

    //只计算一次  
    protected def getPartitions: Array[Partition]  
    //对一个分片进行计算,得出一个可遍历的结果
    def compute(split: Partition, context: TaskContext): Iterator[T]
    //只计算一次,计算RDD对父RDD的依赖
    protected def getDependencies: Seq[Dependency[_]] = deps
    //可选的,分区的方法,针对第4点,类似于mapreduce当中的Paritioner接口,控制key分到哪个reduce
    @transient val partitioner: Option[Partitioner] = None
    //可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.2 宽窄依赖

    1.2.1 宽依赖

      多个子RDD的Partition会依赖同一个父RDD的Partition,关系是一对多,父RDD的一个分区的数据去到子RDD的不同分区里面,会有shuffle的产生

    1.2.2 窄依赖

      指的是每一个父RDD的Partition最多被子RDD的一个partition使用,是一对一的,也就是父RDD的一个分区去到了子RDD的一个分区中,这个过程没有shuffle产生,可能多个父RDD的Partition被一个子RDD使用,即多对一。
      区分的标准就是看父RDD的一个分区的数据的流向,要是流向一个partition的话就是窄依赖,否则就是宽依赖

    1.3 spark划分stage

    1.3.1 划分规则

      Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是宽窄依赖,遇到宽依赖就划分stage,每个stage包含一个或多个task,然后将这些task以taskSet的形式提交给TaskScheduler运行,stage是由一组并行的task组成
      spark程序中可以因为不同的action触发众多的job,一个程序中可以有很多的job,每一个job是由一个或者多个stage构成的,后面的stage依赖于前面的stage,也就是说只有前面依赖的stage计算完毕后,后面的stage才会运行
       stage 的划分标准就是宽依赖:何时产生宽依赖就会产生一个新的stage,例如reduceByKey,groupByKey,join的算子,会导致宽依赖的产生
      切割规则:从后往前,遇到宽依赖就切割stage;

    1.3.2 计算格式

       计算格式:pipeline管道计算模式,piepeline只是一种计算思想,一种模式
      spark的pipeline管道计算模式相当于执行了一个高阶函数,也就是说来一条数据然后计算一条数据,会把所有的逻辑走完,然后落地,而MapReduce是1+1=2,2+1=3这样的计算模式,也就是计算完落地,然后再计算,然后再落地到磁盘或者内存,最后数据是落在计算节点上,按reduce的hash分区落地。管道计算模式完全基于内存计算,所以比MapReduce快的原因。
       管道中的RDD何时落地:shuffle write的时候,对RDD进行持久化的时候。
      stage的task的并行度是由stage的最后一个RDD的分区数来决定的,一般来说,一个partition对应一个task,但最后reduce的时候可以手动改变reduce的个数,也就是改变最后一个RDD的分区数,也就改变了并行度。例如:reduceByKey(+,3)
      优化:提高stage的并行度:reduceByKey(+,patition的个数) ,join(+,patition的个数)

    1.3.3 DAGScheduler分析

      概述:是一个面向stage的调度器
      主要入参有:dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)

    rdd: final RDD;
    cleanedFunc: 计算每个分区的函数;
    resultHander: 结果侦听器;
    
    • 1
    • 2
    • 3

       主要功能:

    1.接受用户提交的job;
    2.将job根据类型划分为不同的stage,记录那些RDD,stage被物化,并在每一个stage内产生一系列的task,并封装成taskset;
    3.决定每个task的最佳位置,任务在数据所在节点上运行,并结合当前的缓存情况,将taskSet提交给TaskScheduler;
    4.重新提交shuffle输出丢失的stage给taskScheduler;
    注:一个stage内部的错误不是由shuffle输出丢失造成的,DAGScheduler是不管的,由TaskScheduler负责尝试重新提交task执行。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3.4 Job生成

      一旦driver程序中出现action,就会生成一个job,比如count等,向DAGScheduler提交job,如果driver程序后面还有action,那么其他action也会对应生成相应的job,所以,driver端有多少action就会提交多少job,这可能就是为什么spark将driver程序称为application而不是job 的原因。每一个job可能会包含一个或者多个stage,最后一个stage生成result,在提交job 的过程中,DAGScheduler会首先从后往前划分stage,划分的标准就是宽依赖,一旦遇到宽依赖就划分,然后先提交没有父阶段的stage们,并在提交过程中,计算该stage的task数目以及类型,并提交具体的task,在这些无父阶段的stage提交完之后,依赖该stage 的stage才会提交。

    1.3.5 有向无环图

      DAG,有向无环图,简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任意一条路径会将其带回到出发点的顶点位置,为每个spark job计算具有依赖关系的多个stage任务阶段,通常根据shuffle来划分stage,如reduceByKey,groupByKey等涉及到shuffle的transformation就会产生新的stage ,然后将每个stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来执行,其中不同stage之前的RDD为宽依赖关系,TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况

    1.4编程API

    1.4.1 Transformation

      RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    转换含义
    map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
    sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])  
    sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
    cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    cartesian(otherDataset)  
    pipe(command, [envVars])  
    coalesce(numPartitions)  
    repartition(numPartitions)  
    repartitionAndSortWithinPartitions(partitioner)  

    1.4.2 Action

    动作含义
    reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering])  
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path)  
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
  • 相关阅读:
    Mapreduce实例-Top Key
    Mapreduce实例-分组排重(group by distinct)
    storm0.9.0.1升级安装
    mysql配置文件my.cnf详解
    MYSQL管理之主从同步管理
    一个经典实用的iptables shell脚本
    sed实例精解--例说sed完整版
    常用的主机监控Shell脚本
    Python(九)Tornado web 框架
    缓存、队列(Memcached、redis、RabbitMQ)
  • 原文地址:https://www.cnblogs.com/ExMan/p/14318504.html
Copyright © 2011-2022 走看看