zoukankan      html  css  js  c++  java
  • Spark消息通信原理(二)——Spark启动消息通信

            Spark启动过程中,主要是进行Master和Worker之间的通信。

     

            首先,由Worker节点向Master发送注册信息,然后,Master处理完毕,返回注册成功或者失败消息,如果注册成功,Worker会定时发送心跳给Master。

    具体过程如下:

            当master节点启动后,随之启动各worker节点,worker启动时会创建RpcEnv,以及EndPoint终端,并向master发送注册worker的消息RegisterWorker。(上一篇文章也提到过,先启动master,再启动worker,因为worker的数量根据实际任务来决定)

    (1)worker节点向master节点请求注册
            在worker中有一个tryRegisterAllMasters方法,里面会创建一个注册线程池(因为一个worker可能需要注册到多个master中,如HA环境),将注册请求放进线程池中,通过线程池启动注册线程(即并发的进行注册请求)。tryRegisterAllMasters部分源码:

    private def tryRegisterAllMasters():Array[JFuture[_]] = {
        masterRpcAddress.map{ masterAddress =>
            registerMasterThreadPool.submit(new Runnable{
                override def run(): Unit = {
                    try{
                        logInfo("Connecting to master " + masterAddress + "......")
                        //获取master终端引用
                        val masterEndPoint = rpcEnv.setupEndPointRef(masterAddress, Master.ENDPOINT_NAME)
                        //注册信息
                        registerWithMaster(masterEndPoint)
                    }catch{......}
     
    ......
     
    }

            上述源码里面,可以看到是遍历masterRpcAddress,一个worker向每个master都发送一条注册信息请求,但这个请求并不是马上执行,而是放在线程池中。registerWIthMaster方法则是对注册请求的处理,registerWIthMaster方法部分源码:

    private def registerWithMaster(masterEndPoint:RpcEndpointRef):Unit = {
        //根据Master终端引用,发送注册信息
        masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId,host,port,self,cores,memory,workerWebUiUrl))
        .onComplete{
            //RegisterWorker处理之后会有两种结果,一个是Success,另一个是Failure。根据结果,进行不同的后续处理。
            case Success(msg) => Utils.tryLogNonFatalError{handleRegisterResponse(msg)}
            case Failure(e) => logError(s"Cannot register with master: ${masterEndpoint.address}", e)
            System.exit(1)    //退出
        }(ThreadUtils.sameThread)
    }

    (2)Master节点处理处理来自Worker节点的注册请求
            master收到消息后,需要对worker发送的信息进行验证、记录。如果注册成功,则发送RegisterWorker消息给对应的worker,worker接收到成功信息,则定期发送心跳信息给master;如果注册失败,则会发送RegisterWorkerFailed消息,Worker打印出错日志并结束Worker启动。上面的RegisterWorker源码如下:

    case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>{
        if(state == RecoveryState.STANDBY){
            //如果master处于standby状态,返回“Master处于StandBy状态”信息
            context.reply(MasterInStandby)
        }else if(idWorker.coontains(id)){
            //如果列表中已经存在该worker信息,则返回RegisterWorkerFailed信息
            context.reply(RegisterWorkerFailed("Duplicate worker ID"))
        }else{
            val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl)
            //registerWorker(worker)用于将worker放到列表中
            if(registerWorker(worker)){
                //放入列表成功
                persistenceEngine.addWorker(worker)
                context.reply(RegisteredWorker(self, masterWebUiUrl))    //心跳发送
                schedule()
            }else{
                //放入列表失败
                val workerAddress = worker.endpoint.address
                context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "+ workerAddress))
            }
        }
    }

    (3)Worker节点注册成功后的心跳通信
            当worker节点接收到注册成功信息时,会进行两个操作:1、记录日志并更新master信息;2、向master发送worker的各个executor的最新状态信息,以及定时发送心跳信息。心跳时间可以通过spark.worker.timeout设置,这个是worker“罢工”时间间隔,即判断worker存活的时间,而心跳时间则是该值的1/4。

    private val HEARTBEAT_MILLS = conf.getLong("spark.worker.timeout",60) * 1000 / 4

    心跳发送RegisteredWorker(self, masterWebUiUrl)的部分源码如下:

    case RegisteredWorker(masterRef, masterWebUiUrl) =>{
        loginInfo("Successfully registered with master" + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)     //更新worker所持有的master信息
        //定时发送的心跳
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable{
                override def run():Unit = Utils.tryLogNonFataError{
                    self.send(SendHearbeat)
                }
            }, 0, HEARTBEAT_MILLS, TimeUnit.MILLISECONDS)
        //如果设置清理以前应用使用的文件夹,就进入该逻辑
        if(CLEANUP_ENABLED){
            ...
        }
     
        //获取worker中各个Executor的最新状态
        val execs = executors.values.map{
            e=>new ExecutorDescription(e.appId, e.execId, e.cores, e.states=)
        }
        //向master汇报worker中各个Executor的最新状态
        masterRef.send(WorkerLatesState(workerId, execs.toList, drivers.keys.toSeq))
    }

    总结:Spark的Worker启动消息通信过程

            worker并发启动注册请求线程——>对应的master接收到请求——>master处理请求:若注册失败,返回RegisterMasterFailed提示信息;若注册成功,发送RegisteredMaster提示信息——>worker接收返回信息:若是RegisterMasterFailed信息,则结束线程;若是RegisteredMaster信息,则向master发送executors状态,以及定时发送心跳信息。

  • 相关阅读:
    Fiddler: Creation of interception certificate failed.
    ip地址检查正则表达式 兼容ipv4,ipv6
    母版页与子页的启动过程
    erlang 读取confg文件异常 could not start kernel pid error in config file
    转义字符 显示形式 转换成 实际形式 \\n to \n
    How to use epoll? A complete example in C
    Lex & Flex 词法分析器实践(未完,持续更新)
    我理解的爱情———柳智宇 (转载)
    Learning by doing 系列文章概述
    锁与RCU数据共享机制
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11345327.html
Copyright © 2011-2022 走看看