zoukankan      html  css  js  c++  java
  • 【原】Spark中Master源码分析(一)

    Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用。下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧。

    1.家当(静态属性)

    1.设置一个守护单线程的消息发送器,
    private val forwardMessageThread =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
    2.根据sparkConf得到hadoopConf
    private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
    3.一个bool类型的标识,如果设置为true,那么app的执行将会尽量分步到尽可能多的worker上,否则app的执行将会先用完一个worker的资源,然后再使用下一个worker的资源
    private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
    4.设置执行app默认的最大核数为Int类型的最大值
    private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
    5.还有一些关于worker、driver、app等的字段信息,都比较简单,限于篇幅限制就不一一列出了

    2.技能(方法)

    由于Master上本质上是一个RpcEndpoint,所以我们按照它的生命周期进行介绍。如果不明白,请看文章

    Spark Rpc通信源码分析 http://www.cnblogs.com/yourarebest/p/5297157.html

    1.构造函数就是Master默认的主构造器
    2.onStart方法,主要功能是启动Jetty的WebUI服务,Rest服务、选出持久化引擎及持久化代理

    override def onStart(): Unit = {
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    webUi = new MasterWebUI(this, webUiPort)
    //启动JettyServer并绑定webUI端口号
    webUi.bind()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    //forwardMessageThread线程每1min中检查Worker是否宕了
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    self.send(CheckForWorkerTimeOut)
    }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    //启动Rest服务,默认端口6066
    if (restServerEnabled) {
    val port = conf.getInt("spark.master.rest.port", 6066)
    restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
    }
    //返回绑定的端口号
    restServerBoundPort = restServer.map(.start())
    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()
    //当metrics系统启动后,将master和app的metrics servlet的hadnler给webui
    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    //序列化Spark的配置文件
    val serializer = new JavaSerializer(conf)
    //支持三种持久化引擎,将Spark的配置参数持久化,便于以后恢复使用
    val (persistenceEngine
    , leaderElectionAgent_) = RECOVERY_MODE match {
    case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
    new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
    case "FILESYSTEM" =>
    val fsFactory =
    new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
    case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
    .newInstance(conf, serializer)
    .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
    case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
    }

    3.onStop方法,停止master的metrics系统、停止app的metrics系统、取消异步执行的任务、停止WebUi服务、停止rest服务以及持久化引擎和选举代理的停止。

    override def onStop() {
    masterMetricsSystem.report()
    applicationMetricsSystem.report()
    //避免异步发出的CompleteRecovery消息导致master的重启
    if (recoveryCompletionTask != null) {
    recoveryCompletionTask.cancel(true)
    }
    if (checkForWorkerTimeOutTask != null) {
    checkForWorkerTimeOutTask.cancel(true)
    }
    forwardMessageThread.shutdownNow()
    webUi.stop()
    restServer.foreach(_.stop())
    masterMetricsSystem.stop()
    applicationMetricsSystem.stop()
    persistenceEngine.close()
    leaderElectionAgent.stop()
    }

    还有一个重要的方法receive方法,留到下一篇吧。

  • 相关阅读:
    Angular 一个简单的指令实现 阻止事件扩散
    怎样group by一列 select多列
    Angular Viewchild undefined
    TypeScript扩展类方法
    vmware station-ubuntu18.04 共享剪贴板
    基于R统计软件的三次样条和平滑样条模型数据拟合及预测
    R语言析因设计分析:线性模型中的对比
    R语言逻辑回归、方差分析 、伪R平方分析
    R语言多重比较方法
    R语言逐步多元回归模型分析长鼻鱼密度影响因素
  • 原文地址:https://www.cnblogs.com/yourarebest/p/5312965.html
Copyright © 2011-2022 走看看