zoukankan      html  css  js  c++  java
  • Spark消息通信原理(三)——Spark运行时消息通信

    一、Spark的应用程序执行过程:
            在Spark中,每一个“作业”称为一个应用程序(Application),每一个Application都必须有一个SparkContext,相当于application的入口,或者理解为环境。当用户(Client)提交应用程序(Application)时,该application的SparkContext就会向Master发送应用注册消息,master会根据该应用所需要的资源来分配Executor进程,Executor进程分布在各个Worker机器上面,并由WorkerEndpoint利用Driver的command环境来创建;Executor创建完成后,向Driver注册为该application的执行器,注册成功后,就向SparkContext发送注册成功消息;当SparkContext的RDD执行action算子时,就会触发执行操作,Driver根据RDD生成DAG图(有向无环图),通过DAGSecheduler进行划分stage,并将stage转化为TaskSet(一个TaskSet由多个Task组成,即Task的集合);接着由TaskSecheduler向已经注册的Executor发送执行消息,Executor接收到任务消息后,启动并运行;最后当所有任务运行结束后,由Driver处理各个executor的计算结果,并回收资源。

    下面两图是Application的内部划分结构图:

            Spark的Application和SparkContext关联(正如上面所说的,Application通过SparkContext与各节点进行关联),每个Application中有一个或者多个job,可以并行或者串行运行job;job里面包含多个stage,stage以shuffle进行划分,stage包含多个task(一个partition就是一个task),多个task构成TaskSet,task则是任务的最小工作单元。

    二、Spark运行消息通信交互过程


            执行应用程序需要启动SparkContext,在SparkContext启动中,会先在DriverEndpoint中实例化SchedulerBackend对象(Standalone模式下,实例化的是SparkDeploySchedulerBackend对象),该对象继承DriverEndpoint和ClientEndpoint两个终端点。

    (1)ClientEndpoint向MasterEndpoint注册Application
            在Spark消息通信原理(二)提过,所有终端点都有tryRegisterAllMasters方法,用于向master注册某些消息。ClientEndpoint的tryRegisterAllMasters方法,则是用于向master注册Application的消息。

    ptivate def tryRegisterAllMasters(): Array[JFuture[_]] = {
        for(masterAddress <- masterRpcAddresses) yield {
            //想线程池中启动注册线程,只要当线程读到的注册成功标识为true时,退出注册线程
            registerMasterThreadPool.submit(new Runnable{
                override def run():Unit = try{
                    //判断注册成功标识
                    if(registered){
                        return
                    }
                    //获取Master终端点的引用,用来发送注册应用信息
                    val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
                    //向master发送注册应用信息
                    masterRef.send(RegisterApplication(appDescription, self))
                }catch{...}
            })
        }
    }

    (2)MasterEndpoint处理申请注册Application的消息
            当Master接收到注册应用消息时,master在registerApplication方法中做了两件事:1、记录application信息,并加入到应用列表中(FIFO执行);2、注册完毕后,master发送RegisteredApplication消息给ClientEndpoint,同时调用startExecutorsOnWorkers方法,发送LaunchExecutor消息,通知Worker启动Executor;

    ClientEndpoint收到RegisteredApplication消息时会更新相关状态:

    case RegisteredApplication(appId_, masterRef) =>
        appId.set(appId_)
        registered.set(true)
        master = Some(masterRef)
        listener.connected(appId.get)

    startExecutorsOnWorkers方法中,首先获取符合执行应用的worker节点,然后遍历通知这些worker启动相应的executor(可能是一个或多个):

    private def startExecutorsOnWorkers() :Unit = {
        //使用FIFO调度算法,先注册,先执行
        for(app <- waitingApps if app.coresLeft > 0){
            val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor
     
            //找出存活的、剩余内存大于等于启动Executor所需大小的、核数大于等于1的worker
            val usableWorkers = workers.toArray.filter(_.state==WorkerState.ALIVE)
                .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1))
                .sortBy(_.coresFree).reverse
     
            //确定应用运行在哪些worker上,以及每个worker分配用于运行的核数
            //分配算法有两种:1、将应用运行在尽可能多的worker上;2、将应用运行在尽可能少的worker上
            val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
     
            //通知分配的worker启动worker
            for(pos <- 0 until usableWorkers.length if assignedCores(pos) > 0){
                allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
            }
        }
    }

    (3)Worker创建CoarseGrainedExecutorBackend对象,用于启动Executor进程
            当worker收到master发送的LaunchExecutor消息时,先实例化ExecutorRunner对象,实例化过程中会创建进程生成器(ProcessBuilder),然后由该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象就是Executor运行的容器,最后worker发送ExecutorStateChanged消息给Master。

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        if (masterUrl != activeMasterUrl){
            logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.")
        }else{
            try{
                //创建Executor执行目录
                val executorDir =  new File(workDir, appId + "/" + execId)
                if(!executorDir.mkdirs()){
                    throw new IOException("Failed to creata directory " + executorDir)
                }
     
                //通过SPARK_EXECUTOR_DIRS环境变量,在worker中创建Executor执行目录,当程序执行完毕后,由worker进行删除
                val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map{ dir =>
                        val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                        Utils.chmod700(appDir)
                        appDir.getAbsolutePath()
                    }.toSeq)
                appDirectories(appId) = appLocalDirs
     
                //在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,使用的是应用信息中的command,command则是在SchedulerBackend中创建的
                val manager = new ExecutorRunner(
                                appId,
                                execId,
                                appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command,conf)),
                                cores_,
                                memory_,
                                self,
                                workerId,
                                host,
                                webUi.boundPort,
                                publicAddress,
                                sparkHome,
                                executorDir,
                                workerUri,
                                conf,
                                appLocalDirs,
                                ExecutorState.RUNNING)
                executors(appId + "/" + execId) = manager
                manager.start()
                coresUsed += cores_
                memoryUsed += memory_
     
                //向master发送ExecutorStateChanged消息,表示Executor状态已经更改为Executor.RUNNING
                sendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None))
            }catch{...}
        }

    (4)DriverEndpoint处理RegisterExecutor消息
            在第(3)点有提到,CoarseGrainedExecutorBackend对象是Executor的容器,该对象是在ExecutorRunner实例化时被创建,以及启动,启动时,会向DriverEndpoint发送RegisterExecutor消息。Driver接收到注册消息后,先判断需要注册的Executor是否已经被注册在列表当中,如果存在,则返回RegisterExecutorFailed消息返回CoarseGrainedExecutorBackend;如果不存在,则Driver会记录该Executor信息,并发送RegisteredExecutor消息。最后Driver分配任务所需资源,并发送LaunchTask消息。

    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
        if(executorDataMap.contains(executorId)){
            //判断列表是否已经存在该executor
            executorRef.send(RegisterExecutorFailed("Duplicate executor ID:" + executorId))
            context.reply(true)
        }else{
            ...
            //1、记录该Executor的编号,以及需要的核数
            addressToExecutorId(executorRef.address) = executorId
            totalCoreCount.addAndGet(cores)
            totalRegisteredExecutors.addAndGet(1)
            val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
            //2、创建Executor编号和其具体信息的键值列表
            CoarseGrainedSchedulerBackend.this.synchronized{
                executorDataMap.put(executorId, data)
                if(currentExecutorIdCounter < executorId.toInt){
                    currentExecutorIdCounter = executorId.toInt    //记录、更新当前executor数量
                }
                if(numPendingExecutors > 0){
                    numPendingExecutors -= 1
                }
            }
            //3、向CoarseGrainedSchedulerBackend发送注册成功信息;
            executorRef.send(RegisteredExecutor(executorAddress.host))
            //4、并监听在总线中加入添加Executor事件
            listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
     
            //5、分配资源,并向Executor发送LaunchTask任务消息
            makeOffers()
        }

    (5)CoarseGrainedExecutorBackend实例化Executor对象
            当CoarseGrainedExecutorBackend收到来自Driver发过来的RegisteredExecutor消息时,就会实例化Executor对象。启动Executor完毕,就会定时向Driver发送心跳。由(3)(4)、(5)步骤来看,executor并不是由WorkerEndpoint直接创建,而是Worker先创建CoarseGrainedExecutorBackend对象,然后CoarseGrainedExecutorBackend对象向Driver注册Executor,注册成功后,才让CoarseGrainedExecutorBackend实例化Executor对象,最后Executor交给Driver管理。CoarseGrainedExecutorBackend处理RegisteredExecutor消息的源码:

    case RegisteredExecutor =>
        logInfo("Successfully registered with driver")
        //根据环境实例化(启动)Executor
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    executor发送心跳,等待Driver下发任务:

    private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
    private def startDriverHeartbeater() : Unit = {
        val intevalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
     
        //等待随机的时间间隔,这样,心跳在同步中不会结束
        val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
        val heartbeatTask = new Runnable(){
            override def run():Unit = Utils.logUncaughtExceptions(reportHeartBeat())
        }
     
        //发送心跳
        heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
    }

    (6)DriverEndpoint向Executor发送LaunchTask消息
            executor接收到LaunchTask消息之后,就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。任务执行完成后,分别给CoarseGrainedExecutorBackend和Driver发送状态变更,然后继续等待任务分配(Driver继续分配任务前,会先对执行结果进行处理)。

    case LaunchTask(data) =>
        if(executor == null){
            //当executor没有实例化(启动),输出异常日志,并关闭Executor
            logError("Received LaunchTask command but executor was null")
            System.exit(1)
        }else{
            val taskDesc = ser.desrialize[TaskDescription](data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
     
            //启动TaskRunner进程
            executor.launchTask(this, taskId=taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
        }
     
     
    //启动TaskRunner进程的方法
    def launchTask(context:ExecutorBackend, taskId:Long, attemptNumber:Int, taskName:String, serializedTask:ByteBuffer) : Unit = {
        //创建当前task的TaskRunner
        val tr = new TaskRunner(context, taskId=taskId, attemptNumber=attemptNumber, taskName, serializedTask)
        //将当前task的TaskRunner放进threadPool里面,统一由Executor调度
        runningTasks.put(taskId, tr)
        threadPool.execute(tr)
    }

    (7)Driver进行StatusUpdate
            当DriverEndpoint接收到Executor发送过来的StatusUpdate消息后,调用TaskSchedulerImpl的statusUpdate方法,根据不同executor执行后的结果进行处理,处理完毕后,继续给Executor发送LaunchTask消息。

    case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)    //scheduler是TaskSchedulerImpl的一个引用
        if(TaskState.isFinished(state)){
            executorDataMap.get(executorId) match{
                case Some(executorInfo) =>
                    executorInfo.freeCores += scheduler.CPUS_PER_TASK
                    //继续向刚才的Executor发送LaunchTask消息,跟(4)中,Driver处理RegisterExecutor消息时调用的是同一个方法
                    makeOffers(executorId)    
                case None =>
            }
        }

            至此,不断重复(6)、(7)操作,直至所有任务执行完毕。

  • 相关阅读:
    Datax streamreader json测试样例
    dbeaver 连接 elasticsearch 记录
    灾害链开发记录资料汇总
    mxgraph
    drawio www.diagrams.net 画图应用程序开发过程资料汇总
    neo4j学习记录
    GraphVis 图可视化分析组件
    D3学习记录
    Kubernetes K8S之固定节点nodeName和nodeSelector调度详解
    记一次性能优化,单台4核8G机器支撑5万QPS
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11345395.html
Copyright © 2011-2022 走看看