1、对于读入的数据,做好清洗、转换、分区工作
rdd1 = sc.textFile("hdfs://text.txt", 15).map(_.split("|"))
.filter{//尽量严格过滤}
.map(id, money)//提取必要字段,减少数据量
.coalesce(10, True)//分片 有可能有一些partition过滤完了后只有10条,另一个剩900条 造成数据倾斜,会发生很严重问题
可以通过从源头用命令行的形式查看,数据的大小,决定申请资源的大小,也可以从监控的stage里面的Input查看每步的rdd大小,比如300个G 那你分成300个Partition那一个partition就是1个G,然后你想想你下面有几个executor,如果1个executor运行2个partition的话,相当于一个executor要运行1G再运行1G,那你的内存有多少
2、RDD高效使用
相同数据不要多次IO读取
提高对同一RDD使用次数,对于多次使用的RDD考虑是否持久化(考虑要不要存在内存和磁盘中,有一个 spill size,如果有东西了,说明内存溢出了 )
重新计算的时间和缓存到磁盘重新读取那个快,如果本身重新计算不是那种机器学习复杂的模型,不需要去考虑持久化的问题
presist()
3、PairRDD状态维护
对于多次需要join的RDD提前repartition
使用PairRDD特有API,不要破坏分区信息。如果是PairRDD,并且只需要计算value相关信息不改变key,就用mapValue代替map 这样才能保留分区的信息; reduceBykey替代reduce;flatMapValues替代flatMap
val rawClassRDD = sc.makeRDD(Array( "spark", "hadoop", "hive","yarn","hbase"), 4).zipWithIndex()
val classRDD = rawClassRDD.mapValue(1=>1._1, 1._2+1))
4、join
大RDD和小RDD做join可以考虑用广播变量
对于需要多次Join的RDD提前repartition
val rdd1 = rdd.repartition(100)
rdd1.join(rdd2)
rdd1.join(rdd3)
5、使用预聚合功能
使用reduceByKey替代groupByKey
rawClassRDD.reduceByKey(_+_)
6、竞争资源批量处理
使用mapPartition替代map
使用foreachPartition替代foreach
7、数据倾斜
存在一些热点数据,导致某些节点数据量特别大,有些节点处理的数据特别小。热点数据过大,内存不够会发生OOM现象,程序不断的恢复又不停的OOM,最后崩溃退出。
数据倾斜往往伴随shuffle过程,相关API:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等
查看倾斜的key
rdd.sample(false, 0.1).countByKey().foreach(println(_))
解决方法:
1.让数仓人员从源头解决倾斜问题,掉更多内存解决它
2.根据业务决定是否可以直接过滤(大部分机器学习在特征工程中需要去噪点数据、裁剪边(关系网络,根据边的权重交互的次数进行过滤))
3.增加partition, 提高并行度,这个方法比较简单,就调下参数
4.利用广播变量调优
5.拆解热点key(找出热点key,给key加上前缀或后缀,最后再合并)
8、参数调优
资源相关申请
spark.driver.cores
spark.driver.memory
spark.executor.cores 与你设置的partition相关,比如申请了1000个core,那你的partition个数就不要是1000,也不要是100了,你的partition个数肯定是你core个数的整数倍,你可以设3000个partition;设置成1000个就是1000个任务并行
spark.executor.memory 与你设置的partition相关,1000G 的rrd,你切成1000份的partition,一个partition就是1G,那你的memory最少是2G以上;这个参数也会影响你做persist
压缩相关
spark.shuffle.compress
spark.shuffle.spill.compress rdd在内存中计算不够的话,会溢出多少磁盘,等里面数据处理完了后再读入
spark.broadcast.compress
序列化相关
spark.serializer
shuffle相关
spark.shuffle.manager = sort
spark.shuffle.consolidateFiles
9、SparkSql调优
spark.sql.codegen 当设置为true时,spark sql会把每条查询的语句在运行是编写为java二进制代码,当查询的数据量大时,可以这么设置,小的时候设置为false
spark.sql.inMemoryColumnStorage.compressed自动对内存压缩
压缩数据,可以将存储在内存中的数据压缩,也是数据量比较大(看Storage中通过persist功能存储在内存的数量和总的内存是多少)
如果任务失败会有日志