zoukankan      html  css  js  c++  java
  • Block Manager

    在Spark中,将数据抽象为Block(不论是shuffle数据,还是节点本身存储的数据),而每个driver/executor中的block都是由`BlockManager`这个类来负责管理的。对于block实际的读取,`BlockManager`根据block存储位置的不同是交由`ShuffleManager`, `MemoryStore`或`DiskStore`来实际处理的。`BlockManager`管理的block可能是shuffle后的文件,也可能是缓存的数据。对于shuffle文件中的block是通过`ShuffleManager`来管理的。对于节点缓存的数据,对于保存在内存中的block,由`MemoryStore`来管理,对于保存在磁盘的block,由`DiskStore`来管理。Spark在不开启`spark.shuffle.service.enabled`(设为false)配置的情况下,结构如下:

    在开启`spark.shuffle.service.enabled`(设为true)配置的情况下,结构如下:

    可见在Spark中,每一个driver/executor节点都有一个`BlockManager`与之对应,用来管理节点数据以及向其他节点请求/返回数据。`BlockManager`中主要与两类节点进行通信:一种是executor中的`BlockManager`需要与driver节点通信,来上报executor中`BlockManager`管理的block,同时接受来自driver节点操作executor上block的指令(图中与executor与driver之间的连线)。另一种是executor节点之间的`BlockManager`需要相互通信,来从彼此获取需要的block(图中executor之间,以及executor与`ExternalShuffleService`之间的连线)。

    数据结构

    在正式介绍Spark的存储结构之前,先来了解一下Spark存储结构中的数据结构,为下文的介绍打下基础。

    BlockManagerId

    在Spark中,每个`BlockManager`都有一个唯一的id与之对应,而这个id不是一个普通的string或者long型,而是一个特殊的数据结构`BlockManagerId`:

    class BlockManagerId private (
        private var executorId_ : String,
        private var host_ : String,
        private var port_ : Int)
    

    可见`BlockManagerId`中保存了executorId(executor节点的唯一标识),host(executor节点的地址),port(`NettyBlockTransferService`的端口(`NettyBlockTransferService`在`BlockManager`中称为ShuffleClient,是用来向其他节点请求/提供Block数据的server,关于`NettyBlockTransferService`的分析见下文))

    BlockManagerInfo

    `BlockManagerInfo`用来记录`BlockManager`的元数据:

    class BlockManagerInfo(
        val blockManagerId: BlockManagerId,
        timeMs: Long,
        val maxMem: Long,
        val slaveEndpoint: RpcEndpointRef)
    

    其中blockManagerId为上文介绍的`BlockManagerId`类型,timeMs为`BlockManager`注册到driver的时间,maxMem为`BlockManager`中`MemoryManager`管理的最大可用的堆内Storage内存大小(关于`MemoryManager`的分析见【Spark内存管理】),slaveEndpoint为`BlockManager`的`BlockManagerSlaveEndpoint`对应的句柄`RpcEndpointRef`(`BlockManagerSlaveEndpoint`分析见下文)。

    BlockId

    `BlockId`用来唯一标识Spark中的一个Block:

    sealed abstract class BlockId {
      // 全局唯一的块标识符,用来序列化/反序列化块
      def name: String
    
      // convenience methods
      def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
      def isRDD: Boolean = isInstanceOf[RDDBlockId]
      def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
      def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
    
      override def toString: String = name
      override def hashCode: Int = name.hashCode
      override def equals(other: Any): Boolean = other match {
        case o: BlockId => getClass == o.getClass && name.equals(o.name)
        case _ => false
      }
    }
    

    这个类是一个抽象类,在Spark中有10种类型的BlockId,如`RDDBlockId`,`ShuffleBlockId`,`TaskResultBlockId`等。这些子类都继承了`BlockId`并重写了name字段。在Spark中实际就是靠name字段的不同来区分不同类型的`BlockId`。

    BlockInfo

    `BlockInfo`用来保存Block的元数据:

    class BlockInfo(
        val level: StorageLevel,
        val classTag: ClassTag[_],
        val tellMaster: Boolean)
    

    其中level保存了Block的存储级别(存储级别见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)),classTag保存了Block用来序列化/反序列化的类型,tellMaster用来标示在Block状态发生变化时是否要上报给driver。

    Block数据管理

    Block数据管理,包括executor向driver上报`BlockManager`所管理的Block元数据,executor向driver请求获取/更新Block元数据,driver向executor发送指令来删除某个Block,driver向executor返回Block元数据信息等。可见Block数据管理是executor与driver间的相互通信。还记得我们在【Spark Rpc分析】中的介绍,Spark中应用程序要实现节点间的通信只需要实现`RpcEndpoint`类型即可,在这里也不例外。其中在driver节点负责通信的类为`BlockManagerMasterEndpoint`。而在executor节点负责通信的类为`BlockManagerSlaveEndpoint`。这两个类都是在driver/executor启动时注册到各自的`RpcEnv`中,并向其他节点暴露`RpcEndpointRef`句柄。这样executor就可以使用`BlockManagerMasterEndpoint`的`RpcEndpointRef`向driver发送请求,driver可以使用executor `BlockManagerSlaveEndpoint`的`RpcEndpointRef`向executor发送响应。需要注意的是,由于Spark只在driver节点才会建立`TransportServer`,而不会在executor建立`TransportServer`,所以driver是不会主动向executor建立连接发送请求。而只会executor主动向driver建立连接发送请求,driver在收到请求后利用建立的连接向executor发送请求/响应。下面详细分析一下Block数据管理中这两个核心的`RpcEndpoint`类型:

    BlockManagerMasterEndpoint

    `BlockManagerMasterEndpoint`类型是注册在driver节点上的。在`BlockManagerMasterEndpoint`中维护了几个map用来管理与Block相关的映射关系:

    // 保存BlockManagerId到BlockManager的映射
    private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
    
    // 保存executorId到BlockManagerId的映射
    private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
    
    // 保存BlockId到BlockManagerId set的映射
    private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
    

    通过blockMangerInfo,driver可以根据`BlockManagerId`找到对应`BlockManager`的元数据`BlockManagerInfo`,而我们知道`BlockManagerInfo`中包含了`BlockManager`与driver通信的`RpcEndpointRef`句柄,driver就可以通过这个与executor的`BlockManager`进行通信。通过blockManagerIdByExecutor,driver可以根据executorId找到对应的`BlockManagerId`。通过blockLocations,driver维护了一个`BlockId`所有副本在集群中的位置,这样在executor发现自身没有某个Block时,可以从driver获取到集群中保存了这个缺少的Block的节点列表,之后executor就可以向保存了Block的节点发送请求来获取Block数据。

    BlockManagerSlaveEndpoint

    `BlockManagerSlaveEndpoint`类是executor或driver上的`BlockManager`用来接收来自driver `BlockManagerMasterEndpoint`发来的指令的类。主要用来删除`BlockManager`管理的Block,以及返回Block状态等,`BlockManagerSlaveEndpoint`可以处理的消息类型很多,这里我们以`RemoveBlock`消息为例:

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      case RemoveBlock(blockId) =>
        doAsync[Boolean]("removing block " + blockId, context) {
        blockManager.removeBlock(blockId)
        true
      }
    }
    
    private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {
      val future = Future {
        logDebug(actionMessage)
        body
      }
      future.onSuccess { case response =>
        logDebug("Done " + actionMessage + ", response is " + response)
        context.reply(response)
        logDebug("Sent response: " + response + " to " + context.senderAddress)
      }
      future.onFailure { case t: Throwable =>
        logError("Error in " + actionMessage, t)
        context.sendFailure(t)
      }
    }
    

    可见其中的很多方法都调用了`BlockManagerSlaveEndpoint.doAsync`方法,用来实现异步调用,在处理完成时,调用对应的`Future.onSuccess`或`Future.onFailure`方法向driver上报处理结果。

    BlockManagerMaster

    在Spark中,executor/driver的`BlockManager`在向driver的`BlockManagerMasterEndpoint`发送请求时,不是直接使用`BlockManagerMasterEndpoint`的句柄`RpcEndpointRef`来发送,而是调用了`BlockManagerMaster`这个类来发送请求。`BlockManagerMaster`可以看做是对发送方法做了一层封装,其中的方法绝大部分是阻塞请求,即在发送请求后需要等待driver返回响应才会返回。

    总结一下Block数据管理,Block数据管理是executor/driver中的`BlockManager`需要将管理的Block信息上报给driver,在driver中维护了集群中所有Block与executor对应关系,Block所在位置等信息。集群中节点都是通过driver保存的Block信息来实现集群Block相互发现的。Block数据管理主要是网络通信,其中`BlockManager`使用`BlockManagerMaster`来向driver发送请求。driver使用`BlockManagerMasterEndpoint`来接收来自`BlockManager`的请求,以及向`BlockManager`发送响应/指令。`BlockManager`使用`BlockManagerSlaveEndpoint`来接收来自driver的指令。他们的关系如下图所示:

    `BlockManager`通过`BlockManagerMaster`将Block元数据上报给driver的`BlockManagerMasterEndpoint`,driver使用`BlockManagerSlaveEndpoint`的句柄向`BlockManager`发送指令。需要注意的是driver不会主动与executor的`BlockManager`建立连接,只有当executor的`BlockManager`主动与driver建立连接发送请求后,driver才可以使用这个建立的连接向executor的`BlockManager`发送指令。

    Block数据传输

    `BlockManager`管理的Block按照产生来源可以分为两大类:一类是缓存类型的数据(不论是存在内存中还是磁盘上),一类是在进行map-reduce操作时,map阶段生成的shuffle数据。对于缓存类型的Block,可以保存在内存中,也可以保存在磁盘上。而对于shuffle类型的Block,都是保存在磁盘上。Spark使用`MemoryStore`来管理保存在内存的数据(数据的增删改查),使用`DiskStore`来管理保存在磁盘的数据(数据的增删改查)。`BlockManager`针对Block的在内存或磁盘的管理实质上都是委托给`MemoryStore`和`DiskStore`来实现的。

    ShuffleClient

    在Spark的`BlockManager`中`ShuffleClient`是executor用来从其他executor获取shuffle后的map文件。

    `ShuffleClient`是一个抽象类,在Spark中有两种实现:`NettyBlockTransferService`和`ExternalShuffleClient`。还记得我们之前所说,`BlockManager`管理了两类数据:缓存数据和shuffle数据。其中`NettyBlockTransferService`不仅是可以请求/返回shuffle数据,还用来请求/返回缓存数据(如果整个action都没有进行缓存,则最终会触发从数据源获取数据)。而`ExternalShuffleClient`是与`ExternalShuffleService`搭配使用的。 `ExternalShuffleClient`只用来向`ExternalShuffleService`请求shuffle数据。而`ExternalShuffleService`是每台机器在启动时启动的server,用来管理这台机器上所有executor产生的shuffle数据。

    那么Spark中为何要实现这两种`ShuffleClient`呢,都是用`NettyBlockTransferService`不就好了?这里我们要说明一下,`NettyBlockTransferService`的生命周期与其所在的executor的生命周期相同,也就是说当executor退出时,`NettyBlockTransferService`也就退出了,不能再向其他节点返回其所管理的shuffle数据或缓存数据。而`ExternalShuffleClient`只是用来向`ExternalShuffleService`请求数据的客户端,虽然`ExternalShuffleClient`随着executor的退出而退出,但是由于shuffle数据是单独由独立于executor的`ExternalShuffleService`管理的,所以其他节点的`ExternalShuffleClient`还可以向`ExternalShuffleService`来获取数据。也就是说`NettyBlockTransferService`可以看做是内嵌在executor中的一个服务器,而`ExternalShuffleClient`配套使用的`ExternalShuffleService`可以看做是独立于executor的服务器,可以在executor退出后依旧提供服务。这里可能读者会有疑问了,executor难道不都是在整个job执行完成后退出的吗,executor退出了,job不也执行完成了吗,使用独立于executor的服务器的意义何在呢?

    我们知道Spark的几种集群部署方式(standalone,yarn,mesos)都是支持资源的动态调整的(可以根据集群情况动态的增加或减少executor数量),也就是说在job没有执行完之前,之前使用的executor就可能退出了。这样就会遇到当集群资源充足需要回收executor时,导致executor上的数据丢失。对于executor的`BlockManager`中缓存的数据还好,可以在下次计算时从上游计算出来再次保存到新的executor上。而对于shuffle类型的数据,executor的退出导致shuffle数据的丢失对性能的影响就比较大了。我们知道map-reduce操作是一个极其耗时间的操作,如果有shuffle的文件丢失,会导致shuffle到这个退出节点的executor的操作重算。为了避免这种情况,Spark提供了独立于executor的外部shuffle管理器`ExternalShuffleService`(这个管理器在每个机器上启动一个,而不是每一个executor上,详见[dynamic-resource-allocation](http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation))。如果配置了\`spark.shuffle.service.enabled=true\`,则在\`BlockManager\`创建时,会将\`BlockManager\`的\`DiskStore\`管理的目录相关信息上报给机器上的\`ExternalShuffleService\`,目录信息最终会保存在\`ExternalShuffleBlockResolver\`的map中。调用链路如下:

    BlockManager.registerWithExternalShuffleService -> ExternalShuffleClient.registerWithShuffleServer -> 向ExternalShuffleService发送RegisterExecutor消息 -> ExternalShuffleBlockHandler.receive -> ExternalShuffleBlockResovler.registerExecutor
    

    最终消息是保存在`ExternalShuffleBlockResovler`中的executors这个map中:

    final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
    
    public static class AppExecId {
      public final String appId;
      public final String execId;
    }
    
    public class ExecutorShuffleInfo implements Encodable {
      /** The base set of local directories that the executor stores its shuffle files in. */
      public final String[] localDirs;
      /** Number of subdirectories created within each localDir. */
      public final int subDirsPerLocalDir;
      /** Shuffle manager (SortShuffleManager) that the executor is using. */
      public final String shuffleManager;
    }
    

    而在shuffle阶段,在执行reduce任务的executor会调用`BlockManager.shuffleClient`来向执行map任务的executor获取shuffle文件,这时就会根据`ShuffleClient`的不同,来决定是调用`NettyBlockTransferService`中的方法还是`ExternalShuffleClient`中的方法来从其他executor还是机器单独的`ExternalShuffleService`获取shuffle文件。

    Block保存

    在`BlockManager`中提供了序列化数据的保存方法:`putBytes`,非序列化数据的保存方法:`putIterator`以及Block数据的保存方法:`putBlockData`三种方法,其中`putBlockData`实际就是调用`putBytes`方法。

    上文我们提到,Spark可以通过配置来设置数据的保存级别(详见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence))

    在保存序列化数据的方法`putBytes`方法中,直接调用了`doPutBytes`,所以让我们直接看一下`doPutBytes`方法:

    private def doPutBytes[T](
          blockId: BlockId,
          bytes: ChunkedByteBuffer,
          level: StorageLevel,
          classTag: ClassTag[T],
          tellMaster: Boolean = true,
          keepReadLock: Boolean = false): Boolean = {
      // 省略非关键代码和异常检查
      doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
        // 由于存储的是bytes(序列化的数据),这里在将数据存储到本地之前就将副本写出
        // 由于数据已经是序列化的,所以发送数据会很快
        val replicationFuture = if (level.replication > 1) {
          Future {
            replicate(blockId, bytes, level, classTag)
          }(futureExecutionContext)
        } else {
          null
        }
        val size = bytes.size
        // 存储级别使用内存存储
        if (level.useMemory) {
          // Put it in memory first, even if it also has useDisk set to true;
          // We will drop it to disk later if the memory store can't hold it.
          // 如果存储时需要反序列化,则首先反序列化数据,并调用memoryStore的putIteratorAsValues方法
          val putSucceeded = if (level.deserialized) {
            val values = serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
            memoryStore.putIteratorAsValues(blockId, values, classTag) match {
              case Right(_) => true
              case Left(iter) =>
                // If putting deserialized values in memory failed, we will put the bytes directly to
                // disk, so we don't need this iterator and can close it to free resources earlier.
                iter.close()
                false
              }
            } else {
              // 不需要反序列化则调用memoryStore的putBytes直接存储
              memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
            }
          // 如果保存到内存上失败,并且存储级别包括磁盘,则将数据直接存到磁盘上(序列化的数据)
          if (!putSucceeded && level.useDisk) {
            diskStore.putBytes(blockId, bytes)
          }
        } else if (level.useDisk) {
          // 如果存储级别只包括磁盘,则直接存到磁盘上
          diskStore.putBytes(blockId, bytes)
        }
        // 获取存储状态,如果成功向master会报存储状态
        val putBlockStatus = getCurrentBlockStatus(blockId, info)
        val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
        if (blockWasSuccessfullyStored) {
          info.size = size
          if (tellMaster) {
            reportBlockStatus(blockId, info, putBlockStatus)
          }
        }
        // 如果需要写副本,则在这里等待上文调用方法的回调
        if (level.replication > 1) {
          // Wait for asynchronous replication to finish
            Await.ready(replicationFuture, Duration.Inf)
        }
      }.isEmpty
    }
    

    可见`doPutBytes`流程如下:

    1. 如果当前配置的存储级别为需要副本,则首先启动一个异步线程在集群中随机选取另一个节点,将数据备份到另一个节点上

    2. 如果当前配置的存储级别包含内存存储,则首先尝试将数据保存到内存中。如果数据在内存中需要以非序列化形式保存,则要先将数据反序列化。如果保存到内存失败(如内存空间不足),且存储级别包含磁盘存储,则将数据保存到磁盘上

    3. 如果当前配置的存储级别只包含磁盘存储,则直接将数据写到磁盘上

    4. 在保存成功后,如果需要向driver节点上报保存结果,则需要将保存的Block信息上报给driver

    5. 如果当前配置的存储级别为需要副本,则在方法返回前需要等待数据备份的完成

    在保存非序列化的方法`putIterator`中,实际是调用`doPutIterator`方法:

    private def doPutIterator[T](
          blockId: BlockId,
          iterator: () => Iterator[T],
          level: StorageLevel,
          classTag: ClassTag[T],
          tellMaster: Boolean = true,
          keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
      // 省略非关键代码和异常检查
      doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
        var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
        // Size of the block in bytes
        var size = 0L
        if (level.useMemory) {
          // Put it in memory first, even if it also has useDisk set to true;
          // We will drop it to disk later if the memory store can't hold it.
          if (level.deserialized) {
            memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
              case Right(s) =>
                size = s
              case Left(iter) =>
                // Not enough space to unroll this block; drop to disk if applicable
                if (level.useDisk) {
                  diskStore.put(blockId) { fileOutputStream =>
                      serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
                  }
                  size = diskStore.getSize(blockId)
                } else {
                  iteratorFromFailedMemoryStorePut = Some(iter)
                }
              }
          } else { // !level.deserialized
            // 如果需要序列化存储,则调用memoryStore的putIteratorAsBytes方法
            memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
              case Right(s) =>
                  size = s
              case Left(partiallySerializedValues) =>
                // Not enough space to unroll this block; drop to disk if applicable
                if (level.useDisk) {
                  diskStore.put(blockId) { fileOutputStream =>
                    partiallySerializedValues.finishWritingToStream(fileOutputStream)
                  }
                  size = diskStore.getSize(blockId)
                } else {
                  iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
                }
            }
          }
        } else if (level.useDisk) {
          // 首先将数据序列化,然后在存到磁盘上
          diskStore.put(blockId) { fileOutputStream =>
            serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
        }
          size = diskStore.getSize(blockId)
        }
        val putBlockStatus = getCurrentBlockStatus(blockId, info)
        val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
        if (blockWasSuccessfullyStored) {
          // Now that the block is in either the memory, externalBlockStore, or disk store,
          // tell the master about it.
          info.size = size
          if (tellMaster) {
            reportBlockStatus(blockId, info, putBlockStatus)
          }
          if (level.replication > 1) {
            val remoteStartTime = System.currentTimeMillis
            val bytesToReplicate = doGetLocalBytes(blockId, info)
            try {
              replicate(blockId, bytesToReplicate, level, classTag)
            } finally {
              bytesToReplicate.dispose()
            }
          }
        }
        iteratorFromFailedMemoryStorePut
      }
    }
    

    可见`doPutIterator`的流程如下:

    1. 如果当前配置的存储级别包含内存存储,则首先尝试将数据保存到内存中。如果内存中数据需要序列化存储,则首先将数据序列化,否则直接尝试将数据保存到内存。如果保存失败(说明内存空间不足),且当前配置的存储级别包含磁盘存储,则首先将数据序列化后,再保存到磁盘

    2. 如果当前存储级别只包含内存存储,则将数据序列化后保存到磁盘

    3. 在数据保存成功后,如果需要将结果上报给driver,则将保存的Block信息上报给driver

    4. 在数据保存成功后,如果配置的存储级别需要备份,则需要在集群中随机选择另一个节点,将数据备份到另一个节点上

    无论是`doPutBytes`还是`doPutIterator`方法中,我们看到实际都是调用了`MemoryStore`或`DiskStore`类来保存数据。对于这两个类以及对应的内存存储/磁盘存储分析见文章【内存存储】和【磁盘存储】。存储级别详见[rdd-persistence](http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)

    Block读取

    `BlockManager`提供了五种获取Block的方法:`getBlockData`(获取本地的Block数据),`getLocalValues`(获取本地的非序列化数据),`getLocalBytes`(获取本地的序列化数据),`getRemoteValues`(获取远端的非序列化数据),`getRemoteBytes`(获取远端的序列化数据)。其中`getBlockData`实际是调用了`getLocalBytes`方法,而`getRemoteValues`方法实际是在调用`getRemoteBytes`方法获取序列化数据后,进行反序列化并返回。所以下面我们主要看一下`getLocalValues`,`getLocalBytes`和`getRemoteBytes`这三个方法。

    首先我们看一下`getLocalValues`方法:

    def getLocalValues(blockId: BlockId): Option[BlockResult] = {
      // 省略非关键代码和异常检查
      val level = info.level
      if (level.useMemory && memoryStore.contains(blockId)) {
        val iter: Iterator[Any] = if (level.deserialized) {
          memoryStore.getValues(blockId).get
        } else {
          serializerManager.dataDeserializeStream(
              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
        }
        val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
        Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
      } else if (level.useDisk && diskStore.contains(blockId)) {
        val iterToReturn: Iterator[Any] = {
          val diskBytes = diskStore.getBytes(blockId)
          if (level.deserialized) {
            val diskValues = serializerManager.dataDeserializeStream(
              blockId,
              diskBytes.toInputStream(dispose = true))(info.classTag)
            maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
          } else {
            val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
              .map {_.toInputStream(dispose = false)}
              .getOrElse { diskBytes.toInputStream(dispose = true) }
            serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
          }
        }
        val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
        Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
      } else {
        handleLocalReadFailure(blockId)
      }
    }
    

    可见`getLocalValues`流程如下:

    1. 如果配置的存储级别包含内存存储,且内存中存在查询的Block,则直接从内存查询。如果内存中的数据是以序列化方式保存的,则在返回之前需要反序列化数据,否则直接返回

    2. 如果配置的存储级别只包含磁盘存储,且磁盘中存在查询的Block,则直接从磁盘查询。在查出数据后,需要将数据缓存到内存中。如果内存中的数据以非序列化的方式保存,则首先需要将查出的数据序列化,否则直接保存到内存中。最后将查出的数据反序列化后返回

    让我们在看一下`getLocalBytes`方法:

    def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
      // 省略非关键代码和异常检查
      // As an optimization for map output fetches, if the block is for a shuffle, return it
      // without acquiring a lock; the disk store never deletes (recent) items so this should work
      if (blockId.isShuffle) {
        val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
        // TODO: This should gracefully handle case where local block is not available. Currently
        // downstream code will throw an exception.
        Option(new ChunkedByteBuffer(
          shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
      } else {
        blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
      }
    }
    
    private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
      val level = info.level
      // In order, try to read the serialized bytes from memory, then from disk, then fall back to
      // serializing in-memory objects, and, finally, throw an exception if the block does not exist.
      if (level.deserialized) {
        // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
        if (level.useDisk && diskStore.contains(blockId)) {
          // Note: we purposely do not try to put the block back into memory here. Since this branch
          // handles deserialized blocks, this block may only be cached in memory as objects, not
          // serialized bytes. Because the caller only requested bytes, it doesn't make sense to
          // cache the block's deserialized objects since that caching may not have a payoff.
          diskStore.getBytes(blockId)
        } else if (level.useMemory && memoryStore.contains(blockId)) {
          // The block was not found on disk, so serialize an in-memory copy:
          serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
        } else {
          handleLocalReadFailure(blockId)
        }
      } else {  // storage level is serialized
        if (level.useMemory && memoryStore.contains(blockId)) {
          memoryStore.getBytes(blockId).get
        } else if (level.useDisk && diskStore.contains(blockId)) {
          val diskBytes = diskStore.getBytes(blockId)
          maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
        } else {
          handleLocalReadFailure(blockId)
        }
      }
    }
    

    在`getLocalBytes`方法中首先会判断查询的Block是否是shuffle类型的Block。如果是shuffle类型的Block,则直接调用`IndexShuffleBlockResolver`来查询(关于`IndexShuffleBlockResolver`的详细介绍见文章【Shuffle详解】)。如果不是shuffle类型的Block,则调用`doGetLocalBytes`方法。`doGetLocalBytes`方法流程如下:

    1. 如果内存中的存储级别为非序列化,且如果当前配置的存储级别包含磁盘存储,且磁盘中包含查询的Block,则直接从磁盘查询并返回(这里直接查磁盘,而不是内存是为了减少从内存读取时序列化数据的时间)

    2. 如果内存中存储的级别为非序列化,且只有内存中包含查询的Block,则从内存中查询数据,并进行序列化后返回

    3. 如果内存中的存储级别为序列化,且在内存中包含查询的Block,则直接读取内存中的数据并返回。否则如果存储级别包含磁盘存储,且磁盘中包含Block,则从磁盘中读取数据,并将数据缓存到内存中,并返回

    最后让我们看一下`getRemoteBytes`方法:

    def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
      // 省略非关键代码和异常检查
      var runningFailureCount = 0
      var totalFailureCount = 0
      val locations = getLocations(blockId)
      val maxFetchFailures = locations.size
      var locationIterator = locations.iterator
      while (locationIterator.hasNext) {
        val loc = locationIterator.next()
        // 直接通过一次网络请求获取到需要的blockId的数据
        val data = try {
          blockTransferService.fetchBlockSync(
            loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
        } catch {
          case NonFatal(e) =>
            runningFailureCount += 1
            totalFailureCount += 1
    
            // 如果失败次数超过了含有块的地址数量,则直接抛出异常
            if (totalFailureCount >= maxFetchFailures) {
              // Give up trying anymore locations. Either we've tried all of the original locations,
              // or we've refreshed the list of locations from the master, and have still
              // hit failures after trying locations from the refreshed list.
              throw new BlockFetchException(s"Failed to fetch block after" +
                s" ${totalFailureCount} fetch failures. Most recent failure cause:", e)
            }
            // If there is a large number of executors then locations list can contain a
            // large number of stale entries causing a large number of retries that may
            // take a significant amount of time. To get rid of these stale entries
            // we refresh the block locations after a certain number of fetch failures
            // 如果失败次数超过了规定的最大尝试次数,则重新获取块的位置信息
            if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
              locationIterator = getLocations(blockId).iterator
              runningFailureCount = 0
            }
            // This location failed, so we retry fetch from a different one by returning null here
            null
          }
          if (data != null) {
            return Some(new ChunkedByteBuffer(data))
          }
        }
        None
      }
    

    可见`getRemoteBytes`的执行流程如下:

    1. 首先调用`getLocations`方法,从driver节点获取保存了查询的Block的节点列表,之后选择第一个节点调用`BlockTransferService.fetchBlockSync`方法向这个节点请求数据,如果请求成功则直接返回

    2. 如果请求失败,重新向driver请求保存了查询的Block的节点列表,然后执行`BlockTransferService.fetchBlockSync`方法,请求数据

    3. 如果重试次数超过重试上限,则直接返回空数据

    总结一下Block数据传输:在Spark中,`BlockManager`管理两类的数据,即shuffle数据和缓存数据。其中shuffle数据来自Spark的shuffle操作(也称为map-reduce操作)中map阶段产生的shuffle文件,缓存数据来自用户程序中调用cache或checkpoint方法时将来自上游RDD计算出的数据缓存到本地内存或磁盘,以便在之后迭代时,无需再执行上游RDD生成数据的计算过程。由于`BlockManager`随着所在的executor退出而终止,所以在可以动态分配资源的集群中(如standalone,yarn,mesos),Spark可以在每台机器上启动一个专门管理shuffle数据的服务`ExternalShuffleService`,这样所有读取shuffle文件的操作实际都通过向`ExternalShuffleService`请求来获取的。在`BlockManager`中提供了对于非序列化数据/序列化数据的读写接口,并通过底层的`MemoryStore`和`DiskStore`来实际保存和读取数据。在`BlockManager`中如果设置了需要副本的存储级别,则在保存数据到本节点的同时,需要向集群的另外一个节点备份写入的数据。

  • 相关阅读:
    DOM几个重要的函数
    手指点赞动画
    随机颜色值
    自定义单选框radio样式
    判断是否是微信浏览器的函数
    JAVA开发微信支付-公众号支付/微信浏览器支付(JSAPI)
    微信授权获取用户openid前端实现
    CSS动画 animation与transition
    JS判断指定dom元素是否在屏幕内的方法实例
    希尔伯特曲线
  • 原文地址:https://www.cnblogs.com/cenglinjinran/p/8476639.html
Copyright © 2011-2022 走看看