zoukankan      html  css  js  c++  java
  • Curator典型应用场景之Master选举

    分布式锁和Master选举相似点
    分布式锁和 Master选举有几种相似点,实际上其实现机制也相近:

    同一时刻只有一个获取锁 / 只能有一个leader
    对于分布式排他锁来说,任意时刻,只能有一个进程(对于单进程内的锁是单线程)可以获得锁。
    对于领导选举来说,任意时刻,只能有一个成功当选为leader。否则就会出现脑裂。

    锁重入 / 确认自己是leader
    对于分布式锁,需要保证获得锁的进程在释放锁之前可再次获得锁,即锁的可重入性。
    对于领导选举,Leader需要能够确认自己已经获得领导权,即确认自己是Leader。

    释放锁 / 放弃领导权
    锁的获得者应该能够正确释放已经获得的锁,并且当获得锁的进程宕机时,锁应该自动释放,从而使得其它竞争方可以获得该锁,从而避免出现死锁的状态。
    领导应该可以主动放弃领导权,并且当领导所在进程宕机时,领导权应该自动释放,从而使得其它参与者可重新竞争领导而避免进入无主状态。

    感知锁释放 / 感知领导权释放
    当获得锁的一方释放锁时,其它对于锁的竞争方需要能够感知到锁的释放,并再次尝试获取锁。
    原来的Leader放弃领导权时,其它参与方应该能够感知该事件,并重新发起选举流程。

    Curator中选举分为两种:

    Leader Latch和Leader Election

    Leader Latch
    LeaderLatch方式就是以一种抢占方式来决定选主,是一种非公平的领导选举,谁抢到就是谁,会随机从候选者中选择一台作为leader, 选中后除非leader自己 调用close()释放leadership,否则其他的候选者不能成为leader。

    选主过程
    假设现在有三个zookeeper的客户端,如下图所示,同时竞争leader。这三个客户端同时向zookeeper集群注册Ephemeral且Non-sequence类型的节点,路径都为/zkroot/leader。

    如上图所示,由于是Non-sequence节点,这三个客户端只会有一个创建成功,其它节点均创建失败。此时,创建成功的客户端(即上图中的Client 1)即成功竞选为 Leader 。其它客户端(即上图中的Client 2和Client 3)此时匀为 Follower。

    放弃领导权
    如果Leader打算主动放弃领导权,直接删除/zkroot/leader节点即可。
    如果Leader进程意外宕机,其与Zookeeper间的Session也结束,该节点由于是Ephemeral类型的节点,因此也会自动被删除。
    此时/zkroot/leader节点不复存在,对于其它参与竞选的客户端而言,之前的Leader已经放弃了领导权。

    感知领导权的放弃
    由上图可见,创建节点失败的节点,除了成为 Follower 以外,还会向/zkroot/leader注册一个 Watch ,一旦 Leader 放弃领导权,也即该节点被删除,所有的 Follower 会收到通知。

    重新选举
    感知到旧 Leader 放弃领导权后,所有的 Follower 可以再次发起新一轮的领导选举,如下图所示。

    从上图中可见
    新一轮的领导选举方法与最初的领导选举方法完全一样,都是发起节点创建请求,创建成功即为Leader,否则为Follower,且Follower会Watch该节点。
    新一轮的选举结果,无法预测,与它们在第一轮选举中的顺序无关。这也是该方案被称为非公平模式的原因。

    Leader Latch模式总结

    1. Leader Latch实现很简单,每一轮的选举算法都一样。
    2. 非公平模式,每一次选举都是随机,谁抢到就是谁的,假如是第二次选举,每个 Follower 通过 Watch 感知到节点被删除的时间不完全一样,只要有一个 Follower 得到通知即发起竞选。
    3. 给zookeeper造成的负载大,假如有上万个客户端都参与竞选,意味着同时会有上万个写请求发送给 Zookeper。同时一旦 Leader 放弃领导权,Zookeeper 需要同时通知上万个 Follower,负载较大。

    使用过程

    相关的类
    LeaderLatch
    构造LeaderLatch ,构造方法如下:

    public LeaderLatch(CuratorFramework client, String latchPath);
    public LeaderLatch(CuratorFramework client, String latchPath, String id);

    启动
    通过start()方法启动之后,再等待几秒钟后,Curator会自动从中选举出Leader。

    public void start() throws Exception;

    可以调用实例的hasLeadership()判断该实例是否为leader。

    public boolean hasLeadership();

    尝试获取leadership
    调用await()方法会使线程一直阻塞到获得leadership为止。

    public void await() throws InterruptedException, EOFException;
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    释放leadership
    只能通过close()释放leadership, 只有leader将leadership释放时,其他的候选者才有机会被选为leader

    public void close() throws IOException;
    public synchronized void close(CloseMode closeMode) throws IOException;

    示例代码

    public class TestLeaderLatch {
    
      private static final String PATH = "/demo/leader";
      /** 5个客户端 */
      private static final Integer CLIENT_COUNT = 5;
    
      public static void main(String[] args) throws Exception {
        //5个线程,5个客户端
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT ; i++) {
          final int index = i;
          service.submit(new Runnable() {
            @Override
            public void run() {
              try {
                new TestLeaderLatch().schedule(index);
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          });
        }
        //休眠50秒之后结束main方法
        Thread.sleep(30 * 1000);
        service.shutdownNow();
      }
    
      private void schedule(int thread) throws Exception {
    
        //获取一个client
        CuratorFramework client = this.getClient(thread);
        //获取一个latch
        LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));
    
        //给latch添加监听,在
        latch.addListener(new LeaderLatchListener() {
    
          @Override
          public void notLeader() {
            //如果不是leader
            System.out.println("Client [" + thread + "] I am the follower !");
          }
    
          @Override
          public void isLeader() {
            //如果是leader
            System.out.println("Client [" + thread + "] I am the leader !");
          }
        });
    
        //开始选取 leader
        latch.start();
    
        //每个线程 休眠时间不一样,但是最大不能超过 main方法中的那个休眠时间,那个是50秒 到时候main方法结束 会中断休眠时间
        Thread.sleep(2 * (thread + 5) * 1000);
        if (latch != null) {
          //释放leadership
          //CloseMode.NOTIFY_LEADER 节点状态改变时,通知LeaderLatchListener
          latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
        }
        if (client != null) {
          client.close();
        }
        System.out.println("Client [" + latch.getId() + "] Server closed...");
      }
    
      private CuratorFramework getClient(final int thread) {
        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
        // Fluent风格创建
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.58.42:2181")
            .sessionTimeoutMs(1000000)
            .connectionTimeoutMs(3000)
            .retryPolicy(rp)
            .build();
        client.start();
        System.out.println("Client [" + thread + "] Server connected...");
        return client;
      }
    
    }

    程序运行,输出以下结果:

    Client [3] Server connected…
    Client [2] Server connected…
    Client [4] Server connected…
    Client [0] Server connected…
    Client [1] Server connected…
    Client [1] I am the leader !
    Client [0] Server closed…
    Client [1] I am the follower !
    Client [1] Server closed…
    Client [2] I am the leader !
    Client [2] I am the follower !
    Client [2] Server closed…
    Client [4] I am the leader !
    Client [3] Server closed…
    Client [4] I am the follower !
    Client [4] Server closed…

    在上面的程序中,启动了5个zookeeper客户端,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。有可能优先close的并不是leader节点,但是当leader节点close的时候,可以继续在已有的节点中重新选举leader节点。

    LeaderElection
    上面讲了怎么使用LeaderLatch方式进行master选举,Curator提供了两种选举,一种是LeaderLatch,提供的另一种Leader选举策略是Leader Election。

    跟LeaderLatch选举策略相比,LeaderElection选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。

    另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。LeaderElection属于公平的选举方式,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。

    选主过程
    如下图所示,LeaderElection选举中,各客户端均创建/zkroot/leader节点,且其类型为Ephemeral与Sequence。

    由于是Sequence类型节点,故上图中三个客户端均创建成功,只是序号不一样。此时,每个客户端都会判断自己创建成功的节点的序号是不是当前最小的。如果是,则该客户端为 Leader,否则即为 Follower。

    在上图中,Client1 创建的节点序号为1 ,Client2创建的节点序号为2,Client3创建的节点序号为3。由于最小序号为 1 ,且该节点由Client1创建,故Client 1为 Leader 。

    放弃领导权
    Leader 如果主动放弃领导权,直接删除其创建的节点即可。
    如果 Leader 所在进程意外宕机,其与 Zookeeper 间的 Session 结束,由于其创建的节点为Ephemeral类型,故该节点自动被删除。

    感知领导权的放弃
    与LeaderLatch方式不同,每个 Follower 并非都 Watch 由 Leader 创建出来的节点,而是 Watch 序号刚好比自己序号小的节点。

    在上图中,总共有 1、2、3 共三个节点,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序号应该是10位数字,而非一位数字,序号最大为int最大值)。
    一旦Leader弃权或者宕机,/zkroot/leader1被删除,Client2可得到通知。此时Client3由于 Watch 的是/zkroot/leader2,故不会得到通知。

    重新选举
    Client2得到/zkroot/leader1被删除的通知后,不会立即成为新的 Leader 。而是先判断自己的序号2是不是当前最小的序号。在该场景下,其序号确为最小。因此Client 2成为新的 Leader 。

    这里要注意,如果在Client1放弃领导权之前,Client2就宕机了,Client3会收到通知。此时Client3不会立即成为Leader,而是要先判断自己的序号3是否为当前最小序号。很显然,由于Client1创建的/zkroot/leader1还在,因此Client 3不会成为新的 Leader ,并向Client2序号2 前面的序号,也即 1 创建 Watch。该过程如下图所示。

    LeaderElection模式总结

    扩展性好,每个客户端都只Watch 一个节点且每次节点被删除只须通知一个客户端
    旧 Leader 放弃领导权时,其它客户端根据竞选的先后顺序(也即节点序号)成为新 Leader,这也是公平模式的由来。
    延迟相对非公平模式要高,因为它必须等待特定节点得到通知才能选出新的 Leader。

    使用过程

    相关的类
    LeaderSelector
    LeaderSelectorListener
    LeaderSelectorListenerAdapter
    CancelLeadershipException

    使用方法 创建 LeaderSelector

    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);

    启动

    leaderSelector.start();

    一旦启动,如果获取了leadership的话,takeLeadership()会被调用,只有当leader释放了leadership的时候,takeLeadership()才会返回。

    释放
    调用close()释放 leadership

    leaderSelector.close();

    示例代码
    LeaderSelectorListener的实现类
    实现LeaderSelectorListener 或者 继承LeaderSelectorListenerAdapter,用于定义获取领导权后的业务逻辑:

    public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    
        /** 客户端名称 */
        private String name;
        /** leaderSelector */
        private LeaderSelector leaderSelector;
        /** 原子性的 用来记录获取 leader的次数 */
        public AtomicInteger leaderCount = new AtomicInteger(1);
    
        public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
            this.name = name;
            this.leaderSelector = new LeaderSelector(client, path, this);
    
            /**
             * 自动重新排队
             * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
             */
            leaderSelector.autoRequeue();
        }
    
        /**
         * 启动  调用leaderSelector.start()
         * @throws IOException
         */
        public void start() throws IOException {
            leaderSelector.start();
        }
    
        /**
         * 获取领导权之后执行的业务逻辑,执行完自动放弃领导权
         * @param client
         * @throws Exception
         */
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            final int waitSeconds = 2;
            System.out.println(name + "成为当前leader" + " 共成为leader的次数:" + leaderCount.getAndIncrement() + "次");
            try{
                //模拟业务逻辑执行2秒
                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            }catch ( InterruptedException e ){
                System.err.println(name + "已被中断");
                Thread.currentThread().interrupt();
            }finally{
                System.out.println(name + "放弃领导权");
            }
        }
    
        @Override
        public void close() throws IOException {
            leaderSelector.close();
        }
    }

    多个客户端测试

    public class TestLeaderElection {
    
      private static final String PATH = "/demo/leader";
      /** 3个客户端 */
      private static final Integer CLIENT_COUNT = 3;
    
      public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
    
        for (int i = 0; i < CLIENT_COUNT; i++) {
          final int index = i;
          service.submit(new Runnable() {
            @Override
            public void run() {
              try {
                new TestLeaderElection().schedule(index);
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          });
        }
    
        Thread.sleep(30 * 1000);
        service.shutdownNow();
      }
    
      private void schedule(final int thread) throws Exception {
        CuratorFramework client = this.getClient(thread);
        CustomLeaderSelectorListenerAdapter leaderSelectorListener =
            new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
        leaderSelectorListener.start();
      }
    
      private CuratorFramework getClient(final int thread) {
        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
        // Fluent风格创建
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.58.42:2181")
            .sessionTimeoutMs(1000000)
            .connectionTimeoutMs(3000)
            .retryPolicy(rp)
            .build();
        client.start();
        System.out.println("Client [" + thread + "] Server connected...");
        return client;
      }
    
    }

    运行程序,输出以下内容:

    Client [0] Server connected…
    Client [1] Server connected…
    Client [2] Server connected…
    Client #2成为当前leader 共成为leader的次数:1次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:1次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:1次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:2次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:2次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:2次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:3次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:3次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:3次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:4次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:4次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:4次
    Client #1放弃领导权

    上面只是简单测试代码,并没有关闭client等操作,每个实例在获取领导权后,如果 takeLeadership(CuratorFramework client) 方法执行结束,将会释放其领导权。而且获取领导权 也是按照 Client #2, Client #0 ,Client #1 顺序来的,正好验证了它的公平性。

    LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。

    推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。
    建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。

    欢迎关注微信公众号:大数据从业者
  • 相关阅读:
    86. Partition List
    2. Add Two Numbers
    55. Jump Game
    70. Climbing Stairs
    53. Maximum Subarray
    64. Minimum Path Sum
    122. Best Time to Buy and Sell Stock II
    以场景为中心的产品设计方法
    那些产品经理犯过最大的错
    Axure教程:如何使用动态面板?动态面板功能详解
  • 原文地址:https://www.cnblogs.com/felixzh/p/15688123.html
Copyright © 2011-2022 走看看