zoukankan      html  css  js  c++  java
  • Spark ZooKeeper数据恢复

    Spark使用ZooKeeper进行数据恢复的逻辑过程如下:

    1.初始化:创建<CuratorFramwork,LeaderLatch,LeaderLatchListener>用于选举

                创建CuratorFramework用于数据恢复。

    2.选举:启动LeaderLatch,Curator开始接管选举工作了。

    3.恢复:当某个Master被选举为Leader后,就会调用LeaderLatchListener的isLeader()方法,这个方法内部开始进行逻辑上的数据恢复工作,具体细节是这样的,向Master发送ElectedLeader消息,Master从ZooKeeperPersistenceEngine中读取数据到内存缓存中,ZooKeeperPersistenceEngine从ZooKeeper的/spark/master_status/目录下读取storedApps,storedDrivers,storedWorkers。

    下面来进行一下源码的走读,方便日后回忆。

    1.初始化:Master启动时创建ZooKeeperLeaderElectionAgent和 ZooKeeperPersistenceEngine,前者用于选举,后者用于数据恢复。

    Master初始化源码如下:

       

     case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            val zkFactory =
              new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
            (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
    private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serialization)
      extends StandaloneRecoveryModeFactory(conf, serializer) {
    
      def createPersistenceEngine(): PersistenceEngine = {
        new ZooKeeperPersistenceEngine(conf, serializer)
      }
    
      def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
        new ZooKeeperLeaderElectionAgent(master, conf)
      }
    }
    private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)
      extends PersistenceEngine
      with Logging {
    
      private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
      //创建zookeeper客户端
      private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
    
      //创建WORKING_DIR目录
      SparkCuratorUtil.mkdir(zk, WORKING_DIR)
    }

    创建ZooKeeperLeaderElectionAgent时会创建用于选举的CuratorFramwork,LeaderLatch,LeaderLatchListener。其中的LeaderLatch用于选举Leader,当某个LeaderLatch被选举为Leader之后,就会调用对应的LeaderLatchListener的isLeader(),如下:

    private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
        conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {
    
      val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
    
      private var zk: CuratorFramework = _
      private var leaderLatch: LeaderLatch = _
      private var status = LeadershipStatus.NOT_LEADER
    
      start()
    
      private def start() {
        logInfo("Starting ZooKeeper LeaderElection agent")
        zk = SparkCuratorUtil.newClient(conf)
        leaderLatch = new LeaderLatch(zk, WORKING_DIR)
        leaderLatch.addListener(this)
        leaderLatch.start()
      }

    2.选举,调用LeaderLatch的start开始进行选举

    3.数据恢复:如果某个master被成功选举为alive master,那么会调用isLeader()。这个方法内部会向Master发送ElectedLeader消息,然后Master会从ZookeeperPersistenceEngin中也就是ZooKeeper中读取storedApps,storedDrivers,storedWorkers并将他们恢复到内存缓存中去。

       

      override def isLeader() {
        synchronized {
          // could have lost leadership by now.
          if (!leaderLatch.hasLeadership) {
            return
          }
    
          logInfo("We have gained leadership")
          updateLeadershipStatus(true)
        }
      }
      private def updateLeadershipStatus(isLeader: Boolean) {
        if (isLeader && status == LeadershipStatus.NOT_LEADER) {
          status = LeadershipStatus.LEADER
          masterActor.electedLeader()
        } else if (!isLeader && status == LeadershipStatus.LEADER) {
          status = LeadershipStatus.NOT_LEADER
          masterActor.revokedLeadership()
        }
      }

    开始真正的数据恢复工作:

      case ElectedLeader => {
          val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
          state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
            RecoveryState.ALIVE
          } else {
            RecoveryState.RECOVERING
          }
          logInfo("I have been elected leader! New state: " + state)
          if (state == RecoveryState.RECOVERING) {
            beginRecovery(storedApps, storedDrivers, storedWorkers)
            recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
              CompleteRecovery)
          }
        }

    持久化数据存储在ZooKeeper中的/spark/master_status目录下。以app为例,当向ZooKeeperPersistenceEngine中写入app时,假设这个appId是1,那么就会创建一个/spark/master_status/app_1的持久化节点,节点数据内容就是序列化的app对象。

     

    /spark/master_status

                                   /app_appid

                                  /worker_workerId

                                 /driver_driverId

  • 相关阅读:
    Spring总结四:IOC和DI 注解方式
    Spring总结二:IOC(控制反转)xml方式
    Spring总结一:Srping快速入门
    TCP UDP HTTP 的关系和区别
    sql 一些偶尔会用到的写法和函数 不定时更新
    AngularJS分层开发
    AngularJS入门
    url和uri的区别
    MyBatis总结八:缓存介绍(一级缓存,二级缓存)
    Javascript 中 == 与=== 对比
  • 原文地址:https://www.cnblogs.com/francisYoung/p/5459961.html
Copyright © 2011-2022 走看看