zoukankan      html  css  js  c++  java
  • Spark分析之Standalone运行过程分析

    一、集群启动过程--启动Master

    $SPARK_HOME/sbin/start-master.sh

    start-master.sh脚本关键内容:

    spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

    日志信息:$SPARK_HOME/logs/

    14/07/22 13:41:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@hadoop000:7077]
    14/07/22 13:41:33 INFO master.Master: Starting Spark master at spark://hadoop000:7077
    14/07/22 13:41:33 INFO server.Server: jetty-8.y.z-SNAPSHOT
    14/07/22 13:41:33 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8080
    14/07/22 13:41:33 INFO ui.MasterWebUI: Started MasterWebUI at http://hadoop000:8080
    14/07/22 13:41:33 INFO master.Master: I have been elected leader! New state: ALIVE

    二、集群启动过程--启动Worker

    $SPARK_HOME/sbin/start-slaves.sh

    start-slaves.sh脚本关键内容:

    spark-daemon.sh start org.apache.spark.deploy.worker.Worker master-spark-URL

    Worker运行时,需要注册到指定的master url,这里就是spark://hadoop000:7077

    Worker启动之后主要做了两件事情:
      1)将自己注册到Master(RegisterWorker);
      2)定期发送心跳信息给Master;

    Worker向Master发送注册信息:

    Worker.scala
        ==>preStart
          ==>registerWithMaster
            ==>tryRegisterAllMasters
              ==> actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

    Master侧收到RegisterWorker通知:

    Master.scala
      ==>case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => {
          val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
           sender, workerUiPort, publicAddress)
           if (registerWorker(worker)) {
             persistenceEngine.addWorker(worker)
            sender ! RegisteredWorker(masterUrl, masterWebUiUrl)   //注册成功后向Worker发送注册成功信息
            schedule()
          }
        }

    Worker在收到Master发来的注册成功信息后,定期向Master发送心跳信息

    Worker.scala
      ==>case SendHeartbeat =>
        masterLock.synchronized {if (connected) { master ! Heartbeat(workerId) }
      }

    Master在接收到Worker发送来的心跳信息后更新最后一次心跳时间

    Master.scala
      ==>case Heartbeat(workerId) => {
          idToWorker.get(workerId) match {
                case Some(workerInfo) =>
              workerInfo.lastHeartbeat = System.currentTimeMillis()
          }
      }

    Master定期移除超时未发送心跳信息给Master的Worker节点

    Master.scala
      ==>preStart
        ==>CheckForWorkerTimeOut
          ==>case CheckForWorkerTimeOut => {timeOutDeadWorkers()} //Check for, and remove, any timed-out workers

    日志信息:$SPARK_HOME/logs/

    Master部分日志信息:

    14/07/22 13:41:36 INFO master.Master: Registering worker hadoop000:48343 with 1 cores, 2.0 GB RAM

    Worker部分日志信息:

    14/07/22 13:41:35 INFO Worker: Starting Spark worker hadoop000:48343 with 1 cores, 2.0 GB RAM
    14/07/22 13:41:35 INFO Worker: Spark home: /home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0
    14/07/22 13:41:35 INFO WorkerWebUI: Started WorkerWebUI at http://hadoop000:8081
    14/07/22 13:41:35 INFO Worker: Connecting to master spark://hadoop000:7077...
    14/07/22 13:41:36 INFO Worker: Successfully registered with master spark://hadoop000:7077

    三、Application提交过程

    A、提交Application

    运行spark-shell: $SPARK_HOME/bin/spark-shell --master spark://hadoop000:7077

    日志信息:$SPARK_HOME/work

    spark-shell属于application,在启动SparkContext的createTaskScheduler创建SparkDeploySchedulerBackend的过程中创建

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

    会向Master发送RegisterApplication请求

    AppClient.scala
      ==>preStart
        ==>registerWithMaster
          ==>tryRegisterAllMasters
            ==>actor ! RegisterApplication(appDescription)

    B、 Master处理RegisterApplication的请求

    在Master侧其处理的分支是RegisterApplication;Master在收到RegisterApplication请求之后,Master进行调度:如果有worker已经注册上来,发送LaunchExecutor指令给相应worker

    Master.scala
            ==>case RegisterApplication(description) => {
                logInfo("Registering app " + description.name)
                val app = createApplication(description, sender)
                registerApplication(app)
                logInfo("Registered app " + description.name + " with ID " + app.id)
                persistenceEngine.addApplication(app)
                sender ! RegisteredApplication(app.id, masterUrl)
                schedule()
            }
    ==>schedule ==>launchExecutor(worker, exec) ==> worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

    C、启动Executor

    Worker在收到LaunchExecutor指令之后,会启动Executor进程

    Worker.scala
        ==>case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
            logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
            val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
            executors(appId + "/" + execId) = manager
            manager.start()
            coresUsed += cores_
            memoryUsed += memory_
            masterLock.synchronized {master ! ExecutorStateChanged(appId, execId, manager.state, None, None)}
        }

    D、注册Executor

    启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend

    SparkDeploySchedulerBackend.scala
        ==>preStart   (CoarseGrainedSchedulerBackend)
            ==> case RegisterExecutor(executorId, hostPort, cores) =>
                logInfo("Registered executor: " + sender + " with ID " + executorId)
                sender ! RegisteredExecutor(sparkProperties)
                executorActor(executorId) = sender
                executorHost(executorId) = Utils.parseHostPort(hostPort)._1
                totalCores(executorId) = cores
                freeCores(executorId) = cores
                executorAddress(executorId) = sender.path.address
                addressToExecutorId(sender.path.address) = executorId
                totalCoreCount.addAndGet(cores)
                makeOffers()
    
    CoarseGrainedExecutorBackend.scala
        case RegisteredExecutor(sparkProperties) =>
            ogInfo("Successfully registered with driver")
            executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,false)

    executor日志信息位置:控制台/$SPARK_HOME/logs

    E、运行Task

    示例代码:

    sc.textFile("hdfs://hadoop000:8020/hello.txt").flatMap(_.split('	')).map((_,1)).reduceByKey(_+_).collect

    SchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行

    CoarseGrainedSchedulerBackend.scala
        def makeOffers() {
            launchTasks(scheduler.resourceOffers(
                executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
            }   ==>executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
                ==>CoarseGrainedSchedulerBackend  case LaunchTask(data) =>
                      if (executor == null) {
                        logError("Received LaunchTask command but executor was null")
                        System.exit(1)
                      } else {
                        val ser = SparkEnv.get.closureSerializer.newInstance()
                        val taskDesc = ser.deserialize[TaskDescription](data.value)
                        logInfo("Got assigned task " + taskDesc.taskId)
                        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
                      }    

    Master部分日志信息:

    14/07/22 15:25:27 INFO master.Master: Registering app Spark shell
    14/07/22 15:25:27 INFO master.Master: Registered app Spark shell with ID app-20140722152527-0001
    14/07/22 15:25:27 INFO master.Master: Launching executor app-20140722152527-0001/0 on worker worker-20140722134135-hadoop000-48343

    Worker部分日志信息:

    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    14/07/22 15:25:27 INFO Worker: Asked to launch executor app-20140722152527-0001/0 for Spark shell
    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    14/07/22 15:25:28 INFO ExecutorRunner: Launch command: "java" "-cp" "::/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/conf:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/spark-assembly-1.0.1-hadoop2.3.0-cdh5.0.0.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-core-3.2.2.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-api-jdo-3.2.1.jar" "-XX:MaxPermSize=128m" "-Xms1024M" "-Xmx1024M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@hadoop000:50515/user/CoarseGrainedScheduler" "0" "hadoop000" "1" "akka.tcp://sparkWorker@hadoop000:48343/user/Worker" "app-20140722152527-0001"

    控制台部分日志信息:

    14/07/22 15:25:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hadoop000:45150/user/Executor#-791712793] with ID 0
    14/07/22 15:25:31 INFO CoarseGrainedExecutorBackend: Successfully registered with driver

    每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend,最终的Task就运行在ExecutorBackend中

  • 相关阅读:
    MVC中生成PDF并在web中显示
    Excel中如何避免删除未筛选得数据
    double数据类型四舍五入的bug
    Oracle 计算两个日期间时间排除非工作日及非工作时间精确至分钟
    数据库日志文件收缩
    Repeater 数据绑定 分页
    文件迁移 得到文件夹下所有文件名称
    共享业务稳定性测试&技术创新组
    产品测试组和业务测试组
    初级Bug率,随时受不了
  • 原文地址:https://www.cnblogs.com/luogankun/p/3912956.html
Copyright © 2011-2022 走看看