zoukankan      html  css  js  c++  java
  • Spark消息通信原理(三)——Spark运行时消息通信

    一、Spark的应用程序执行过程:
            在Spark中,每一个“作业”称为一个应用程序(Application),每一个Application都必须有一个SparkContext,相当于application的入口,或者理解为环境。当用户(Client)提交应用程序(Application)时,该application的SparkContext就会向Master发送应用注册消息,master会根据该应用所需要的资源来分配Executor进程,Executor进程分布在各个Worker机器上面,并由WorkerEndpoint利用Driver的command环境来创建;Executor创建完成后,向Driver注册为该application的执行器,注册成功后,就向SparkContext发送注册成功消息;当SparkContext的RDD执行action算子时,就会触发执行操作,Driver根据RDD生成DAG图(有向无环图),通过DAGSecheduler进行划分stage,并将stage转化为TaskSet(一个TaskSet由多个Task组成,即Task的集合);接着由TaskSecheduler向已经注册的Executor发送执行消息,Executor接收到任务消息后,启动并运行;最后当所有任务运行结束后,由Driver处理各个executor的计算结果,并回收资源。

    下面两图是Application的内部划分结构图:

            Spark的Application和SparkContext关联(正如上面所说的,Application通过SparkContext与各节点进行关联),每个Application中有一个或者多个job,可以并行或者串行运行job;job里面包含多个stage,stage以shuffle进行划分,stage包含多个task(一个partition就是一个task),多个task构成TaskSet,task则是任务的最小工作单元。

    二、Spark运行消息通信交互过程


            执行应用程序需要启动SparkContext,在SparkContext启动中,会先在DriverEndpoint中实例化SchedulerBackend对象(Standalone模式下,实例化的是SparkDeploySchedulerBackend对象),该对象继承DriverEndpoint和ClientEndpoint两个终端点。

    (1)ClientEndpoint向MasterEndpoint注册Application
            在Spark消息通信原理(二)提过,所有终端点都有tryRegisterAllMasters方法,用于向master注册某些消息。ClientEndpoint的tryRegisterAllMasters方法,则是用于向master注册Application的消息。

    ptivate def tryRegisterAllMasters(): Array[JFuture[_]] = {
        for(masterAddress <- masterRpcAddresses) yield {
            //想线程池中启动注册线程,只要当线程读到的注册成功标识为true时,退出注册线程
            registerMasterThreadPool.submit(new Runnable{
                override def run():Unit = try{
                    //判断注册成功标识
                    if(registered){
                        return
                    }
                    //获取Master终端点的引用,用来发送注册应用信息
                    val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
                    //向master发送注册应用信息
                    masterRef.send(RegisterApplication(appDescription, self))
                }catch{...}
            })
        }
    }

    (2)MasterEndpoint处理申请注册Application的消息
            当Master接收到注册应用消息时,master在registerApplication方法中做了两件事:1、记录application信息,并加入到应用列表中(FIFO执行);2、注册完毕后,master发送RegisteredApplication消息给ClientEndpoint,同时调用startExecutorsOnWorkers方法,发送LaunchExecutor消息,通知Worker启动Executor;

    ClientEndpoint收到RegisteredApplication消息时会更新相关状态:

    case RegisteredApplication(appId_, masterRef) =>
        appId.set(appId_)
        registered.set(true)
        master = Some(masterRef)
        listener.connected(appId.get)

    startExecutorsOnWorkers方法中,首先获取符合执行应用的worker节点,然后遍历通知这些worker启动相应的executor(可能是一个或多个):

    private def startExecutorsOnWorkers() :Unit = {
        //使用FIFO调度算法,先注册,先执行
        for(app <- waitingApps if app.coresLeft > 0){
            val coresPerExecutor:Option[Int] = app.desc.coresPerExecutor
     
            //找出存活的、剩余内存大于等于启动Executor所需大小的、核数大于等于1的worker
            val usableWorkers = workers.toArray.filter(_.state==WorkerState.ALIVE)
                .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1))
                .sortBy(_.coresFree).reverse
     
            //确定应用运行在哪些worker上,以及每个worker分配用于运行的核数
            //分配算法有两种:1、将应用运行在尽可能多的worker上;2、将应用运行在尽可能少的worker上
            val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
     
            //通知分配的worker启动worker
            for(pos <- 0 until usableWorkers.length if assignedCores(pos) > 0){
                allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
            }
        }
    }

    (3)Worker创建CoarseGrainedExecutorBackend对象,用于启动Executor进程
            当worker收到master发送的LaunchExecutor消息时,先实例化ExecutorRunner对象,实例化过程中会创建进程生成器(ProcessBuilder),然后由该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象就是Executor运行的容器,最后worker发送ExecutorStateChanged消息给Master。

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        if (masterUrl != activeMasterUrl){
            logWarning("InvalidMaster (" + masterUrl + ") attempted to launch executor.")
        }else{
            try{
                //创建Executor执行目录
                val executorDir =  new File(workDir, appId + "/" + execId)
                if(!executorDir.mkdirs()){
                    throw new IOException("Failed to creata directory " + executorDir)
                }
     
                //通过SPARK_EXECUTOR_DIRS环境变量,在worker中创建Executor执行目录,当程序执行完毕后,由worker进行删除
                val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map{ dir =>
                        val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                        Utils.chmod700(appDir)
                        appDir.getAbsolutePath()
                    }.toSeq)
                appDirectories(appId) = appLocalDirs
     
                //在ExecutorRunner中创建CoarseGrainedExecutorBackend对象,使用的是应用信息中的command,command则是在SchedulerBackend中创建的
                val manager = new ExecutorRunner(
                                appId,
                                execId,
                                appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command,conf)),
                                cores_,
                                memory_,
                                self,
                                workerId,
                                host,
                                webUi.boundPort,
                                publicAddress,
                                sparkHome,
                                executorDir,
                                workerUri,
                                conf,
                                appLocalDirs,
                                ExecutorState.RUNNING)
                executors(appId + "/" + execId) = manager
                manager.start()
                coresUsed += cores_
                memoryUsed += memory_
     
                //向master发送ExecutorStateChanged消息,表示Executor状态已经更改为Executor.RUNNING
                sendToMaster(ExecutorStateChanged(appId,execId,manager.state,None,None))
            }catch{...}
        }

    (4)DriverEndpoint处理RegisterExecutor消息
            在第(3)点有提到,CoarseGrainedExecutorBackend对象是Executor的容器,该对象是在ExecutorRunner实例化时被创建,以及启动,启动时,会向DriverEndpoint发送RegisterExecutor消息。Driver接收到注册消息后,先判断需要注册的Executor是否已经被注册在列表当中,如果存在,则返回RegisterExecutorFailed消息返回CoarseGrainedExecutorBackend;如果不存在,则Driver会记录该Executor信息,并发送RegisteredExecutor消息。最后Driver分配任务所需资源,并发送LaunchTask消息。

    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
        if(executorDataMap.contains(executorId)){
            //判断列表是否已经存在该executor
            executorRef.send(RegisterExecutorFailed("Duplicate executor ID:" + executorId))
            context.reply(true)
        }else{
            ...
            //1、记录该Executor的编号,以及需要的核数
            addressToExecutorId(executorRef.address) = executorId
            totalCoreCount.addAndGet(cores)
            totalRegisteredExecutors.addAndGet(1)
            val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
            //2、创建Executor编号和其具体信息的键值列表
            CoarseGrainedSchedulerBackend.this.synchronized{
                executorDataMap.put(executorId, data)
                if(currentExecutorIdCounter < executorId.toInt){
                    currentExecutorIdCounter = executorId.toInt    //记录、更新当前executor数量
                }
                if(numPendingExecutors > 0){
                    numPendingExecutors -= 1
                }
            }
            //3、向CoarseGrainedSchedulerBackend发送注册成功信息;
            executorRef.send(RegisteredExecutor(executorAddress.host))
            //4、并监听在总线中加入添加Executor事件
            listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
     
            //5、分配资源,并向Executor发送LaunchTask任务消息
            makeOffers()
        }

    (5)CoarseGrainedExecutorBackend实例化Executor对象
            当CoarseGrainedExecutorBackend收到来自Driver发过来的RegisteredExecutor消息时,就会实例化Executor对象。启动Executor完毕,就会定时向Driver发送心跳。由(3)(4)、(5)步骤来看,executor并不是由WorkerEndpoint直接创建,而是Worker先创建CoarseGrainedExecutorBackend对象,然后CoarseGrainedExecutorBackend对象向Driver注册Executor,注册成功后,才让CoarseGrainedExecutorBackend实例化Executor对象,最后Executor交给Driver管理。CoarseGrainedExecutorBackend处理RegisteredExecutor消息的源码:

    case RegisteredExecutor =>
        logInfo("Successfully registered with driver")
        //根据环境实例化(启动)Executor
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    executor发送心跳,等待Driver下发任务:

    private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
    private def startDriverHeartbeater() : Unit = {
        val intevalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
     
        //等待随机的时间间隔,这样,心跳在同步中不会结束
        val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
        val heartbeatTask = new Runnable(){
            override def run():Unit = Utils.logUncaughtExceptions(reportHeartBeat())
        }
     
        //发送心跳
        heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
    }

    (6)DriverEndpoint向Executor发送LaunchTask消息
            executor接收到LaunchTask消息之后,就会执行任务。执行任务时,会创建TaskRunner进程,放到thredPool中,统一由Executor进行调度。任务执行完成后,分别给CoarseGrainedExecutorBackend和Driver发送状态变更,然后继续等待任务分配(Driver继续分配任务前,会先对执行结果进行处理)。

    case LaunchTask(data) =>
        if(executor == null){
            //当executor没有实例化(启动),输出异常日志,并关闭Executor
            logError("Received LaunchTask command but executor was null")
            System.exit(1)
        }else{
            val taskDesc = ser.desrialize[TaskDescription](data.value)
            logInfo("Got assigned task " + taskDesc.taskId)
     
            //启动TaskRunner进程
            executor.launchTask(this, taskId=taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
        }
     
     
    //启动TaskRunner进程的方法
    def launchTask(context:ExecutorBackend, taskId:Long, attemptNumber:Int, taskName:String, serializedTask:ByteBuffer) : Unit = {
        //创建当前task的TaskRunner
        val tr = new TaskRunner(context, taskId=taskId, attemptNumber=attemptNumber, taskName, serializedTask)
        //将当前task的TaskRunner放进threadPool里面,统一由Executor调度
        runningTasks.put(taskId, tr)
        threadPool.execute(tr)
    }

    (7)Driver进行StatusUpdate
            当DriverEndpoint接收到Executor发送过来的StatusUpdate消息后,调用TaskSchedulerImpl的statusUpdate方法,根据不同executor执行后的结果进行处理,处理完毕后,继续给Executor发送LaunchTask消息。

    case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)    //scheduler是TaskSchedulerImpl的一个引用
        if(TaskState.isFinished(state)){
            executorDataMap.get(executorId) match{
                case Some(executorInfo) =>
                    executorInfo.freeCores += scheduler.CPUS_PER_TASK
                    //继续向刚才的Executor发送LaunchTask消息,跟(4)中,Driver处理RegisterExecutor消息时调用的是同一个方法
                    makeOffers(executorId)    
                case None =>
            }
        }

            至此,不断重复(6)、(7)操作,直至所有任务执行完毕。

  • 相关阅读:
    D. Constructing the Array
    B. Navigation System
    B. Dreamoon Likes Sequences
    A. Linova and Kingdom
    G. Special Permutation
    B. Xenia and Colorful Gems
    Firetrucks Are Red
    java getInstance()的使用
    java 静态代理和动态代理
    java 类加载机制和反射机制
  • 原文地址:https://www.cnblogs.com/SysoCjs/p/11345395.html
Copyright © 2011-2022 走看看