zoukankan      html  css  js  c++  java
  • spark系列(二)----shuffle

    一.hashShuffle

      在早期的spark中,其shuffle的机制是hashShuffle。在hashShuffle的过程中,每一个shuffleMapTask都会为每一个reduceTask创建一个bucket缓存,shuffleMapTask会对处理后的数据进行partitioner操作(默认是hash partition,即对key进行hashcode再将其值与reduceTask数量进行取余),划分好哪些数据该分配到哪个reduceTask中,并将数据写入对应的bucket缓存中,当数据都处理完毕之后,会将bucket缓存里面的数据刷到硬盘里面。

      这种shuffle模式有几个显而易见的缺点

        1.当处理的数据量很大,集群的规模很大的时候,ShuffleMapTask与reduceTask的数量会很多,假设它们的数量各为100,则shuffle的过程中需要打开100*100=10000个小文件,除了对IO造成极大的负担外,每打开一个文件句柄就需要消耗一定的内存去管理它,因此内存的消耗也非常巨大。

        2.bucket里面的数据需要待shuffleMapTask任务完成后才会刷新到硬盘,这里对内存的要求很大,当shuffleMapTask需要处理的数据量很大的时候,就容易造成OOM。

      后来,spark对hashShuffle进行了一次优化,优化后的机制叫做consolidateShuffle,与hashShuffle最大的区别是,对于同一个Executor-core,其执行的shuffleMapTask的输出都只写到同一个bucket中,也就是说,bucket的数量变成executor*executor-core*reduceTask,假如现在有100个shuffleMapTask,100个reduceTask,两个executor,每个executor有两个core,可以同时并行处理两个shuffleMapTask,这时候生成的bucket数量就是400个。这样就大幅度减少了文件的数量。

      但是在集群规模特别大的时候,这模式产生出来的文件数量还是非常惊人。

    二.sortShuffle

      针对hashShuffle的缺点,spark在1.2版本之后使用sortShuffle替代hashShuffle,其中,sortShuffle有三种模式,分别是BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWrite。

      1.BypassMergeSortShuffleWriter

        BypassMergeSortShuffleWriter的原理与hashShuffle相似,它主要是用来处理再map的时候不需要对数据进行sort和aggregate操作的场景的,如groupByKey。

        由于它不需要考虑sort和aggregate,因此这种模式下,shuffleMapTask输出的数据将直接写入文件里面,与hashShuffle不同的是,它在数据都写入文件后,会对文件进行merge操作,将多个小文件合并成一个大文件,并同时生成一个索引文件,用于供reduceTask快速从这个大文件中定位到他需要拿到的数据。可以看到,这种模式虽然在merge后解决了文件数量多的问题,但是在merge之前,需要同时打开大量的小文件。因此这种模式的弊端与hashShuffle是相似的,都是不能满足分区多,数据规模大的场景。

        所以,spark集群会通过两个条件来判断是否应该使用BypassMergeSortShuffleWriter,第一是集群是否开启了mapSideCombine ,第二是当前分区数量是否小于spark.shuffle.sort.bypassMergeThreshold这个参数的值。

      2.Sort-Based Shuffle

        这种模式下,shuffleMapTask处理完的数据会先输出到一个内存区域(由spark.shuffle.spill决定,默认是true,如果设置为false,则不会溢写,假如内存不足,则会导致oom),在这个内存区域里面,spark会通过PartitionedAppendOnlyMap 或者 PartitionedPairBuffer(如果需要合并,则使用PartitionedAppendOnlyMap )这两种数据结构对数据进行排序。当内存区域里的数据量超过一定阈值之后,会启动溢写程序,将数据写到磁盘里面。最后会对磁盘里面的数据已经还在内存里面的数据进行一次mergeSort,意思是将所有数据合并成一个有序的大文件。

      针对这种模式,有几个关键的点需要提及一些

      appendOnlyMap

        appendOnlyMap实际上是一种只允许record添加或者对value更新的hashMap(这样提供了在线聚合的功能)。与Java的hashMap采用数组加链表的做法不同,appendOnlyMap只使用数组作为底层数据结构来存储元素,根据元素K值得hash值来确定元素该存放在数组得哪个位置,在遇到hash冲突的情况下,会使用二次地址探测法来解决Hash冲突。

        所谓的二次地址探测法,与hashtable的二次探测是一样的,若当前key与原来key产生相同的哈希地址,则当前key存在该地址后偏移量为(1,2,3...)的二次方地址处

          key1:hash(key)+0
          key2:hash(key)+1^2
          key3:hash(key)+2^2

        对于appendOnlyMap的扩容问题,当appendOnlyMap里面的record大于70%,appendOnlyMap就会扩容成原来的一倍,并且之前计算的hash值全部作废,需重新计算。

      externalAppendOnlyMap

        对于appendOnlyMap来说,它的缺点很明显,那就是它只能使用内存的存储record,当shuffleMapTask输出的数据非常大的话,容易内存溢出,因此spark设计了一种数据结构可以结合内存与磁盘来存储数据externalAppendOnlyMap。externalAppendOnlyMap其实是在appendOnlyMap的基础上,在内存空间快被填满的时候,将内存中的数据排序,然后将数据spill到磁盘里面,在运算过程中,externalAppendOnlyMap的内存空间可能会被多次填满,因此会产生多个spill文件,所以,在shuffle的最后,需要对内存中的数据以及已经写入磁盘里面的数据进行一次mergeSort,也就是全局聚合排序。

        但是这里有个技术难点,就是如何取确定externalAppendOnlyMap中的数据所占内存的大小,有一种方法就是每次插入或者更新record的时候,都将externalAppendOnlyMap内所有record都取出来并计算他们所占内存的值的总和,但是这样的操作太过于消耗资源,因此,spark设计了一个估算逻辑。spark会定期对externalAppendOnlyMap里面的record进行抽样,统计这些record的总个数,总大小,以及大小平均值,并作为历史计算值保存起来,然后每次插入或者更新的record的时候,将会通过历史计算值去估算现在的record所占内存的大小。

        最后还有一个难点就是全局聚合,为了使全局聚合操作更加方便,externalAppendOnlyMap在每次spill之前都会对内存中的record进行排序,以保证spill文件里面的数据也是有序的,当保证了这个前提之后,接下来的原理就很简单了,就是把还在内存里面的数据,以及已经写入到硬盘里面的各个文件里面,将他们开头的部分元素取出来形成steambuffer,将若干个streambuffer构造成一个最小堆,然后在最小堆里面寻找k相同的元素进行合并,最后输出堆顶的头部元素,然后对于数据不足的steambuffer,spark将会从文件或者内存中获取数据填补最小堆,直到所有record被处理完为止。

      PartitionedAppendOnlyMap

        partitionedAppendOnlyMap的实现逻辑与externalAppendOnlyMap一样,唯一不同的是他的K可以是partitionId+key,也可以是partitionId,以应对需要和不需要根据key排序的场景,例如sortBykey和groupBykey。

        从上图可以看到,merge 过程就是 每个文件读取部分数据(StreamBuffer)放到 mergeHeap 里面, 当前内存中的 PartitionedAppendOnlyMap 也进行 sort,形成一个 sortedMap 放在 mergeHeap 里面,  这个 heap 是一个 优先队列 PriorityQueue, 并且自定义了排序方式,就是取出堆元素StreamBuffer的head元素进行比较大小。这样的话,每次从堆顶的 StreamBuffer 中 pop 出的 head 元素就是全局最小的元素(记住是按照(partitionId,hash(Key))排序的), 如果需要 aggregation, 就把这些key 的hash值相同的元素放在一个mergeBuffers 中,  第一个被放入 mergeBuffers 的 StreamBuffer(堆顶StreamBuffer) 被称为 minBuffer,那么 minKey 就是 minBuffer 中第一个 record 的 key。当 merge-combine 的时候,与 minKey 有相同的Key的records 被 aggregate 一起,然后输出。

        其中,PriorityQueue的代码如下:

    val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
          // Use the reverse of comparator.compare because PriorityQueue dequeues the max
          override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
    })

    参考博客:https://www.cnblogs.com/itboys/p/9201750.html

  • 相关阅读:
    Android 序列化 反序列功能
    Android两个应用之间共享数据之SharedPrefence
    利用Asp.Net的Identity控制登录权限
    基元类型
    CLR概述
    python中的函数-基础
    较为复杂的sql
    oracle中的替换字段字符串的方法
    使用Sql查看表对应的外键关系。
    js中的除法
  • 原文地址:https://www.cnblogs.com/QicongLiang/p/13667565.html
Copyright © 2011-2022 走看看