zoukankan      html  css  js  c++  java
  • spark存储管理之磁盘存储--DiskStore

    DiskStore

    接着上一篇,本篇,我们分析一下实现磁盘存储的功能类DiskStore,这个类相对简单。在正式展开之前,我觉得有必要大概分析一下BlockManager的背景,或者说它的运行环境,运行的作用范围。Blockmanager这个类其实在运行时的每个节点都会有一个实例(包括driver和executor进程),因为不论是driver端进行广播变量的创建,还是executor端shuffle过程中写shuffle块,或者是任务运行时结果太大需要通过BlockManager传输,或者是RDD的缓存,其实在每个运行节点上都会通过Blockmanager来管理程序内部对于本地的内存和磁盘的读写,所以综上,我想表达的核心意思就是每个进程(driver和executor)都有一Blockmanager实例,而这些Blockmanager实例是通过BlockManagerId类来进行唯一区分的,BlockManagerId实际上是对进程物理位置的封装。

    DiskStore.put

    首先我们来看一个最常用的写入方法

    def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
        // 通过DiskBlockManager对象检查这个blockId对应的文件名的文件是否存在
        if (contains(blockId)) {
          throw new IllegalStateException(s"Block $blockId is already present in the disk store")
        }
        logDebug(s"Attempting to put block $blockId")
        val startTime = System.currentTimeMillis
        // 通过DiskBlockManager获取一个文件用于写入数据
        val file = diskManager.getFile(blockId)
        // 用CountingWritableChannel包装一下,以便于记录写入的字节数
        val out = new CountingWritableChannel(openForWrite(file))
        var threwException: Boolean = true
        try {
          writeFunc(out)
          // 关键步骤,记录到内部的map结构中
          blockSizes.put(blockId, out.getCount)
          threwException = false
        } finally {
          try {
            out.close()
          } catch {
            case ioe: IOException =>
              if (!threwException) {
                threwException = true
                throw ioe
              }
          } finally {
             if (threwException) {
              remove(blockId)
            }
          }
        }
        val finishTime = System.currentTimeMillis
        logDebug("Block %s stored as %s file on disk in %d ms".format(
          file.getName,
          Utils.bytesToString(file.length()),
          finishTime - startTime))
      }
    

    这个方法很简单,没什么好说的,但是调用了一个比较重要的类DiskBlockManager,这个类的功能就是对磁盘上的目录和文件进行管理,会在磁盘上按照一定规则创建一些目录和子目录,在分配文件名时也会尽量均匀第分配在这些目录和子目录下。

    DiskStore.putBytes

    这个方法就不说了,简单处理一下直接调用put方法。

      def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
        put(blockId) { channel =>
          bytes.writeFully(channel)
        }
      }
    

    DiskStore.getBytes

    我们来看一下这个方法,首先通过DiskBlockManager获取对应的文件名,然后将其包装成一个BlockData对象,分为加密和不加密两种。

      def getBytes(blockId: BlockId): BlockData = {
        val file = diskManager.getFile(blockId.name)
        val blockSize = getSize(blockId)
      
        securityManager.getIOEncryptionKey() match {
          case Some(key) =>
            // Encrypted blocks cannot be memory mapped; return a special object that does decryption
            // and provides InputStream / FileRegion implementations for reading the data.
            new EncryptedBlockData(file, blockSize, conf, key)
      
          case _ =>
            // 看一下DiskBlockData
            new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
        }
      }
    

    DiskBlockData

    这个类作为磁盘文件的包装类,主要功能是提供了几个方便的接口,将磁盘文件中的数据读取出来并生成缓冲对象。
    这个类中有两个重要的方法toChunkedByteBuffer和toByteBuffer,toByteBuffer就不说了,调用ReadableByteChannel.read(ByteBuffer dst)方法读取文件数据,我们看一下toChunkedByteBuffer

    DiskBlockData.toChunkedByteBuffer

    这个方法也很简单,在数据量比较大的时候,由于每次申请的内存块大小有限制maxMemoryMapBytes,所以需要切分成多个块

      override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
        // Utils.tryWithResource调用保证在使用完资源后关闭资源
        // 基本等同于java中的try{}finally{}
        Utils.tryWithResource(open()) { channel =>
          var remaining = blockSize
          val chunks = new ListBuffer[ByteBuffer]()
          while (remaining > 0) {
            // 这里取剩余大小和maxMemoryMapBytes的较小值,
            // 也就是说每次申请的内存块大小不超过maxMemoryMapBytes
            val chunkSize = math.min(remaining, maxMemoryMapBytes)
            val chunk = allocator(chunkSize.toInt)
            remaining -= chunkSize
            JavaUtils.readFully(channel, chunk)
            chunk.flip()
            chunks += chunk
          }
          new ChunkedByteBuffer(chunks.toArray)
        }
      }
    

    DiskBlockManager

    这个类之前也分析过,主要是用来管理spark运行过程中写入的一些临时文件,以及目录的管理。

    • 首先会根据参数配置创建本地目录(可以是逗号分隔的多个目录),参数的优先顺序是:如果是运行在yarn上,则会使用yarn参数LOCAL_DIRS配置的本地目录;否则获取环境变量SPARK_LOCAL_DIRS的值;否则获取spark.local.dir参数的值;最后如果都没有配置,那么就用java系统参数java.io.tmpdir的值作为临时目录。

    • 其次,关于文件在目录之间分配的问题,使用文件名的hash值对目录数量取余的方法来尽量将文件均匀地分配到不同的目录下。

    • 另外一点要说的是文件名的命名规则,是根据不同作用的Block来区别命名的,例如RDD缓存写入的block的id就是RDDBlockId,它的文件名拼接规则是"rdd_" + rddId + "_" + splitIndex

  • 相关阅读:
    JavaScript 格式化数字
    浅析C#中单点登录的原理和使用
    从银行转账失败到分布式事务:总结与思考
    计算机网络资料
    阿里巴巴Java开发规约插件p3c详细教程及使用感受
    程序员如何打造属于自己的云笔记服务
    sql server2016里面的json功能
    mac pro 开启三只滑动选中文本
    技术网站
    idea gradle项目导入
  • 原文地址:https://www.cnblogs.com/zhuge134/p/11007328.html
Copyright © 2011-2022 走看看