zoukankan      html  css  js  c++  java
  • 【大数据开发工程师】面试——Spark Shuffle

    引起shuffle的算子

    所谓shuffle就是指把数据打乱重新组合。指数据从map task输出到reduce task输入的这段过程。

    引起shuffle的算子有:

    1. repartition类的操作:repartition, coaleasce等
    2. ByKey类的操作:reduceByKey, groupByKey,  SortByKey等。相同的Key会到同一个节点上进行处理。
    3. join类的操作:join, cogroup等

    所谓分区,就是对key执行hash算法。

    Shuffle的原理

    Spark Shuffle分为map side 和 reduce side。

    一般将map task端的数据准备的shuffle阶段称为Shuffle Write,将reduce task端数据拷贝的阶段称为Shuffle Read。

    reduce task会跨节点去拉取其他节点上的map task结果,一边拉取一边聚合。这一过程会消耗网络资源、内存和磁盘IO。

    Hadoop Shuffle

     另:map个数等于切片个数,reduce个数等于分区个数。

    Spark Shuffle

    reduce task的个数 是 上个stage最后一个task的partition个数,也即下一个stage 的task的个数。

    Spark Shuffle的分类

     Spark中,负责Shuffle过程的执行、计算和处理的组件主要就是ShuffleManager。ShuffleManager有两种实现方式:HashShuffleManager和SortShuffleManager,因此Spark Shuffle有两种:Hash Shuffle 和Sort Shuffle

    • Hash Shuffle。 一个map task 产生reduce task个文件
      • 未优化。普通运行机制:
        • 2个map task * 3个reduce task = 6个小文件(一个executor上)
      • 优化。合并运行机制:  复用buffer
        • core * 3个reduce task  = 3个小文件(一个executor上)
    • Sort Shuffle 一个map task产生一个文件。有索引文件+数据文件
      • 未优化.普通运行机制。
      • 优化。bypass运行机制。排序

    Hash Shuffle

    1.2 之前,默认是HashShuffleManager引擎。

    假定一个executor只有一个CPU core,同一时间执行执行一个task线程。

    map task主要是进行分区然后将数据写入不同文件这样一个工作。

    未优化的普通运行机制

     每个reduce read task都有自己的buffer缓冲(默认32k),每次只能拉取与buffer缓冲相同大小的数据,然后通过内存中的map等操作进行聚合。聚合完一批数据后,再拉取下一批数据,直到所有的数据拉取完。

    存在的问题

    1. 海量小文件。导致建立通讯和拉取数据次数变多,产生大量耗时IO操作。

    优化的合并运行机制

     复用shuffleFileGroup的write buffer。

    如何开启:spark.shuffle.consolidateFiles=true (默认是false)

    一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,每个shuffleFileGroup会对应一批磁盘文件,并将数据写入对应的磁盘文件内。

    执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

    Sort Shuffle

    当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),或不是聚合类的shuffle算子,就会启用bypass机制。

    map task:缓存(分区+排序)、溢写、merge,一个task对应一个数据文件和一份索引文件。

    普通运行机制

     在该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    bypass运行机制

    而该机制与普通SortShuffleManager运行机制的不同在于:

    第一,磁盘写机制不同;

    第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。


    参考文章:

    https://www.cnblogs.com/itboys/p/9226479.html

  • 相关阅读:
    win10 uwp 如何判断一个对象被移除
    win10 uwp 如何判断一个对象被移除
    上传代码 CodePlex
    上传代码 CodePlex
    如何使用 Q#
    让 AE 输出 MPEG
    让 AE 输出 MPEG
    解决 vs 出现Error MC3000 给定编码中的字符无效
    解决 vs 出现Error MC3000 给定编码中的字符无效
    PHP date_date_set() 函数
  • 原文地址:https://www.cnblogs.com/lintong-zf/p/14231356.html
Copyright © 2011-2022 走看看