zoukankan      html  css  js  c++  java
  • Spark分析之Master、Worker以及Application三者之间如何建立连接

    Master.preStart(){
    
      webUi.bind()
      context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除
    
      case CheckForWorkerTimeOut => {
        timeOutDeadWorkers()
      }
    
      /** Check for, and remove, any timed-out workers */  
      def timeOutDeadWorkers() {
        ...
        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
          workers -= worker 
        }
      }
    
    }
    Worker.preStart(){
    
      override def preStart() {
        webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
        webUi.bind()
        registerWithMaster()  //注册该Worker到Master
      }
    
      def tryRegisterAllMasters() {
        for (masterUrl <- masterUrls) {
          logInfo("Connecting to master " + masterUrl + "...")
          val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
          actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
        }
      }
    
    }
    Master.scala
    
    case RegisterWorker(){  
    
      persistenceEngine.addWorker(worker)
      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)  //向Worker发送Worker注册成功事件
    
      schedule()  //调度部分后续章节分析  
    
    }
    Worker.scala
    
    case RegisteredWorker(){
    
      registered = true
      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)  //Worker注册成功后,定时向Master发送心跳信息
    
    }
    
    case SendHeartbeat =>
      masterLock.synchronized {
      if (connected) { master ! Heartbeat(workerId) }
    }
    Master.scala
    
    case Heartbeat(workerId) => {
      idToWorker.get(workerId) match {
      case Some(workerInfo) =>
        workerInfo.lastHeartbeat = System.currentTimeMillis()  //更新该worker的上次发送心跳信息的时间
      case None =>
        logWarning("Got heartbeat from unregistered worker " + workerId)
      }
    }

    =================如上步骤完成了Worker到Master的连接===============================================

    SparkContext启动时:

    SparkContext.createTaskScheduler()
    
      ==>new SparkDeploySchedulerBackend()
    
        ==>创建AppClient并启动
    
          ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)}  //向Master发起RegisterApplication事件
    Master.scala
    
    case RegisterApplication(description) {
    
      val app = createApplication(description, sender)
    
      registerApplication(app)
      persistenceEngine.addApplication(app)
      sender ! RegisteredApplication(app.id, masterUrl)  //向Worker发起RegisteredApplication事件表示该Application已经注册成功
      schedule()  //调度部分后续章节分析
    }

    =======================如上步骤完成了Application到Master的连接===============================================

    小结:

    1、Master的主要功能:

      1)Master Leader选举;

      2)Master对Worker、Application等的管理(接收worker的注册并管理所有的worker,接收client提交的application,(FIFO)调度等待的application并向worker提交);

    2、Worker的主要功能:

      1)通过RegisterWorker注册到Master;

      2)定时发送心跳给Master;

          3)根据master发送的application配置进程环境,并启动StandaloneExecutorBackend

    3、运行spark-shell:

      1)ClientActor通过RegisterApplication注册到Master;

      2)Master收到RegisterApplication后,通过scheduler方法进行调度,如有满足要求的Worker,则发送LaunchExecutor给相应的Worker;

  • 相关阅读:
    Linux 禁用笔记本触摸板
    Linux 下安装android
    关于JAVA多线程的那些事__初心者
    ADT下开发环境的配置--个人配置啦 Eclipse Color Themes
    关于权限系统的一些思考
    关于线程安全的单例模式的讨论
    说下Fedora下把SpiderMonkey放入Eclipse内编译的过程
    基于Eclipse构建Hadoop源码阅读环境
    Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
    CentOS6.5安装配置
  • 原文地址:https://www.cnblogs.com/luogankun/p/3826026.html
Copyright © 2011-2022 走看看