zoukankan      html  css  js  c++  java
  • Spark源码学习1.5——BlockManager.scala

    一、BlockResult类

    该类用来表示返回的匹配的block及其相关的参数。共有三个参数:

    data:Iterator [Any]。

    readMethod: DataReadMethod.Value。

    bytes: Long。

    实例化InputMetrics类。

    二、BlockManager类

    关系的参数有executorId,blockManagerMaster,mapOutPutTracker等。关系的类有ShuffleBlockManager类,DiskBlockManager类,ConnectionManager类,MemoryStore类,DiskStore类等。包含以下操作:

    1、检查broadcast,shuffle,rdds,shuffleSpill存储时是否被压缩。

    2、定义其slaveActor。

    3、建立一个BlockManagerSlaveActor(this, mapOutputTracker)。

    4、一个lazy val compressionCodec,压缩包解码器。lazy表示在调用它时才实例化一个解码器,主要是针对用户自定义的jar包。

    包含的方法有:

    1、initialize():初始化操作,主要是向master(BlockManagerMaster)注册BlockManager,并启动BlockManagerWorker。

    2、reportAllBlocks():再次将所有的blocks汇报给BlockManager。这个方法强调所有的blocks必须都能在BlockManager的管理下,因为可能会出现各种因素,如slave需要重新注册、进程冲突导致block变化等,让blocks产生变化。

    3、reregister():重新注册BlockManager,这个方法主要是在心跳进程发现BlockManager没有注册时调用。这个方法的调用无需在锁状态下执行。

    4、asyncReregister():异步地重新向master注册BlockManager。

    5、waitForAsyncReregister():如果有其他的异步重注册进程,则等待。

    6、getStatus(block Id: BlockId):根据blockId获取block的信息。

    7、getMatchingBlockIds():指定过滤器对所有的blocks进行过滤。

    8、reportBlockStatus():向master报告block所在存储位置的状况,这个信息不仅反映了block当前的状态,还用于更新block的信息。但是这个存储状态的信息,如磁盘、内存、cache存储等等,并不一定是block的Information中所期望的存储信息,例如MEMORY_AND_DISK等。

    9、tryToReportBlockStatus():实际上是发送UpdateBlockInfo信息,返回master返回的信息。block记录正确则返回true,slave需要重新注册则返回false。信息包括storageLevel, inMemSize, inTachyonSize, onDiskSize等

    10、getCurrentBlockStatus():返回指定block所在存储块的最新信息。特别的,当block从内存移到磁盘时,更改其存储级别并更新内存和磁盘大小。

    11、getLocationBlockIds():获取一系列block的位置。

    12、getLocalFromDisk():直接从磁盘获取block,主要是用来获取shuffle block的。

    13、getLocal():从本地block manager获取block。

    14、getLocakBytes():以序列化字节流的形式从本地block manager获取block。

    15、doGetLocal():getLocalBytes()的实际操作函数,用来获取block并以序列化形式输出。需要处理的问题主要是同步问题,既要保证block仍然在指定的存储位置,也要保证block没有处于写的锁状态(尽管如此,Spark提示它仍然存在一定的隐患,block被移除)。

    16、getRemote():从远程block manager获取block。

    17、getRemoteBytes():同getLocalBytes()。

    18、doGetRemote():与doGetLocal()类似。

    19、get():从block manager获取block,无论本地还是远程。

    20、getMultiple():根据BlockManagerIds获取一系列的block。

    21、putIterator():在Iterator中加入block的信息。

    22、getDiskWriter():创建一个能够直接将数据写到磁盘的writer,block通过文件名来指定写入的文件,这个方法通常用来在shuffle之后写入shuffle的输出文件。

    23、putArray():在block manager中写入新的block,block的值是Array数组。

    24、putBytes():在block manager中写入新的block,block的值是序列化的字符流。这个方法和putArray都是使用doPut()来实际写入信息到block manager。

    25、doPut():将给定的block根据给定的存储等级(内存还是磁盘)写入相应的block存储区,必要时复制其中的数据。实际有效的存储级依赖于block最终所挂载的区域,因此调用者可以指定存储级,而不是依靠最初由用户指定的存储级来确定(用户可能存在错误或恶意行为)。doPut()需要考虑的问题包括:block信息的写入,block的值的处理,block向block manager和master的注册,block存储的位置(先内存,后实际考虑),报告block所在存储区的信息,block manager信息的更新等。

    26、replicate():复制block到另外的节点上。

    27、putSingle():在block manager中写入新的block,block的值是一个对象。调用的方法是putIterator。

    28、dropFromMemory():将内存中的block溢出到磁盘中。往往在内存达到限制时调用。

    29、removeRdd():将指定RDD的block全部移除,返回移除的block的数量。

    30、removeBroadcast():将指定的broadcast的block全部移除。这个方法和removeRdd都是循环移除自身的所有block,移除的方法为removeBlock()。

    31、removeBlock():移除内存和磁盘中的指定block。同时需要告知master并更新block信息reportBlockStatuses。

    32、dropOldNonBroadcastBlocks()和dropOldBroadcastBlocks():移除旧的没有的/旧的broadcast block。

    33、dropOldBlocks():移除没有用的block。

    34、shouldCompress():判断是否经过压缩,共有四种压缩包——shuffle,broadcast,rdds,shuffleSpill。

    35、wrapForCompression():有两种加载方式,根据参数决定是压缩输入流还是压缩输出流。

    36、dataSerializeStram():序列化为流。

    37、dataSerialize():序列化为字符缓存。

    38、dateDeserialize():反序列化字符缓存为迭代的参数值,并且在末尾对该值进行处理。在对block进行处理的时候,如果block是shuffle的block,也就是会传递给reduce进行处理的block时,需要进行压缩(lazy的)。

    39、stop():清除各种类的实例化对象。

    三、BlockManager对象

    针对blockId进行了注册、处理等操作。

  • 相关阅读:
    mybatis 错误 Invalid bound statement (not found)
    Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.
    bug 记录 Unable to start ServletWebServerApplicationContext due to multiple ServletWebServerFactory beans
    解决:The Tomcat connector configured to listen on port 8182 failed to start. The port may already be in use or the connector may be misconfigured.
    jquery validate 验证插件 解决多个相同的Name 只验证第一个的方案
    phpStorm+xdebug调试(php7.3)
    小程序视频多个视频播放与暂停
    CSS实现单行、多行文本溢出显示省略号(…)
    Packet for query is too large (4,544,730 > 4,194,304). You can change this value on the server by setting the 'max_allowed_packet' variable.
    idea自动在文件头中添加作者和创建时间
  • 原文地址:https://www.cnblogs.com/zx247549135/p/4316654.html
Copyright © 2011-2022 走看看