zoukankan      html  css  js  c++  java
  • spark性能调优01-常规调优

    1、分配更多的资源

      1.1 分配的资源有:executor、cup per executor、memory per executor、driver memory

      1.2 如何分配:在spark-submit提交时设置相应的参数  

    /usr/local/spark/bin/spark-submit 
    --class cn.spark.sparktest.core.WordCountCluster 
    --num-executors 3   配置executor的数量
    --driver-memory 100m   配置driver的内存(影响不大)
    --executor-memory 100m   配置每个executor的内存大小
    --executor-cores 3   配置每个executor的cpu core数量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar 

      1.3 调节到多大(原则:能使用的资源有多大,就尽量调节到最大的大小)

        第一种,spark standalone,公司集群上,搭建了一套spark集群,应该清楚每台机器还能够给你使用的,还有多少内存和多少个cpu,然后根据此来进行配置;

          比如:有20台机器,每台有2个cpu和4G的内存,那么如果配置20个executor,那个每个executor内存分配4G和2个cpu

        第二种,yarn,应该去查看spark作业提交到的资源队列大概有多少资源;

          比如:如果有500G内存和100个cpu core,那么如果分配50个executor,那么每个executor分配的cpu core为2个

    2、调节并行度

      2.1 并行度:其实是指,spark作业中,各个stage的task数量,也就代表了spark作业在各个stage的并行度

      2.2 配置方法:

    spark.default.parallelism 
    SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

      2.3 调节原则:应该配置到足够大,大到可以完全利用你的集群资源

        2.3.1 task数量:至少设置成与spark application的总cpu数量相同

          比如:总共150个cpu core,配置150个task,一起运行,差不多同一时间运行完毕

        2.3.2 官方推荐 task数据设置为spark application的总cpu数量的2~3倍

          比如:总共150个cupcore,配置300~500个task

    3、将rdd进行持久化

      3.1 持久化的原则

        3.1.1 Rdd的架构重构和优化

          尽量复用Rdd,差不多的Rdd进行抽象为一个公共的Rdd,供后面使用

        3.1.2 公共Rdd一定要进行持久化

          对应对次计算和使用的Rdd,一定要进行持久化

        3.1.3 持久化是可以序列化的

          首先采用纯内存的持久化方式,如果出现OOM异常,则采用纯内存+序列化的方法,如果依然存在OOM异常,使用内存+磁盘,以及内存+磁盘+序列化的方法

        3.1.4 为了数据的高可靠性,而且内存充足时,可以使用双副本机制进行持久化

      3.2 持久化的代码实现

        .persist(StorageLevel.MEMORY_ONLY())

      3.3 持久化等级

        StorageLevel.MEMORY_ONLY()    纯内存    等效于   .cache()

        序列化的:后缀带有_SER 如:StorageLevel.MEMORY_ONLY_SER()   内存+序列化

            后缀带有_DISK 表示磁盘,如:MEMORY_AND_DISK() 内存+磁盘

            后缀带有_2表示副本数,如:MEMORY_AND_DISK_2() 内存+磁盘且副本数为2

    4、将每个task中都使用的大的外部变量作为广播变量

      4.1 没有使用广播变量的缺点

        默认情况,task使用到了外部变量,每个task都会获取一份外部变量的副本,会占用不必要的内存消耗,导致在Rdd持久化时不能写入到内存,只能持久化到磁盘中,增加了IO读写操作。

        同时,在task创建对象时,内存不足,进行频繁的GC操作,降低效率

      4.2 使用广播变量的好处

        广播变量不是每个task保存一份,而是每个executor保存一份。

        广播变量初始化时,在Driver上生成一份副本,task运行时需要用到广播变量中的数据,首次使用会在本地的Executor对应的BlockManager中尝试获取变量副本;如果本地没有,那么就会从Driver远程拉取变量副本,并保存到本地的BlockManager中;此后这个Executor中的task使用到的数据都从本地的BlockManager中直接获取。

        Executor中的BlockManager除了从远程的Driver中拉取变量副本,也可能从其他节点的BlockManager中拉取数据,距离越近越好。

    5、使用KryoSerializer进行序列化

      5.1 使用KryoSerializer序列化的好处

        默认情况,spark使用的是java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。

        该序列化的好处是方便使用,但必须实现Serializable接口,缺点是效率低,速度慢,序列化后的占用空间大

        KryoSerializer序列化机制,效率高,速度快,占用空间小(只有java序列化的1/10),可以减少网络传输

      5.2 使用方法

          //配置使用KryoSerializer进行序列化
            conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            //(为了使序列化效果达到最优)注册自定义的类型使用KryoSerializer序列化
            .registerKryoClasses(new Class[]{ExtractSession.class,FilterCount.class,SessionDetail.class,Task.class,Top10Session.class,Top10.class,VisitAggr.class});

      5.3 使用KryoSerializer序列化的场景

        5.3.1 算子函数中使用到的外部变量,使用KryoSerializer后,可以优化网络传输效率,优化集群中内存的占用和消耗

        5.3.2 持久化Rdd,优化内存占用,task过程中创建对象,减少GC次数

        5.3.3 shuffle过程,优化网络的传输性能

    6、使用fastutil代替java标准的集合框架

      6.1 fastutil是什么

        fastutil扩展了java标准的集合框架,占用内存更小,存取速度更快,还提供了双向迭代器,并对引用类型使用等号(=)进行比较

      6.2 使用方法

        6.2.1 在pom.xml文件中引入相应的jar包

    <dependency>
        <groupId>fastutil</groupId>
        <artifactId>fastutil</artifactId>
        <version>5.0.9</version>
    </dependency>

        6.2.2 在java代码中使用fastutil相应的集合框架

    //使用fastutil代替java util
    final Map<String, IntList> extractSessionIndexs=new HashMap<String, IntList>();
    //final Map<String, List<Integer>> extractSessionIndexs=new HashMap<String, List<Integer>>();
    
    //使用fastutil代替java util
    IntList  extractIndexSet= new IntArrayList();
    //extractIndexSet= new ArrayList<Integer>();

      6.3 适用场景

        6.3.1 fastutil尽量提供了在任何情况下都是速度最快的集合框架

        6.3.2 如果算子函数中使用到了外部变量,第一,可以使用广播变量进行优化;第二、可以使用KryoSerializer序列化进行优化;第三可以使用fastutil代替java 标准的集合框架进行优化

        6.3.3 算子函数中如果出现较大的集合,可以考虑使用fastutil进行重构

    7、调节数据本地化等待时长

      7.1 什么事数据本地化等待时长

        每个task在哪个节点上执行是根据spark的task分配算法进行预先计算好的,但是可能由于该节点的资源或者计算能力满了,该task无法分配到该节点上,默认会等待3s,如果还是不能分配到该节点上,就会选择比较差的本地化级别,比如说,将task分配到原节点比较近的节点进行计算。

      7.2 数据本地化级别

        PROCESS_LOCAL:(默认),进程本地化,在同一个节点中执行,数据在执行task的executor中的BlockManager中,性能最优

        NODE_LOCAL:节点本地化,数据和task在同一节点的不同executor中,数据需要进行进程间的传输

        NO_PRE:对于task来说,数据从哪里获取都一样,没有好坏之分

        RACK_LOCAL:机架本地化,数据和task在一个机架的不同节点上,数据需要进行网络传输

        ANY:数据和task可能在集群中的任何地方,而且不在一个机架上,性能最差

      7.3 如何调节

        通过查看日志,日志里会显示,starting task ....,PROCESS_LOCAL、NODE_LOCAL、等信息  

    //设置数据本地化等待时间(单位为s)
    conf.set("spark.locality.wait", "5");
  • 相关阅读:
    小学数学计算出题小程序(Excel版)
    网页自动化测试技术---SeleniumBasic(VBA网页外挂)
    ODBC链接数据源(PQ学习)
    WPF动态绑定矢量图标
    由数据转为树杈的js 和由一个子节点的id获取所有的父类的id
    面试上机题目--采用vue实现以下页面效果
    html前端上机面试题
    在vue项目中的跨域解决办法
    vue-quill-editor富文本编辑器使用
    vue项目eslint配置 以及 解释
  • 原文地址:https://www.cnblogs.com/lifeone/p/6428885.html
Copyright © 2011-2022 走看看