主要组件:BlockManager、BlockManagerMaster、BlockManagerWorker、BlockManagerMasterActor、BlockManagerSlaveActor……
存储模块相关的组件,不论是Driver还是Worker,都是在SparkEnv 中实例化的
val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker) val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) val cacheManager = new CacheManager(blockManager)
注意到BlockManagermaster实例化参数中用到了registerOrLookup,具体判断是否为driver,如果是则创建Actor,否则获取driver的URL和Port,获取远程在driver上的Actor。即,只有driver(不论运行在哪个executor)运行着BlockManagerMaster,其他都只有actor的ref
def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } }
然后,实例化BlockManger,类定义中调用了initialize方法初始化,其定义如下。首先blockManagerMaster调用registerBlockManager,带上自己的slaveActor向driver注册BlockManager,其中第三个参数是slaveActor;然后通过BlockManagerWorker伴生对象调用startBlockManagerWorker,启动blockManagerWorker
initialize() /** * Initialize the BlockManager. Register to the BlockManagerMaster, and start the * BlockManagerWorker actor. */ private def initialize() { master.registerBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { Utils.tryOrExit { heartBeat() } } } }
首先看一下slaveActor,定义如下,actorSystem创建BlockManagerSlaveActor
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
回到registerBlockManager,调用tell发送RegisterBlockManager消息
/** Register the BlockManager's id with the driver. */ def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { logInfo("Trying to register BlockManager") tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) logInfo("Registered BlockManager") }
tell的作用是发送单程消息给master actor,即运行在driver上的blockManagerMasterActor
/** Send a one-way message to the master actor, to which we expect it to reply with true. */ private def tell(message: Any) { if (!askDriverWithReply[Boolean](message)) { throw new SparkException("BlockManagerMasterActor returned false, expected true.") } }
在BlockManagerMaster中定义receive接收,调用register注册
def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) sender ! true case UpdateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) case GetLocationsMultipleBlockIds(blockIds) => sender ! getLocationsMultipleBlockIds(blockIds) case GetPeers(blockManagerId, size) => sender ! getPeers(blockManagerId, size) case GetMemoryStatus => sender ! memoryStatus case GetStorageStatus => sender ! storageStatus case GetBlockStatus(blockId, askSlaves) => sender ! blockStatus(blockId, askSlaves) case GetMatchingBlockIds(filter, askSlaves) => sender ! getMatchingBlockIds(filter, askSlaves) case RemoveRdd(rddId) => sender ! removeRdd(rddId) case RemoveShuffle(shuffleId) => sender ! removeShuffle(shuffleId) case RemoveBroadcast(broadcastId, removeFromDriver) => sender ! removeBroadcast(broadcastId, removeFromDriver) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) sender ! true case RemoveExecutor(execId) => removeExecutor(execId) sender ! true case StopBlockManagerMaster => logInfo("Stopping BlockManagerMaster") sender ! true if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } context.stop(self) case ExpireDeadHosts => expireDeadHosts() case HeartBeat(blockManagerId) => sender ! heartBeat(blockManagerId) case other => logWarning("Got unknown message: " + other) }
register判断如果不存在此blockManager,则将blockManagerId和executorId对应起来,并向blockManagerInfo中添加新实例slaveActor
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => // A block manager of the same executor already exists. // This should never happen. Let's just quit. logError("Got two different block manager registrations on " + id.executorId) System.exit(1) case None => blockManagerIdByExecutor(id.executorId) = id } blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) } listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) }
回到initialize(),第二句BlockManagerWorker.startBlockManagerWorker(this),实例化BlockManagerWorker
def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) }
BlockManagerWorker类定义中,监听远程的block存取请求来进行相应处理
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
最后,存储模块各组件关系如下图所示
END