zoukankan      html  css  js  c++  java
  • Spark的Shuffle是怎么回事

    Spark的Shuffle是怎么回事

    ​ Shuffle的中文含义是混洗,官方定义是:一种让数据重新分布以使得某些数据被放在同一分区里的一种机制。Shuffle的过程中,存在着大量的网络消耗传输数据,会在磁盘上产生大量的中间文件,在平时的工作中了解shuffle的运行机制能帮助我们写出更优秀的代码。此篇文章从shuffle的含义开始讲起,按照spark中shuffle的几中不同运行机制进行了解析,并最终附上了一些shuffle调优的建议。

    1. Shuffle的概述

    ​ spark中对于stage,划分了两个种类,除了最后一个stage称为resultStage,其余stage均称为ShuffleMapStage。

    ​ ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。而ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。

    ​ 在spark中,shuffle分为map阶段和reduce阶段,也可称之为shuffle read和shuffle write阶段,这里和hadoop的mapreduce不是一个东西。map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task会先执行,将上一个stage得到的最后结果写出,后执行的reduce task拉取上一个stage进行合并

    ​ 对于一次shuffle,map过程和reduce过程都有若干个task来执行。对于task的个数,map端的task个数和RDD的partition个数相同,reduce 端的 stage 默认取spark.default.parallelism 这个配置项的值作为分区数,如果没有配置,则以 map 端的最后一个 RDD 的分区数作为其分区数,分区数就将决定 reduce 端的 task 的个数。

    2. Shuffle机制详解

    ​ 在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager。在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。

    2.1 spark早期的HashShuffleManager

    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。它的计算模式比较的简单粗暴,详细如下:

    • shuffle write阶段

    这个阶段将stage中每个task处理的数据根据算子进行“划分”。比如reduceByKey,就是对相同的key执行hash算法,从而将相同都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

    • shuffle read阶段

    stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

    ​ 那么针对这种简单粗暴的HashShuffleManager,有着一个非常严重的弊端:会产生大量的中间磁盘文件,这样大量的磁盘IO操作会很影响性能。磁盘文件的数量由下一个stage的task数量决定,即下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件,如果当前stage有50个 task,那么总共会建立5000个磁盘文件。

    2.2 优化后的 HashShuffleManager

    ​ 由于原版的HashShuffleManager,HashShuffleManager后期进行了优化,这里说的优化是指可以设置一个参数,spark.shuffle.consolidateFiles=true。该参数默认值为false,通常来说如果我们使用HashShuffleManager,那么都建议开启这个选项。

    ​ 开启consolidate机制之后,在shuffle write过程中,task不会为下游stage的每个task创建一个磁盘文件,此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。而此时就会根据Executor数,并行执行task。第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

    ​ 当Executor执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。即运行在同一个Executor的task会复用之前的磁盘文件。 这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

    2.3 当前默认的SortShuffleManager

    在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

    • 普通运行模式

    ​ 在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。等到内存容量到了临界值就准备溢写到磁盘。
      在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件,每批次默认1万条数据。
      此时task往磁盘溢写,会产生多个临时文件,最后会将所有的临时文件都进行合并,合并成一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件,标识了下游各个task的数据在文件中的start offset与end offset。下游的task根据索引文件读取相应的数据文件。需要注意的是,此处所说的两个文件,是指上游一个task生成两个文件,而非所有的task最终只有两个文件。

    • bypass运行模式

    触发bypass机制的条件:

    1. shuffle map task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)

    2. 不是聚合类的shuffle算子(比如groupByKey)

    ​ 我们都知道,排序的时间复杂度最高不能优于O(nlogn),那么如果将排序的时间复杂度省下,那么shuffle性能将会提升很多。bypass机制与普通SortShuffleManager运行机制的不同在于,bypass机制就是利用了hash的O(1)时间复杂度取代了排序的操作开销,提升了这部分的性能。

    ​ task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。如上,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

    附: 一些shuffle调优参数

    spark.shuffle.file.buffer
    参数说明该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    spark.reducer.maxSizeInFlight
    参数说明该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

    spark.shuffle.io.maxRetries & spark.shuffle.io.retryWait
    spark.shuffle.io.retryWait:huffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
    spark.shuffle.io.retryWait该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
    调优建议:一般的调优都是将重试次数调高,不调整时间间隔。

    spark.shuffle.memoryFraction
    参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例。

    spark.shuffle.manager
    参数说明该参数用于设置shufflemanager的类型(默认为sort)。Spark1.5x以后有三个可选项:

    Hash:spark1.x版本的默认值,HashShuffleManager
    Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
    tungsten-sort:
    

    spark.shuffle.sort.bypassMergeThreshold
    参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
    调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些

    spark.shuffle.consolidateFiles
    参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,也就是开启优化后的HashShuffleManager。
    调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%

  • 相关阅读:
    避免因为Arcgis Server服务设置不当导致Oracle Process溢出的方法
    ArcSOC进程数不断增长导致oracle processes溢出原因分析
    PostgreSQL中的Toast Pointer
    SQLite中字段顺序和PAGE_SIZE对性能的影响
    PG数据库CPU和内存满负荷运转优化案
    ArcGIS Server浏览地图服务无响应原因分析说明
    PythonWeb全栈开发-虚拟环境搭建
    C 语言学习——错误处理
    C 语言学习——Valgrind 内存问题简述
    HTML学习--拖放API实现拖放排序
  • 原文地址:https://www.cnblogs.com/valjeanshaw/p/12549202.html
Copyright © 2011-2022 走看看