zoukankan      html  css  js  c++  java
  • Spark存储介绍

    记录一下Spark的存储相关内容

    @


    Spark虽说是计算引擎,但存储也是比较重要的一块。

    在cache和shuffle等地方用到了存储,存储介质包括有内存和磁盘。

    整体架构

    Spark存储采用主从模式(Master/Slave),模块间使用RPC进行通信。

    Master负责运行期间数据块元数据的管理和维护。

    Slave一方面将本地数据块的状态报告给Master;另一方面接收Master传过来的执行命令,如获取数据块状态、删除RDD/数据块等命令。

    Slave之间存在数据传输通道,可以进行远程数据的读取和写入。


    存储相关类

    在看整体架构之前,先看一下Spark存储模块相关类,以下是类图:
    存储模块相关类

    可以看到是以BlockManager为核心。

    BlockManager:存在于Driver和Executor中,Driver端的BlockManager保存了数据的元数据信息,Executor端的BlockManager根据接收到的消息进行操作。

    BlockManagerMaster:Driver端特有的Master类,用来接收处理Executor发送来的请求。

    BlockManagerMasterEndpoint:Master的消息终端点,用于与远程Slave进行消息通信。

    BlockManagerSlaveEndpoint:Slave的消息终端点,用于与Master进行通信。

    BlockTransferService:在远程节点间提供数据传输服务。

    BlockManagerInfo:维护了BlockManager的一些信息。

    DiskBlockManager:对数据块进行磁盘读写的管理者。

    DiskStore:在磁盘上存储BlockManager块。

    MemoryStore:将BlockManager存储在内存中。

    MapOutputTracker:跟踪shuffle map stage输出位置的类。

    ShuffleManager:shuffle的管理器,可以用于获取shuffle读写的组件。


    接下来看看Spark存储的消息通信架构:

    以下是架构图,

    Spark存储的消息通信架构

    图中根据数据的生命周期描述了四个步骤:

    1. RegisterBlockManager。应用程序启动时、初始化相关组件。
    2. UpdateBlockInfo。增删改后更新数据块信息。
    3. GetLocations、GetMemoryStatus。查询数据存放的位置,对数据进行读取。
    4. RemoveBlock、RemoveRDD。提供了删除的功能。

    依次看看四个步骤的具体过程:

    应用启动时

    应用程序启动时,SparkContext创建Driver端的SparkEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在BlockManagerMaster内部创建消息通信的BlockManagerMasterEndpoint。

    Executor启动时也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和负责网络数据传输服务的BlockTransferService。在BlockManager初始化过程中,一方面会加入BlockManagerMasterEndpoint终端点的引用,另一方面会创建Executor消息通信的BlockManagerSlaveEndpoint终端点,并把终端点的引用注册到Driver中,Driver和Executor相互持有引用,在应用执行过程中就可以进行通信了。

    增删改后更新元数据

    当写入、更新或删除数据完毕后,发送数据块的最新状态消息UpdateBlockInfo给BlockManagerMasterEndpoint终端点,由其更新数据块的元数据。该终端点的元数据存放BlockManagerMasterEndpoint的3个HashMap中,如下:

    // 该HashMap中存放了BlockManagerId与BLockManagerInfo的对应,其中BlockManagerInfo包含了Executor的内存使用情况、数据块的使用情况、已被缓存的数据块和Executor终端点的引用
    private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
    
    // 该HashMap存放了ExecutorId和BlockManagerId的对应
    private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
    
    // 该HashMap存放了BlockId和BlockManagerId序列 的对应,原因在于一个数据块可能存储有多个副本,保存在多个Executor中
    private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
    

    获取数据存放位置

    应用数据存储后,在获取远程节点数据、获取RDD执行的首选位置时需要根据数据块的编号查询数据块所处的位置,通过发送GetLocations或GetLocationsMultipleBlockIds等消息给BlockManagerMasterEndpoint,通过对元数据的查询获取数据块的位置信息。

    数据块的删除

    当数据需要删除时,提交删除消息给BlockManagerSlaveEndpoint终端点,在该终端店发起删除操作。删除操作一方面需要删除Driver端的元数据信息,另一方面发送消息通知Executor,删除对应的物理数据。

    RDD存储调用

    RDD和Block的关系:RDD包含了多个Partition,每个Partition对应一个数据块,那么每个RDD中包含一个或多个数据块Block。

    我们知道RDD是懒执行的,只有在遇到行动操作的时候,才会提交作业、划分阶段、执行任务,其真正发生数据操作是调用RDD.iterator()时发生的。

    我们看看RDD的iterator方法:

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        // 如果存在存储级别,则从尝试从缓存中读取数据,缓存不存在时再进行计算
        if (storageLevel != StorageLevel.NONE) {
            getOrCompute(split, context)
        // 不存在缓存,直接计算或者从checkpoint中读取
        } else {
            computeOrReadCheckpoint(split, context)
        }
    }
    

    iterator中,会判断是否存在存储级别(其实就是缓存),如果存在调用getOrCompute(),如果不存在调用computeOrReadCheckpoint()

    先看一下不存在缓存的时候,调用的computeOrReadCheckpoint()

    // 如果RDD存在检查点,则从检查点读取它。不存在,则计算
    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
    {	
        if (isCheckpointedAndMaterialized) {
        	// 存在时,调用父RDD的iterator读取数据   
            firstParent[T].iterator(split, context)
        } else {
            // 不存在时,直接调用compute方法对数据进行计算
            compute(split, context)
        }
    }
    

    computeOrReadCheckpoint()会从checkpoint中读取数据或重新计算数据,进行返回。

    再看一下存在缓存时,调用的getOrCompute()

    private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        // 通过RDD的编号和Partition序号获取数据块Block的编号
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // 根据数据块编号先读取数据,然后再更新数据,这里是读写数据的入口
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
            readCachedBlock = false
            // 如果缓存中不存在数据块,则尝试调用computeOrReadCheckpoint()从检查点读取或重新计算
            computeOrReadCheckpoint(partition, context)
        }) match {
            // 对返回结果进行处理,该结果表示处理成功
            case Left(blockResult) =>
            if (readCachedBlock) {
                val existingMetrics = context.taskMetrics().inputMetrics
                existingMetrics.incBytesRead(blockResult.bytes)
                new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                    override def next(): T = {
                        existingMetrics.incRecordsRead(1)
                        delegate.next()
                    }
                }
            } else {
                new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
            // 处理失败把结果返回调用者
            case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
    }
    
    // 从缓存中获取数据;如果缓存中不存在,重新计算并写入缓存
    def getOrElseUpdate[T](
        blockId: BlockId,
        level: StorageLevel,
        classTag: ClassTag[T],
        makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
        // 读数据的入口,尝试从本地或远程读取数据
        get[T](blockId)(classTag) match {
            case Some(block) =>
            	return Left(block)
            case _ =>
        }
        // 写数据入口
        doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
            case None =>
                val blockResult = getLocalValues(blockId).getOrElse {
                    releaseLock(blockId)
                }
                releaseLock(blockId)
                Left(blockResult)
            case Some(iter) =>
            	Right(iter)
        }
    }
    

    getOrCompute()中会调用getOrElseUpdate()方法从缓存中读取数据;如果缓存中不存在数据则重新计算,并写入缓存。


    RDD的计算就是基于对Iterator中数据的不断转换,只有需要存储的时候,才会做对应的存储操作。

    数据读取

    BlockManager的get方法时读数据的入口。

    读数据时分为本地读取和远程节点读取。

    本地读取时使用getLocalValues方法,在该方法中根据不同的存储级别调用不同的实现。

    远程读取时使用getRemoteValues方法,最终调用BlockTransferService的fetchBlockSync进行处理,使用Netty的fetchBlocks方法获取数据。数据读取调用图如下:
    数据读取调用图

    数据写入

    BlockManager的doPutIterator方法是写数据的入口点。

    在该方法中,根据数据是否缓存到内存中进行处理。

    如果不缓存到内存中,调用BlockManager的putIterator方法直接存储磁盘;如果缓存到内存中,先判断数据是否进行了反序列化。

    如果设置反序列化,说明数据为值类型,调用putIteratorAsValues把数据存入内存;如果没有设置反序列化,说明数据为字节类型,调用putIteratorAsBytes把数据写入内存。

    在把数据存入内存过程中,需要判断在内存中展开该数据是否足够,当足够时调用BlockManager的putArray方法写入内存,否则把数据写入磁盘。


    写入完成后,一方面把数据块的元数据发送给Driver端的BlockManagerMasterEndpoint终端点,请求其更新数据元数据;另一方面判断是否需要创建副本,如果需要则调用replicate方法,把数据写到远程节点上。

    写入调用图如下:
    数据读取调用图

    cache & checkpoint

    cache:将RDD的数据缓存到内存或磁盘中。

    checkpoint:将计算过程中重要的中间数据建立检查点,类似于快照。

    cache的应用主要是对一个RDD的进行复用,避免重复计算。

    相对于cache而言,checkpoint将切断与该RDD之前的依赖关系。设置检查点对包含宽依赖的长血统RDD是非常重要的,可以避免失败时重新计算的高成本。

    贴两个缓存和检查点讲的比较清晰的链接:

    https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md

    https://blog.csdn.net/qq_20641565/article/details/76223002


    end. 以上内容来自看书和自己的理解,如果偏差,欢迎指正。

    Reference

    《图解Spark核心技术与案例实践》



    个人公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎大家关注。

  • 相关阅读:
    Java RunTime Environment (JRE) or Java Development Kit (JDK) must be available in order to run Eclipse. ......
    UVA 1597 Searching the Web
    UVA 1596 Bug Hunt
    UVA 230 Borrowers
    UVA 221 Urban Elevations
    UVA 814 The Letter Carrier's Rounds
    UVA 207 PGA Tour Prize Money
    UVA 1592 Database
    UVA 540 Team Queue
    UVA 12096 The SetStack Computer
  • 原文地址:https://www.cnblogs.com/upupfeng/p/12373038.html
Copyright © 2011-2022 走看看