zoukankan      html  css  js  c++  java
  • Spark性能调优

    一、分配资源
    最大调节分配资源:
    常用的资源调度模式有Spark Standalone和Spark On Yarn。比如说你的每台机器能够给你使用60G内存,10个cpu core,20台机器。那么executor的数量是20。平均每个executor所能分配60G内存和10个cpu core。
    1.增加executor数量:提升了并行度
    2.增加每个executor的CPU Core:提高了并行度
    3.增加每个executor的内存量:
    对于RDD的cache,减少了磁盘IO
    对于shuffle操作的reduce端:减少了磁盘IO
    对于task的执行,可能会创建很多对象,内存小的话会频繁导致JVM堆内存满了,就会频繁的进行GC。
    二、executor、task、cpu Core数量关系调优
    1.并行度
    task数量,至少设置与Spark Application的总CPU Core 数量相同。官方推荐,task数量,设置成spark application总CPU core数量的2-3倍。
    假如集群有50个executor,每个executor10个G内存,每个executor有3个CPU core。150个CPU Core最多并行执行150个task
    三、RDD架构重构以及RDD持久化
    1、RDD架构重构
    复用RDD,抽取共同的RDD
    2、公共RDD的持久化
    将RDD的数据缓存到内存中/磁盘中(BlockManager
    3、持久化的序列化
    减少内存开销
    4、内存充足,双副本持久化
    四、广播变量
    一台节点有一个或者多个Executor,一个Executor有多个Task(task数量由Executor里的Cpu Core数量一对一决定)
    广播变量:
    将固定的,只读的数据变量提前广播给各个Executor,该Executor上的各个Task再从所在节点的BlockManager获取广播变量,如果本地的Executor没有就从Driver远程拉取变量副本,并保存到本地的BlockManager中.解决了每次将固定的,只读的数据用Driver广播到各个Task上的繁重且效率低下的问题.
    使用方法:
    1.调用SparkContext.broadcast创建一个Broadcast[T]对象.任何序列化的类型都可以这么实现
    2.通过 .value 获取对象的值
    3.变量只会被发送到各个节点一次,应该作为只读值处理(修改这个值不会影响到别的节点)
    五、使用Kryo序列化
    默认Spark内部的序列化机制:
    Java的ObjectOutputStream/ObjectInputStream:
    优点: 使用方便,实现Serializable接口即可
    缺点: 效率低,序列化速度慢,序列化后的数据相对大
    Spark支持使用Kryo序列化机制:
    比Java速度快
    序列化后的数据小 Java的十分之一大小
    所以网络传输的数据变少,在集群中耗费的内存资源大大减少
    使用Kryo后影响的几个地方
    1.优化网络传输的性能,优化集群中内存占用和消耗
    2.持久化RDD,优化内存的占用和消耗.持久化RDD的内存占用的内存越小,task执行的时候创建的对象,就不至于频繁的占满内存,频繁发生GC
    3.shuffle 优化网络传输的性能
    Kryo的使用:
    Spark中Scala语法的Kryo使用:
    //创建一个集合存储输入输出,字典文件,停用词库
    val Array(inputPath,outputPath,dicPath,stopwords,date) = args
    val conf = new SparkConf()
    .setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    //使用Kryo序列胡
    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    //spark 1.6 版本时候默认的压缩方式还是snappy,2.0之后才默认snappy
    sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
    六、使用fastutil优化数据格式
    Fastutil:(最新版本要求Java7及以上版本)
    概念:
    是扩展了Java标准的集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的类库,提供了特殊类型的map,set,list和queue
    优势:
    代替JDK的原生的集合框架,占内存小,存取速度快
    提供了64位的array,set,list的集合列表,实用的IO类,来处理二进制和文本类型的文件
    每种集合类型都实现了对应的java中的标准接口
    支持双向迭代器
    支持引用类型 但是对引用类型是使用等于号(=)进行比较的,而不是equals()方法
    应用:
    1.算子函数使用了外部变量:
    使用fastutil改写外部变量,首先从源头上就减少内存的占用,通过广播变量进一步减少内存占用,再通过Kryo序列化类库进一步减少内存占用
    2.在task任务执行的计算逻辑里(算子函数),如果逻辑中有要创建较大的Map,List集合,可以使用fastutil类库重写,减少了task创建出来的集合类型的内存占用,避免executor内存频繁占满,频繁唤起GC,导致性能下降.
    fastutil调优的说明:
    广播变量、Kryo序列化类库、fastutil,都是之前所说的,对于性能来说,类似于一种调味品,烤鸡,本来就很好吃了,然后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡。broadcast、kryo、fastutil,类似于调料。
    比如:经过一些调优之后30分钟的任务,经过broadcast,kryo,fastutil调优可能是29,28或者25,经过shuffle调优15分,groupbykey用reducebykey改写,执行本地聚合也许就只有10分钟,利用资源更大的YARN队列,1分钟.
    fastutil使用:
    在pom.xml中引用fastutil的包
    <dependency>
        <groupId>fastutil</groupId>
        <artifactId>fastutil</artifactId>
        <version>5.0.9</version>
    </dependency>
    List<Integer> 相当于 IntList
    基本都是类似于IntList的格式,前缀就是集合的元素类型。特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。除此之外,还支持object、reference
    七、调节数据本地化等待时长
    task的locality的五种方式:
    PROCESS_LOCAL:进程本地化
    NODE_LOCAL:节点本地化
    NO_PREF:对于task来说,数据从哪里获取都一样,没有优势之分
    RACK_LOCAL:机架本地化
    ANY:数据和task可能在集群中的任何地方,而且不再一个机架中,性能最差.
    Spark的任务调度:
    Spark在Driver上,对Application的每一个stage的task进行分配之前都会计算出每个task要计算的是哪个分片数据.
    首先会优先将task正好分配到他要计算的数据所在的节点,这样就避免了网络传输数据.
    但是可能会遇到那个节点的计算资源和计算能力都满了.这种时候,Spark默认等待3秒(不是绝对,还有很多种情况,对于不同的本地化级别,都会去等待),到最后实在等不下去,就会选择一个差的本地化级别.
    对于第二种差的情况,肯定会发生数据传输,task会通过其所在的节点的blockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络传输组件)从数据所在节点的BlockManager中,获取数据.
    适当的调节本地化等待时长:
    观察Spark作业的运行日志,看数据本地化级别的信息,如果大多是PROCESS_LOCALL,就不用调节,若大多是NODE_LOCAL,ANY就需要调节,反复的看日志,以及作业运行时间,不要一味地提升本地化级别造成了本末倒置.
    调节方式:
    spark.locality.wait,默认是3s。6s,10s
    默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s
    spark.locality.wait.process
    spark.locality.wait.node
    spark.locality.wait.rack
    new SparkConf().set("spark.locality.wait", "10")
  • 相关阅读:
    20145339《网络对抗》后门原理与实践
    20145339顿珠达杰 《网络对抗技术》 逆向与Bof基础
    20145339《信息安全系统设计基础》课程总结
    20145339《信息安全系统设计基础》第14周学习总结
    20145339《信息安全系统设计基础》第12周学习总结
    国考准备
    20145339《信息安全系统设计基础》第十一周学习总结
    git安装
    20145339顿珠《信息安全系统设计基础》第十周学习总结
    第十五周学习总结
  • 原文地址:https://www.cnblogs.com/gentle-awen/p/9999001.html
Copyright © 2011-2022 走看看