Spark SQL(5-2) CacheManage之InMemoryRelation
本来计划中是没有这节的,但是中午在看spark sql 内存管理模块的时候,脑子里面突然问到,spark sql 缓存到内存的数据是怎么组织的;上网查了下博客;然后自己也跟了下代码,就形成了这篇总结。
接着之前提到的CacheManage中有个CacheQuery的方法:
def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan, tableName, planToCache.stats) cachedData.add(CachedData(planToCache, inMemoryRelation)) } }
这个方法里面可以看到缓存的数据是以CacheData的形式组织的:
/** Holds a cached logical plan and its data */ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
阅读他的注释可以了解到,保存逻辑计划和他的数据,那么数据应该就在InMemoryRelation里面保存了;那么再看InMemoryRelation的apply方法:
object InMemoryRelation { def apply( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], statsOfPlanToCache: Statistics): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( statsOfPlanToCache = statsOfPlanToCache) }
在这里主要的方法是:
private def buildBuffers(): Unit = { val output = child.output val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray var rowCount = 0 var totalSize = 0L while (rowIterator.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { val row = rowIterator.next() // Added for SPARK-6082. This assertion can be useful for scenarios when something // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat // hard to decipher. assert( row.numFields == columnBuilders.length, s"Row column number mismatch, expected ${output.size} columns, " + s"but got ${row.numFields}." + s" Row content: $row") var i = 0 totalSize = 0 while (i < row.numFields) { columnBuilders(i).appendFrom(row, i) totalSize += columnBuilders(i).columnStats.sizeInBytes i += 1 } rowCount += 1 } sizeInBytesStats.add(totalSize) val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) } def hasNext: Boolean = rowIterator.hasNext } }.persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) _cachedColumnBuffers = cached }
这里面可以看到这里面组织了一个CachedBatch的迭代器,对于CachedBatch:
/** * CachedBatch is a cached batch of rows. * * @param numRows The total number of rows in this batch * @param buffers The buffers for serialized columns * @param stats The stat of columns */ case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
这里面主要的就是buffers了,他是在上面提到的buildBuffers方法里面:
val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats)
这段是调用了JavaUtils.bufferToArray(builder.build())方法返回了一个buye数组,columnBuilders也是数组的形式,所以就是之前提到的CachedBatch的buffers;其实这个二维数组里面第一维保存了ColumnBuilder数组,这个数组相当于是每一列的信息;之后在每一列的信息里面保存了JavaUtils.bufferToArray(builder.build())的数据(值),至此如果想拿某个列的数据,其实拿到数组的第一维度的描述,如果匹配获取,不匹配直接跳过;这样好似内存的列存储,这块理解不知道对不对,我觉得大概率是对的。
关于整个内存存储的逻辑总结如上,但是关于列存储的描述不确定对不对,有懂的可以留言讨论。