zoukankan      html  css  js  c++  java
  • Spark SQL(5-2) CacheManage之InMemoryRelation

    Spark SQL(5-2) CacheManage之InMemoryRelation

    本来计划中是没有这节的,但是中午在看spark sql 内存管理模块的时候,脑子里面突然问到,spark sql 缓存到内存的数据是怎么组织的;上网查了下博客;然后自己也跟了下代码,就形成了这篇总结。

    接着之前提到的CacheManage中有个CacheQuery的方法:

    def cacheQuery(
          query: Dataset[_],
          tableName: Option[String] = None,
          storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
        val planToCache = query.logicalPlan
        if (lookupCachedData(planToCache).nonEmpty) {
          logWarning("Asked to cache already cached data.")
        } else {
          val sparkSession = query.sparkSession
          val inMemoryRelation = InMemoryRelation(
            sparkSession.sessionState.conf.useCompression,
            sparkSession.sessionState.conf.columnBatchSize, storageLevel,
            sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
            tableName,
            planToCache.stats)
          cachedData.add(CachedData(planToCache, inMemoryRelation))
        }
      }
    

      这个方法里面可以看到缓存的数据是以CacheData的形式组织的:

    /** Holds a cached logical plan and its data */
    case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
    

      阅读他的注释可以了解到,保存逻辑计划和他的数据,那么数据应该就在InMemoryRelation里面保存了;那么再看InMemoryRelation的apply方法:

    object InMemoryRelation {
      def apply(
          useCompression: Boolean,
          batchSize: Int,
          storageLevel: StorageLevel,
          child: SparkPlan,
          tableName: Option[String],
          statsOfPlanToCache: Statistics): InMemoryRelation =
        new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
          statsOfPlanToCache = statsOfPlanToCache)
    }
    

      在这里主要的方法是:

    private def buildBuffers(): Unit = {
        val output = child.output
        val cached = child.execute().mapPartitionsInternal { rowIterator =>
          new Iterator[CachedBatch] {
            def next(): CachedBatch = {
              val columnBuilders = output.map { attribute =>
                ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
              }.toArray
    
              var rowCount = 0
              var totalSize = 0L
              while (rowIterator.hasNext && rowCount < batchSize
                && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
                val row = rowIterator.next()
    
                // Added for SPARK-6082. This assertion can be useful for scenarios when something
                // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
                // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
                // hard to decipher.
                assert(
                  row.numFields == columnBuilders.length,
                  s"Row column number mismatch, expected ${output.size} columns, " +
                    s"but got ${row.numFields}." +
                    s"
    Row content: $row")
    
                var i = 0
                totalSize = 0
                while (i < row.numFields) {
                  columnBuilders(i).appendFrom(row, i)
                  totalSize += columnBuilders(i).columnStats.sizeInBytes
                  i += 1
                }
                rowCount += 1
              }
    
              sizeInBytesStats.add(totalSize)
    
              val stats = InternalRow.fromSeq(
                columnBuilders.flatMap(_.columnStats.collectedStatistics))
              CachedBatch(rowCount, columnBuilders.map { builder =>
                JavaUtils.bufferToArray(builder.build())
              }, stats)
            }
    
            def hasNext: Boolean = rowIterator.hasNext
          }
        }.persist(storageLevel)
    
        cached.setName(
          tableName.map(n => s"In-memory table $n")
            .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
        _cachedColumnBuffers = cached
      }
    

      这里面可以看到这里面组织了一个CachedBatch的迭代器,对于CachedBatch:

    /**
     * CachedBatch is a cached batch of rows.
     *
     * @param numRows The total number of rows in this batch
     * @param buffers The buffers for serialized columns
     * @param stats The stat of columns
     */
    case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
    

      这里面主要的就是buffers了,他是在上面提到的buildBuffers方法里面:

     val stats = InternalRow.fromSeq(
                columnBuilders.flatMap(_.columnStats.collectedStatistics))
              CachedBatch(rowCount, columnBuilders.map { builder =>
                JavaUtils.bufferToArray(builder.build())
              }, stats)
    

      这段是调用了JavaUtils.bufferToArray(builder.build())方法返回了一个buye数组,columnBuilders也是数组的形式,所以就是之前提到的CachedBatch的buffers;其实这个二维数组里面第一维保存了ColumnBuilder数组,这个数组相当于是每一列的信息;之后在每一列的信息里面保存了JavaUtils.bufferToArray(builder.build())的数据(值),至此如果想拿某个列的数据,其实拿到数组的第一维度的描述,如果匹配获取,不匹配直接跳过;这样好似内存的列存储,这块理解不知道对不对,我觉得大概率是对的。

         关于整个内存存储的逻辑总结如上,但是关于列存储的描述不确定对不对,有懂的可以留言讨论。

  • 相关阅读:
    面试题系列--【vue的生命周期】
    面试题系列--【hash和history的区别】
    ES6系列--【事件循环 EventLoop(Promise,setTimeOut,async/await执行顺序)】
    ES6系列--【ES6数组新增方法】
    ES6系列--【ES6 新增字符串方法】
    微信小程序系列--【VXML语法、VMSS样式、条件渲染、列表渲染、模板、引用、组件、事件系统】
    微信小程序系列--【小程序注册、工程创建、全局配置、页面配置】
    微信小程序系列---【五星好评案例】
    react系列---【Hooks】
    2019.9.18-单向循环链表删除元素(+之前完整代码)
  • 原文地址:https://www.cnblogs.com/ldsggv/p/13405548.html
Copyright © 2011-2022 走看看