zoukankan      html  css  js  c++  java
  • Spark调优


    下面调优主要基于2.0以后。

    代码优化

    1.语言选择

    如果是ETL并进行单节点机器学习,SparkR或Python。优点:语言相对简单;缺点:使用语言自身的数据结构时,效率低,因为这些数据需要转换。

    如果用到自定义transformations或自定义类,Scala或Java。优点:性能好;缺点:语言相对复杂。

    2.API选择

    • DataFrames
      • 大多数情况下的最佳选择
      • 更有效率的储存(Tungsten,减少GC开销)和处理(Catalyst优化查询)
      • 全阶段代码生成
      • 直接内存访问
      • 没有提供域对象编程和编译时检查
      • 部分算子在特定情况可优化,如order后take、window function等
      • 少数功能没有相应的算子
    • DataSets
      • 适用于性能影响可接受的复杂ETL管道
      • 通过Catalyst提供查询优化
      • 提供域对象编程和编译时检查
      • 较高GC开销
      • 打破整个阶段的代码生成
    • RDDs
      • 在Spark 2.x中,基本不需要使用,除非某些功能上面两种API没有
      • 增加序列化/反序列化开销。上面两种结构不必对整个对象进行反序列化也可访问对象属性。
      • 没有通过Catalyst进行查询优化
      • 没有全阶段代码生成
      • 高GC开销

    3.内存

    • 数据类型

    尽量用DF,默认通常比自定义有更多优化。

    另外,优先用原始数据结构和数组,次之为String而非其他集合和自定义对象。可以把自定义类改写成String的形式来表示,即用逗号,竖线等隔开每个成员变量,又或者用json字符串存储。

    数字或枚举key优于string

    估计内存消耗的方法:1.cache并在UI的Storage页查看。2.org.apache.spark.util.SizeEstimator可估计object的大小

    • GC(1.6以后)

      • spark.memory.fraction默认0.6,即堆内存减去reserved的300MB后的60%,它用来存储计算和cache所需的数据(storageFraction设置这两类数据的边界。例如0.4代表当execution需要空间时会赶走超过40%的那部分storage空间),剩下的40%存储Spark的元数据。

      • GC调优:在UI或者配置一些选项后能在worker节点的日志查看GC信息。

      • 过多full GC,则要增加executor内存。

      • 过多minor GC,则调大伊甸区。

      • 如果老年代空间不足,减少用于cache的内存,即减少fraction,同时减少storageFraction。或者直接调小伊甸区。

      • 调用G1回收器,如果heap size比较大,要调大-XX:G1HeapRegionSize。

      • 关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB

    • OffHeap(2.0以后)

      尝试使用Tungsten内存管理,设置spark.memory.ofHeap.enabled开启,spark.memory.ofHeap.size控制大小。

    4.Caching

    cache不同操作逻辑都需要用到的RDD或DF。会占用storage空间。

    目前Spark自带的cache适合小量数据,如果数据量大,建议用Alluxio,配合in-memory and ssd caching,或者直接存到HDFS。

    cache()是persist(MEMORY_ONLY),首选。内存不够部分(整个partition)在下次计算时会重新计算。
    如下所示,persist可以使用不同的存储级别进行持久化。能不存disk尽量不存,有时比重新算还慢
    MEMORY_AND_DISK
    MEMORY_ONLY_SER(序列化存,节省内存(小2到5倍),但要反序列化效率稍低,第二种选择),MEMORY_AND_DISK_SER
    DISK_ONLY
    MEMORY_ONLY_2, MEMORY_AND_DISK_2 备2份

    对已经不需要的RDD或DF调用unpersist

    当persist数据而storage不足时,默认会执行LRU算法,可通过persistencePriority控制,来淘汰之前缓存的数据。不同persist选项有不同操作,比如memory_only时,重用溢出的persisted RDD要重新计算,而memory and disk会将溢出部分写到磁盘。

    4.filter、map、join、partition、UDFs等

    • filter:尽早filter,过滤不必要的数据,有时还能避免读取部分数据(Catalyst的PushdownPredicate)。

    • mapPartitions替代map(foreachPartitions同理):

      该算子比map更高效,但注意算子内运用iterator-to-iterator转换,而非一次性将iterator转换为一个集合(容易OOM)。详情看high performance spark的“Iterator-to-Iterator Transformations with mapPartitions”

    • broadcast join:实际上是设置spark.sql.autoBroadcastJoinThreshold,是否主动用broadcast join交给Spark DF判断,因为我们只能知道数据源的大小而不一定知道经过处理后join前数据的大小

    • partition:减少partition用coalesce不会产生shuffle(把同节点的partition合并),例如filter后数据减少了不少时可以考虑减少分块

      repartition能尽量均匀分布data,在join或cache前用比较合适。

      自定义partitioner(很少用)

    • UDFs:在确定没有合适的内置sql算子才考虑UDFs

    • groupByKey/ reduceByKey

      对于RDD,少用前者,因为它不会在map-side进行聚合。但注意不要与DF的groupBy + agg和DS的groupByKey + reduceGroups混淆,它们的行为会经过Catalyst优化,会有map-side聚合。

    • flatMap:用来替代map后filter

    5.I/O

    将数据写到数据库中时,开线程池,并使用foreachPartition,分batch提交等。

    开启speculation(有些storage system会产生重复写入),HDFS系统和Spark在同一个节点

    6.广播变量

    比如一个函数要调用一个大的闭包变量时,比如用于查询的map、机器学习模型等

    配置优化

    1.并行度

    num-executors * executor-cores的2~3倍spark.default.parallelismspark.sql.shuffle.partitions

    2.数据序列化Kryo

    Spark涉及序列化的场景:闭包变量、广播变量、自定义类、持久化。

    Kryo不是所有可序列化类型都支持,2.0之后,默认情况下,simple types, arrays of simple types, or string type的shuffle都是Kryo序列化。

    //通过下面设置,不单单shuffle,涉及序列化的都会用Kryo。开启之后,scala的map,set,list,tuple,枚举类等都会启用。
    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //对于自定义类,要登记。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
    val sc = new SparkContext(conf)
    

    优化缓存大小spark.kryoserializer.buffr.mb(默认2M)

    3.数据本地化

    调节任务等待时间 ,通过spark.locality.xxx,可以根据不同级别设置等待时间。

    4.规划

    资源均分:spark.scheduler.modeFAIR

    限制:--max-executor-cores或修改默认spark.cores.max减少core的情况:

    1. Reduce heap size below 32 GB to keep GC overhead < 10%.
    2. Reduce the number of cores to keep GC overhead < 10%.

    5.数据储存

    存储格式:Parquet首选

    压缩格式:选择splittable文件如 zip(在文件范围内可分), bzip2(压缩慢,其他都很好), LZO(压缩速度最快)。上传数据分开几个文件,每个最好不超几百MB,用maxRecordsPerFile控制。文件不算太大,用gzip(各方面都好,但不能分割)。在conf中通过spark.sql.partuet.compression.codec设置。

    6.shuffle

    spark.reducer.maxSizeInFlight(48m): reduce端拉取数据的buffer大小,如果内存够大,且数据量大时,可尝试96m,反之调低。

    spark.reducer.maxReqsInFlight(Int.MaxValue):当reduce端的节点比较多时,过多的请求对发送端不利,为了稳定性,有时可能需要减少。

    spark.reducer.maxBlocksInFlightPerAddress(Int.MaxValue):和上面对应,控制每次向多少个节点拉取数据。

    spark.maxRemoteBlockSizeFetchToMem(Long.MaxValue):当一个block的数据超过这个值就把这个block拉取到到磁盘。

    spark.shuffle.compress(true)

    spark.shuffle.file.buffer:所产生准备shuffle的文件的大小,调大可减少溢出磁盘的次数,默认32k,可尝试64k。

    spark.shuffle.sort.bypassMergeThreshold(200):小于这个值时用BypassMergeSortShuffleWriter。

    Netty only:

    maxRetries * retryWait两个参数:有可能CG时间长导致拉取不到数据

    spark.shuffle.io.preferDirectBufs(true):如果对外内存紧缺,可以考虑关掉。

    spark.shuffle.io.numConnectionsPerPeer(1):对于具有许多硬盘和少数主机的群集,这可能导致并发性不足以使所有磁盘饱和,因此用户可能会考虑增加此值。

    7.executor内存压力和Garbage Collection

    收集统计数据:在Spark-submit添加--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps"。这之后能在worker节点的logs里看到信息。也可以在UI查看。

    如果full GC被触发多次,则表明老年代空间不够用,可以增加executor的内存或减少spark.memory.fraction的值(会影响性能)

    如果很多minor GC,分配更多内存给Eden区(-Xmn=4/3*E)。关于伊甸区大小的设置,可以根据每个executor同时处理的任务数估计。比如一个executor同时处理4个task,而每份HDFS压缩数据为128M(解压大概为2到3倍),可以把伊甸区设置为4 x 3 x 128MB。

    启用G1收集器也有可能提高效率。

    spark.executor.memory是不包含overhead的,所以实际上占用的内存比申请的多。在executor内存中默认40%存一些数据和Spark的元数据,剩下的60%中,默认50% execution(计算) and 50% storage(用于caching) memory,但这是动态变动的,storage可借用execution的空间,当execution需要空间时又会赶走超过50%的那部分storage空间。这里数据量都是按block计算的。

    storage过小会cache丢失,代价取决于persist等级,可能直接删除,需要时重计算淘汰数据或溢出淘汰数据到磁盘,execution过小会有很多I/O。

    在1.6之前,内存空间划分为

    • Executionspark.shuffle.memoryFraction(default0.2):buffering intermediate data when performing shuffles, joins, sorts and aggregations
    • Storagespark.storage.memoryFraction(default0.6): caching, broadcasts and large task results
    • Other default 0.2: data structures allocated by user code and internal Spark metadata.

    在1.6及之后,内存空间划分为

    • Execution and Storage spark.memory.fraction (1.6时0.75,2.0后0.6)
      • spark.memory.storageFraction(default0.5): 当上面的内存中,storage超过50%,cached data may be evicted
    • Other

    8.集群配置

    Resource and Job Scheduling

    每个spark application运行一系列独立的executor进程。在application中,多个job可能同时运行,只要他们被分配到不同的线程。

    executor调度

    Spark’s standalone, YARN modes 和 coarse-grained Mesos mode都是静态划分,即app被分配最大量的executor,并拥有这些executor,直到app完成。Mesos可以实现动态划分。

    Spark可以设置动态调整executor,根据workload决定是否回收executor,这更适合多服务。这机制对上面提到的三种集群部署模式都适用。在动态调整下,如果有task等待executor且有空余资源,就会启动executor。这里有两个参数,一个规定task的等待时间,一个规定每次提供executor的间隔时间(开始提供1、然后2、4等)。移除则是根据idletimeout参数设定时间。这种动态调整会带来一个问题,即reducer端未得到所需数据,mapper端进程就因空闲而退出,导致数据无法获取。为解决这个问题,在动态调整阶段,executor的退出并非马上退出,而是会持续一段时间。这个做法通过外部shuffle服务完成(所以启动动态调整时也要启动外部shuffle服务),reducer端会从service处拉取数据而不是executor。

    cache数据也有类似的参数设置spark.dynamicAllocation.cachedExecutorIdleTimeout。

    参数调整经验

    • spark.dynamicAllocation.initialExecutors决定了Executor初始数目。这个值的选取可以根据历史任务的Executor数目的统计,按照二八原则来设置,例如80%的历史业务的Executor数目都不大于参数值。同时,也要考虑集群的资源紧张度,当资源比较紧张时,这个值需要设置得小一点。
    • spark.dynamicAllocation.maxExecutors决定了业务最大可以拥有的Executor数目。默认无穷大,要注意过大会使大业务独占大部分资源,造成小任务没有资源的情况;过小会导致大任务执行时间超出业务要求。
    • spark.dynamicAllocation.executorIdleTimeout决定了Executor空闲多长时间后会被动态删除。当这个值比较小时,集群资源会比较充分地共享,但会影响业务的执行时间(在Executor被删除后,可能需要重新申请新的Executor来执行task);当这个值比较大时,不利于资源的共享,若一些比较大的任务占用资源,迟迟不释放,就会造成其他任务得不到资源。这个值的选取需要在用户业务的执行时间和等待时间上做一个权衡。需要注意的是,当spark.dynamicAllocation.maxExecutors为有限值时,spark.dynamicAllocation.executorIdleTimeout过小会导致某些任务不能申请新的资源。例如maxExecutors=10,而某个业务所需的资源大于或等于10个Executor,业务在申请到10个Executor之后,申请到Executor由于空闲(有可能因为task还没来得及分配到其上)而被删除,目前社区Spark SQL的逻辑是不会再申请新的Executor的,这样就会导致任务执行速度变慢。

    job调度

    FIFO:如果前面的job不需要所有资源,那么第二个job也可启动。适合长作业、不适合短作业;适合CPU繁忙型。

    FAIR:轮询分配,大概平均的资源,适合多服务。该模式还可以设置pool,对job进行分组,并对各组设置优先级,从而实现job的优先级。可设置的参数有schedulingMode(FIFO和FAIR)、weight(池的优先级,如某个为2,其他为1,则2的那个会获得比其他多一倍的资源)、minShare(至少资源数)

    小文件合并

    当Reducer数目比较多时,可能会导致小文件过多。最好在输出前将文件进行合并。另外,后台应定期对数据进行压缩和合并。

    冷热数据分离

    • 冷:性能低、数据块大、备份数少、压缩率高
    • 热:反之(压缩速度快)

    参考

    书籍:

    Spark: The Definitive Guide

    High Performance Spark

    Spark SQL 内核剖析

    文章:

    官网Tunning Guide

    官网Spark Configuration

    Microsoft Azure's Optimize Spark jobs

  • 相关阅读:
    POJ 3126 Prime Path
    POJ 2429 GCD & LCM Inverse
    POJ 2395 Out of Hay
    【Codeforces 105D】 Bag of mice
    【POJ 3071】 Football
    【POJ 2096】 Collecting Bugs
    【CQOI 2009】 余数之和
    【Codeforces 258E】 Devu and Flowers
    【SDOI 2010】 古代猪文
    【BZOJ 2982】 combination
  • 原文地址:https://www.cnblogs.com/code2one/p/10159719.html
Copyright © 2011-2022 走看看