zoukankan      html  css  js  c++  java
  • Spark-源码-Spark-StartAll Master Worler启动流程

    Spark start-all>>
    
    
    
    """Master启动流程"""
    
    
    Master类
    class Master(
        host: String,
        port: Int,
        webUiPort: Int,
        val securityMgr: SecurityManager,
        val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable
    
    
    
    
    
    
    
    Master端
    def main(){
    	val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
        actorSystem.awaitTermination()
    }
    
    Master端
    def startSystemAndActor(System, Int, Int, Option[Int]) = {
    	//调用AkkaUtils创建ActorSystem
    	val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
    	  securityManager = securityMgr)
    	//创建属于Master的actor, 在创建actor的同时, 会使用classOf[Master]初始化Master
    	val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
    }
    
    Master端
    """初始化Master时由于Master继承了 trait Actor 重写了preStart方法, 
    Actor的初始化会启动preStart方法 因此找到Master的 override def preStart()
    preStart属于生命周期方法, 在构造器之后, receiver之前"""
    override def preStart() {
    	// 启动一个定时器, 定时检查超时的Worker, WORKER_TIMEOUT:每六十秒检查一次, 
    	// self:先对着自己来一下(检查)试试
    	context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
     	// 调用 timeOutDeadWorkers() 方法,
     	override def receiveWithLogging = { 
    	 	case CheckForWorkerTimeOut => {
    	      timeOutDeadWorkers()
    	    }
    	}
    
    	// 用来检查并移除所有超时的workers
    	def timeOutDeadWorkers(){
    		// 事实上是移除了一个存有WorkInfo的HashSet[WrokInfo]中的对象
    		val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    		for (worker <- toRemove) {
    	      if (worker.state != WorkerState.DEAD) {
    	        removeWorker(worker)
    	      }
    	    }
    	}
    
    	def removeWorker(worker: WorkerInfo){
    		// 删除内存里的workInfo
    		idToWorker -= worker.id
    	    addressToWorker -= worker.endpoint.address
    	}
    }
        
    """之后执行receive方法(1.3版本), 在后来的1.6版本中叫 def receive: PartialFunction[Any, Unit]"""
    Master端
    override def receiveWithLogging () {}
    会不断的接收actor发送过来的请求
    
    
    
    
    """Worker启动流程"""
    
    Worker类
    class Worker(
        host: String,
        port: Int,
        webUiPort: Int,
        cores: Int,
        memory: Int,
        masterAkkaUrls: Array[String],
        actorSystemName: String,
        actorName: String,
        workDirPath: String = null,
        val conf: SparkConf,
        val securityMgr: SecurityManager)
      extends Actor
    
    def preStart() => {
      registerWithMaster()
    }
    
    // 向Master注册的方法
    def registerWithMaster() {
      	// 向所有的Master注册Worker
      	tryRegisterAllMasters()
      	
      	// 其中内容
      	def tryRegisterAllMasters()=>{
    	  	// 通过Master的Url获取Master的actor
    		val actor = context.actorSelection(masterAkkaUrl)
    		// 向Master发送注册信息
    	    actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    	}	
     }
    
    Master端
    // 接收Worker发送的注册信息
    override def receiveWithLogging = {
    	case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>{
    		// 判断是否是StandBy状态, doNothing
    		idToWorker.contains(id), 已经注册过, doNothing
    		
    		正常情况下(Active状态, 且没有注册过):{
    			// 把发送来的 WorkerInfo 添加到 Master的 WorkerInfo中
    			val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress)
    		}
    		// 如果将Worker Info存入内存成功, 则调用持久化引擎, 将信息存入磁盘中, 
    		// 目的是防止数据丢失. 如果Master宕机, 内存中会丢失数据, 
    		// 切换状态(Standby和Active)后, 需要切换的节点拿不到WorkerInfo, Worker会再次注册, 非常消耗资源, 存在磁盘则可以直接去磁盘拿取数据不需要重新注册
    		if (registerWorker(worker)) {
    	      persistenceEngine.addWorker(worker)
    	      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
    	      schedule()
    	    }
    
    		// 向worker响应注册成功信息
    		sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
    		// 开始调度资源, 调度资源不仅仅是集群启动的时候调动资源, 运行Job的时候也会调度资源, 其有两种方式 一种是尽量分散, 一种是尽量集中
    		schedule()
    	}
    }
    
    Worker端
    // 接收注册成功的信息, 其实是将 Active Master 的Url和rWebUiUrl传回并更新, 之后向他发送心跳~
    def receiveWithLogging() = {
    	case RegisteredWorker(masterUrl, masterWebUiUrl) =>{
    		//更新MasterUrl
    		changeMaster(masterUrl, masterWebUiUrl)
    		//向Master发送心跳信息, HEARTBEAT_MILLIS =15秒, 每十五秒发送一次心跳信息, 发送逻辑为 SendHeartbeat
      		context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
    	}
    
      	//向Master发送心跳信息, 实际上是将自己的WorkerId发送给Master
      	case SendHeartbeat =>
        	if (connected) { master ! Heartbeat(workerId) }
    }
    
    Master端 
    def receiveWithLogging() = {
    	case Heartbeat(workerId) => {
    		//正常情况下, 更新上次心跳时间
    		workerInfo.lastHeartbeat = System.currentTimeMillis()
    		//启动完成
    	}
    }
    

      

  • 相关阅读:
    学习redis-安装和基本一些命令
    Eclipse启动报错Java was started but returned exit code=13
    踩过的坑系列之InputStream.read(byte[])方法
    <<深入Java虚拟机>>-虚拟机类加载机制-学习笔记
    <<深入Java虚拟机>>-第三章-垃圾收集器与内存分配策略-学习笔记
    <<深入Java虚拟机>>-第二章-Java内存区域-学习笔记
    创建线程的两种方式比较Thread VS Runnable
    Java设计模式之--代理模式学习
    shell脚本中$参数的介绍
    (转)使用DataTime这个类来获取当前的时间
  • 原文地址:https://www.cnblogs.com/chinashenkai/p/9977672.html
Copyright © 2011-2022 走看看