zoukankan      html  css  js  c++  java
  • spark性能调优01-常规调优

    1、分配更多的资源

      1.1 分配的资源有:executor、cup per executor、memory per executor、driver memory

      1.2 如何分配:在spark-submit提交时设置相应的参数  

    /usr/local/spark/bin/spark-submit 
    --class cn.spark.sparktest.core.WordCountCluster 
    --num-executors 3   配置executor的数量
    --driver-memory 100m   配置driver的内存(影响不大)
    --executor-memory 100m   配置每个executor的内存大小
    --executor-cores 3   配置每个executor的cpu core数量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar 

      1.3 调节到多大(原则:能使用的资源有多大,就尽量调节到最大的大小)

        第一种,spark standalone,公司集群上,搭建了一套spark集群,应该清楚每台机器还能够给你使用的,还有多少内存和多少个cpu,然后根据此来进行配置;

          比如:有20台机器,每台有2个cpu和4G的内存,那么如果配置20个executor,那个每个executor内存分配4G和2个cpu

        第二种,yarn,应该去查看spark作业提交到的资源队列大概有多少资源;

          比如:如果有500G内存和100个cpu core,那么如果分配50个executor,那么每个executor分配的cpu core为2个

    2、调节并行度

      2.1 并行度:其实是指,spark作业中,各个stage的task数量,也就代表了spark作业在各个stage的并行度

      2.2 配置方法:

    spark.default.parallelism 
    SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

      2.3 调节原则:应该配置到足够大,大到可以完全利用你的集群资源

        2.3.1 task数量:至少设置成与spark application的总cpu数量相同

          比如:总共150个cpu core,配置150个task,一起运行,差不多同一时间运行完毕

        2.3.2 官方推荐 task数据设置为spark application的总cpu数量的2~3倍

          比如:总共150个cupcore,配置300~500个task

    3、将rdd进行持久化

      3.1 持久化的原则

        3.1.1 Rdd的架构重构和优化

          尽量复用Rdd,差不多的Rdd进行抽象为一个公共的Rdd,供后面使用

        3.1.2 公共Rdd一定要进行持久化

          对应对次计算和使用的Rdd,一定要进行持久化

        3.1.3 持久化是可以序列化的

          首先采用纯内存的持久化方式,如果出现OOM异常,则采用纯内存+序列化的方法,如果依然存在OOM异常,使用内存+磁盘,以及内存+磁盘+序列化的方法

        3.1.4 为了数据的高可靠性,而且内存充足时,可以使用双副本机制进行持久化

      3.2 持久化的代码实现

        .persist(StorageLevel.MEMORY_ONLY())

      3.3 持久化等级

        StorageLevel.MEMORY_ONLY()    纯内存    等效于   .cache()

        序列化的:后缀带有_SER 如:StorageLevel.MEMORY_ONLY_SER()   内存+序列化

            后缀带有_DISK 表示磁盘,如:MEMORY_AND_DISK() 内存+磁盘

            后缀带有_2表示副本数,如:MEMORY_AND_DISK_2() 内存+磁盘且副本数为2

    4、将每个task中都使用的大的外部变量作为广播变量

      4.1 没有使用广播变量的缺点

        默认情况,task使用到了外部变量,每个task都会获取一份外部变量的副本,会占用不必要的内存消耗,导致在Rdd持久化时不能写入到内存,只能持久化到磁盘中,增加了IO读写操作。

        同时,在task创建对象时,内存不足,进行频繁的GC操作,降低效率

      4.2 使用广播变量的好处

        广播变量不是每个task保存一份,而是每个executor保存一份。

        广播变量初始化时,在Driver上生成一份副本,task运行时需要用到广播变量中的数据,首次使用会在本地的Executor对应的BlockManager中尝试获取变量副本;如果本地没有,那么就会从Driver远程拉取变量副本,并保存到本地的BlockManager中;此后这个Executor中的task使用到的数据都从本地的BlockManager中直接获取。

        Executor中的BlockManager除了从远程的Driver中拉取变量副本,也可能从其他节点的BlockManager中拉取数据,距离越近越好。

    5、使用KryoSerializer进行序列化

      5.1 使用KryoSerializer序列化的好处

        默认情况,spark使用的是java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。

        该序列化的好处是方便使用,但必须实现Serializable接口,缺点是效率低,速度慢,序列化后的占用空间大

        KryoSerializer序列化机制,效率高,速度快,占用空间小(只有java序列化的1/10),可以减少网络传输

      5.2 使用方法

          //配置使用KryoSerializer进行序列化
            conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            //(为了使序列化效果达到最优)注册自定义的类型使用KryoSerializer序列化
            .registerKryoClasses(new Class[]{ExtractSession.class,FilterCount.class,SessionDetail.class,Task.class,Top10Session.class,Top10.class,VisitAggr.class});

      5.3 使用KryoSerializer序列化的场景

        5.3.1 算子函数中使用到的外部变量,使用KryoSerializer后,可以优化网络传输效率,优化集群中内存的占用和消耗

        5.3.2 持久化Rdd,优化内存占用,task过程中创建对象,减少GC次数

        5.3.3 shuffle过程,优化网络的传输性能

    6、使用fastutil代替java标准的集合框架

      6.1 fastutil是什么

        fastutil扩展了java标准的集合框架,占用内存更小,存取速度更快,还提供了双向迭代器,并对引用类型使用等号(=)进行比较

      6.2 使用方法

        6.2.1 在pom.xml文件中引入相应的jar包

    <dependency>
        <groupId>fastutil</groupId>
        <artifactId>fastutil</artifactId>
        <version>5.0.9</version>
    </dependency>

        6.2.2 在java代码中使用fastutil相应的集合框架

    //使用fastutil代替java util
    final Map<String, IntList> extractSessionIndexs=new HashMap<String, IntList>();
    //final Map<String, List<Integer>> extractSessionIndexs=new HashMap<String, List<Integer>>();
    
    //使用fastutil代替java util
    IntList  extractIndexSet= new IntArrayList();
    //extractIndexSet= new ArrayList<Integer>();

      6.3 适用场景

        6.3.1 fastutil尽量提供了在任何情况下都是速度最快的集合框架

        6.3.2 如果算子函数中使用到了外部变量,第一,可以使用广播变量进行优化;第二、可以使用KryoSerializer序列化进行优化;第三可以使用fastutil代替java 标准的集合框架进行优化

        6.3.3 算子函数中如果出现较大的集合,可以考虑使用fastutil进行重构

    7、调节数据本地化等待时长

      7.1 什么事数据本地化等待时长

        每个task在哪个节点上执行是根据spark的task分配算法进行预先计算好的,但是可能由于该节点的资源或者计算能力满了,该task无法分配到该节点上,默认会等待3s,如果还是不能分配到该节点上,就会选择比较差的本地化级别,比如说,将task分配到原节点比较近的节点进行计算。

      7.2 数据本地化级别

        PROCESS_LOCAL:(默认),进程本地化,在同一个节点中执行,数据在执行task的executor中的BlockManager中,性能最优

        NODE_LOCAL:节点本地化,数据和task在同一节点的不同executor中,数据需要进行进程间的传输

        NO_PRE:对于task来说,数据从哪里获取都一样,没有好坏之分

        RACK_LOCAL:机架本地化,数据和task在一个机架的不同节点上,数据需要进行网络传输

        ANY:数据和task可能在集群中的任何地方,而且不在一个机架上,性能最差

      7.3 如何调节

        通过查看日志,日志里会显示,starting task ....,PROCESS_LOCAL、NODE_LOCAL、等信息  

    //设置数据本地化等待时间(单位为s)
    conf.set("spark.locality.wait", "5");
  • 相关阅读:
    iOS resign code with App Store profile and post to AppStore
    HTTPS科普扫盲帖 对称加密 非对称加密
    appid 评价
    使用Carthage安装第三方Swift库
    AngularJS:何时应该使用Directive、Controller、Service?
    xcode7 The operation couldn't be completed.
    cocoapods pod install 安装报错 is not used in any concrete target
    xcode7 NSAppTransportSecurity
    learning uboot how to set ddr parameter in qca4531 cpu
    learning uboot enable protect console
  • 原文地址:https://www.cnblogs.com/lifeone/p/6428885.html
Copyright © 2011-2022 走看看