zoukankan      html  css  js  c++  java
  • Spark调优


    下面调优主要基于2.0以后。

    代码优化

    1.语言选择

    如果是ETL并进行单节点机器学习,SparkR或Python。优点:语言相对简单;缺点:使用语言自身的数据结构时,效率低,因为这些数据需要转换。

    如果用到自定义transformations或自定义类,Scala或Java。优点:性能好;缺点:语言相对复杂。

    2.API选择

    • DataFrames
      • 大多数情况下的最佳选择
      • 更有效率的储存(Tungsten,减少GC开销)和处理(Catalyst优化查询)
      • 全阶段代码生成
      • 直接内存访问
      • 没有提供域对象编程和编译时检查
      • 部分算子在特定情况可优化,如order后take、window function等
      • 少数功能没有相应的算子
    • DataSets
      • 适用于性能影响可接受的复杂ETL管道
      • 通过Catalyst提供查询优化
      • 提供域对象编程和编译时检查
      • 较高GC开销
      • 打破整个阶段的代码生成
    • RDDs
      • 在Spark 2.x中,基本不需要使用,除非某些功能上面两种API没有
      • 增加序列化/反序列化开销。上面两种结构不必对整个对象进行反序列化也可访问对象属性。
      • 没有通过Catalyst进行查询优化
      • 没有全阶段代码生成
      • 高GC开销

    3.内存

    • 数据类型

    尽量用DF,默认通常比自定义有更多优化。

    另外,优先用原始数据结构和数组,次之为String而非其他集合和自定义对象。可以把自定义类改写成String的形式来表示,即用逗号,竖线等隔开每个成员变量,又或者用json字符串存储。

    数字或枚举key优于string

    估计内存消耗的方法:1.cache并在UI的Storage页查看。2.org.apache.spark.util.SizeEstimator可估计object的大小

    • GC(1.6以后)

      • spark.memory.fraction默认0.6,即堆内存减去reserved的300MB后的60%,它用来存储计算和cache所需的数据(storageFraction设置这两类数据的边界。例如0.4代表当execution需要空间时会赶走超过40%的那部分storage空间),剩下的40%存储Spark的元数据。

      • GC调优:在UI或者配置一些选项后能在worker节点的日志查看GC信息。

      • 过多full GC,则要增加executor内存。

      • 过多minor GC,则调大伊甸区。

      • 如果老年代空间不足,减少用于cache的内存,即减少fraction,同时减少storageFraction。或者直接调小伊甸区。

      • 调用G1回收器,如果heap size比较大,要调大-XX:G1HeapRegionSize。

      • 关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB

    • OffHeap(2.0以后)

      尝试使用Tungsten内存管理,设置spark.memory.ofHeap.enabled开启,spark.memory.ofHeap.size控制大小。

    4.Caching

    cache不同操作逻辑都需要用到的RDD或DF。会占用storage空间。

    目前Spark自带的cache适合小量数据,如果数据量大,建议用Alluxio,配合in-memory and ssd caching,或者直接存到HDFS。

    cache()是persist(MEMORY_ONLY),首选。内存不够部分(整个partition)在下次计算时会重新计算。
    如下所示,persist可以使用不同的存储级别进行持久化。能不存disk尽量不存,有时比重新算还慢
    MEMORY_AND_DISK
    MEMORY_ONLY_SER(序列化存,节省内存(小2到5倍),但要反序列化效率稍低,第二种选择),MEMORY_AND_DISK_SER
    DISK_ONLY
    MEMORY_ONLY_2, MEMORY_AND_DISK_2 备2份

    对已经不需要的RDD或DF调用unpersist

    当persist数据而storage不足时,默认会执行LRU算法,可通过persistencePriority控制,来淘汰之前缓存的数据。不同persist选项有不同操作,比如memory_only时,重用溢出的persisted RDD要重新计算,而memory and disk会将溢出部分写到磁盘。

    4.filter、map、join、partition、UDFs等

    • filter:尽早filter,过滤不必要的数据,有时还能避免读取部分数据(Catalyst的PushdownPredicate)。

    • mapPartitions替代map(foreachPartitions同理):

      该算子比map更高效,但注意算子内运用iterator-to-iterator转换,而非一次性将iterator转换为一个集合(容易OOM)。详情看high performance spark的“Iterator-to-Iterator Transformations with mapPartitions”

    • broadcast join:实际上是设置spark.sql.autoBroadcastJoinThreshold,是否主动用broadcast join交给Spark DF判断,因为我们只能知道数据源的大小而不一定知道经过处理后join前数据的大小

    • partition:减少partition用coalesce不会产生shuffle(把同节点的partition合并),例如filter后数据减少了不少时可以考虑减少分块

      repartition能尽量均匀分布data,在join或cache前用比较合适。

      自定义partitioner(很少用)

    • UDFs:在确定没有合适的内置sql算子才考虑UDFs

    • groupByKey/ reduceByKey

      对于RDD,少用前者,因为它不会在map-side进行聚合。但注意不要与DF的groupBy + agg和DS的groupByKey + reduceGroups混淆,它们的行为会经过Catalyst优化,会有map-side聚合。

    • flatMap:用来替代map后filter

    5.I/O

    将数据写到数据库中时,开线程池,并使用foreachPartition,分batch提交等。

    开启speculation(有些storage system会产生重复写入),HDFS系统和Spark在同一个节点

    6.广播变量

    比如一个函数要调用一个大的闭包变量时,比如用于查询的map、机器学习模型等

    配置优化

    1.并行度

    num-executors * executor-cores的2~3倍spark.default.parallelismspark.sql.shuffle.partitions

    2.数据序列化Kryo

    Spark涉及序列化的场景:闭包变量、广播变量、自定义类、持久化。

    Kryo不是所有可序列化类型都支持,2.0之后,默认情况下,simple types, arrays of simple types, or string type的shuffle都是Kryo序列化。

    //通过下面设置,不单单shuffle,涉及序列化的都会用Kryo。开启之后,scala的map,set,list,tuple,枚举类等都会启用。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //对于自定义类,要登记。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
    val sc = new SparkContext(conf)
    

    优化缓存大小spark.kryoserializer.buffr.mb(默认2M)

    3.数据本地化

    调节任务等待时间 ,通过spark.locality.xxx,可以根据不同级别设置等待时间。

    4.规划

    资源均分:spark.scheduler.modeFAIR

    限制:--max-executor-cores或修改默认spark.cores.max减少core的情况:

    1. Reduce heap size below 32 GB to keep GC overhead < 10%.
    2. Reduce the number of cores to keep GC overhead < 10%.

    5.数据储存

    存储格式:Parquet首选

    压缩格式:选择splittable文件如 zip(在文件范围内可分), bzip2(压缩慢,其他都很好), LZO(压缩速度最快)。上传数据分开几个文件,每个最好不超几百MB,用maxRecordsPerFile控制。文件不算太大,用gzip(各方面都好,但不能分割)。在conf中通过spark.sql.partuet.compression.codec设置。

    6.shuffle

    spark.reducer.maxSizeInFlight(48m): reduce端拉取数据的buffer大小,如果内存够大,且数据量大时,可尝试96m,反之调低。

    spark.reducer.maxReqsInFlight(Int.MaxValue):当reduce端的节点比较多时,过多的请求对发送端不利,为了稳定性,有时可能需要减少。

    spark.reducer.maxBlocksInFlightPerAddress(Int.MaxValue):和上面对应,控制每次向多少个节点拉取数据。

    spark.maxRemoteBlockSizeFetchToMem(Long.MaxValue):当一个block的数据超过这个值就把这个block拉取到到磁盘。

    spark.shuffle.compress(true)

    spark.shuffle.file.buffer:所产生准备shuffle的文件的大小,调大可减少溢出磁盘的次数,默认32k,可尝试64k。

    spark.shuffle.sort.bypassMergeThreshold(200):小于这个值时用BypassMergeSortShuffleWriter。

    Netty only:

    maxRetries * retryWait两个参数:有可能CG时间长导致拉取不到数据

    spark.shuffle.io.preferDirectBufs(true):如果对外内存紧缺,可以考虑关掉。

    spark.shuffle.io.numConnectionsPerPeer(1):对于具有许多硬盘和少数主机的群集,这可能导致并发性不足以使所有磁盘饱和,因此用户可能会考虑增加此值。

    7.executor内存压力和Garbage Collection

    收集统计数据:在Spark-submit添加--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。这之后能在worker节点的logs里看到信息。也可以在UI查看。

    如果full GC被触发多次,则表明老年代空间不够用,可以增加executor的内存或减少spark.memory.fraction的值(会影响性能)

    如果很多minor GC,分配更多内存给Eden区(-Xmn=4/3*E)。关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB。

    启用G1收集器也有可能提高效率。

    spark.executor.memory是不包含overhead的,所以实际上占用的内存比申请的多。在executor内存中默认40%存一些数据和Spark的元数据,剩下的60%中,默认50% execution(计算) and 50% storage(用于caching) memory,但这是动态变动的,storage可借用execution的空间,当execution需要空间时又会赶走超过50%的那部分storage空间。这里数据量都是按block计算的。

    storage过小会cache丢失,代价取决于persist等级,可能直接删除,需要时重计算淘汰数据或溢出淘汰数据到磁盘,execution过小会有很多I/O。

    在1.6之前,内存空间划分为

    • Executionspark.shuffle.memoryFraction(default0.2):buffering intermediate data when performing shuffles, joins, sorts and aggregations
    • Storagespark.storage.memoryFraction(default0.6): caching, broadcasts and large task results
    • Other default 0.2: data structures allocated by user code and internal Spark metadata.

    在1.6及之后,内存空间划分为

    • Execution and Storage spark.memory.fraction (1.6时0.75,2.0后0.6)
      • spark.memory.storageFraction(default0.5): 当上面的内存中,storage超过50%,cached data may be evicted
    • Other

    8.集群配置

    Resource and Job Scheduling

    每个spark application运行一系列独立的executor进程。在application中,多个job可能同时运行,只要他们被分配到不同的线程。

    executor调度

    Spark’s standalone, YARN modes 和 coarse-grained Mesos mode都是静态划分,即app被分配最大量的executor,并拥有这些executor,直到app完成。Mesos可以实现动态划分。

    Spark可以设置动态调整executor,根据workload决定是否回收executor,这更适合多服务。这机制对上面提到的三种集群部署模式都适用。在动态调整下,如果有task等待executor且有空余资源,就会启动executor。这里有两个参数,一个规定task的等待时间,一个规定每次提供executor的间隔时间(开始提供1、然后2、4等)。移除则是根据idletimeout参数设定时间。这种动态调整会带来一个问题,即reducer端未得到所需数据,mapper端进程就因空闲而退出,导致数据无法获取。为解决这个问题,在动态调整阶段,executor的退出并非马上退出,而是会持续一段时间。这个做法通过外部shuffle服务完成(所以启动动态调整时也要启动外部shuffle服务),reducer端会从service处拉取数据而不是executor。

    cache数据也有类似的参数设置spark.dynamicAllocation.cachedExecutorIdleTimeout。

    参数调整经验

    • spark.dynamicAllocation.initialExecutors决定了Executor初始数目。这个值的选取可以根据历史任务的Executor数目的统计,按照二八原则来设置,例如80%的历史业务的Executor数目都不大于参数值。同时,也要考虑集群的资源紧张度,当资源比较紧张时,这个值需要设置得小一点。
    • spark.dynamicAllocation.maxExecutors决定了业务最大可以拥有的Executor数目。默认无穷大,要注意过大会使大业务独占大部分资源,造成小任务没有资源的情况;过小会导致大任务执行时间超出业务要求。
    • spark.dynamicAllocation.executorIdleTimeout决定了Executor空闲多长时间后会被动态删除。当这个值比较小时,集群资源会比较充分地共享,但会影响业务的执行时间(在Executor被删除后,可能需要重新申请新的Executor来执行task);当这个值比较大时,不利于资源的共享,若一些比较大的任务占用资源,迟迟不释放,就会造成其他任务得不到资源。这个值的选取需要在用户业务的执行时间和等待时间上做一个权衡。需要注意的是,当spark.dynamicAllocation.maxExecutors为有限值时,spark.dynamicAllocation.executorIdleTimeout过小会导致某些任务不能申请新的资源。例如maxExecutors=10,而某个业务所需的资源大于或等于10个Executor,业务在申请到10个Executor之后,申请到Executor由于空闲(有可能因为task还没来得及分配到其上)而被删除,目前社区Spark SQL的逻辑是不会再申请新的Executor的,这样就会导致任务执行速度变慢。

    job调度

    FIFO:如果前面的job不需要所有资源,那么第二个job也可启动。适合长作业、不适合短作业;适合CPU繁忙型。

    FAIR:轮询分配,大概平均的资源,适合多服务。该模式还可以设置pool,对job进行分组,并对各组设置优先级,从而实现job的优先级。可设置的参数有schedulingMode(FIFO和FAIR)、weight(池的优先级,如某个为2,其他为1,则2的那个会获得比其他多一倍的资源)、minShare(至少资源数)

    小文件合并

    当Reducer数目比较多时,可能会导致小文件过多。最好在输出前将文件进行合并。另外,后台应定期对数据进行压缩和合并。

    冷热数据分离

    • 冷:性能低、数据块大、备份数少、压缩率高
    • 热:反之(压缩速度快)

    参考

    书籍:

    Spark: The Definitive Guide

    High Performance Spark

    Spark SQL 内核剖析

    文章:

    官网Tunning Guide

    官网Spark Configuration

    Microsoft Azure's Optimize Spark jobs

  • 相关阅读:
    Linux source命令
    pythoy 基础一: python的特点 if 语句 whlie语句
    linux 基础
    python的游戏之旅( 数字 字符串 列表 元组 字典 即为游戏职业)
    html 基础
    《SQL Server 2008 从入门到精通》 学习笔记 第五天
    [转载] Visual Studio 2010 MSDN Help Library文档位置、错误、重新安装及安装注意事项
    ASP.NET后台通过输出JavaScript弹出窗口小结
    网上搜集的webbrower的资料,很有借鉴价值
    解决MSSQL 2008不能用IP登录的问题 和 打开可以用SA登录SQL2008的方法
  • 原文地址:https://www.cnblogs.com/code2one/p/10159719.html
Copyright © 2011-2022 走看看