zoukankan      html  css  js  c++  java
  • Spark分析之Standalone运行过程分析

    一、集群启动过程--启动Master

    $SPARK_HOME/sbin/start-master.sh

    start-master.sh脚本关键内容:

    spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

    日志信息:$SPARK_HOME/logs/

    14/07/22 13:41:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@hadoop000:7077]
    14/07/22 13:41:33 INFO master.Master: Starting Spark master at spark://hadoop000:7077
    14/07/22 13:41:33 INFO server.Server: jetty-8.y.z-SNAPSHOT
    14/07/22 13:41:33 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8080
    14/07/22 13:41:33 INFO ui.MasterWebUI: Started MasterWebUI at http://hadoop000:8080
    14/07/22 13:41:33 INFO master.Master: I have been elected leader! New state: ALIVE

    二、集群启动过程--启动Worker

    $SPARK_HOME/sbin/start-slaves.sh

    start-slaves.sh脚本关键内容:

    spark-daemon.sh start org.apache.spark.deploy.worker.Worker master-spark-URL

    Worker运行时,需要注册到指定的master url,这里就是spark://hadoop000:7077

    Worker启动之后主要做了两件事情:
      1)将自己注册到Master(RegisterWorker);
      2)定期发送心跳信息给Master;

    Worker向Master发送注册信息:

    Worker.scala
        ==>preStart
          ==>registerWithMaster
            ==>tryRegisterAllMasters
              ==> actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

    Master侧收到RegisterWorker通知:

    Master.scala
      ==>case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => {
          val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
           sender, workerUiPort, publicAddress)
           if (registerWorker(worker)) {
             persistenceEngine.addWorker(worker)
            sender ! RegisteredWorker(masterUrl, masterWebUiUrl)   //注册成功后向Worker发送注册成功信息
            schedule()
          }
        }

    Worker在收到Master发来的注册成功信息后,定期向Master发送心跳信息

    Worker.scala
      ==>case SendHeartbeat =>
        masterLock.synchronized {if (connected) { master ! Heartbeat(workerId) }
      }

    Master在接收到Worker发送来的心跳信息后更新最后一次心跳时间

    Master.scala
      ==>case Heartbeat(workerId) => {
          idToWorker.get(workerId) match {
                case Some(workerInfo) =>
              workerInfo.lastHeartbeat = System.currentTimeMillis()
          }
      }

    Master定期移除超时未发送心跳信息给Master的Worker节点

    Master.scala
      ==>preStart
        ==>CheckForWorkerTimeOut
          ==>case CheckForWorkerTimeOut => {timeOutDeadWorkers()} //Check for, and remove, any timed-out workers

    日志信息:$SPARK_HOME/logs/

    Master部分日志信息:

    14/07/22 13:41:36 INFO master.Master: Registering worker hadoop000:48343 with 1 cores, 2.0 GB RAM

    Worker部分日志信息:

    14/07/22 13:41:35 INFO Worker: Starting Spark worker hadoop000:48343 with 1 cores, 2.0 GB RAM
    14/07/22 13:41:35 INFO Worker: Spark home: /home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0
    14/07/22 13:41:35 INFO WorkerWebUI: Started WorkerWebUI at http://hadoop000:8081
    14/07/22 13:41:35 INFO Worker: Connecting to master spark://hadoop000:7077...
    14/07/22 13:41:36 INFO Worker: Successfully registered with master spark://hadoop000:7077

    三、Application提交过程

    A、提交Application

    运行spark-shell: $SPARK_HOME/bin/spark-shell --master spark://hadoop000:7077

    日志信息:$SPARK_HOME/work

    spark-shell属于application,在启动SparkContext的createTaskScheduler创建SparkDeploySchedulerBackend的过程中创建

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

    会向Master发送RegisterApplication请求

    AppClient.scala
      ==>preStart
        ==>registerWithMaster
          ==>tryRegisterAllMasters
            ==>actor ! RegisterApplication(appDescription)

    B、 Master处理RegisterApplication的请求

    在Master侧其处理的分支是RegisterApplication;Master在收到RegisterApplication请求之后,Master进行调度:如果有worker已经注册上来,发送LaunchExecutor指令给相应worker

    Master.scala
            ==>case RegisterApplication(description) => {
                logInfo("Registering app " + description.name)
                val app = createApplication(description, sender)
                registerApplication(app)
                logInfo("Registered app " + description.name + " with ID " + app.id)
                persistenceEngine.addApplication(app)
                sender ! RegisteredApplication(app.id, masterUrl)
                schedule()
            }
    ==>schedule ==>launchExecutor(worker, exec) ==> worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

    C、启动Executor

    Worker在收到LaunchExecutor指令之后,会启动Executor进程

    Worker.scala
        ==>case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
            logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
            val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, ExecutorState.RUNNING)
            executors(appId + "/" + execId) = manager
            manager.start()
            coresUsed += cores_
            memoryUsed += memory_
            masterLock.synchronized {master ! ExecutorStateChanged(appId, execId, manager.state, None, None)}
        }

    D、注册Executor

    启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend

    SparkDeploySchedulerBackend.scala
        ==>preStart   (CoarseGrainedSchedulerBackend)
            ==> case RegisterExecutor(executorId, hostPort, cores) =>
                logInfo("Registered executor: " + sender + " with ID " + executorId)
                sender ! RegisteredExecutor(sparkProperties)
                executorActor(executorId) = sender
                executorHost(executorId) = Utils.parseHostPort(hostPort)._1
                totalCores(executorId) = cores
                freeCores(executorId) = cores
                executorAddress(executorId) = sender.path.address
                addressToExecutorId(sender.path.address) = executorId
                totalCoreCount.addAndGet(cores)
                makeOffers()
    
    CoarseGrainedExecutorBackend.scala
        case RegisteredExecutor(sparkProperties) =>
            ogInfo("Successfully registered with driver")
            executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,false)

    executor日志信息位置:控制台/$SPARK_HOME/logs

    E、运行Task

    示例代码:

    sc.textFile("hdfs://hadoop000:8020/hello.txt").flatMap(_.split('	')).map((_,1)).reduceByKey(_+_).collect

    SchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行

    CoarseGrainedSchedulerBackend.scala
        def makeOffers() {
            launchTasks(scheduler.resourceOffers(
                executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
            }   ==>executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
                ==>CoarseGrainedSchedulerBackend  case LaunchTask(data) =>
                      if (executor == null) {
                        logError("Received LaunchTask command but executor was null")
                        System.exit(1)
                      } else {
                        val ser = SparkEnv.get.closureSerializer.newInstance()
                        val taskDesc = ser.deserialize[TaskDescription](data.value)
                        logInfo("Got assigned task " + taskDesc.taskId)
                        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
                      }    

    Master部分日志信息:

    14/07/22 15:25:27 INFO master.Master: Registering app Spark shell
    14/07/22 15:25:27 INFO master.Master: Registered app Spark shell with ID app-20140722152527-0001
    14/07/22 15:25:27 INFO master.Master: Launching executor app-20140722152527-0001/0 on worker worker-20140722134135-hadoop000-48343

    Worker部分日志信息:

    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    14/07/22 15:25:27 INFO Worker: Asked to launch executor app-20140722152527-0001/0 for Spark shell
    Spark assembly has been built with Hive, including Datanucleus jars on classpath
    14/07/22 15:25:28 INFO ExecutorRunner: Launch command: "java" "-cp" "::/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/conf:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/spark-assembly-1.0.1-hadoop2.3.0-cdh5.0.0.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-core-3.2.2.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-api-jdo-3.2.1.jar" "-XX:MaxPermSize=128m" "-Xms1024M" "-Xmx1024M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@hadoop000:50515/user/CoarseGrainedScheduler" "0" "hadoop000" "1" "akka.tcp://sparkWorker@hadoop000:48343/user/Worker" "app-20140722152527-0001"

    控制台部分日志信息:

    14/07/22 15:25:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@hadoop000:45150/user/Executor#-791712793] with ID 0
    14/07/22 15:25:31 INFO CoarseGrainedExecutorBackend: Successfully registered with driver

    每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend,最终的Task就运行在ExecutorBackend中

  • 相关阅读:
    工作中遇到的java 内存溢出,问题排查
    java线上内存溢出问题排查步骤
    性能测试-java内存溢出问题排查
    164 01 Android 零基础入门 03 Java常用工具类01 Java异常 04 使用try…catch…finally实现异常处理 04 终止finally执行的方法
    163 01 Android 零基础入门 03 Java常用工具类01 Java异常 04 使用try…catch…finally实现异常处理 03 使用多重catch结构处理异常
    162 01 Android 零基础入门 03 Java常用工具类01 Java异常 04 使用try…catch…finally实现异常处理 02 使用try-catch结构处理异常
    161 01 Android 零基础入门 03 Java常用工具类01 Java异常 04 使用try…catch…finally实现异常处理 01 try-catch-finally简介
    160 01 Android 零基础入门 03 Java常用工具类01 Java异常 03 异常处理简介 01 异常处理分类
    159 01 Android 零基础入门 03 Java常用工具类01 Java异常 02 异常概述 02 异常分类
    158 01 Android 零基础入门 03 Java常用工具类01 Java异常 02 异常概述 01 什么是异常?
  • 原文地址:https://www.cnblogs.com/luogankun/p/3912956.html
Copyright © 2011-2022 走看看