zoukankan      html  css  js  c++  java
  • Spark Shuffle(一)ShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去(转载)

    转载自:https://blog.csdn.net/raintungli/article/details/70807376

    当Executor进行reduce运算的时候,生成运算结果的临时Shuffle数据,并保存在磁盘中,被最后的Action算子调用,而这个阶段就是在ShuffleMapTask里执行的。

    前面博客中也提到了,用什么ShuffleWrite是由ShuffleHandler来决定的,在这篇博客里主要介绍最常见的SortShuffleWrite的核心算法ExternalSorter.

    2. 结构AppendOnlyMap

    在前面博客中介绍了SortShuffleWriter调用ExternalSorter.insertAll进行数据插入和数据合并的,ExternalSorted里使用了PartitionedAppendOnlyMap作为数据的存储方式

    先来看PartitionedAppendOnlyMap的结构

     

    虽然名字为Map,但是在这里和常见的Map的结构并不太一样,里面并没有使用链表结果保存相同的hash值的key,当插入的key的hashcode相同的时但key不相同,会通过i的叠加一直找到数组里空闲的位置。

    这里有几个注意点:

    • Key 注意这里的Key并不是通过Map里拆分的Key, 而是Tuple2(PartitionId,Key),由分片的段和key组合的联合key
    • 如何计算PartitionId? 这是由Partitioner来决定的

    2.1 Partitioner

    Partitioner的方法

    abstract class Partitioner extends Serializable {  
      def numPartitions: Int  
      def getPartition(key: Any): Int  
    }  

    通过调用getPartition方法找到对应的partition相应的块,而常用的是HashPartitioner

    def getPartition(key: Any): Int = key match {  
       case null => 0  
       case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)  
     }  

    计算 key的hashCode,进行总的分片数求余,分配到对应的片区

    3. Spill

    在大数据的情况下进行归并,由于合并的数据量非常大,仅仅使用AppendOnlyMap进行数据的归并内存显然是不足够的,在这种情况下需要将内存里的已经归并的数据刷到磁盘上避免OOM的风险。

    控制Spill到磁盘的阀值

    • 内存:虽然Java的堆内存管理是由JVM虚拟机管控,但是Spark自己实现了一个简单的但不精准的内存管理,内存的申请在TaskMemoryManager里进行管理
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {  
         // Claim up to double our current memory from the shuffle memory pool  
         val amountToRequest = 2 * currentMemory - myMemoryThreshold  
         val granted = acquireMemory(amountToRequest)  
         myMemoryThreshold += granted  
         // If we were granted too little memory to grow further (either tryToAcquire returned 0,  
         // or we already had more memory than myMemoryThreshold), spill the current collection  
         shouldSpill = currentMemory >= myMemoryThreshold  
       }  

    在每添加32个元素的时候,检查一下当前的内存状况,currentMemory是Map当前大概使用的内存,myMemoryThreshold是可以使用的内存址,初始的时候受参数控制:

    spark.shuffle.spill.initialMemoryThreshold  
    为何要尝试申请1倍的当前内存?AppendOnlyMap的每次扩容是1倍数组

    • 数据的数量:有的时候每条数据量比较小,但是数据的数量非常大,为了避免在AppendOnlyMap里有大量的数据,在Spill的时候同时还可以使用数量的控制:
    spark.shuffle.spill.numElementsForceSpillThreshold  

    3.1 如何Spill?

     
    当从AppendOnlyMap到SpilledFile磁盘总共有3个过程
    1. 整理数组,将数组里的不存在KV的空间移除
    2. 按照区块排序,对同一区块里的Key使用TimeSort进行排序,TimeSort不在此处讨论
    3. Spill到文件的时候,只是保存了序列化了Key,Value并没有保存Key的区块信息,但在SpilledFile的对象中有记录每个partitionkey的数量的数组
    SpilledFile的命名:temp_shuffle_UUID

    4. 生成ShuffleWrite的数据文件

    在3章节的时候,有没有考虑过为何要排序完才Spill到临时文件中?
    Spark中是不要求在reduce端进行排序的,生成Shuffle的结果文件并不要求排序,但是因为Spill到文件中后,有可能相同的Key会分布在不同的文件中,所以需要对不同的文件进行相同的Key的值的计算。如果Spill到文件是乱序的,那代表在最后生成Shuffle结果的时候,还是要Load所有文件才能确定哪些Key是重复的需要做合并,这样依然面对着内存不够的情况。
    生成Shuffle文件过程实际上就是个外排序的过程。
    • 首先对AppendOnlyMap进行归并,排序
    • 开始对同一区块的进行归并
    • 将AppendOnlyMap,SpilledFile的文件进行优先级的Queue的迭代,每次迭代出所有Queue中一个最小的Key,最小的Key就是HashCode最小
    private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])  
        : Iterator[Product2[K, C]] =  
    {  
      val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)  
      type Iter = BufferedIterator[Product2[K, C]]  
      val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {  
        // Use the reverse of comparator.compare because PriorityQueue dequeues the max  
        override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)  
      })  
      heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true  
      new Iterator[Product2[K, C]] {  
        override def hasNext: Boolean = !heap.isEmpty  
      
        override def next(): Product2[K, C] = {  
          if (!hasNext) {  
            throw new NoSuchElementException  
          }  
          val firstBuf = heap.dequeue()  
          val firstPair = firstBuf.next()  
          if (firstBuf.hasNext) {  
            heap.enqueue(firstBuf)  
          }  
          firstPair  
        }  
      }  
    }  
    • 当找到一个最小的Key的时候,并不能保存到ShuffleWrite文件中,因为有可能存在相同的最小的key,所以还需要在迭代找到下一个最小的Key,如果key的hashcode相同的时候,要进行相同的Key进行合并(因为Key的排序是依赖于HashCode的大小,所以相同的最小的Key代表的是HashCode相同的Key),如果不同则保存成相同HashCode的数组,进行下一次的优先queue的查找,直到找到的Key的hashcode大于最小的Key结束
    if (!hasNext) {  
             throw new NoSuchElementException  
           }  
           keys.clear()  
           combiners.clear()  
           val firstPair = sorted.next()  
           keys += firstPair._1  
           combiners += firstPair._2  
           val key = firstPair._1  
           while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {  
             val pair = sorted.next()  
             var i = 0  
             var foundKey = false  
             while (i < keys.size && !foundKey) {  
               if (keys(i) == pair._1) {  
                 combiners(i) = mergeCombiners(combiners(i), pair._2)  
                 foundKey = true  
               }  
               i += 1  
             }  
             if (!foundKey) {  
               keys += pair._1  
               combiners += pair._2  
             }  
           }  
    • 将k,v内容写到shufflewrite的文件Shuffle_shuffleId_mapId_reduceId.data中去
    • 重复前面的行为直到所有的key被迭代结束
    • 前面的归并是以区块(Partition)为单位的,而data的文件里并没有保存区块的相关信息,但在每迭代完一个Partition的时候(SpilledFile文件里面也没有Partition的信息,但是是通过SpilledFile结构中的numPartition的数量来判断Partition的数据是否已经读完),会生成一个Segement,Segement 里记录了这个块保存在data文件里的长度
    • 最后生成Shuffle_shuffleId_mapId_reduceId.index文件,文件里记录了每个Partition在data文件中的位移
    这样一个完整的Shuffle结果写入data的逻辑执行完了

    5 总结

    • 使用AppendOnlyMap数据结构进行输入数据的合并计算
    • 输入的数据是进行分区合并计算,分区的方式是由Partitioner决定的
    • 当内存不够的时候,会进行相同区块下的数据整理排序,Spill到临时文件temp_shuffle_UUID
    • 最后对所有的数据集合(AppendOnlyMap里的数据和多个Spill的临时文件)进行区块的数据合并
    • 生成Shuffle_shuffleId_mapId_reduceId.data 分区的数据文件,Shuffle_shuffleId_mapId_reduceId.index记录分区的位置
  • 相关阅读:
    linux系统命令学习系列-用户切换命令su,sudo
    linux系统命令学习系列-用户组管理
    linux系统命令学习-用户管理
    python web开发-flask中sqlalchemy的使用
    python web开发-flask连接sqlite数据库
    python实现bt种子 torrent转magnet
    prefProvider.kt
    douyin-bot-代码
    pyadb关于python操作adb的资料
    bottle源码
  • 原文地址:https://www.cnblogs.com/itboys/p/9208033.html
Copyright © 2011-2022 走看看