zoukankan      html  css  js  c++  java
  • spark driver HA

    实验环境: 
    zookeeper-3.4.6 
    Spark:1.6.0 
    简介: 
    本篇博客将从以下几点组织文章: 
    一:Spark 构建高可用HA架构 
    二:动手实战构建高可用HA 
    三:提交程序测试HA

    一:Spark 构建高可用HA架构 
    这里写图片描述 
    Spark本身是Master和Slave,而这这里的 
    Master是指Spark资源调度和分配。负责整个集群的资源调度和分配。 
    Worker是管理单个节点的资源。 
    这里面的资源主要指:内存和CPU。 
    1. Master-Slave模型很容易出现单节点故障的问题。所以为了应用这个问题,解决办法是通过Zookeeper来解决,在实际开发的时候一般都是三台,一个active,两个standby,当一个active挂掉后,Zookeeper会根据自己的选举机制,从standby的Master选举出来一个作为leader。这个leader从standby模式变成active模式的话,做的最重要的事:是从Zookeeper中获取整个集群的状态信息,恢复整个集群的Worker,Driver,Application,这样才能接管整个集群的工作,而只有它成功完成之后,leader的Master才可以恢复成active的Master,才可以对外继续提供服务(作业的提交和资源的申请请求。),当active的master挂掉以后,standby的master变成active的master之前我们是不可以向集群提交新的程序。但是在Zookeeper切换期间,在这个时间集群的运行时正常的,例如,一个程序依然可以正常运行。因为程序在运行之前已经向Master申请资源了,Driver与我们所有worker分配的executors进行通信,这个过程一般不需要master参与,除非executor有故障。Master是粗粒度分配,粗粒度的好处当Master出故障以后,可以让Worker和executor交互完成计算。 
    2. Zookeper包含的内容有哪些:所有的Worker,Driver(代表了正在运行的程序),Application(应用程序)


    如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure)。Spark可以选用ZooKeeper来实现HA。

         ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。


    1. Master的重启策略

    Master在启动时,会根据启动参数来决定不同的Master故障重启策略:

    1. ZOOKEEPER实现HA
    2. FILESYSTEM:实现Master无数据丢失重启,集群的运行时数据会保存到本地/网络文件系统上
    3. 丢弃所有原来的数据重启

    Master::preStart()可以看出这三种不同逻辑的实现。

    [java] view plain copy
    1. override def preStart() {  
    2.     logInfo("Starting Spark master at " + masterUrl)  
    3.     ...  
    4.     //persistenceEngine是持久化Worker,Driver和Application信息的,这样在Master重新启动时不会影响  
    5.     //已经提交Job的运行  
    6.     persistenceEngine = RECOVERY_MODE match {  
    7.       case "ZOOKEEPER" =>  
    8.         logInfo("Persisting recovery state to ZooKeeper")  
    9.         new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)  
    10.       case "FILESYSTEM" =>  
    11.         logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)  
    12.         new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))  
    13.       case _ =>  
    14.         new BlackHolePersistenceEngine()  
    15.     }  
    16.     //leaderElectionAgent负责Leader的选取。  
    17.     leaderElectionAgent = RECOVERY_MODE match {  
    18.         case "ZOOKEEPER" =>  
    19.           context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))  
    20.         case _ => // 仅仅有一个Master的集群,那么当前的Master就是Active的  
    21.           context.actorOf(Props(classOf[MonarchyLeaderAgent], self))  
    22.       }  
    23.   }  

    RECOVERY_MODE是一个字符串,可以从spark-env.sh中去设置。

    [java] view plain copy
    1. val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode""NONE")  

    如果不设置spark.deploy.recoveryMode的话,那么集群的所有运行数据在Master重启是都会丢失,这个结论是从BlackHolePersistenceEngine的实现得出的。

    [java] view plain copy
    1. private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {  
    2.   override def addApplication(app: ApplicationInfo) {}  
    3.   override def removeApplication(app: ApplicationInfo) {}  
    4.   override def addWorker(worker: WorkerInfo) {}  
    5.   override def removeWorker(worker: WorkerInfo) {}  
    6.   override def addDriver(driver: DriverInfo) {}  
    7.   override def removeDriver(driver: DriverInfo) {}  
    8.   
    9.   override def readPersistedData() = (Nil, Nil, Nil)  
    10. }  

    它把所有的接口实现为空。PersistenceEngine是一个trait。作为对比,可以看一下ZooKeeper的实现。

    [java] view plain copy
    1. class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)  
    2.   extends PersistenceEngine  
    3.   with Logging  
    4. {  
    5.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/master_status"  
    6.   val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)  
    7.   
    8.   SparkCuratorUtil.mkdir(zk, WORKING_DIR)  
    9.   // 将app的信息序列化到文件WORKING_DIR/app_{app.id}中  
    10.   override def addApplication(app: ApplicationInfo) {  
    11.     serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)  
    12.   }  
    13.   
    14.   override def removeApplication(app: ApplicationInfo) {  
    15.     zk.delete().forPath(WORKING_DIR + "/app_" + app.id)  
    16.   }  

    Spark使用的并不是ZooKeeper的API,而是使用的org.apache.curator.framework.CuratorFramework 和 org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} 。Curator在ZooKeeper上做了一层很友好的封装。


    2. 集群启动参数的配置

    简单总结一下参数的设置,通过上述代码的分析,我们知道为了使用ZooKeeper至少应该设置一下参数(实际上,仅仅需要设置这些参数。通过设置spark-env.sh:

    [java] view plain copy
    1. spark.deploy.recoveryMode=ZOOKEEPER  
    2. spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181  
    3. spark.deploy.zookeeper.dir=/dir     
    4. // OR 通过一下方式设置  
    5. export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "  
    6. export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"  

    各个参数的意义:

    参数
    默认值
    含义
    spark.deploy.recoveryMode
    NONE
    恢复模式(Master重新启动的模式),有三种:1, ZooKeeper, 2, FileSystem, 3 NONE
    spark.deploy.zookeeper.url

    ZooKeeper的Server地址
    spark.deploy.zookeeper.dir
    /spark
    ZooKeeper 保存集群元数据信息的文件目录,包括Worker,Driver和Application。


    3. CuratorFramework简介 

    CuratorFramework极大的简化了ZooKeeper的使用,它提供了high-level的API,并且基于ZooKeeper添加了很多特性,包括

    • 自动连接管理:连接到ZooKeeper的Client有可能会连接中断,Curator处理了这种情况,对于Client来说自动重连是透明的。
    • 简洁的API:简化了原生态的ZooKeeper的方法,事件等;提供了一个简单易用的接口。
    • Recipe的实现(更多介绍请点击Recipes):
      • Leader的选择
      • 共享锁
      • 缓存和监控
      • 分布式的队列
      • 分布式的优先队列


    CuratorFrameworks通过CuratorFrameworkFactory来创建线程安全的ZooKeeper的实例。

    CuratorFrameworkFactory.newClient()提供了一个简单的方式来创建ZooKeeper的实例,可以传入不同的参数来对实例进行完全的控制。获取实例后,必须通过start()来启动这个实例,在结束时,需要调用close()。

    [java] view plain copy
    1. /** 
    2.      * Create a new client 
    3.      * 
    4.      * 
    5.      * @param connectString list of servers to connect to 
    6.      * @param sessionTimeoutMs session timeout 
    7.      * @param connectionTimeoutMs connection timeout 
    8.      * @param retryPolicy retry policy to use 
    9.      * @return client 
    10.      */  
    11.     public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)  
    12.     {  
    13.         return builder().  
    14.             connectString(connectString).  
    15.             sessionTimeoutMs(sessionTimeoutMs).  
    16.             connectionTimeoutMs(connectionTimeoutMs).  
    17.             retryPolicy(retryPolicy).  
    18.             build();  
    19.     }  

    需要关注的还有两个Recipe:org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}。

    首先看一下LeaderlatchListener,它在LeaderLatch状态变化的时候被通知:

    1. 在该节点被选为Leader的时候,接口isLeader()会被调用
    2. 在节点被剥夺Leader的时候,接口notLeader()会被调用

    由于通知是异步的,因此有可能在接口被调用的时候,这个状态是准确的,需要确认一下LeaderLatch的hasLeadership()是否的确是true/false。这一点在接下来Spark的实现中可以得到体现。

    [java] view plain copy
    1. /** 
    2. * LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed. 
    3. * 
    4. * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that 
    5. * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes 
    6. * before these methods get called. The contract is that if that happens, you should see another call to the other 
    7. * method pretty quickly. 
    8. */  
    9. public interface LeaderLatchListener  
    10. {  
    11.   /** 
    12. * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. 
    13. * 
    14. * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If 
    15. * this occurs, you can expect {@link #notLeader()} to also be called. 
    16. */  
    17.   public void isLeader();  
    18.   
    19.   /** 
    20. * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. 
    21. * 
    22. * Note that it is possible that by the time this method call happens, hasLeadership has become true. If 
    23. * this occurs, you can expect {@link #isLeader()} to also be called. 
    24. */  
    25.   public void notLeader();  
    26. }  

    LeaderLatch负责在众多连接到ZooKeeper Cluster的竞争者中选择一个Leader。Leader的选择机制可以看ZooKeeper的具体实现,LeaderLatch这是完成了很好的封装。我们只需要要知道在初始化它的实例后,需要通过

    [java] view plain copy
    1. public class LeaderLatch implements Closeable  
    2. {  
    3.     private final Logger log = LoggerFactory.getLogger(getClass());  
    4.     private final CuratorFramework client;  
    5.     private final String latchPath;  
    6.     private final String id;  
    7.     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);  
    8.     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);  
    9.     private final AtomicReference<String> ourPath = new AtomicReference<String>();  
    10.     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();  
    11.     private final CloseMode closeMode;  
    12.     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();  
    13. .  
    14. .  
    15. .  
    16.     /** 
    17.      * Attaches a listener to this LeaderLatch 
    18.      * <p/> 
    19.      * Attaching the same listener multiple times is a noop from the second time on. 
    20.      * <p/> 
    21.      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded 
    22.      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with 
    23.      * them being called out of order you are welcome to use multiple threads. 
    24.      * 
    25.      * @param listener the listener to attach 
    26.      */  
    27.     public void addListener(LeaderLatchListener listener)  
    28.     {  
    29.         listeners.addListener(listener);  
    30.     }  


    通过addListener可以将我们实现的Listener添加到LeaderLatch。在Listener里,我们在两个接口里实现了被选为Leader或者被剥夺Leader角色时的逻辑即可。


    4. ZooKeeperLeaderElectionAgent的实现

    实际上因为有Curator的存在,Spark实现Master的HA就变得非常简单了,ZooKeeperLeaderElectionAgent实现了接口LeaderLatchListener,在isLeader()确认所属的Master被选为Leader后,向Master发送消息ElectedLeader,Master会将自己的状态改为ALIVE。当noLeader()被调用时,它会向Master发送消息RevokedLeadership时,Master会关闭。

    [java] view plain copy
    1. private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,  
    2.     masterUrl: String, conf: SparkConf)  
    3.   extends LeaderElectionAgent with LeaderLatchListener with Logging  {  
    4.   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/leader_election"  
    5.   // zk是通过CuratorFrameworkFactory创建的ZooKeeper实例  
    6.   private var zk: CuratorFramework = _  
    7.   // leaderLatch:Curator负责选出Leader。  
    8.   private var leaderLatch: LeaderLatch = _  
    9.   private var status = LeadershipStatus.NOT_LEADER  
    10.   
    11.   override def preStart() {  
    12.   
    13.     logInfo("Starting ZooKeeper LeaderElection agent")  
    14.     zk = SparkCuratorUtil.newClient(conf)  
    15.     leaderLatch = new LeaderLatch(zk, WORKING_DIR)  
    16.     leaderLatch.addListener(this)  
    17.   
    18.     leaderLatch.start()  
    19.   }  


    在prestart中,启动了leaderLatch来处理选举ZK中的Leader。就如在上节分析的,主要的逻辑在isLeader和noLeader中。

    [java] view plain copy
    1. override def isLeader() {  
    2.   synchronized {  
    3.     // could have lost leadership by now.  
    4.     //现在leadership可能已经被剥夺了。。详情参见Curator的实现。  
    5.     if (!leaderLatch.hasLeadership) {  
    6.       return  
    7.     }  
    8.   
    9.     logInfo("We have gained leadership")  
    10.     updateLeadershipStatus(true)  
    11.   }  
    12. }  
    13.   
    14. override def notLeader() {  
    15.   synchronized {  
    16.     // 现在可能赋予leadership了。详情参见Curator的实现。  
    17.     if (leaderLatch.hasLeadership) {  
    18.       return  
    19.     }  
    20.   
    21.     logInfo("We have lost leadership")  
    22.     updateLeadershipStatus(false)  
    23.   }  
    24. }  

    updateLeadershipStatus的逻辑很简单,就是向Master发送消息。

    [java] view plain copy
    1. def updateLeadershipStatus(isLeader: Boolean) {  
    2.     if (isLeader && status == LeadershipStatus.NOT_LEADER) {  
    3.       status = LeadershipStatus.LEADER  
    4.       masterActor ! ElectedLeader  
    5.     } else if (!isLeader && status == LeadershipStatus.LEADER) {  
    6.       status = LeadershipStatus.NOT_LEADER  
    7.       masterActor ! RevokedLeadership  
    8.     }  
    9.   }  

    5. 设计理念

    为了解决Standalone模式下的Master的SPOF,Spark采用了ZooKeeper提供的选举功能。Spark并没有采用ZooKeeper原生的Java API,而是采用了Curator,一个对ZooKeeper进行了封装的框架。采用了Curator后,Spark不用管理与ZooKeeper的连接,这些对于Spark来说都是透明的。Spark仅仅使用了100行代码,就实现了Master的HA。当然了,Spark是站在的巨人的肩膀上。谁又会去重复发明轮子呢?


    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    查找算法之——符号表(引入篇)
    排序算法之——优先队列经典实现(基于二叉堆)
    C# Timer和多线程编程、委托、异步、Func/Action
    Tomcat汇总-部署多个项目(不同端口)
    数据库汇总(MySQL教材)
    基础知识
    常用工具&网址
    Phython开发
    单元测试
    软件项目过程和文档
  • 原文地址:https://www.cnblogs.com/candlia/p/11920304.html
Copyright © 2011-2022 走看看