一:资源优化
对于数据处理的分组
数据有的上报的多一天1T,有的上报的少一天不到1G,但是需要统一去处理,这时候就可以使用数据分组的方法。将大小类似的数据放到一组内进行统一的处理,例子:将1G以下的分成一个组,将1G到10G的分成一个组,10G到100G的分为一个组。具体的需要根据数据具体的分布来确定。
优点:数据处理均匀,对于文件个数的生成的控制强,统一管理。
缺点:数据分组是根据数据大小来确定的,数据出现增长的时候如果不及时发现,会出现倾斜问题。
广播
一个运算中如果可以使用广播,那就尽量不使用别的shuffle,因为广播除了对主节点有压力之外,别的方面都是最好的shuffle。
--conf spark.sql.broadcastTimeout=-1 //广播变量永不超时
--conf spark.sql.autoBroadcastJoinThreshold=104857600 //广播大小设置为1G
sparksql运行数据的时候默认第一次都不是广播的,因为它在开始的时候不知道数据的大小,所以没办法广播,第一次运行完成之后知道了数据大小,第二次才会广播
如果确定一个文件可以广播的话,建议使用显式broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast
val sparkSession= SparkSession.builder().master("local").getOrCreate()
val data = sparkSession.read.textFile("/software/java/idea/data")
broadcast(data)
并发度
sparksql中难免会涉及到文件IO,网络IO,这时候并发度就显得很重要,并发度可以简单理解为线程数。
--conf spark.default.parallelism=60 如果代码中没有shuffle操作或者repartation生成的文件就是并发的个数
--conf spark.sql.shuffle.partitions=800
输出文件压缩
HDFS中文本存储是最浪费空间做法,所以强烈建议开启压缩
--conf spark.hadoop.mapred.output.compress=true
--conf spark.hadoop.mapred.output.compression.codec=true
--conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
--conf spark.hadoop.mapred.output.compression.type=BLOCK
慢任务检测
sparksql任务里面会同时起来多台机器进行作业,这时候如果一台机器有问题,运行的满会拉低整个人物执行的时间。把慢任务检测开启的话,如果有慢任务,就会再次起来一个相同的任务,谁现执行完成就会把另一个杀掉。从而节约执行的时间。
--conf spark.speculation=true
--conf spark.speculation.interval=30000
--conf spark.speculation.quantile=0.8
--conf spark.speculation.multiplier=1.5
repartation
sparksql读取文件的时候是根据文件个数来决定task个数的,但是如果出现很多小文件的话,就会严重影响任务的执行时间,再读取之后显示使用repartation,或者在使用newAPIHadoopFile来合并文件大小
val newData = sparkContext.newAPIHadoopFile("/software/java/idea/data",classOf[CombineTextInputFormat],classOf[LongWritable],classOf[Text])
.saveAsTextFile("/software/java/idea/end")
hive中也有对应的参数就是:
set mapreduce.input.fileinputformat.split.minsize = 1024000000;
set mapreduce.input.fileinputformat.split.maxsize = 1024000000;(默认256M)
set mapreduce.input.fileinputformat.split.minsize.per.node= 1024000000;
set mapreduce.input.fileinputformat.split.maxsize.per.node= 1024000000;(默认1b)
set mapreduce.input.fileinputformat.split.minsize.per.rack= 1024000000;
set mapreduce.input.fileinputformat.split.maxsize.per.rack= 1024000000;(默认1b)
写mysql
sparksql里面可以直接写数据库,但是真正写数据库的时候是根据partation的个数来确定数据库连接数的,所以在写数据库之前最好根据数据条数去reparttation一下,达到最快的写入速度
hive入库
set hive.msck.path.validation=ignore;MSCK REPAIR TABLE tablename //适合分区数较少的load,分区数较多的时候反而时间会加长
ALTER TABLE externaltable_test ADD PARTITION(ddate=20190920) LOCATION '/hive/table/table_test/dt=20190920'; //当分区数到达一定的数量之后就可以使用这个来load数据,也是只修改元数据的操作。