zoukankan      html  css  js  c++  java
  • Spark的Shuffle和MR的Shuffle异同

    介绍

    不论MapReduce还是RDD,shuffle都是非常重要的一环,也是影响整个程序执行效率的主要环节,但是在这两个编程模型里面shuffle却有很大的异同。

    shuffle的目的是对数据进行混洗,将各个节点的同一类数据汇集到某一个节点进行计算,为了就是分布式计算的可扩展性。

    可能大家多MR的shuffle比较清楚,相对来说MR的shuffle是比较清晰和粗暴的。shuffle阶段是介于Map和Reduce的一个中间阶段。

    具体详情见:

    高威:MapReduce编程模型​zhuanlan.zhihu.com图标

    而Spark的shuffle过程时出现在ShuffleMapTask过程中,和MR的map端shuffle以及reduce端shuffle类似,spark由于是一条链路不落盘的RDD开发模式,所以Spark的shuffle分为shuffle的读操作和shuffle的写操作。

    区别

    MR的shuffle分为:

    1. Map端的shuffle,主要是Partition、Collector、Sort、Spill、Merge几个阶段;
    2. Reduce端的shuffle,主要是Copy、Merge、Reduce几个阶段。

    但是MR的shuffle有一个很重要的特点:全局排序。

    MR的shuffle过程中在Map端会进行一个Sort,也会在Reduce端对Map的结果在进行一次排序。这样子最后就变成了有多个溢出文件(单个溢出文件是有序的,但是整体上是无序的),那么最后在merge成一个输出文件时还需再排序一次,同时,reduce在进行merge的时候同样需要再次排序(因为它从多个map处拉数据)

    注意:这个有序是指Key值有序,对于value依旧是无序的,如果想对value进行排序,需要借鉴二次排序的算法。
    二次排序的理论是利用MR的全局排序的功能,将value和key值合并,作为一个新的Key值,然后由MR的机制进行Key的排序,这个方法类似于在处理数据倾斜的时候在Key值上加随机数的方法。

    这个是排序的一种思想----合并排序。先进行小范围排序,最后再大范围排序。最后的复杂度为O(nlog(n)),比普通排序复杂度O(n的平方)快。

    但是问题是,排序是一个很耗资源的一种操作,而且很多的业务场景,是不需要进行排序的。所以MR的全局排序在很多的业务场景中是一个非常耗资源而且无用的操作。

    Spark的shuffle(对排序和合并进行了优化):

    为了避免不必要的排序,Spark提供了基于Hash的、基于排序和自定义的shuffleManager操作。

    1、shuffle的读写操作

    • 基于Hash的shuffle的写操作(一种是普通运行机制,另一种是合并的运行机制)。
    1. Mapper根据Reduce的数量创建相应的bucket(类似于MR的分区),bucket是一个抽象的概念,数量为M*R;
    2. Mapper生成的结果会根据设置的partition算法填充到每个bucket中;
    3. Reduce启动的时候,Mapper优先从本地或者然后远端(数据的本地性)取相应的bucket作为Reduce的输入进行处理
    4. Spark不在reduce端不做merge和sort,而是使用聚合(aggregator)。聚合是一个HashMap,将shuffle读到的每一个健值对更新或者插入到HashMap中,这样就不需要把所有的健值对进行merge sort,而是来一个插入一个。
    基于Hash的shuffle会出现一个问题:文件个数是M*R个,对于文件系统是一个负担,同时在shuffle的数据量不大的情况下,文件个数过多,随机写入会严重降低I/O的性能。同时如果后续任务很大的情况下,这些小文件所占用的缓存也是一个很大的开销。
    后续HashShuffle优化合并的运行机制,避免大量文件产生,把同一个core上的多个Mapper文件输出到同一个文件里面,这样文件个数就变成了R。
    在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。
    • 基于排序的shuffle的读写操作(spill和index)。

    根据不同的shuffle算子(是否combine),可选择不同的数据结构。

    如果不需要combine,会选择Array数据结构,直接写到内存,然后溢写到磁盘;

    如果需要combine,则选择Map数据结构,一边对Map进行排序聚合,一边写到内存,然后溢写到磁盘,最后在合并聚合。

    1. ShuffleMapTask不在为每个任务创建单独的文件,而是将所有的结果写到同一个文件中;
    2. 生成index文件进行索引,通过索引避免大量的文件的产生。
    3. 判断在Map端是否进行合并(combine),例如sortByKey等是需要排序的,但是HashMap是不会进行排序的,如果需要合并则在Map端进行聚合排序,如果不需要则HashMap;
    4. 排序中的Map内存超过域值,则溢写到磁盘。当所有的数据处完后,通过merge将内存和磁盘上的文件进行合并。
    5. 在下一个调度阶段,shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

    SortShuffle数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是有聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。接着,每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    此时task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件(标识了下游各个task的数据在文件中的start offset与end offset)。最终再由下游的task根据索引文件读取相应的数据文件。

    当非聚合的情况下,同时分区数少于设定的阈值,会启动ByPass 机制,bypass的就是不排序,还是用hash去为key分磁盘文件,分完之后再合并,形成一个索引文件和一个合并后的key hash文件。省掉了排序的性能。

    总结

    1. 功能上,MR的shuffle和Spark的shuffle是没啥区别的,都是对Map端的数据进行分区,要么聚合排序,要么不聚合排序,然后Reduce端或者下一个调度阶段进行拉取数据,完成map端到reduce端的数据传输功能。
    2. 方案上,有很大的区别,MR的shuffle是基于合并排序的思想,在数据进入reduce端之前,都会进行sort,为了方便后续的reduce端的全局排序,而Spark的shuffle是可选择的聚合,特别是1.2之后,需要通过调用特定的算子才会触发排序聚合的功能。
    3. 流程上,MR的Map端和Reduce区分非常明显,两块涉及到操作也是各司其职,而Spark的RDD是内存级的数据转换,不落盘,所以没有明确的划分,只是区分不同的调度阶段,不同的算子模型。
    4. 数据拉取,MR的reduce是直接拉去Map端的分区数据,而Spark是根据索引读取,而且是在action触发的时候才会拉去数据。
    5. HashShuffle,虽然MR和shuffle读都会进行HashShuffle,但是如果在shuffle读没有combine操作的时候同时分区数少于设定的阈值,则不会在HashMap的时候预先对分区中所有的健值对进行merge和sort,从而省下了排序过程。
  • 相关阅读:
    ASP.NET Core 添加统一模型验证处理机制
    【Spark】开发Spark选择Java还是Scala?
    【设计模式】单例模式-为什么是静态变量
    【Spark】SparkStreaming-如何使用checkpoint
    【Java】Java-ShutDownHook-优雅关闭系统资源
    【Scala】Scala-None-null引发的血案
    【Spark】SparkStreaming-输出到Kafka
    【Spark】Spark-Redis连接池
    【Spark】提交Spark任务-ClassNotFoundException-错误处理
    【大数据】王加林-大数据学习资料
  • 原文地址:https://www.cnblogs.com/jeasonit/p/13638285.html
Copyright © 2011-2022 走看看