zoukankan      html  css  js  c++  java
  • 【原】 Spark中Worker源码分析(一)

    Worker作为对于Spark集群的健壮运行起着举足轻重的作用,作为Master的奴隶,每15s向Master告诉自己还活着,一旦主人(Master》有了任务(Application),立马交给属于它的奴隶们(Workers),那么奴隶们就会数数自己有多少家当(比如内存、核数),量力而行地交给主人完成的任务,如果奴隶不量力而行在执行任务过程中不幸死了的话,作为主人的Master只会等待60s,如果奴隶在这生死攸关的紧要关头不理睬主人,那么主人只能认为它死了,那么就会把它抛弃了。下面,我们一起了解一下Worker究竟有哪些不为人知的故事。

    1.家当(静态属性)


    我们只列出一些重要的属性:
    1.一个守护单线程的调度器用于在特殊的时间发送消息,执行的任务包括:向Master注册Worker信息、发送心跳信息、定期清理任务等。
      private val forwordMessageScheduler =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
    2.一个独立的线程用于清理工作空间,执行任务:定期清理执行过程中创建的本地文件。
      private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
        ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
    3.shuffle服务默认没有开启除非用户自己配置,之所以会开启外部的Shuffle服务,是为了避免Executor进程任务过重,导致不能为其他的Executor提供Shuffle数据,影响任务的执行。比如,如果使用YARN模式时,可以在yarn-site.xml文件中配置及其端口号,从而在NodeManger上开启Shuffle服务,减轻Executor的负担。
      private val shuffleService = new ExternalShuffleService(conf, securityMgr)
    4.一个masters的线程池。因为master注册Worker是一个阻塞操作,所以这个线程池必须能同时创建"masterRpcAddresses.size"大小的线程,这样我们就能将worker注册到所有的master上。
    private val registerMasterThreadPool = new ThreadPoolExecutor(
        0,
        masterRpcAddresses.size, // Make sure we can register with all masters at the same time
        60L, TimeUnit.SECONDS,
        new SynchronousQueue[Runnable](),
        ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))

    2.技能(方法)


    由于Worker本质上是一个RpcEndpoint,所以我们按照它的声明周期进行介绍。
    1.构造函数就是Worker默认的构造器
    2.onStart方法


    <code>
    //worker的启动
      override def onStart() {
        assert(!registered)
        logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
          host, port, cores, Utils.megabytesToString(memory)))
        logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
        logInfo("Spark home: " + sparkHome)
        createWorkDir()
        //如果用户已经配置外部的Shuffle,那么就启动该服务
        shuffleService.startIfEnabled()
        //该WebUI只仅限于Standalone模式下
        webUi = new WorkerWebUI(this, workDir, webUiPort)
        webUi.bind()
        //将worker注册到master上,详情如下(1)
        registerWithMaster()
        metricsSystem.registerSource(workerSource)
        metricsSystem.start()
        //metricsSystem启动后,将worker的metrics的servlet handler添加到web ui
        metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
      }
    </code>



    (1)将worker注册到master上的registerWithMaster()代码如下所示:



    <code>
    private def registerWithMaster() {
        //如果work与master可能多次失去连接,所以不要尝试太多次的注册
        registrationRetryTimer match {
          case None =>
            registered = false
            //将woker注册到所有的master上返回一个Future的数组,详情如下(2)
            registerMasterFutures = tryRegisterAllMasters()
            connectionAttemptCount = 0
            //一个单线程不定时向master发送注册信息
            registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
              new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  Option(self).foreach(_.send(ReregisterWithMaster))
                }
              },
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))
          case Some(_) =>
            logInfo("Not spawning another attempt to register with the master, since there is an" +
              " attempt scheduled already.")
        }
      }
    </code>



    (2)tryRegisterAllMasters代码如下:



    <code>
    //将worker注册到所有的master上面
      private def tryRegisterAllMasters(): Array[JFuture[_]] = {
        masterRpcAddresses.map { masterAddress =>
          registerMasterThreadPool.submit(new Runnable {
            override def run(): Unit = {
              try {
                logInfo("Connecting to master " + masterAddress + "...")
                //在Client的Rpc中根据master的systemname、address、endpointname返回一个master的远程引用
                val masterEndpoint =
                  rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
                //调用master的远程引用将worker注册到master上
                masterEndpoint.send(RegisterWorker(
                  workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            }
          })
        }
      }
    </code>



    3.onStop()方法,把关于Worker的一切都停止掉,比如线程、executors、drivers、shuffleService等



    <code>
    override def onStop() {
        cleanupThreadExecutor.shutdownNow()
        metricsSystem.report()
        cancelLastRegistrationRetry()
        forwordMessageScheduler.shutdownNow()
        registerMasterThreadPool.shutdownNow()
        executors.values.foreach(_.kill())
        drivers.values.foreach(_.kill())
        shuffleService.stop()
        webUi.stop()
        metricsSystem.stop()
      }
    </code>



    还有一个很重要的receive方法,都放到这儿可能有点拥挤,留到下一篇吧。

  • 相关阅读:
    Vue 封装axios(四种请求)及相关介绍
    简单的按钮样式,两个连在一起的按钮
    http接口访问正常,https访问报错,基础连接已经关闭: 发送时发生错误.
    单点登录思路,多台服务器共用一个数据库,登录信息解决方案
    Wait()在过滤器中卡住 ,在异步代码上阻塞时的常见死锁问题
    接口对接,接口通过原有代码无法访问,解决办法,用postman解决
    svg基础标签说明
    server 2016部署网络负载平衡(NLB)
    写 JS 逻辑判断,不要只知道用 if-else 和 switch
    .NetCore部署IIS出错原因未安装ASP.N.NetCore部署IIS出错原因未安装ASP.NET Core Module v2ET Core Module v2
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5300202.html
Copyright © 2011-2022 走看看