Spark core面试篇03
1.Spark使用parquet文件存储格式能带来哪些好处?
- 如果说HDFS 是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准
- 速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况
会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况
下,使用parquet很多时候可以成功运行 - parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作
(例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成 - 极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理
数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的
减少磁盘的IO和内存的占用,(下推过滤器) - spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu
- 采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径
2.Executor之间如何共享数据?
答:基于hdfs或者基于tachyon
3.Spark累加器有哪些特点?
1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态
2)在exe中修改它,在driver读取
3)executor级别共享的,广播变量是task级别的共享
两个application不可以共享累加器,但是同一个app不同的job可以共享
4.如何在一个不确定的数据规模的范围内进行排序?
为了提高效率,要划分划分,划分的范围并且是有序的
要么有序,要么降序?
水塘抽样:目的是从一个集合中选取,集合非常答,适合内存
无法容纳数据的时候使用
从N中抽取出K个,N是随机数
5.spark hashParitioner的弊端是什么?
答:HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据
6.RangePartitioner分区的原理?
答:RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。可以参考这篇博文
https://www.iteblog.com/archives/1522.html
7.介绍parition和block有什么关联关系?
答:1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。
8.Spark应用程序的执行过程是什么?
1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2).资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3).SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor。
4).Task在Executor上运行,运行完毕释放所有资源。
9.hbase预分区个数和spark过程中的reduce个数相同么
答:和spark的map个数相同,reduce个数如果没有设置和reduce前的map数相同。
10.如何理解Standalone模式下,Spark资源分配是粗粒度的?
答:spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候
使用分配好的资源,除非资源出现了故障才会重新分配。比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。
11.Spark如何自定义partitioner分区器?
答:1)spark默认实现了HashPartitioner和RangePartitioner两种分区策略,我们也可以自己扩展分区策略,自定义分区器的时候继承org.apache.spark.Partitioner类,实现类中的三个方法
def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
2)使用,调用parttionBy方法中传入自定义分区对象
参考:http://blog.csdn.net/high2011/article/details/68491115
12.spark中task有几种类型?
答:2种类型:1)result task类型,最后一个task,2是shuffleMapTask类型,除了最后一个task都是
13.union操作是产生宽依赖还是窄依赖?
答:窄依赖
14.rangePartioner分区器特点?
答:rangePartioner尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds
15.什么是二次排序,你是如何用spark实现二次排序的?(互联网公司常面)
答:就是考虑2个维度的排序,key相同的情况下如何排序,参考博文:http://blog.csdn.net/sundujing/article/details/51399606
16.如何使用Spark解决TopN问题?(互联网公司常面)
答:常见的面试题,参考博文:http://www.cnblogs.com/yurunmiao/p/4898672.html
17.如何使用Spark解决分组排序问题?(互联网公司常面)
组织数据形式:
aa 11
bb 11
cc 34
aa 22
bb 67
cc 29
aa 36
bb 33
cc 30
aa 42
bb 44
cc 49
需求:
1、对上述数据按key值进行分组
2、对分组后的值进行排序
3、截取分组后值得top 3位以key-value形式返回结果
答案:如下
val groupTopNRdd = sc.textFile("hdfs://db02:8020/user/hadoop/groupsorttop/groupsorttop.data")
groupTopNRdd.map(_.split(" ")).map(x => (x(0),x(1))).groupByKey().map(
x => {
val xx = x._1
val yy = x._2
(xx,yy.toList.sorted.reverse.take(3))
}
).collect
18.窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?
答:不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变),比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系
19.Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
答:相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner
20.什么是shuffle,以及为什么需要shuffle?
shuffle中文翻译为洗牌,需要shuffle的原因是:某种具有共同特征的数据汇聚到一个计算节点上进行计算
21.不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
答:不一定!!当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
22.Spark中的HashShufle的有哪些不足?
答:1)shuffle产生海量的小文件在磁盘上,此时会产生大量耗时的、低效的IO操作;2).容易导致内存不够用,由于内存需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较大的化,容易出现OOM;3)容易出现数据倾斜,导致OOM
23.conslidate是如何优化Hash shuffle时在map端产生的小文件?
答:1)conslidate为了解决Hash Shuffle同时打开过多文件导致Writer handler内存使用过大以及产生过多文件导致大量的随机读写带来的低效磁盘IO;2)conslidate根据CPU的个数来决定每个task shuffle map端产生多少个文件,假设原来有10个task,100个reduce,每个CPU有10个CPU
那么使用hash shuffle会产生10100=1000个文件,conslidate产生1010=100个文件
备注:conslidate部分减少了文件和文件句柄,并行读很高的情况下(task很多时)还是会很多文件
24.Sort-basesd shuffle产生多少个临时文件
答:2*Map阶段所有的task数量,Mapper阶段中并行的Partition的总数量,其实就是Mapper端task
25.Sort-based shuffle的缺陷?
- 如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序
26.Spark shell启动时会启动derby?
答: spark shell启动会启动spark sql,spark sql默认使用derby保存元数据,但是尽量不要用derby,它是单实例,不利于开发。会在本地生成一个文件metastore_db,如果启动报错,就把那个文件给删了 ,derby数据库是单实例,不能支持多个用户同时操作,尽量避免使用
27.spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
答:1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费,spark官网建议task个数为CPU的核数executor的个数的2~3倍。
28.spark.storage.memoryFraction参数的含义,实际生产中如何调优?
答:1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。2)如果持久化操作比较多,可以提高spark.storage.memoryFraction参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction更小一点。
29.spark.shuffle.memoryFraction参数的含义,以及优化经验?
答:1)spark.shuffle.memoryFraction是shuffle调优中 重要参数,shuffle从上一个task拉去数据过来,要在Executor进行聚合操作,聚合操作时使用Executor内存的比例由该参数决定,默认是20%
如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值
30.介绍一下你对Unified Memory Management内存管理模型的理解?
答:Spark中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于shuffles、joins、sorts和aggregations,存储内存则用于缓存或者跨节点的内部数据传输。1.6之前,对于一个Executor,内存都有哪些部分构成:
1)ExecutionMemory。这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2) 配置。
2)StorageMemory。这片内存区域是为了解决 block cache(就是你显示调用dd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)。设置
3)OtherMemory。给系统预留的,因为程序本身运行也是需要内存的。 (默认为0.2).
传统内存管理的不足:
1).Shuffle占用内存0.20.8,内存分配这么少,可能会将数据spill到磁盘,频繁的磁盘IO是很大的负担,Storage内存占用0.6,主要是为了迭代处理。传统的Spark内存分配对操作人的要求非常高。(Shuffle分配内存:ShuffleMemoryManager, TaskMemoryManager,ExecutorMemoryManager)一个Task获得全部的Execution的Memory,其他Task过来就没有内存了,只能等待。
2).默认情况下,Task在线程中可能会占满整个内存,分片数据特别大的情况下就会出现这种情况,其他Task没有内存了,剩下的cores就空闲了,这是巨大的浪费。这也是人为操作的不当造成的。
3).MEMORY_AND_DISK_SER的storage方式,获得RDD的数据是一条条获取,iterator的方式。如果内存不够(spark.storage.unrollFraction),unroll的读取数据过程,就是看内存是否足够,如果足够,就下一条。unroll的space是从Storage的内存空间中获得的。unroll的方式失败,就会直接放磁盘。
4). 默认情况下,Task在spill到磁盘之前,会将部分数据存放到内存上,如果获取不到内存,就不会执行。永无止境的等待,消耗CPU和内存。
在此基础上,Spark提出了UnifiedMemoryManager,不再分ExecutionMemory和Storage Memory,实际上还是分的,只不过是Execution Memory访问Storage Memory,Storage Memory也可以访问Execution Memory,如果内存不够,就会去借。