zoukankan      html  css  js  c++  java
  • 2.1.6、SparkEnv中创建ShuffleManager

    ShuffleManager负责管理本地以及远程的block数据的shuffle操作。

    ShffuleManager的创建是在SparkEnv中。

        // Let the user specify short names for shuffle managers
        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    //通过反射创建ShuffleManager val shuffleManager
    = instantiateClass[ShuffleManager](shuffleMgrClass)

    2.1.6.1、在本人的spark版本中(2.1.1)只有SortShuffleManger, 在spark1.2之前还有HashShuffleManager, 已经被移除了。

    在Spark的版本的发展,ShuffleManager在不断迭代,变得越来越先进。
    
    在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
    View Code

    接下来看看SortShuffleManger功能

    注册Shuffle

    通过manager注册shuffle,  同时获取一个handle用于发送任务

      /**
       * 注册Shuffle
       * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
       */
      override def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    
          //创建handle
        if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
            // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
            // need map-side aggregation, then write numPartitions files directly and just concatenate
            // them at the end. This avoids doing serialization and deserialization twice to merge
            // together the spilled files, which would happen with the normal code path.
            // The downside is having multiple files open at a time and thus more memory allocated to buffers.
    
            // by pass handle
          new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
          // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
            // 序列化shuffle handle
          new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
        } else {
          // Otherwise, buffer map outputs in a deserialized form:
            // 剩下就是 未序列化格式
          new BaseShuffleHandle(shuffleId, numMaps, dependency)
        }
      }
    View Code

    有三种Handle :  确定使用哪种shuffle path

    BypassMergeSortShuffleHandle

    SerializedShuffleHandle

    BaseShuffleHandle

    移除shuffle 

      通过ShuffleBlockResolver持有的blockManager.diskBlockManager定位到数据文件和索引文件, 然后删除

      /** Remove a shuffle's metadata from the ShuffleManager. */
      override def unregisterShuffle(shuffleId: Int): Boolean = {
        Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
          (0 until numMaps).foreach { mapId =>
            shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
          }
        }
        true
      }
    View Code

    getWriter

      根据给定的partition,获取一个ShuffleWriter, 在executor上被map task调用

      override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
    
          //向numMapsForShuffle中添加新的shuffleId,
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    
    
          //SparkEnv
        val env = SparkEnv.get
    
          //根据ShuffleHandle匹配对应的ShuffleWriter
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
      }
    View Code

    getReader

      创建一个BlockStoreShuffleManager 读取一个范围partition的数据, 在executor上被reduce task调用

      /**
         * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
         * Called on executors by reduce tasks.
         */
        override def getReader[K, C](
                handle: ShuffleHandle,
                startPartition: Int,
                endPartition: Int,
                context: TaskContext): ShuffleReader[K, C] = {
            new BlockStoreShuffleReader(
                handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
        }
    View Code
  • 相关阅读:
    Python-PyQt5-图形可视化界面(5)--打开文件或文件夹--QFileDialog
    python opencv图片拼接源码
    python numpy库矩阵运算的功能
    stm32定时器/定时器中断/PWM输出/输入捕获
    STM32 串口/中断
    STM32F4 IO
    STM32F4 时钟树概述
    MDK5新建工程/MDK5 使用技巧/STM32F4 在线调试
    使用 LocalDate 过滤掉工作日
    斐波拉契数列(Lambda表达式)
  • 原文地址:https://www.cnblogs.com/chengbao/p/10639889.html
Copyright © 2011-2022 走看看