zoukankan      html  css  js  c++  java
  • 2.1.6、SparkEnv中创建ShuffleManager

    ShuffleManager负责管理本地以及远程的block数据的shuffle操作。

    ShffuleManager的创建是在SparkEnv中。

        // Let the user specify short names for shuffle managers
        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    //通过反射创建ShuffleManager val shuffleManager
    = instantiateClass[ShuffleManager](shuffleMgrClass)

    2.1.6.1、在本人的spark版本中(2.1.1)只有SortShuffleManger, 在spark1.2之前还有HashShuffleManager, 已经被移除了。

    在Spark的版本的发展,ShuffleManager在不断迭代,变得越来越先进。
    
    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
    View Code

    接下来看看SortShuffleManger功能

    注册Shuffle

    通过manager注册shuffle,  同时获取一个handle用于发送任务

      /**
       * 注册Shuffle
       * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
       */
      override def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    
          //创建handle
        if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
            // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
            // need map-side aggregation, then write numPartitions files directly and just concatenate
            // them at the end. This avoids doing serialization and deserialization twice to merge
            // together the spilled files, which would happen with the normal code path.
            // The downside is having multiple files open at a time and thus more memory allocated to buffers.
    
            // by pass handle
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
            // 序列化shuffle handle
          new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // Otherwise, buffer map outputs in a deserialized form:
            // 剩下就是 未序列化格式
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
      }
    View Code

    有三种Handle :  确定使用哪种shuffle path

    BypassMergeSortShuffleHandle

    SerializedShuffleHandle

    BaseShuffleHandle

    移除shuffle 

      通过ShuffleBlockResolver持有的blockManager.diskBlockManager定位到数据文件和索引文件, 然后删除

      /** Remove a shuffle's metadata from the ShuffleManager. */
      override def unregisterShuffle(shuffleId: Int): Boolean = {
        Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
          (0 until numMaps).foreach { mapId =>
            shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
          }
        }
        true
      }
    View Code

    getWriter

      根据给定的partition,获取一个ShuffleWriter, 在executor上被map task调用

      override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
    
          //向numMapsForShuffle中添加新的shuffleId,
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    
    
          //SparkEnv
        val env = SparkEnv.get
    
          //根据ShuffleHandle匹配对应的ShuffleWriter
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
      }
    View Code

    getReader

      创建一个BlockStoreShuffleManager 读取一个范围partition的数据, 在executor上被reduce task调用

      /**
         * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
         * Called on executors by reduce tasks.
         */
        override def getReader[K, C](
                handle: ShuffleHandle,
                startPartition: Int,
                endPartition: Int,
                context: TaskContext): ShuffleReader[K, C] = {
            new BlockStoreShuffleReader(
                handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
        }
    View Code
  • 相关阅读:
    php多态
    ssl certificate problem: self signed certificate in certificate chain
    test plugin
    open specific port on ubuntu
    junit vs testng
    jersey rest service
    toast master
    use curl to test java webservice
    update folder access
    elk
  • 原文地址:https://www.cnblogs.com/chengbao/p/10639889.html
Copyright © 2011-2022 走看看