zoukankan      html  css  js  c++  java
  • Spark源代码阅读笔记之DiskStore

    Spark源代码阅读笔记之DiskStore

    BlockManager底层通过BlockStore来对数据进行实际的存储。BlockStore是一个抽象类,有三种实现:DiskStore(磁盘级别的持久化)、MemoryStore(内存级别的持久化)和TachyonStore(Tachyon内存分布式文件系统级别的持久化)。

    DiskStore通过DiskBlockManager来实现Block和相应磁盘文件的映射关系,从而将Block存储到磁盘的文件里。DiskBlockManager依据YARN_LOCAL_DIRSLOCAL_DIRS(yarn模式),SPARK_LOCAL_DIRSspark.local.dir(其它模式,默认值System.getProperty(“java.io.tmpdir“))配置的本地根文件夹(可能有多个,以逗号分隔)来生成DiskStore存放Block的根文件夹(与配置的根文件夹相应,也有可能有多个):…/blockmgr-UUID.randomUUID.toString(yarn模式)或…/spark-UUID.randomUUID.toString/blockmgr-UUID.randomUUID.toString(其它模式)。

    同一时候DiskBlockManager会为每一个根文件夹生成conf.getInt(“spark.diskStore.subDirectories“, 64)个子文件夹用来存放Block相应的文件。每一个Block会依据它的name哈希到相应的子文件夹,然后以Block的name为文件名称来生成文件存储。

    Creates and maintains the logical mapping between logical blocks and physical on-disk locations. By default, one block is mapped to one file with a name given by its BlockId. However, it is also possible to have a block map to only a segment of a file, by calling mapBlockToFileSegment().
    Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS, if it’s set).

    DiskBlockManager属性

    • blockManager:BlockManager

    • subDirsPerLocalDir:Int = blockManager.conf.getInt(“spark.diskStore.subDirectories“, 64)
      每一个本地根文件夹生成子文件夹的个数,生成子文件夹是为了避免生成过多的索引节点

      Create one local directory for each path mentioned in spark.local.dir; then, inside this directory, create multiple subdirectories that we will hash files into, in order to avoid having really large inodes at the top level.

    • localDirs:Array[File]
      存放Block相应的File的本地根文件夹,依据依据YARN_LOCAL_DIRSLOCAL_DIRS(yarn模式), SPARK_LOCAL_DIRSspark.local.dir(其它模式。默认值System.getProperty(“java.io.tmpdir”))配置生成

    • subDirs:Array[File](localDirs.lenght)(subDirsPerLocalDir)
      存放全部子文件夹的二维数组

    DiskBlockManager方法

    • getFile(filename: String): File
      依据文件名称,取得文件。该方法先将filename哈希到相应的子文件夹(subDirs[hash % localDirs.length][(hash / localDirs.length) % subDirsPerLocalDir])。然后推断子文件夹是否存在,若不存在则生成
    /** Looks up a file by hashing it into one of our local subdirectories. */
    def getFile(filename: String): File = {
        // Figure out which local directory it hashes to, and which subdirectory in that
        val hash = Utils.nonNegativeHash(filename)
        val dirId = hash % localDirs.length
        val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    
        // Create the subdirectory if it doesn't already exist
        var subDir = subDirs(dirId)(subDirId)
        if (subDir == null) {
          subDir = subDirs(dirId).synchronized {
            val old = subDirs(dirId)(subDirId)
            if (old != null) {
              old
            } else {
              val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
              if (!newDir.exists() && !newDir.mkdir()) {
                throw new IOException(s"Failed to create local dir in $newDir.")
              }
              subDirs(dirId)(subDirId) = newDir
              newDir
            }
          }
        }
    
        new File(subDir, filename)
      }
    • getFile(blockId: BlockId): File = getFile(blockId.name)
      依据BlockId取得相应的File

    • containsBlock(blockId: BlockId): Boolean = getFile(blockId.name).exists()
      推断BlockId是否有存储在该本地磁盘

    • getAllFiles(): Seq[File]
      取得存储的全部的文件

      /** List all the files currently stored on disk by the disk manager. */
      def getAllFiles(): Seq[File] = {
      // Get all the files inside the array of array of directories
      subDirs.flatten.filter(_ != null).flatMap { dir =>
        val files = dir.listFiles()
        if (files != null) files else Seq.empty
      }
      }
    • getAllBlocks(): Seq[BlockId] = getAllFiles().map(f => BlockId(f.getName))
      取得存储的全部Block的BlockId

    • createTempLocalBlock(): (TempLocalBlockId, File)
      创建本地暂时文件

      /** Produces a unique block id and File suitable for storing local intermediate results. */
      def createTempLocalBlock(): (TempLocalBlockId, File) = {
        var blockId = new TempLocalBlockId(UUID.randomUUID())
        while (getFile(blockId).exists()) {
          blockId = new TempLocalBlockId(UUID.randomUUID())
        }
        (blockId, getFile(blockId))
      }
    • createTempShuffleBlock(): (TempShuffleBlockId, File)
      创建sort shuffle使用的暂时文件
      Produces a unique block id and File suitable for storing shuffled intermediate results. “
    def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
        var blockId = new TempShuffleBlockId(UUID.randomUUID())
        while (getFile(blockId).exists()) {
          blockId = new TempShuffleBlockId(UUID.randomUUID())
        }
        (blockId, getFile(blockId))
      }
    **DiskStore**属性
    • blockManager: BlockManager

    • diskManager: DiskBlockManager

    • minMemoryMapBytes:Long= blockManager.conf.getLong(
      spark.storage.memoryMapThreshold“, 2 * 1024L * 1024L)
      对文件进行内存映射的阈值,即当文件大于该值时getBytes方法对文件进行内存映射,而不是直接将该文件的内容读取到字节缓存区。

    DiskStore方法

    • def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult
      将BlockId相应的字节缓存存储到磁盘
    override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
        // So that we do not modify the input offsets !
        // duplicate does not copy buffer, so inexpensive
        val bytes = _bytes.duplicate()
        logDebug(s"Attempting to put block $blockId")
        val startTime = System.currentTimeMillis
        val file = diskManager.getFile(blockId)
        val channel = new FileOutputStream(file).getChannel
        while (bytes.remaining > 0) {
          channel.write(bytes)
        }
        channel.close()
        val finishTime = System.currentTimeMillis
        logDebug("Block %s stored as %s file on disk in %d ms".format(
          file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
        PutResult(bytes.limit(), Right(bytes.duplicate()))
      }
    • putIterator(blockId: BlockId, values: Iterator[Any],level: StorageLevel,returnValues: Boolean): PutResult
      将BlockId相应的Iterator数据存储到磁盘,该方法先将Iterator序列化,然后存储到相应的文件。
    override def putIterator(
          blockId: BlockId,
          values: Iterator[Any],
          level: StorageLevel,
          returnValues: Boolean): PutResult = {
    
        logDebug(s"Attempting to write values for block $blockId")
        val startTime = System.currentTimeMillis
        val file = diskManager.getFile(blockId)
        val outputStream = new FileOutputStream(file)
        try {
          try {
            blockManager.dataSerializeStream(blockId, outputStream, values)
          } finally {
            // Close outputStream here because it should be closed before file is deleted.
            outputStream.close()
          }
        } catch {
          case e: Throwable =>
            if (file.exists()) {
              file.delete()
            }
            throw e
        }
    
        val length = file.length
    
        val timeTaken = System.currentTimeMillis - startTime
        logDebug("Block %s stored as %s file on disk in %d ms".format(
          file.getName, Utils.bytesToString(length), timeTaken))
    
        if (returnValues) {
          // Return a byte buffer for the contents of the file
          val buffer = getBytes(blockId).get
          PutResult(length, Right(buffer))
        } else {
          PutResult(length, null)
        }
      }
    • putArray(blockId: BlockId,values: Array[Any],level: StorageLevel,returnValues: Boolean): PutResult
      将BlockId相应的Array数据存储到磁盘,该方法先将Array序列化,然后存储到相应的文件。
    override def putArray(
          blockId: BlockId,
          values: Array[Any],
          level: StorageLevel,
          returnValues: Boolean): PutResult = {
        putIterator(blockId, values.toIterator, level, returnValues)
      }
    • getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer]
      底层方法,读取文件里偏移为offset。长度为length的内容。该方法会推断length是否大于minMemoryMapBytes。若大于。则做内存映射,否则直接读取到字节缓存中。
    private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
        val channel = new RandomAccessFile(file, "r").getChannel
    
        try {
          // For small files, directly read rather than memory map
          if (length < minMemoryMapBytes) {
            val buf = ByteBuffer.allocate(length.toInt)
            channel.position(offset)
            while (buf.remaining() != 0) {
              if (channel.read(buf) == -1) {
                throw new IOException("Reached EOF before filling buffer
    " +
                  s"offset=$offset
    file=${file.getAbsolutePath}
    buf.remaining=${buf.remaining}")
              }
            }
            buf.flip()
            Some(buf)
          } else {
            Some(channel.map(MapMode.READ_ONLY, offset, length))
          }
        } finally {
          channel.close()
        }
      }
    • getBytes(blockId: BlockId): Option[ByteBuffer]
      读取存储在磁盘中与BlockId相应的内容。
    override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
        val file = diskManager.getFile(blockId.name)
        getBytes(file, 0, file.length)
      }
    • getBytes(segment: FileSegment): Option[ByteBuffer] = getBytes(segment.file, segment.offset, segment.length)
      依据FileSegment读取内容,当中 FileSegment存放文件和要读取数据的偏移和大小:FileSegment(val file: File, val offset: Long, val length: Long)

    • getValues(blockId: BlockId): Option[Iterator[Any]]
      读取BlockId相应的内容,并反序列化为Iterator。

    override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
        getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
      }
    • getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]]
      读取BlockId相应的内容。并依据自己定义的Serializer反序列化为Iterator。
    /**
     - A version of getValues that allows a custom serializer. This is used as part of the
     - shuffle short-circuit code.
       */
      def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
        // TODO: Should bypass getBytes and use a stream based implementation, so that
        // we won't use a lot of memory during e.g. external sort merge.
        getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
      }
    • getSize(blockId: BlockId): Long = diskManager.getFile(blockId.name).length
      得到存储在该本地磁盘的BlockId相应Block的大小。

    • remove(blockId: BlockId): Boolean
      删除存储的BlockId相应的Block。

      override def remove(blockId: BlockId): Boolean = {
        val file = diskManager.getFile(blockId.name)
        // If consolidation mode is used With HashShuffleMananger, the physical filename for the block
        // is different from blockId.name. So the file returns here will not be exist, thus we avoid to
        // delete the whole consolidated file by mistake.
        if (file.exists()) {
          file.delete()
        } else {
          false
        }
      }
    • contains(blockId: BlockId): Boolean
      推断是否存储BlockId相应的Block。
    override def contains(blockId: BlockId): Boolean = {
        val file = diskManager.getFile(blockId.name)
        file.exists()
      }
  • 相关阅读:
    2. text()方法
    【CS231n】斯坦福大学李飞飞视觉识别课程笔记(十五):神经网络笔记2(下)
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
    我的友情链接
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/7394333.html
Copyright © 2011-2022 走看看