zoukankan      html  css  js  c++  java
  • Apache Spark-1.0.0浅析(十):数据存储——读写操作

    “RDD是由不同的partition组成的,transformation和action是在partition上面进行的;而在storage模块内部,RDD又被视为由不同的block组成,对于RDD的存取是以block为单位进行的,本质上partition和block是等价的,只是看待的角度不同。在Spark storage模块中中存取数据的最小单位是block,所有的操作都是以block为单位进行的。”

    BlockManager中定义了三种主要的存储类型(tackyonStore暂且不做分析)

    private[storage] val memoryStore = new MemoryStore(this, maxMemory)
    private[storage] val diskStore = new DiskStore(this, diskBlockManager)
    private[storage] lazy val tachyonStore: TachyonStore

    一、DiskStore

    首先看diskStore,实例化DiskStore时带入diskBlockManager参数

    val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
        conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))

    DiskBlockManager初始化时,类中为spark.local.dir中的每个路径创建一个本地目录,在这些目录中,创建多个子目录hash存放文件,避免在顶层目录存在大的索引节点

    // Create one local directory for each path mentioned in spark.local.dir; then, inside this
      // directory, create multiple subdirectories that we will hash files into, in order to avoid
      // having really large inodes at the top level.
      private val localDirs: Array[File] = createLocalDirs()
      private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

    createLocalDir创建本地目录,每个本地目录其实是一个文件夹,文件夹以“spark-local“+”日期”+“随机整数“形式命名,block以文件的形式存放在localDir中

    private def createLocalDirs(): Array[File] = {
        logDebug("Creating local directories at root dirs '" + rootDirs + "'")
        val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
        rootDirs.split(",").map { rootDir =>
          var foundLocalDir = false
          var localDir: File = null
          var localDirId: String = null
          var tries = 0
          val rand = new Random()
          while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
            tries += 1
            try {
              localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
              localDir = new File(rootDir, "spark-local-" + localDirId)
              if (!localDir.exists) {
                foundLocalDir = localDir.mkdirs()
              }
            } catch {
              case e: Exception =>
                logWarning("Attempt " + tries + " to create local dir " + localDir + " failed", e)
            }
          }
          if (!foundLocalDir) {
            logError("Failed " + MAX_DIR_CREATION_ATTEMPTS +
              " attempts to create local dir in " + rootDir)
            System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
          }
          logInfo("Created local directory at " + localDir)
          localDir
        }
      }

    所以,DiskStoreManager中对于block的所有操作,都归结为对于File的存取操作,即维护和创建逻辑Block和物理地址的映射。默认的,一个block映射为该blockID命名的文件。

    getFile方法创建这种映射,根据filename,hash得到子文件夹,将filename映射到其中

    def getFile(filename: String): File = {
        // Figure out which local directory it hashes to, and which subdirectory in that
        val hash = Utils.nonNegativeHash(filename)
        val dirId = hash % localDirs.length
        val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    
        // Create the subdirectory if it doesn't already exist
        var subDir = subDirs(dirId)(subDirId)
        if (subDir == null) {
          subDir = subDirs(dirId).synchronized {
            val old = subDirs(dirId)(subDirId)
            if (old != null) {
              old
            } else {
              val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
              newDir.mkdir()
              subDirs(dirId)(subDirId) = newDir
              newDir
            }
          }
        }
    
        new File(subDir, filename)
      }

    存入数据时:

    BlockManager对于diskStore,根据不同的值类型,有两种操作putValues和putBytes

    image

    putValues通过blockId得到对应file,根据file创建FileOutputStream,然后序列化写入values

    override def putValues(
          blockId: BlockId,
          values: Iterator[Any],
          level: StorageLevel,
          returnValues: Boolean)
        : PutResult = {
    
        logDebug("Attempting to write values for block " + blockId)
        val startTime = System.currentTimeMillis
        val file = diskManager.getFile(blockId)
        val outputStream = new FileOutputStream(file)
        blockManager.dataSerializeStream(blockId, outputStream, values)
        val length = file.length
    
        val timeTaken = System.currentTimeMillis - startTime
        logDebug("Block %s stored as %s file on disk in %d ms".format(
          file.getName, Utils.bytesToString(length), timeTaken))
    
        if (returnValues) {
          // Return a byte buffer for the contents of the file
          val buffer = getBytes(blockId).get
          PutResult(length, Right(buffer))
        } else {
          PutResult(length, null)
        }
      }

    putBytes则通过blockId得到file, 根据file获得通道channel,然后写入bytes

    override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
        // So that we do not modify the input offsets !
        // duplicate does not copy buffer, so inexpensive
        val bytes = _bytes.duplicate()
        logDebug("Attempting to put block " + blockId)
        val startTime = System.currentTimeMillis
        val file = diskManager.getFile(blockId)
        val channel = new FileOutputStream(file).getChannel()
        while (bytes.remaining > 0) {
          channel.write(bytes)
        }
        channel.close()
        val finishTime = System.currentTimeMillis
        logDebug("Block %s stored as %s file on disk in %d ms".format(
          file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
        return PutResult(bytes.limit(), Right(bytes.duplicate()))
      }

    读取数据时:

    执行getValues和getBytes,getValues也调用getBytes

    override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
        getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
      }

    getBytes通过blockId找到文件位置,然后建立channel,对于小文件,直接读入栈中,大文件则映射到内存中

    override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
        val segment = diskManager.getBlockLocation(blockId)
        val channel = new RandomAccessFile(segment.file, "r").getChannel()
    
        try {
          // For small files, directly read rather than memory map
          if (segment.length < minMemoryMapBytes) {
            val buf = ByteBuffer.allocate(segment.length.toInt)
            channel.read(buf, segment.offset)
            buf.flip()
            Some(buf)
          } else {
            Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
          }
        } finally {
          channel.close()
        }
      }

    二、MemoryStore

    MemoryStore在内存中存放blocks,不是作为反序列化的Java对象的ArrayBuffer,就是作为序列化的ByteBuffer。与diskStore需要创建本地目录相比,memoryStore实例化时,创建一个LinkedHashMap,以此维护BlockId和block entry的映射

    case class Entry(value: Any, size: Long, deserialized: Boolean)
    
    private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)

    存入数据时:

    类似的,BlockManager对于memoryStore,也有两种操作putValues和putBytes

    image

    putValues首先估计存入实例的大小,然后调用tryToPut尝试放入内存中内存

    override def putValues(
          blockId: BlockId,
          values: ArrayBuffer[Any],
          level: StorageLevel,
          returnValues: Boolean): PutResult = {
        if (level.deserialized) {
          val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
          val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true)
          PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks)
        } else {
          val bytes = blockManager.dataSerialize(blockId, values.iterator)
          val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
          PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
        }
      }

    putBytes同样估计存入实例的大小,然后调用tryToPut尝试存入内存

    override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
        // Work on a duplicate - since the original input might be used elsewhere.
        val bytes = _bytes.duplicate()
        bytes.rewind()
        if (level.deserialized) {
          val values = blockManager.dataDeserialize(blockId, bytes)
          val elements = new ArrayBuffer[Any]
          elements ++= values
          val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
          tryToPut(blockId, elements, sizeEstimate, true)
          PutResult(sizeEstimate, Left(values.toIterator))
        } else {
          tryToPut(blockId, bytes, bytes.limit, false)
          PutResult(bytes.limit(), Right(bytes.duplicate()))
        }
      }

    两种操作最终都调用tryToPut,putLock用来确保所有的存放请求和相关block存入只被唯一的线程完成,否则,当一个线程正在向空闲内存存放block时,另一个线程可能也在用同一块空闲内存存放不同的block。调用ensureFreeSpace确保内存中有足够的空间存放block。如果memory空间充足,新建entry,加入LinkedHashMap;否则,调用dropFromMemory将block落入磁盘

    /**
       * Try to put in a set of values, if we can free up enough space. The value should either be
       * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
       * size must also be passed by the caller.
       *
       * Lock on the object putLock to ensure that all the put requests and its associated block
       * dropping is done by only on thread at a time. Otherwise while one thread is dropping
       * blocks to free memory for one block, another thread may use up the freed space for
       * another block.
       *
       * Return whether put was successful, along with the blocks dropped in the process.
       */
      private def tryToPut(
          blockId: BlockId,
          value: Any,
          size: Long,
          deserialized: Boolean): ResultWithDroppedBlocks = {
    
        /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
         * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
         * been released, it must be ensured that those to-be-dropped blocks are not double counted
         * for freeing up more space for another block that needs to be put. Only then the actually
         * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
    
        var putSuccess = false
        val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    
        putLock.synchronized {
          val freeSpaceResult = ensureFreeSpace(blockId, size)
          val enoughFreeSpace = freeSpaceResult.success
          droppedBlocks ++= freeSpaceResult.droppedBlocks
    
          if (enoughFreeSpace) {
            val entry = new Entry(value, size, deserialized)
            entries.synchronized {
              entries.put(blockId, entry)
              currentMemory += size
            }
            if (deserialized) {
              logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
                blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
            } else {
              logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
                blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
            }
            putSuccess = true
          } else {
            // Tell the block manager that we couldn't put it in memory so that it can drop it to
            // disk if the block allows disk storage.
            val data = if (deserialized) {
              Left(value.asInstanceOf[ArrayBuffer[Any]])
            } else {
              Right(value.asInstanceOf[ByteBuffer].duplicate())
            }
            val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
            droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
          }
        }
        ResultWithDroppedBlocks(putSuccess, droppedBlocks)
      }

    dropFromMemory同样调用diskstore.putValue或diskstore.putBytes将blocks存入硬盘,当然首先需要判断存储级别是否使用硬盘,最后从memoryStore中删除blockId信息。如果存储级别不使用硬盘,则直接移除blockId

    /**
       * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
       * store reaches its limit and needs to free up space.
       *
       * Return the block status if the given block has been updated, else None.
       */
      def dropFromMemory(
          blockId: BlockId,
          data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
    
        logInfo("Dropping block " + blockId + " from memory")
        val info = blockInfo.get(blockId).orNull
    
        // If the block has not already been dropped
        if (info != null)  {
          info.synchronized {
            // required ? As of now, this will be invoked only for blocks which are ready
            // But in case this changes in future, adding for consistency sake.
            if (!info.waitForReady()) {
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure. Nothing to drop")
              return None
            }
    
            var blockIsUpdated = false
            val level = info.level
    
            // Drop to disk, if storage level requires
            if (level.useDisk && !diskStore.contains(blockId)) {
              logInfo("Writing block " + blockId + " to disk")
              data match {
                case Left(elements) =>
                  diskStore.putValues(blockId, elements, level, false)
                case Right(bytes) =>
                  diskStore.putBytes(blockId, bytes, level)
              }
              blockIsUpdated = true
            }
    
            // Actually drop from memory store
            val droppedMemorySize =
              if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
            val blockIsRemoved = memoryStore.remove(blockId)
            if (blockIsRemoved) {
              blockIsUpdated = true
            } else {
              logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
            }
    
            val status = getCurrentBlockStatus(blockId, info)
            if (info.tellMaster) {
              reportBlockStatus(blockId, info, status, droppedMemorySize)
            }
            if (!level.useDisk) {
              // The block is completely gone from this node; forget it so we can put() it again later.
              blockInfo.remove(blockId)
            }
            if (blockIsUpdated) {
              return Some(status)
            }
          }
        }
        None
      }

    读取数据时:

    相应的两个方法getValues和getBytes类似,都是先从entry中获得blockId,然后从LinkedHashMap中根据BlockId得到对应的数据。getValues如下

    override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
        } else {
          val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
          Some(blockManager.dataDeserialize(blockId, buffer))
        }
      }

    getBytes如下

    override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
        val entry = entries.synchronized {
          entries.get(blockId)
        }
        if (entry == null) {
          None
        } else if (entry.deserialized) {
          Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
        } else {
          Some(entry.value.asInstanceOf[ByteBuffer].duplicate())   // Doesn't actually copy the data
        }
      }

    三、BlockManager封装

    BlockManager为我们提供了doPut和doGet方法,使用这两个方法对block进行存取操作,无需关心底层实现

    存入操作:

    对于三种不同的数据类型:Iterator, ArrayBuffer和ByteBuffer,有三种不同的put操作相对应,但是统一调用doPut方法

    def put(
          blockId: BlockId,
          values: Iterator[Any],
          level: StorageLevel,
          tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
        doPut(blockId, IteratorValues(values), level, tellMaster)
      }
     
    def put(
    blockId: BlockId,
    values: ArrayBuffer[Any],
    level: StorageLevel,
    tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
    require(values != null, "Values is null")
    doPut(blockId, ArrayBufferValues(values), level, tellMaster)
    }
     
    def putBytes(
    blockId: BlockId,
    bytes: ByteBuffer,
    level: StorageLevel,
    tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
    require(bytes != null, "Bytes is null")
    doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
    }

    doPut方法,要求blockId和storageLevel不能为空,为block创建BlockInfo实例,同时在blockInfo中将其加锁,使其他线程不能get访问此block。然后根据storageLevel将数据存储到memory或者disk上,然后markReady使其可以被其他线程读取。最后,如果level.replication大于1,调用replicate将该block复制到其他节点

    private def doPut(
          blockId: BlockId,
          data: Values,
          level: StorageLevel,
          tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
    
        require(blockId != null, "BlockId is null")
        require(level != null && level.isValid, "StorageLevel is null or invalid")
    
        // Return value
        val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    
        // Remember the block's storage level so that we can correctly drop it to disk if it needs
        // to be dropped right after it got put into memory. Note, however, that other threads will
        // not be able to get() this block until we call markReady on its BlockInfo.
        val putBlockInfo = {
          val tinfo = new BlockInfo(level, tellMaster)
          // Do atomically !
          val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
    
          if (oldBlockOpt.isDefined) {
            if (oldBlockOpt.get.waitForReady()) {
              logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
              return updatedBlocks
            }
    
            // TODO: So the block info exists - but previous attempt to load it (?) failed.
            // What do we do now ? Retry on it ?
            oldBlockOpt.get
          } else {
            tinfo
          }
        }
    
        val startTimeMs = System.currentTimeMillis
    
        // If we're storing values and we need to replicate the data, we'll want access to the values,
        // but because our put will read the whole iterator, there will be no values left. For the
        // case where the put serializes data, we'll remember the bytes, above; but for the case where
        // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
        var valuesAfterPut: Iterator[Any] = null
    
        // Ditto for the bytes after the put
        var bytesAfterPut: ByteBuffer = null
    
        // Size of the block in bytes
        var size = 0L
    
        // If we're storing bytes, then initiate the replication before storing them locally.
        // This is faster as data is already serialized and ready to send.
        val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
          // Duplicate doesn't copy the bytes, just creates a wrapper
          val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
          Future {
            replicate(blockId, bufferView, level)
          }
        } else {
          null
        }
    
        putBlockInfo.synchronized {
          logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
            + " to get into synchronized block")
    
          var marked = false
          try {
            if (level.useMemory) {
              // Save it just to memory first, even if it also has useDisk set to true; we will
              // drop it to disk later if the memory store can't hold it.
              val res = data match {
                case IteratorValues(iterator) =>
                  memoryStore.putValues(blockId, iterator, level, true)
                case ArrayBufferValues(array) =>
                  memoryStore.putValues(blockId, array, level, true)
                case ByteBufferValues(bytes) =>
                  bytes.rewind()
                  memoryStore.putBytes(blockId, bytes, level)
              }
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case Left(newIterator) => valuesAfterPut = newIterator
              }
              // Keep track of which blocks are dropped from memory
              res.droppedBlocks.foreach { block => updatedBlocks += block }
            } else if (level.useOffHeap) {
              // Save to Tachyon.
              val res = data match {
                case IteratorValues(iterator) =>
                  tachyonStore.putValues(blockId, iterator, level, false)
                case ArrayBufferValues(array) =>
                  tachyonStore.putValues(blockId, array, level, false)
                case ByteBufferValues(bytes) =>
                  bytes.rewind()
                  tachyonStore.putBytes(blockId, bytes, level)
              }
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case _ =>
              }
            } else {
              // Save directly to disk.
              // Don't get back the bytes unless we replicate them.
              val askForBytes = level.replication > 1
    
              val res = data match {
                case IteratorValues(iterator) =>
                  diskStore.putValues(blockId, iterator, level, askForBytes)
                case ArrayBufferValues(array) =>
                  diskStore.putValues(blockId, array, level, askForBytes)
                case ByteBufferValues(bytes) =>
                  bytes.rewind()
                  diskStore.putBytes(blockId, bytes, level)
              }
              size = res.size
              res.data match {
                case Right(newBytes) => bytesAfterPut = newBytes
                case _ =>
              }
            }
    
            val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
            if (putBlockStatus.storageLevel != StorageLevel.NONE) {
              // Now that the block is in either the memory, tachyon, or disk store,
              // let other threads read it, and tell the master about it.
              marked = true
              putBlockInfo.markReady(size)
              if (tellMaster) {
                reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
              }
              updatedBlocks += ((blockId, putBlockStatus))
            }
          } finally {
            // If we failed in putting the block to memory/disk, notify other possible readers
            // that it has failed, and then remove it from the block info map.
            if (!marked) {
              // Note that the remove must happen before markFailure otherwise another thread
              // could've inserted a new BlockInfo before we remove it.
              blockInfo.remove(blockId)
              putBlockInfo.markFailure()
              logWarning("Putting block " + blockId + " failed")
            }
          }
        }
        logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
    
        // Either we're storing bytes and we asynchronously started replication, or we're storing
        // values and need to serialize and replicate them now:
        if (level.replication > 1) {
          data match {
            case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
            case _ => {
              val remoteStartTime = System.currentTimeMillis
              // Serialize the block if not already done
              if (bytesAfterPut == null) {
                if (valuesAfterPut == null) {
                  throw new SparkException(
                    "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
                }
                bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
              }
              replicate(blockId, bytesAfterPut, level)
              logDebug("Put block " + blockId + " remotely took " +
                Utils.getUsedTimeMs(remoteStartTime))
            }
          }
        }
    
        BlockManager.dispose(bytesAfterPut)
    
        if (level.replication > 1) {
          logDebug("Put for block " + blockId + " with replication took " +
            Utils.getUsedTimeMs(startTimeMs))
        } else {
          logDebug("Put for block " + blockId + " without replication took " +
            Utils.getUsedTimeMs(startTimeMs))
        }
    
        updatedBlocks
      }

    读取操作:

    get首先根据blockId调用getLocal从本地获取block,如果不能得到,则调用getRemote从其他节点BlockManger获取block。“在通常情况下Spark任务的分配是根据block的分布决定的,任务往往会被分配到拥有block的节点上,因此getLocal()就能找到所需的block;但是在资源有限的情况下,Spark会将任务调度到与block不同的节点上,这样就必须通过getRemote()来获得block。”

    def get(blockId: BlockId): Option[Iterator[Any]] = {
        val local = getLocal(blockId)
        if (local.isDefined) {
          logInfo("Found block %s locally".format(blockId))
          return local
        }
        val remote = getRemote(blockId)
        if (remote.isDefined) {
          logInfo("Found block %s remotely".format(blockId))
          return remote
        }
        None
      }

    看一下getLocal,调用doGetLocal

    /**
       * Get block from local block manager.
       */
      def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
        logDebug("Getting local block " + blockId)
        doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
      }

    doGetLocal首先判断storageLevel如果使用内存,则读入block;然后判断是否使用硬盘,如果使用硬盘且使用内存,则将数据读入内存中,如果只使用硬盘不使用内存,则读取block并返回;如果不使用硬盘,block未在本地找到

    private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
        val info = blockInfo.get(blockId).orNull
        if (info != null) {
          info.synchronized {
    
            // If another thread is writing the block, wait for it to become ready.
            if (!info.waitForReady()) {
              // If we get here, the block write failed.
              logWarning("Block " + blockId + " was marked as failure.")
              return None
            }
    
            val level = info.level
            logDebug("Level for block " + blockId + " is " + level)
    
            // Look for the block in memory
            if (level.useMemory) {
              logDebug("Getting block " + blockId + " from memory")
              val result = if (asValues) {
                memoryStore.getValues(blockId)
              } else {
                memoryStore.getBytes(blockId)
              }
              result match {
                case Some(values) =>
                  return Some(values)
                case None =>
                  logDebug("Block " + blockId + " not found in memory")
              }
            }
    
            // Look for the block in Tachyon
            if (level.useOffHeap) {
              logDebug("Getting block " + blockId + " from tachyon")
              if (tachyonStore.contains(blockId)) {
                tachyonStore.getBytes(blockId) match {
                  case Some(bytes) => {
                    if (!asValues) {
                      return Some(bytes)
                    } else {
                      return Some(dataDeserialize(blockId, bytes))
                    }
                  }
                  case None =>
                    logDebug("Block " + blockId + " not found in tachyon")
                }
              }
            }
    
            // Look for block on disk, potentially storing it back into memory if required:
            if (level.useDisk) {
              logDebug("Getting block " + blockId + " from disk")
              val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
                case Some(bytes) => bytes
                case None =>
                  throw new Exception("Block " + blockId + " not found on disk, though it should be")
              }
              assert (0 == bytes.position())
    
              if (!level.useMemory) {
                // If the block shouldn't be stored in memory, we can just return it:
                if (asValues) {
                  return Some(dataDeserialize(blockId, bytes))
                } else {
                  return Some(bytes)
                }
              } else {
                // Otherwise, we also have to store something in the memory store:
                if (!level.deserialized || !asValues) {
                  // We'll store the bytes in memory if the block's storage level includes
                  // "memory serialized", or if it should be cached as objects in memory
                  // but we only requested its serialized bytes:
                  val copyForMemory = ByteBuffer.allocate(bytes.limit)
                  copyForMemory.put(bytes)
                  memoryStore.putBytes(blockId, copyForMemory, level)
                  bytes.rewind()
                }
                if (!asValues) {
                  return Some(bytes)
                } else {
                  val values = dataDeserialize(blockId, bytes)
                  if (level.deserialized) {
                    // Cache the values before returning them:
                    // TODO: Consider creating a putValues that also takes in a iterator?
                    val valuesBuffer = new ArrayBuffer[Any]
                    valuesBuffer ++= values
                    memoryStore.putValues(blockId, valuesBuffer, level, true).data match {
                      case Left(values2) =>
                        return Some(values2)
                      case _ =>
                        throw new Exception("Memory store did not return back an iterator")
                    }
                  } else {
                    return Some(values)
                  }
                }
              }
            }
          }
        } else {
          logDebug("Block " + blockId + " not registered locally")
        }
        None
      }

    再看一下getRemote,调用doGetRemote

    /**
       * Get block from remote block managers.
       */
      def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
        logDebug("Getting remote block " + blockId)
        doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
      }

    doGetRemote首先取得该block的所有location信息,然后根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求

    private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
        require(blockId != null, "BlockId is null")
        val locations = Random.shuffle(master.getLocations(blockId))
        for (loc <- locations) {
          logDebug("Getting remote block " + blockId + " from " + loc)
          val data = BlockManagerWorker.syncGetBlock(
            GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
          if (data != null) {
            if (asValues) {
              return Some(dataDeserialize(blockId, data))
            } else {
              return Some(data)
            }
          }
          logDebug("The value of block " + blockId + " is null")
        }
        logDebug("Block " + blockId + " not found")
        None
      }

    四、Partition和Block

    最后,说明Partition是如何转化为block存储的。资源调度——Task执行中曾经分析过,对于RDD的一系列transformation或action,将转化为对于partitions的tasks的执行,而最后是调用getOrCompute方法。getOrCompute首先根据RDD id和partition index构造出key(blockId),根据key从BlockManager中取出相应的block。如果该block存在,表示此RDD在之前已经被计算过并存储在BlockManager中,可以直接读取无需再重新计算。如果该block不存在,则需要调用RDD的computeOrReadCheckpoint方法,读取checkpoint或者计算得到新的block,并将其存储到BlockManager中。需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。

    END

  • 相关阅读:
    从零自学Hadoop(15):Hive表操作
    从零自学Hadoop(14):Hive介绍及安装
    从零自学Hadoop(13):Hadoop命令下
    从零自学Hadoop(12):Hadoop命令中
    angularjs不同页面间controller传参方式,使用service封装sessionStorage
    angularjs简单实现$http.post(CORS)跨域及$http.post传参方式模拟jQuery.post
    angularjs中的绑定策略“@”,“=”,“&”实例
    angularjs中ng-repeat-start与ng-repeat-end用法实例
    关于JavaScript对象的键和值
    angularjs中ng-attr的用法
  • 原文地址:https://www.cnblogs.com/kevingu/p/4770820.html
Copyright © 2011-2022 走看看