zoukankan      html  css  js  c++  java
  • Spark分析之BlockManager

    BlockManager中存储block的流程: doPut()方法  

    入参:blockId, data, level, tellMaster

    1)为block创建BlockInfo并加锁使其不能被其他线程访问;

    2)按照block的存储级别:useMemory, useOffHeap, useDisk进行存储,并标识该block可以被其他线程访问;

      注:只要使用了useMemory,就算也使用了useDisk,一开始也只会存在内存中,而不会立即存储到硬盘上,只有等内存不够时才会将部分partition数据drop到硬盘上

    3)tellMaster=true(默认就时true): reportBlockStatus(blockId, putBlockInfo, putBlockStatus)

      通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中更新Block信息

    4)根据block的replication数决定是否将该block备份到其他节点(异步)

      1)存储结果是序列化后的字节数组

      2) 存储结果是没有序列化的值

        备份数据的序列化:序列化成字节数组;先压缩再序列化

        默认的压缩是snappy,可以通过spark.io.compression.codec参数进行配置;

        序列化默认使用的是org.apache.spark.serializer.JavaSerializer,可以通过spark.serializer参数进行配置;在创建BlockManager时设定;

    bytesAfterPut = dataSerialize(blockId, valuesAfterPut)  //数据序列化
    replicate(blockId, bytesAfterPut, putLevel){ //数据备份到其他节点
        val putBlock = PutBlock(blockId, data, eLevel)
        val cmId = new ConnectionManagerId(host, port)
        BlockManagerWorker.syncPutBlock(putBlock, cmId)
    }

    BlockManagerWorker 以防止数据丢失的时候还能够恢复,进行数据的备份操作,将数据拷贝到其他节点(异步)
    ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收

    BlockManager获取block的流程:get()方法

    //Get a block from the block manager (either local or remote).
    def get(blockId: BlockId): Option[BlockResult] = {
        val local = getLocal(blockId)  //调用doGetLocal()方法
        if (local.isDefined) {
          return local
        }
        val remote = getRemote(blockId) //调用doGetRemote()方法
        if (remote.isDefined) {
          return remote
        }
        None
    }

    1)先从本地的BlockManager查找:依次从useMemory, useOffHeap, useDisk去查找;

    根据blockid获得到对应的blockinfo(该blockinfo被加锁了),获取到该blockinfo的storagelevel,进入如下分支进行查找:

      level.useMemory    从Memory中取出block并返回,如果没有就进入下一个分支;

      level.useOffHeap   从Tachyon中取出block并返回,如果没有就进入下一个分支;

      level.useDisk

        level.useMemory==true 将block从disk中读出并写入内存以便下次使用时从内存中获取,同时返回该block;

        level.useMemory==false 将block从disk中读出并返回;

    2)本地获取不到再从远端(executor)的BlockManager去查找(BlockManagerWorker.syncGetBlock)

      获得该block的location信息;

      根据location向远端发送请求获取block,只要有一个远端返回block该函数就返回而不继续发送请求;

    注:通常情况下spark任务的分配时根据block的分布决定的,任务往往会被分配到拥有block的节点上,因此getLocal()就能找到所需要的block;但在资源有限的情况下,spark会将任务调度到与block不同的节点上,这样就必须通过getRemote()来获得block。

  • 相关阅读:
    C++——STL内存清除
    c++——智能指针学习(unique_ptr)
    linux下将tomcat加入服务
    linux下oracle远程连接的问题
    oracle计算容量的方式
    oracle删除表的方式
    阻塞与非阻塞的区别
    java中queue的使用
    yum源
    VMware Tools 安装
  • 原文地址:https://www.cnblogs.com/luogankun/p/3924917.html
Copyright © 2011-2022 走看看