zoukankan      html  css  js  c++  java
  • Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源代码实现

         假设Spark的部署方式选择Standalone,一个採用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure)。Spark能够选用ZooKeeper来实现HA。

         ZooKeeper提供了一个Leader Election机制,利用这个机制能够保证尽管集群存在多个Master可是唯独一个是Active的。其它的都是Standby。当Active的Master出现问题时,另外的一个Standby Master会被选举出来。

    因为集群的信息,包含Worker。 Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有不论什么的影响。加入ZooKeeper的集群总体架构例如以下图所看到的。


    1. Master的重新启动策略

    Master在启动时。会依据启动參数来决定不同的Master故障重新启动策略:

    1. ZOOKEEPER实现HA
    2. FILESYSTEM:实现Master无数据丢失重新启动,集群的执行时数据会保存到本地/网络文件系统上
    3. 丢弃全部原来的数据重新启动

    Master::preStart()能够看出这三种不同逻辑的实现。

    override def preStart() {
        logInfo("Starting Spark master at " + masterUrl)
        ...
        //persistenceEngine是持久化Worker,Driver和Application信息的,这样在Master又一次启动时不会影响
        //已经提交Job的执行
        persistenceEngine = RECOVERY_MODE match {
          case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
          case "FILESYSTEM" =>
            logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
            new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
          case _ =>
            new BlackHolePersistenceEngine()
        }
        //leaderElectionAgent负责Leader的选取。
        leaderElectionAgent = RECOVERY_MODE match {
            case "ZOOKEEPER" =>
              context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
            case _ => // 唯独一个Master的集群。那么当前的Master就是Active的
              context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
          }
      }

    RECOVERY_MODE是一个字符串,能够从spark-env.sh中去设置。

    val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

    假设不设置spark.deploy.recoveryMode的话,那么集群的全部执行数据在Master重新启动是都会丢失。这个结论是从BlackHolePersistenceEngine的实现得出的。

    private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
      override def addApplication(app: ApplicationInfo) {}
      override def removeApplication(app: ApplicationInfo) {}
      override def addWorker(worker: WorkerInfo) {}
      override def removeWorker(worker: WorkerInfo) {}
      override def addDriver(driver: DriverInfo) {}
      override def removeDriver(driver: DriverInfo) {}
    
      override def readPersistedData() = (Nil, Nil, Nil)
    }

    它把全部的接口实现为空。

    PersistenceEngine是一个trait。作为对照,能够看一下ZooKeeper的实现。

    class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
      extends PersistenceEngine
      with Logging
    {
      val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
      val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
    
      SparkCuratorUtil.mkdir(zk, WORKING_DIR)
      // 将app的信息序列化到文件WORKING_DIR/app_{app.id}中
      override def addApplication(app: ApplicationInfo) {
        serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
      }
    
      override def removeApplication(app: ApplicationInfo) {
        zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
      }

    Spark使用的并非ZooKeeper的API,而是使用的org.apache.curator.framework.CuratorFramework 和 org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} 。Curator在ZooKeeper上做了一层非常友好的封装。


    2. 集群启动參数的配置

    简单总结一下參数的设置,通过上述代码的分析。我们知道为了使用ZooKeeper至少应该设置一下參数(实际上,只须要设置这些參数。通过设置spark-env.sh:

    spark.deploy.recoveryMode=ZOOKEEPER
    spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181
    spark.deploy.zookeeper.dir=/dir   
    // OR 通过一下方式设置
    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "
    export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"
    

    各个參数的意义:

    參数
    默认值
    含义
    spark.deploy.recoveryMode
    NONE
    恢复模式(Master又一次启动的模式)。有三种:1, ZooKeeper, 2。 FileSystem, 3 NONE
    spark.deploy.zookeeper.url

    ZooKeeper的Server地址
    spark.deploy.zookeeper.dir
    /spark
    ZooKeeper 保存集群元数据信息的文件文件夹,包含Worker,Driver和Application。


    3. CuratorFramework简单介绍

    CuratorFramework极大的简化了ZooKeeper的使用,它提供了high-level的API,而且基于ZooKeeper加入了非常多特性,包含

    • 自己主动连接管理:连接到ZooKeeper的Client有可能会连接中断。Curator处理了这样的情况。对于Client来说自己主动重连是透明的。
    • 简洁的API:简化了原生态的ZooKeeper的方法,事件等。提供了一个简单易用的接口。
    • Recipe的实现(很多其它介绍请点击Recipes):
      • Leader的选择
      • 共享锁
      • 缓存和监控
      • 分布式的队列
      • 分布式的优先队列


    CuratorFrameworks通过CuratorFrameworkFactory来创建线程安全的ZooKeeper的实例。

    CuratorFrameworkFactory.newClient()提供了一个简单的方式来创建ZooKeeper的实例,能够传入不同的參数来对实例进行全然的控制。获取实例后,必须通过start()来启动这个实例。在结束时,须要调用close()。

    /**
         * Create a new client
         *
         *
         * @param connectString list of servers to connect to
         * @param sessionTimeoutMs session timeout
         * @param connectionTimeoutMs connection timeout
         * @param retryPolicy retry policy to use
         * @return client
         */
        public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
        {
            return builder().
                connectString(connectString).
                sessionTimeoutMs(sessionTimeoutMs).
                connectionTimeoutMs(connectionTimeoutMs).
                retryPolicy(retryPolicy).
                build();
        }

    须要关注的还有两个Recipe:org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}。

    首先看一下LeaderlatchListener。它在LeaderLatch状态变化的时候被通知:

    1. 在该节点被选为Leader的时候,接口isLeader()会被调用
    2. 在节点被剥夺Leader的时候,接口notLeader()会被调用

    因为通知是异步的。因此有可能在接口被调用的时候。这个状态是准确的。须要确认一下LeaderLatch的hasLeadership()是否的确是true/false。这一点在接下来Spark的实现中能够得到体现。

    /**
    * LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed.
    *
    * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that
    * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes
    * before these methods get called. The contract is that if that happens, you should see another call to the other
    * method pretty quickly.
    */
    public interface LeaderLatchListener
    {
      /**
    * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
    *
    * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
    * this occurs, you can expect {@link #notLeader()} to also be called.
    */
      public void isLeader();
    
      /**
    * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
    *
    * Note that it is possible that by the time this method call happens, hasLeadership has become true. If
    * this occurs, you can expect {@link #isLeader()} to also be called.
    */
      public void notLeader();
    }

    LeaderLatch负责在众多连接到ZooKeeper Cluster的竞争者中选择一个Leader。

    Leader的选择机制能够看ZooKeeper的详细实现,LeaderLatch这是完毕了非常好的封装。我们只须要要知道在初始化它的实例后。须要通过

    public class LeaderLatch implements Closeable
    {
        private final Logger log = LoggerFactory.getLogger(getClass());
        private final CuratorFramework client;
        private final String latchPath;
        private final String id;
        private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
        private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
        private final AtomicReference<String> ourPath = new AtomicReference<String>();
        private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
        private final CloseMode closeMode;
        private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?

    >>(); . . . /** * Attaches a listener to this LeaderLatch * <p/> * Attaching the same listener multiple times is a noop from the second time on. * <p/> * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded * executor so that you can be certain that listener methods are called in sequence, but if you are fine with * them being called out of order you are welcome to use multiple threads. * * @param listener the listener to attach */ public void addListener(LeaderLatchListener listener) { listeners.addListener(listener); }


    通过addListener能够将我们实现的Listener加入到LeaderLatch。在Listener里。我们在两个接口里实现了被选为Leader或者被剥夺Leader角色时的逻辑就可以。


    4. ZooKeeperLeaderElectionAgent的实现

    实际上因为有Curator的存在。Spark实现Master的HA就变得非常easy了,ZooKeeperLeaderElectionAgent实现了接口LeaderLatchListener。在isLeader()确认所属的Master被选为Leader后,向Master发送消息ElectedLeader,Master会将自己的状态改为ALIVE。

    当noLeader()被调用时,它会向Master发送消息RevokedLeadership时,Master会关闭。


    private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
        masterUrl: String, conf: SparkConf)
      extends LeaderElectionAgent with LeaderLatchListener with Logging  {
      val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
      // zk是通过CuratorFrameworkFactory创建的ZooKeeper实例
      private var zk: CuratorFramework = _
      // leaderLatch:Curator负责选出Leader。

      private var leaderLatch: LeaderLatch = _   private var status = LeadershipStatus.NOT_LEADER   override def preStart() {     logInfo("Starting ZooKeeper LeaderElection agent")     zk = SparkCuratorUtil.newClient(conf)     leaderLatch = new LeaderLatch(zk, WORKING_DIR)     leaderLatch.addListener(this)     leaderLatch.start()   }


    在prestart中,启动了leaderLatch来处理选举ZK中的Leader。就如在上节分析的,基本的逻辑在isLeader和noLeader中。

      override def isLeader() {
        synchronized {
          // could have lost leadership by now.
          //如今leadership可能已经被剥夺了。

    。详情參见Curator的实现。 if (!leaderLatch.hasLeadership) { return } logInfo("We have gained leadership") updateLeadershipStatus(true) } } override def notLeader() { synchronized { // 如今可能赋予leadership了。详情參见Curator的实现。 if (leaderLatch.hasLeadership) { return } logInfo("We have lost leadership") updateLeadershipStatus(false) } }


    updateLeadershipStatus的逻辑非常easy,就是向Master发送消息。

    def updateLeadershipStatus(isLeader: Boolean) {
        if (isLeader && status == LeadershipStatus.NOT_LEADER) {
          status = LeadershipStatus.LEADER
          masterActor ! ElectedLeader
        } else if (!isLeader && status == LeadershipStatus.LEADER) {
          status = LeadershipStatus.NOT_LEADER
          masterActor ! RevokedLeadership
        }
      }

    5. 设计理念

    为了解决Standalone模式下的Master的SPOF,Spark採用了ZooKeeper提供的选举功能。Spark并没有採用ZooKeeper原生的Java API,而是採用了Curator,一个对ZooKeeper进行了封装的框架。採用了Curator后,Spark不用管理与ZooKeeper的连接,这些对于Spark来说都是透明的。

    Spark只使用了100行代码,就实现了Master的HA。当然了,Spark是站在的巨人的肩膀上。谁又会去反复发明轮子呢?


    请您支持:

    假设你看到这里。相信这篇文章对您有所帮助。假设是的话,请为本文投一下票吧: 点击投票,多谢。假设您已经在投票页面,请点击以下的投一票吧!

    BTW。即使您没有CSDN的帐号,能够使用第三方登录的,包含微博,QQ,Gmail。GitHub,百度。等。


  • 相关阅读:
    001-CH573F学习开发-硬件使用说明,下载和运行第一个程序
    1-HC32F460(华大)+BC260Y(NB-IOT)基本控制篇(自建物联网平台)-硬件使用说明
    2-HC32F460(华大)+Air724UG(4G GPRS)远程升级篇(自建物联网平台)-整体运行测试-HC32F460通过Air724UG(4G GPRS)使用http或https远程下载升级单片机程序(单片机程序轮训检查更新)
    2-HC32F460(华大)+Air724UG(4G GPRS)远程升级篇(自建物联网平台)-什么是http,怎么通过http下载文件数据,http分段下载
    2-HC32F460(华大)+Air724UG(4G GPRS)基本控制篇(自建物联网平台)-整体运行测试-微信小程序扫码绑定Air724,并通过MQTT和单片机实现远程通信控制
    2-HC32F460(华大)+Air724UG(4G GPRS)基本控制篇(自建物联网平台)-整体运行测试-Android扫码绑定Air724,并通过MQTT和单片机实现远程通信控制
    1-HC32F460(华大)+Air724UG(4G GPRS)基本控制篇(自建物联网平台)-硬件使用说明
    8-HC32F460(华大单片机)-串口(定时器空闲检测)
    7-HC32F460(华大单片机)-定时器Timer0
    6-HC32F460(华大单片机)-时钟树
  • 原文地址:https://www.cnblogs.com/tlnshuju/p/6978113.html
Copyright © 2011-2022 走看看