zoukankan      html  css  js  c++  java
  • Spark存储介绍

    记录一下Spark的存储相关内容

    @


    Spark虽说是计算引擎,但存储也是比较重要的一块。

    在cache和shuffle等地方用到了存储,存储介质包括有内存和磁盘。

    整体架构

    Spark存储采用主从模式(Master/Slave),模块间使用RPC进行通信。

    Master负责运行期间数据块元数据的管理和维护。

    Slave一方面将本地数据块的状态报告给Master;另一方面接收Master传过来的执行命令,如获取数据块状态、删除RDD/数据块等命令。

    Slave之间存在数据传输通道,可以进行远程数据的读取和写入。


    存储相关类

    在看整体架构之前,先看一下Spark存储模块相关类,以下是类图:
    存储模块相关类

    可以看到是以BlockManager为核心。

    BlockManager:存在于Driver和Executor中,Driver端的BlockManager保存了数据的元数据信息,Executor端的BlockManager根据接收到的消息进行操作。

    BlockManagerMaster:Driver端特有的Master类,用来接收处理Executor发送来的请求。

    BlockManagerMasterEndpoint:Master的消息终端点,用于与远程Slave进行消息通信。

    BlockManagerSlaveEndpoint:Slave的消息终端点,用于与Master进行通信。

    BlockTransferService:在远程节点间提供数据传输服务。

    BlockManagerInfo:维护了BlockManager的一些信息。

    DiskBlockManager:对数据块进行磁盘读写的管理者。

    DiskStore:在磁盘上存储BlockManager块。

    MemoryStore:将BlockManager存储在内存中。

    MapOutputTracker:跟踪shuffle map stage输出位置的类。

    ShuffleManager:shuffle的管理器,可以用于获取shuffle读写的组件。


    接下来看看Spark存储的消息通信架构:

    以下是架构图,

    Spark存储的消息通信架构

    图中根据数据的生命周期描述了四个步骤:

    1. RegisterBlockManager。应用程序启动时、初始化相关组件。
    2. UpdateBlockInfo。增删改后更新数据块信息。
    3. GetLocations、GetMemoryStatus。查询数据存放的位置,对数据进行读取。
    4. RemoveBlock、RemoveRDD。提供了删除的功能。

    依次看看四个步骤的具体过程:

    应用启动时

    应用程序启动时,SparkContext创建Driver端的SparkEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在BlockManagerMaster内部创建消息通信的BlockManagerMasterEndpoint。

    Executor启动时也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和负责网络数据传输服务的BlockTransferService。在BlockManager初始化过程中,一方面会加入BlockManagerMasterEndpoint终端点的引用,另一方面会创建Executor消息通信的BlockManagerSlaveEndpoint终端点,并把终端点的引用注册到Driver中,Driver和Executor相互持有引用,在应用执行过程中就可以进行通信了。

    增删改后更新元数据

    当写入、更新或删除数据完毕后,发送数据块的最新状态消息UpdateBlockInfo给BlockManagerMasterEndpoint终端点,由其更新数据块的元数据。该终端点的元数据存放BlockManagerMasterEndpoint的3个HashMap中,如下:

    // 该HashMap中存放了BlockManagerId与BLockManagerInfo的对应,其中BlockManagerInfo包含了Executor的内存使用情况、数据块的使用情况、已被缓存的数据块和Executor终端点的引用
    private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
    
    // 该HashMap存放了ExecutorId和BlockManagerId的对应
    private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
    
    // 该HashMap存放了BlockId和BlockManagerId序列 的对应,原因在于一个数据块可能存储有多个副本,保存在多个Executor中
    private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
    

    获取数据存放位置

    应用数据存储后,在获取远程节点数据、获取RDD执行的首选位置时需要根据数据块的编号查询数据块所处的位置,通过发送GetLocations或GetLocationsMultipleBlockIds等消息给BlockManagerMasterEndpoint,通过对元数据的查询获取数据块的位置信息。

    数据块的删除

    当数据需要删除时,提交删除消息给BlockManagerSlaveEndpoint终端点,在该终端店发起删除操作。删除操作一方面需要删除Driver端的元数据信息,另一方面发送消息通知Executor,删除对应的物理数据。

    RDD存储调用

    RDD和Block的关系:RDD包含了多个Partition,每个Partition对应一个数据块,那么每个RDD中包含一个或多个数据块Block。

    我们知道RDD是懒执行的,只有在遇到行动操作的时候,才会提交作业、划分阶段、执行任务,其真正发生数据操作是调用RDD.iterator()时发生的。

    我们看看RDD的iterator方法:

    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
        // 如果存在存储级别,则从尝试从缓存中读取数据,缓存不存在时再进行计算
        if (storageLevel != StorageLevel.NONE) {
            getOrCompute(split, context)
        // 不存在缓存,直接计算或者从checkpoint中读取
        } else {
            computeOrReadCheckpoint(split, context)
        }
    }
    

    iterator中,会判断是否存在存储级别(其实就是缓存),如果存在调用getOrCompute(),如果不存在调用computeOrReadCheckpoint()

    先看一下不存在缓存的时候,调用的computeOrReadCheckpoint()

    // 如果RDD存在检查点,则从检查点读取它。不存在,则计算
    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
    {	
        if (isCheckpointedAndMaterialized) {
        	// 存在时,调用父RDD的iterator读取数据   
            firstParent[T].iterator(split, context)
        } else {
            // 不存在时,直接调用compute方法对数据进行计算
            compute(split, context)
        }
    }
    

    computeOrReadCheckpoint()会从checkpoint中读取数据或重新计算数据,进行返回。

    再看一下存在缓存时,调用的getOrCompute()

    private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
        // 通过RDD的编号和Partition序号获取数据块Block的编号
        val blockId = RDDBlockId(id, partition.index)
        var readCachedBlock = true
        // 根据数据块编号先读取数据,然后再更新数据,这里是读写数据的入口
        SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
            readCachedBlock = false
            // 如果缓存中不存在数据块,则尝试调用computeOrReadCheckpoint()从检查点读取或重新计算
            computeOrReadCheckpoint(partition, context)
        }) match {
            // 对返回结果进行处理,该结果表示处理成功
            case Left(blockResult) =>
            if (readCachedBlock) {
                val existingMetrics = context.taskMetrics().inputMetrics
                existingMetrics.incBytesRead(blockResult.bytes)
                new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
                    override def next(): T = {
                        existingMetrics.incRecordsRead(1)
                        delegate.next()
                    }
                }
            } else {
                new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
            }
            // 处理失败把结果返回调用者
            case Right(iter) =>
            new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
        }
    }
    
    // 从缓存中获取数据;如果缓存中不存在,重新计算并写入缓存
    def getOrElseUpdate[T](
        blockId: BlockId,
        level: StorageLevel,
        classTag: ClassTag[T],
        makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
        // 读数据的入口,尝试从本地或远程读取数据
        get[T](blockId)(classTag) match {
            case Some(block) =>
            	return Left(block)
            case _ =>
        }
        // 写数据入口
        doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
            case None =>
                val blockResult = getLocalValues(blockId).getOrElse {
                    releaseLock(blockId)
                }
                releaseLock(blockId)
                Left(blockResult)
            case Some(iter) =>
            	Right(iter)
        }
    }
    

    getOrCompute()中会调用getOrElseUpdate()方法从缓存中读取数据;如果缓存中不存在数据则重新计算,并写入缓存。


    RDD的计算就是基于对Iterator中数据的不断转换,只有需要存储的时候,才会做对应的存储操作。

    数据读取

    BlockManager的get方法时读数据的入口。

    读数据时分为本地读取和远程节点读取。

    本地读取时使用getLocalValues方法,在该方法中根据不同的存储级别调用不同的实现。

    远程读取时使用getRemoteValues方法,最终调用BlockTransferService的fetchBlockSync进行处理,使用Netty的fetchBlocks方法获取数据。数据读取调用图如下:
    数据读取调用图

    数据写入

    BlockManager的doPutIterator方法是写数据的入口点。

    在该方法中,根据数据是否缓存到内存中进行处理。

    如果不缓存到内存中,调用BlockManager的putIterator方法直接存储磁盘;如果缓存到内存中,先判断数据是否进行了反序列化。

    如果设置反序列化,说明数据为值类型,调用putIteratorAsValues把数据存入内存;如果没有设置反序列化,说明数据为字节类型,调用putIteratorAsBytes把数据写入内存。

    在把数据存入内存过程中,需要判断在内存中展开该数据是否足够,当足够时调用BlockManager的putArray方法写入内存,否则把数据写入磁盘。


    写入完成后,一方面把数据块的元数据发送给Driver端的BlockManagerMasterEndpoint终端点,请求其更新数据元数据;另一方面判断是否需要创建副本,如果需要则调用replicate方法,把数据写到远程节点上。

    写入调用图如下:
    数据读取调用图

    cache & checkpoint

    cache:将RDD的数据缓存到内存或磁盘中。

    checkpoint:将计算过程中重要的中间数据建立检查点,类似于快照。

    cache的应用主要是对一个RDD的进行复用,避免重复计算。

    相对于cache而言,checkpoint将切断与该RDD之前的依赖关系。设置检查点对包含宽依赖的长血统RDD是非常重要的,可以避免失败时重新计算的高成本。

    贴两个缓存和检查点讲的比较清晰的链接:

    https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md

    https://blog.csdn.net/qq_20641565/article/details/76223002


    end. 以上内容来自看书和自己的理解,如果偏差,欢迎指正。

    Reference

    《图解Spark核心技术与案例实践》



    个人公众号:码农峰,定时推送行业资讯,持续发布原创技术文章,欢迎大家关注。

  • 相关阅读:
    【C语言】中的版本规范(C89 C99等)
    【微机】计算机系统组成
    【微机】验证负数以补码存储程序 C语言
    katalon studio升级到6.3.3版本后如何生成测试报告
    使用Katalon Studio进行数据驱动测试的方法(转)
    katalon 参数化
    Katalon中的测试对象、用例和套件的命名规范
    转载kalaton故障处理
    Katalon Studio IE浏览器 不好用 无法录制
    Katalon Studio操作界面详细说明(转载)
  • 原文地址:https://www.cnblogs.com/upupfeng/p/12373038.html
Copyright © 2011-2022 走看看