zoukankan      html  css  js  c++  java
  • Spark-BlockManager

    简单说明

    BlockManager是管理整个Spark运行时数据的读写,包含数据存储本身,在数据存储的基础之上进行数据读写。由于Spark是分布式的,所有BlockManager也是分布式的,BlockManager本身相对而言是一个比较大的模块,Spark中有非常多的模块:调度模块、资源管理模块等等。BlockManager是另外一个非常重要的模块,BlockManager本身源码量非常大。本篇从BlockManager原理流程对BlockManager做深刻的理解。在Shuffle读写数据的时候, 我们需要读写BlockManager。因此BlockManager是至关重要的内容。

    BlockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。

    相关组件

    BlockManagerMaster

    Driver上有BlockManagerMaster,负责对各个节点(Executor)上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护好元数据的变更。

    BlockManagerMaster与BlockManager的关系非常像NameNode与DataNode的关系,BlockManagerMaster中保存中BlockManager内部管理数据的元数据,进行维护,当BlockManager进行Block增删改等操作时,都会在BlockManagerMaster中进行元数据的变更,这与NameNode维护DataNode的元数据信息,DataNode中数据发生变化时NameNode中的元数据信息也会相应变化是一致的。

    • 4,BlockManagerMaster  tell,如果message为false,抛异常

    message为 BlockManagerMasterEndpoint中的receiveAndReply方法发送过去的

    BlockManager

    每个节点(Executor)都有一个BlockManager,(每启动一个CoarseGrainedExecutorBackend都会实例化一个BlockManager),每个BlockManager创建之后,第一件事即是去向BlockManagerMaster进行注册,实质上是:Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint。

    • 1,在Executor中创建BlockManager

    当Executor实例化的时候会通过env.blockManager.initialize(conf.getAppId)来实例化Executor上的BlockManager并且创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中的BlockManagerMasterEndpoint发过来的指令,例如:删除Block。(Executor中)

    BlockManager中有三个非常重要的组件:

    1. MemoryStore

      负责对内存数据进行读写。
    2. DiskStore

      负责对磁盘数据进行读写。
    3. BlockTransferService

      负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写;例如:
      1. 使用BlockManager进行写操作时,比如说,RDD运行过程中的一些中间数据,或者我们手动指定了persist(),会优先将数据写入内存中,如果内存大小不够,会使用自己的算法,将内存中的部分数据写入磁盘;此外,如果persist()指定了要replica,那么会使用BlockTransferService将数据replicate一份到其他节点的BlockManager上去。

      2. 使用BlockManager进行读操作时,比如说,shuffleRead操作,如果能从本地读取,就利用DiskStore或者MemoryStore从本地读取数据,但是本地没有数据的话,那么会用BlockTransferService与有数据的BlockManager建立连接,然后用BlockTransferService从远程BlockManager读取数据。

    BlockManagerMasterEndpoint

    传入BlockManagerMaster类(Driver)里的一个消息循环体,会负责通过远程消息通信的方式去管理所有节点的BlockMaster。

    • 3,BlockManagerMaster接收到Executor上的注册信息并进行处理。

    (BlockManagerMasterEndpoint中的receiveAndReply方法)

    BlockManagerSlaveEndpoint

    BlockManager上的消息循环体。(BlockManager里面的一个属性)

    Executor上的BlockManager创建BlockManagerSlaveEndpoint这个消息循环体来接受Driver中的BlockManagerMasterEndpoint发过来的指令。

    • 2BlockManagerBlockManagerMaster注册

    当BlockManagerSlaveEndpoint实例化后,Executor上的BlockManager需要向Driver上的BlockManagerMasterEndpoint注册:

    (BlockManager中的initialize(appId: String)方法中)

    BlockManagerInfo

    每个BlockManager创建之后,第一件事即是去向BlockManagerMaster进行注册,此时BlockManagerMaster会为其创建对应的BlockManagerInfo来进行元数据管理。

    BlockStatus

    只要使用BlockManager执行了数据增删改的操作,那么必须将Block的BlockStatus上报到BlockManagerMaster,在BlockManagerMaster上会对指定BlockManager的BlockManagerInfo内部的BlockStatus进行增删改操作,从而达到元数据的维护功能。

    // Mapping from block id to its status.

      private val _blocks = new JHashMap[BlockId, BlockStatus]

    MapOutPutTrackerMaster

    在Application启动的时候,会在SparkEnv中注册BlockManager和MapOutPutTracker。

    MapOutPutTrackerMaster负责跟踪所有的Mapper输出的。

    当执行第二个 Stage 时,第二个 Stage 会向 Driver 中的 MapOutputTrackerMasterEndpoint 发消息请求上一个 Stage 中相应的输出,此時 MapOutputTrackerMaster 会把上一個 Stage 的输出数据的元数据信息发送给当前请求的 Stage。

    DiskBlockManager

    管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘的文件的创建、读写等。

    负责管理逻辑级别和物理级别的映射关系,根据BlockID映射一个文件。在目录spark.local.dir或者SPARK_LOCAL_DIRS中,Block文件进行hash生成。通过createLocalDirs 生成本地目录。

    运行原理示意图

    图片下载地址:https://www.processon.com/embed/5e96a0a0e401fd262e1a14d6

    上述文字描述中标序号的红色加粗部分为BlockManager向BlockManagerMaster注册的过程。

  • 相关阅读:
    一、
    一、AJAX
    一、RequireHttps
    【2019-08-23】被环境影响时,想想初心
    【2019-08-22】任何收获,是需要成本的
    【2019-08-20】有点目标,有点计划,有点目的
    【2019-08-21】承认自己错误,就是正确的开始
    【2019-08-19】新,是一种魔力
    【2019-08-18】时间是有密度的
    【2019-08-17】工作太多是适得其反
  • 原文地址:https://www.cnblogs.com/Rabcheng/p/12728143.html
Copyright © 2011-2022 走看看