zoukankan      html  css  js  c++  java
  • Spark 调优

    1. 资源调优

      (1). 在部署 spark 集群中指定资源分配的默认参数

      • 在 spark 安装包的 conf 下的 spark-env.sh

      • SPARK_WORKER_CORES

      • SPARK_WORKER_MEMORY

      • SPARK_WORKER_INSTANCES 每台机器启动 worker 数

      (2). 在提交 Application 的时候给当前的 Application 分配更多的资源

      • 提交命令选项: (在提交 Application 的时候使用选项)

        • --executor-cores
        • --executor-memory
        • --total-executor-cores
      • 配置信息: (Application 的代码设置或在 Spark-default.conf 中设置)

        • spark.executor.cores
        • spark.executor.memory
        • spark.max.cores
      • 动态分配资源

        • 启用 External shuffle Service 服务: spark.shuffle.service.enabled true
        • Shuffle Service 服务端口, 必须和 yarn-site中的一致: spark.shuffle.service.port 7337
        • 开启动态资源分配: spark.dynamicAllocation.enabled true
        • 每个 Application最小分配的 executor 数: spark.dynamicAllocation.minExecutors
        • 每个 Application 最大并发分配的 executor 数: spark.dynamicAllocation.maxExecutors
        • 调度第一次超时时间: spark.dynamicAllocation.schedulerBacklogTimeout(1s)
        • 调度第二次及之后超时时间: spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(5s)
        • 普通Executor空闲超时时间 : spark.dynamicAllocation.executorIdleTimeout(60s)
        • 含有cached blocks的Executor空闲超时时间: spark.dynamicAllocation.cachedExecutorIdleTimeout ( spark.dynamicAllocation.executorIdleTimeout的2倍 )
        • 注意事项:
          • 使用动态资源调度功能,必须配置External Shuffle Service。如果没有使用External Shuffle Service,Executor被杀时会丢失shuffle文件。
          • 配置了动态资源调度功能,就不能再单独配置Executor的个数,否则会报错退出。
          • 使用动态资源调度功能,能保证最少的executor的个数(spark.dynamicAllocation.minExecutors)
    2. 并行度调优

      (1). 如果读取的数据在 HDFS 中, 降低 block 大小, 相当于提高了 RDD中 partition 个数, sc.textFile(xx, numPartirions)

      (2). sc.parallelize(xxx, numPartitions)

      (3). sc.makeRDD(xxx, numPartitions)

      (4). sc.parallelizePairs(xxx, numPartitions)

      (5). repartitions/coalesce

      (6). reduceByKey/groupByKey/join ---(xxx, numPartitions)

      (7). spark.default.parallelism net set

      (8). spark.sql.shuffle.partitions --200

      (8). 自定义 Paritioner(分区器)

      (9). 如果读取数据是在 SparkStreaming中

      • Receiver(2.3后木了): spark.streaming.blockInterval-200ms

      • Direct: 读取的 topic 的分区数

    3. 代码调优

      (1). 避免创建重复的 RDD

      (2). 对多次使用的RDD进行持久化

      • 如何选择合适的持久化策略(存储级别)

        • 默认情况下, MEMORY_ONLY 性能最高, 但前提是内存必须足够大, 可以存放下整个RDD的所有数据, 且有余量。

          • 因为不进行序列化与反序列化操作, 就避免了这部分的性能开销: 对这个RDD的后续算子操作, 都是基于纯内存中的数据的操作, 不需要从磁盘文件中读取数据, 性能也很高;
          • 而且不需要复制一份数据副本, 并远程传送到其他节点上。
          • 需要注意的是, 在实际的生产环境中, 能直接用到次策略的场景还是有限的, 如果 RDD中数据比较多时(比如 几十亿), 直接用这种持久化级别, 会导致 JVM的 OOM 内存溢出异常。
        • 如果使用 MEMORY_ONLY 级别时发生了内存溢出, 那么建议尝试使用 MEMORY_ONLY_SER 级别。

          • 该级别会将 RDD 数据序列化后再保存在内存中, 此时每个 partition 仅仅是一个字节数组而已, 大大减少了对象数量, 并降低了内存占用。
          • 这种级别比MEMORY_ONLY 多出来的性能开销, 主要就是序列化与反序列化的开销。
          • 但是后续算子可以基于纯内存进行操作, 因此性能总体还是比较高的。
          • 此时, 可能发生的问题同上, 如果RDD中的 数据量过多的话, 还是可能会导致 OOM 内存溢出的异常。
        • 如果纯内存的级别都无法使用, 那么建议使用 MEMORY_AND_DISK_SER 策略, 而不是MEMORY_AND_DISK 策略。

        • 既然到了这一步, 就说明 RDD 的数据量很大, 内存无法完全放下。

        • 序列化后的数据比较少, 可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中, 内存缓存不下才会写入磁盘。

        • 通常不建议使用 DISK_ONLY 和后缀为_2的级别: 因为完全基于磁盘文件进行数据的读写, 会导致性能急剧降低, 有时还不如重新计算一次所有 RDD。

        • 后缀为 _2 的级别, 必须将所有数据都复制一份福本, 并发送到其他节点上, 数据复制以及网络传输会导致较大的性能开销, 除非是要求作业的高可用性, 否则不建议使用。

        • 持久化算子:

          • cache:
            • MEMORY_ONLY
          • persist:
            • MEMORY_ONLY
            • MEMORY_ONLY_SER
            • MEMORY_AND_DISK_SER
              • 一般不要选择带有_2的持久化级别
          • checkpoint:
            • 如果一个RDD的计算时间比较长或者计算起来比较复杂, 一般将这个 RDD 的计算结果保存到 HDFS 上, 这样数据会更加安全。
            • 如果一个 RDD 的依赖关系非常长, 也会使用 checkpoint, 会切断依赖关系, 提高容错的效率。

        (3). 尽量避免使用 shuffle 类的算子

      • 使用广播变量来模拟使用 join, 使用情况: 一个RDD比较大, 一个 RDD 比较小。

      • join 算子 = 广播变量 + filter 、广播变量 + map、广播变量 + flatMap

        (4). 使用 map-side 预聚合的 shuffle 操作

      • 尽量使用有combiner 的shuffle 类算子

      • combiner 概念:

        • 在 map 端, 每一个 map task 计算完毕后进行的局部聚合。
      • combiner 好处:

        • 降低 shuffle write 写磁盘的数据量。
        • 降低 shuffle read 拉取数据量的大小。
        • 降低 reduce 端聚合的次数。
      • 有 combiner 的 shuffle 类算子:

        • reduceByKey: 这个算子在 map 端是由 combiner 的, 在一些场景中可以使用 reduceByKey 代替 groupByKey。
        • aggregateByKey
        • combinerByKey

        (5). 尽量使用高性能的算子

      • 使用 reduceByKey 替代 groupByKey

      • 使用 mapPartition 替代 map

      • 使用 foreachPartition 替代 foreach

      • filter 后使用 coalesce 减少分区数

      • 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作

      • 使用 repartition 与 coalesce 算子操作分区

        (6). 使用广播变量

      • 开发过程中, 会遇到需要在算子函数中使用外部变量的场景(尤其是打变量, 比如 100M 以上的大集合), 那么此时就应该使用 Spark 的广播(Broadcast) 功能来提升性能。

      • 函数中使用到外部变量时, 默认情况下, Spark会将改1变量复制多个副本, 通过网络传输到 task 中, 此时每个 task 都有一个变量副本。

      • 如果变量本身比较大的话(比如 100M, 甚至 1G), 那么大量的变量副本在网络组传输的性能开销, 以及在各个节点的 Executor 中占用过的内存导致的频繁 GC, 都会极大地影响性能。

      • 如果使用的外部变量比较大, 建议使用 Spark 的广播功能, 对该变量进行广播。

      • 广播后的变量, 会保证每个 Executor 的内存中, 只驻留一份变量副本, 而 Executor 中的 task 执行时共享该 Executor 中的内存中, 只驻留一份变量副本, 而 Executor 中的 task 执行时共享该 Executor 中的那份变量副本。

      • 这样的话, 可以大大减少变量副本的数量, 从而减少网络传输的性能开销, 并减少对 Executor 内存的占用开销, 降低 GC 的概率。

      • 广大变量发送方式: Executor 一开始并没有广播变量, 而是 task 运行需要用到 广播变量, 会找 executor 的 blockManager 要, 而 blockManager 找 Driver 里面的 blockManagerMaster 要。

      • 使用广播变量可以大大降低集群中变量的副本数。

      • 不使用广播变量, 变量的副本数 和 task 数一致。

      • 使用广播变量变量的 副本 和 Executor 数一致。

        (7). 使用 Kyro 优化序列化性能

      • 在 Spark 中, 主要有三个地方设计到了序列化:

        • 在算子函数中使用到外部变量时, 该变量会被序列化后进行网络传输。
        • 将自定义的类型作为 RDD 的泛型类型时(比如 JavaRDD, xxx 是自定义类型), 所有自定义类型对象, 都会进行序列化。因此这种情况下, 也要求自定义的类必须实现 Serializable 接口。
        • 使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER), Spark会将 RDD 中的每个 partition 都序列化成一个大的字节数组。
      • Kryo 序列化器介绍:

        • Kryo序列化机制
      • Spark 中使用 Kryo:

        Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .registerKryoClasses(new Class[]{SpeedSortKey.class})
        

        (8). 优化数据结构

      • java中有三种类型比较消耗内存

        • 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
        • 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
        • 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
      • Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。

        (9). 使用高性能的库 fastutil

      • fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue。

        • fastutil能够提供更小的内存占用,更快的存取速度。
        • 我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度。
        • fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
    4. 数据本地化

      (1). 数据本地化的级别:

      • PROCESS_LOCAL

      • NODE_LOCAL

      • NO_PREF

      • RACK_LOCAL

      • ANY

        (2). Spark 数据本地化调优:

      image-20191105135132046

      • Spark 中任务调度时, TaskScheduler 在分发之前需要依据数据的位置来分发, 最好将 task 分发到数据所在的节点上, 如果 TaskScheduler 分发的 task 在默认 3 * 5 次, 如果依然无法执行, 那么 TaskScheduler 会降低一级数据本地化的级别再次发送 task。

      • 如何提高数据本地化的级别?

      • 如何查看数据本地化的级别?

    5. 内存调优

    6. Spark Shuffle 调优

      (1). buffer 大小 ------32KB

      (2). shuffle read 拉取数据量的大小 ---- 48M

      (3). shuffle 聚合内存的比例 ---- 20%

      (4). 拉取数据重试次数 ---- 5次

      (5). 重试间隔时间 60s

      (6). Spark Shuffle 的种类

      (7). SortShuffle bypass机制 200次

    7. Executor 堆外内存调优

      • Spark 底层 shuffle 的传输方式是使用 netty 传输, netty在进行网络传输的过程会申请堆外内存(netty 是零拷贝), 所以使用了堆外内存。

      • 默认情况下, 这个堆外内存上限默认是每一个 executor 的内存大小的 10%

      • 但真正处理大数据时, 这里都会出现问题, 导致 spark 作业反复崩溃, 无法运行, 此时就回去调节改参数到至少1G(1024M), 甚至说2G, 4G。

      • 如果 Executor 由于内存不足或者堆外内存不足, 挂掉了, Reducer 端不能够啦取数据。我们可以调节堆外内存的大小。

        • 在 ./spark-submit 提交任务的脚本中添加:
          • on yarn 模式:
            • --conf spark.yarn.executor.memoryOverhead=2048 (单位M)
          • standalone 模式:
            • --conf spark.executor.memoryOverhead=2048 (单位M)
      • executor 在进行 shuffle write, 优先从自己本地关联的 mapOutPutWorker 中获取某份 数据, 如果本地 mapOutPutWorkers 没有的话, 那么会通过 TransferService, 去远程连接其他节点上 executor 的 block manager 去获取, 尝试建立远程的网络连接, 并且去拉取数据。

      • 频繁创建对象让 JVM 堆内存满溢, 进行垃圾回收。

      • 处于垃圾回收过程中, 所有的工作现场全部停止, 相当于只要一旦进行垃圾回收, spark / executor 停止工作, 无法提供相应, spark默认的网络连接的超时时长是 120s;

      • 如果 lag 了 120s 都无法建立连接的话, 那么这个task就失败了。

      • task 失败了就会出现 shuffle file cannot find 的错误。

      • 调节等待的时长:

        • 在 ./spark-submit 提交任务的脚本里面添加:

          • --conf spark.core.connection.ack.wait.timeout=300
    8. 解决数据倾斜

      (1). 使用 Hive ETL 预处理数据

      • 方案适用场景
        • 如果导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个 key 对应了100 万数据, 其他 key 才对应了 100 条数据)。
        • 而且 业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作, 那么比较适合使用这种技术方案。
      • 方案实现思路
        • 先评估一下, 是否可以通过 Hive 来进行数据预处理 (即通过 Hive ETL 预先对数据按照 key 进行聚合, 或者是预先和其他表进行 join), 然后在 Spark 作业中就不需要使用原先的 shuffle 类算子执行这类操作。
      • 方案实现原理
        • 由于该方案彻底避免了在 Spark 中执行 shuffle 类算子, 那么肯定就不会有数据倾斜的问题了。
        • 但是这种方式属于治标不治本, 数据本身就存在分布不均匀的问题, 所以 Hive ETL 中进行 group by 或者 join 等shuffle 操作时, 还是会出现数据倾斜, 导致 Hive ETL 的速度很慢。
        • 我们只是把数据倾斜的发生提前到了 Hive ETL 中, 避免 Spark 程序发生数据倾斜而已。

      (2). 过滤少数导致倾斜的 key

      • 方案适用场景

        • 如果发现导致倾斜的 key 就只有少数几个, 而且对计算本身的影响并不大的话, 那么很适合使用这种方案。比如 99% 的 key 就对应 10 条数据, 但是只有一个 key 对应了 100 万数据, 从而导致了数据倾斜。
      • 方案实现思路

        • 如果我们判断少数几个数据量特别多的 key, 对作业的执行和计算结果不是特别重要的话, 那么干脆就直接过滤掉那少数几个 key。
        • 比如, 在 Spark SQL 中 可以使用 where 子句过滤掉这些 key 或者 在 Spark Core 中对 RDD 执行 filter 算子过滤掉这些 key。
        • 如果需要每次作业执行时, 动态判定哪些 key 的数据量最多然后再进行过滤, 那么可以使用 sample 算子 对 RDD 进行采样, 然后计算出每个 key 的数量, 取数据量最多的 key 过滤掉即可。
      • 方案实现原理

        • 将导致数据倾斜的 key 给过滤掉之后, 这些 key 就不会参与计算了, 自然不可能产生数据倾斜。

      (3). 提高 shuffle 操作的并行度

      • 方案实现思路
        • 在对 RDD 执行 shuffle 算子时, 给 shuffle 算子传入一个参数, 比如 reduceByKey(1000), 该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。
        • 对于 Spark SQL 中的 shuffle 类语句, 比如 group by、join 等, 需要设置一个参数, 即spark.sql.shuffle.partitions, 该参数代表了 shuffle read task 的并行度, 该值默认是 200, 对于很多场景来说都有点过小。
      • 方案实现原理
        • 增加 shuffle read task 的数量, 可以让原本分配给一个 task 的多个 key 分配给 多个 task, 从而让每个 task 处理比原来更少的数据。
        • 举例来说, 如果原本有 5 个不同的 key, 每个 key 对应 10 条数据, 这 5 个 key 都是分配给一个 task 的, 那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后, 每个 task 就分配到 一个 key, 即每个 task 就处理 10 条数据, 那么自然每个 task 的执行时间就会变短了。

      (4). 双重聚合

      • 方案适用场景

        • 对 RDD 执行 reduceByKey 等聚合类 shuffle 算子 或者 在 SparkSQL 中 使用 group by 语句进行分组2聚合时, 比较适用这种方案。
      • 方案实现思路

        • 进行两阶段聚合。

          • 第一次是局部聚合, 先给 每个 key 都打上一个随机数, 比如 10 以内的随机数, 此时原先一样的 key 就变成不一样的了, 比如 (amd, 1)(amd, 1)(amd, 1)(amd, 1) => (1_amd, 1)(1_amd, 1)(2_amd, 1)(2_amd, 1)。 接着对打上随机数后的数据执行 reduceByKey 等聚合操作, 进行局部聚合, 那么局部聚合结果就会变成了(1_amd, 2)(2_amd, 2)。
          • 然后将各个 key 的前缀去掉, 就会变成(amd, 2)(amd, 2), 再次进行全局聚合操作, 就可以得到最终结果了, 比如(amd, 4)。

          image-20191106111254898

      • 方案实现原理

        • 将原本相同的 key 通过附加随机前缀的方式, 变成多个不同的 key, 就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合, 进而解决 单个 task 处理的数据分散到多个 task 上去做局部聚合, 进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀, 再次进行全局聚合, 就可以得到最终结果。
        • 如果一个 RDD 中有一个 key 导致数据倾斜, 同时还有其他的 key, 那么一般先对数据集进行抽样, 然后找出倾斜的 key, 再使用 filter 对原始的 RDD 进行分离为两个 RDD, 一个是由倾斜的 key 组成的 RDD1, 一个是由其他的 key 组成的 RDD2, 那么对于 RDD1 可以使用加随机前缀进行多分区多 task计算, 对于另一个 RDD2 正常聚合计算, 最后将两个结果合并起来。

      (5). 使用广播变量代替 Join

      • BroadCast + filter(或者 map)

      • 方案适用场景

        • 在对 RDD使用 join 类操作, 或者是在 Spark SQL中使用 join 语句时, 而且 join 操作中的一个 RDD 或表的数据量比较小 (比如几百 M 或者 1, 2 G), 比较适用此方案。
      • 方案实现思路:

        • 不使用 join 算子进行连接操作, 而使用 Broadcast 变量 与 filter/map 算子实现 join 操作, 进而完全规避掉 shuffle 类的操作, 彻底避免数据倾斜的发生和出现。
        • 将较小RDD中的数据直接通过 collect 算子拉取到 Driver 端的内存中来, 然后对其创建一个 Broadcast 变量。
        • 接着对另外一个 RDD 执行 map/filter 算子, 在算子函数内, 从 Broadcast 变量中获取较小 RDD 的全量数据, 与当前 RDD 的每一条数据按照连接 key 进行对比, 如果连接 key 相同的话, 那么就将两个 RDD 的数据用你需要的方式连接起来。
      • 方案实现原理:

        • 普通的 join 是会走 shuffle 过程的, 而一旦 shuffle, 就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join, 此时就是 reduce join。

        • 但是如果一个 RDD 是比较小的, 则可以采用广播小 RDD 全量数据 + map / filter 算子来实现 与 join 同样的效果, 也就是 map join, 此时就不会发生 shuffle 操作, 也就不会发生数据倾斜。

      • 方案实现原理

      (6). 采样倾斜 key 并分拆 join 操作

      • 方案适用场景

        • 两个 RDD/Hive 表进行 join 的时候, 如果数据量都比较大, 无法采用"解决方案五", 那么此时可以看一下两个 RDD/Hive 表中的 key 分布情况。

        • 如果出现数据倾斜, 是因为其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大, 而另一个 RDD/Hive 表中所有 key 都分布比较均匀, 那么采用这个解决方案是比较合适的。

          image-20191106132500577

          image-20191106132520283

      • 方案实现思路

        • 对包含少数几个数据量过大的 key 的那个 RDD, 通过 sample 取出一份样本, 然后统计一下每个 key 的数量, 计算出来数据量最大的是哪几个 key。
        • 然后将这几个 key 对应的数据从原来的 RDD 中拆分出来, 形成一个单独的 RDD, 并给每个 key 都打上 n以内的随机数作为前缀, 从而防止倾斜的大部分 key 形成另外一个 RDD。
        • 接着将需要 join 的另一个 RDD, 也过滤出来那几个倾斜 key 所对应的数据并形成一个单独的 RDD, 将每条数据膨胀成 n 条数据, 这 n 条数据都按顺序附加一个 0~n的前缀, 不会导致倾斜的大部分 key 也形成另外一个 RDD。
        • 再将附加了随机前缀的独立 RDD 与 另一个膨胀 n 倍 的独立 RDD 进行 join, 此时就可以将原先相同的 key 打散成 n 份, 分散到多个 task 中进行 join了。
        • 而另外两个普通的 RDD 就照常 join 即可。
        • 最后将两次 join 的结果使用 union 算子 合并起来就是最终的join结果。

      (7). 使用随机前缀 和 扩容 RDD 进行 join

      • 方案适用场景

        • 如果在进行 join 操作时, RDD 中有大量 的 key 导致 数据倾斜, 那么进行分拆 key 也没什么意义, 此时就只能使用最后一种方案来解决问题了。
      • 方案实现思路

        • 该方案的实现思路基本 和 "解决方案六" 类似, 首先查看 RDD/Hive 表中的数据分布情况, 找到那个造成数据倾斜的 RDD/Hive 表。比如有多个 key都对应了超过 1 万 条数据。

        • 然后将该 RDD 的每条数据都打上一个 n 以内的随机前缀。

        • 同时对另外一个正常的 RDD 进行扩容, 将每条数据都扩容成 n 条数据, 扩容出来的每条数据都依次打上一个 0~n 的前缀。

        • 最后将两个处理后的 RDD 进行 join 即可。

          image-20191106141714464

          image-20191106141728089

          • 前者的 join 可以并行处理数据的 task 最多为2, 而优化后的后者 的 join 可以并行处理数据的 task 数最多为3。
    9. Spark故障解决(troubleshooting)

      (1). shuffle file cannot find: 磁盘小文件找不到。

      • connection timeout ---- shuffle file cannot find

        • 提高建立连接的超时时间, 或者降低 gc, 降低 gc 了那么 spark 不能在堆外提供服务的时间就少了, 那么超时的可能就会降低。
      • fetch data fail ---- shuffle file cannot find

        • 提高拉取数据的重试次数以及间隔时间。
      • OOM/executor lost ---- shuffle file cannot find

        • 提高堆 内/外 内存大小

      (2). reduce OOM

      • BlockManager 拉取的数据量大, reduce task 处理的数据量小
      • 解决方法:
        • 降低每次拉取的数据量
        • 提高 shuffle 聚合的内存比例
        • 提高 Executor 的内存大小
  • 相关阅读:
    vi命令大全
    理解proc文件系统
    读目录
    取得系统资源信息
    qtempinc
    我实现的一个正则表达式代码
    oracle内置函数大全
    STL算法
    unix基础教程
    两日期间的天数
  • 原文地址:https://www.cnblogs.com/ronnieyuan/p/11804962.html
Copyright © 2011-2022 走看看