zoukankan      html  css  js  c++  java
  • sparksql系列(八) sparksql优化

            公司数仓迁移完成了,现在所有的数据一天6T的用户行为数据全部由一个spark脚本,关联用户属性数据生成最终想要的数据。里面让我感触最深的是资源的使用spark优化,再此记录一篇关于sparksql优化的文章,专门总结以下现在使用的资源优化及以前使用的资源优化。

    一:资源优化

    对于数据处理的分组

            数据有的上报的多一天1T,有的上报的少一天不到1G,但是需要统一去处理,这时候就可以使用数据分组的方法。将大小类似的数据放到一组内进行统一的处理,例子:将1G以下的分成一个组,将1G到10G的分成一个组,10G到100G的分为一个组。具体的需要根据数据具体的分布来确定。

      优点:数据处理均匀,对于文件个数的生成的控制强,统一管理。

      缺点:数据分组是根据数据大小来确定的,数据出现增长的时候如果不及时发现,会出现倾斜问题。

    广播

            一个运算中如果可以使用广播,那就尽量不使用别的shuffle,因为广播除了对主节点有压力之外,别的方面都是最好的shuffle。

            --conf spark.sql.broadcastTimeout=-1                                             //广播变量永不超时
            --conf spark.sql.autoBroadcastJoinThreshold=104857600             //广播大小设置为1G

       sparksql运行数据的时候默认第一次都不是广播的,因为它在开始的时候不知道数据的大小,所以没办法广播,第一次运行完成之后知道了数据大小,第二次才会广播

           如果确定一个文件可以广播的话,建议使用显式broadcast

      import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.broadcast
    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val data = sparkSession.read.textFile("/software/java/idea/data")
    broadcast(data)

    并发度

            sparksql中难免会涉及到文件IO,网络IO,这时候并发度就显得很重要,并发度可以简单理解为线程数。

            --conf spark.default.parallelism=60          如果代码中没有shuffle操作或者repartation生成的文件就是并发的个数
            --conf spark.sql.shuffle.partitions=800     

    输出文件压缩

            HDFS中文本存储是最浪费空间做法,所以强烈建议开启压缩

            --conf spark.hadoop.mapred.output.compress=true
            --conf spark.hadoop.mapred.output.compression.codec=true
            --conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
            --conf spark.hadoop.mapred.output.compression.type=BLOCK

     慢任务检测

            sparksql任务里面会同时起来多台机器进行作业,这时候如果一台机器有问题,运行的满会拉低整个人物执行的时间。把慢任务检测开启的话,如果有慢任务,就会再次起来一个相同的任务,谁现执行完成就会把另一个杀掉。从而节约执行的时间。

            --conf spark.speculation=true
            --conf spark.speculation.interval=30000
            --conf spark.speculation.quantile=0.8
            --conf spark.speculation.multiplier=1.5

     repartation

            sparksql读取文件的时候是根据文件个数来决定task个数的,但是如果出现很多小文件的话,就会严重影响任务的执行时间,再读取之后显示使用repartation,或者在使用newAPIHadoopFile来合并文件大小

            val newData = sparkContext.newAPIHadoopFile("/software/java/idea/data",classOf[CombineTextInputFormat],classOf[LongWritable],classOf[Text])
                 .saveAsTextFile("/software/java/idea/end")

    hive中也有对应的参数就是:

            set mapreduce.input.fileinputformat.split.minsize = 1024000000;
            set mapreduce.input.fileinputformat.split.maxsize = 1024000000;(默认256M)
            set mapreduce.input.fileinputformat.split.minsize.per.node= 1024000000;
            set mapreduce.input.fileinputformat.split.maxsize.per.node= 1024000000;(默认1b)
            set mapreduce.input.fileinputformat.split.minsize.per.rack= 1024000000; 
            set mapreduce.input.fileinputformat.split.maxsize.per.rack= 1024000000;(默认1b)

     写mysql

            sparksql里面可以直接写数据库,但是真正写数据库的时候是根据partation的个数来确定数据库连接数的,所以在写数据库之前最好根据数据条数去reparttation一下,达到最快的写入速度

     hive入库

            set hive.msck.path.validation=ignore;MSCK REPAIR TABLE tablename     //适合分区数较少的load,分区数较多的时候反而时间会加长

            ALTER TABLE externaltable_test  ADD PARTITION(ddate=20190920) LOCATION '/hive/table/table_test/dt=20190920';     //当分区数到达一定的数量之后就可以使用这个来load数据,也是只修改元数据的操作。

  • 相关阅读:
    BZOJ_2661_[BeiJing wc2012]连连看_费用流
    BZOJ_4867_[Ynoi2017]舌尖上的由乃_分块+dfs序
    BZOJ_3238_[Ahoi2013]差异_后缀自动机
    BZOJ_3207_花神的嘲讽计划Ⅰ_哈希+主席树
    [转载]快速幂与矩阵快速幂
    ACM的一点基础知识
    [转载]C++STL—vector的插入与删除
    C++STL—map的使用
    [转载]汇编语言assume伪指令的作用
    [转载]c++中的位运算符
  • 原文地址:https://www.cnblogs.com/wuxiaolong4/p/12595364.html
Copyright © 2011-2022 走看看