zoukankan      html  css  js  c++  java
  • Curator典型应用场景之Master选举

    分布式锁和Master选举相似点
    分布式锁和 Master选举有几种相似点,实际上其实现机制也相近:

    同一时刻只有一个获取锁 / 只能有一个leader
    对于分布式排他锁来说,任意时刻,只能有一个进程(对于单进程内的锁是单线程)可以获得锁。
    对于领导选举来说,任意时刻,只能有一个成功当选为leader。否则就会出现脑裂。

    锁重入 / 确认自己是leader
    对于分布式锁,需要保证获得锁的进程在释放锁之前可再次获得锁,即锁的可重入性。
    对于领导选举,Leader需要能够确认自己已经获得领导权,即确认自己是Leader。

    释放锁 / 放弃领导权
    锁的获得者应该能够正确释放已经获得的锁,并且当获得锁的进程宕机时,锁应该自动释放,从而使得其它竞争方可以获得该锁,从而避免出现死锁的状态。
    领导应该可以主动放弃领导权,并且当领导所在进程宕机时,领导权应该自动释放,从而使得其它参与者可重新竞争领导而避免进入无主状态。

    感知锁释放 / 感知领导权释放
    当获得锁的一方释放锁时,其它对于锁的竞争方需要能够感知到锁的释放,并再次尝试获取锁。
    原来的Leader放弃领导权时,其它参与方应该能够感知该事件,并重新发起选举流程。

    Curator中选举分为两种:

    Leader Latch和Leader Election

    Leader Latch
    LeaderLatch方式就是以一种抢占方式来决定选主,是一种非公平的领导选举,谁抢到就是谁,会随机从候选者中选择一台作为leader, 选中后除非leader自己 调用close()释放leadership,否则其他的候选者不能成为leader。

    选主过程
    假设现在有三个zookeeper的客户端,如下图所示,同时竞争leader。这三个客户端同时向zookeeper集群注册Ephemeral且Non-sequence类型的节点,路径都为/zkroot/leader。

    如上图所示,由于是Non-sequence节点,这三个客户端只会有一个创建成功,其它节点均创建失败。此时,创建成功的客户端(即上图中的Client 1)即成功竞选为 Leader 。其它客户端(即上图中的Client 2和Client 3)此时匀为 Follower。

    放弃领导权
    如果Leader打算主动放弃领导权,直接删除/zkroot/leader节点即可。
    如果Leader进程意外宕机,其与Zookeeper间的Session也结束,该节点由于是Ephemeral类型的节点,因此也会自动被删除。
    此时/zkroot/leader节点不复存在,对于其它参与竞选的客户端而言,之前的Leader已经放弃了领导权。

    感知领导权的放弃
    由上图可见,创建节点失败的节点,除了成为 Follower 以外,还会向/zkroot/leader注册一个 Watch ,一旦 Leader 放弃领导权,也即该节点被删除,所有的 Follower 会收到通知。

    重新选举
    感知到旧 Leader 放弃领导权后,所有的 Follower 可以再次发起新一轮的领导选举,如下图所示。

    从上图中可见
    新一轮的领导选举方法与最初的领导选举方法完全一样,都是发起节点创建请求,创建成功即为Leader,否则为Follower,且Follower会Watch该节点。
    新一轮的选举结果,无法预测,与它们在第一轮选举中的顺序无关。这也是该方案被称为非公平模式的原因。

    Leader Latch模式总结

    1. Leader Latch实现很简单,每一轮的选举算法都一样。
    2. 非公平模式,每一次选举都是随机,谁抢到就是谁的,假如是第二次选举,每个 Follower 通过 Watch 感知到节点被删除的时间不完全一样,只要有一个 Follower 得到通知即发起竞选。
    3. 给zookeeper造成的负载大,假如有上万个客户端都参与竞选,意味着同时会有上万个写请求发送给 Zookeper。同时一旦 Leader 放弃领导权,Zookeeper 需要同时通知上万个 Follower,负载较大。

    使用过程

    相关的类
    LeaderLatch
    构造LeaderLatch ,构造方法如下:

    public LeaderLatch(CuratorFramework client, String latchPath);
    public LeaderLatch(CuratorFramework client, String latchPath, String id);

    启动
    通过start()方法启动之后,再等待几秒钟后,Curator会自动从中选举出Leader。

    public void start() throws Exception;

    可以调用实例的hasLeadership()判断该实例是否为leader。

    public boolean hasLeadership();

    尝试获取leadership
    调用await()方法会使线程一直阻塞到获得leadership为止。

    public void await() throws InterruptedException, EOFException;
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    释放leadership
    只能通过close()释放leadership, 只有leader将leadership释放时,其他的候选者才有机会被选为leader

    public void close() throws IOException;
    public synchronized void close(CloseMode closeMode) throws IOException;

    示例代码

    public class TestLeaderLatch {
    
      private static final String PATH = "/demo/leader";
      /** 5个客户端 */
      private static final Integer CLIENT_COUNT = 5;
    
      public static void main(String[] args) throws Exception {
        //5个线程,5个客户端
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT ; i++) {
          final int index = i;
          service.submit(new Runnable() {
            @Override
            public void run() {
              try {
                new TestLeaderLatch().schedule(index);
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          });
        }
        //休眠50秒之后结束main方法
        Thread.sleep(30 * 1000);
        service.shutdownNow();
      }
    
      private void schedule(int thread) throws Exception {
    
        //获取一个client
        CuratorFramework client = this.getClient(thread);
        //获取一个latch
        LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));
    
        //给latch添加监听,在
        latch.addListener(new LeaderLatchListener() {
    
          @Override
          public void notLeader() {
            //如果不是leader
            System.out.println("Client [" + thread + "] I am the follower !");
          }
    
          @Override
          public void isLeader() {
            //如果是leader
            System.out.println("Client [" + thread + "] I am the leader !");
          }
        });
    
        //开始选取 leader
        latch.start();
    
        //每个线程 休眠时间不一样,但是最大不能超过 main方法中的那个休眠时间,那个是50秒 到时候main方法结束 会中断休眠时间
        Thread.sleep(2 * (thread + 5) * 1000);
        if (latch != null) {
          //释放leadership
          //CloseMode.NOTIFY_LEADER 节点状态改变时,通知LeaderLatchListener
          latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
        }
        if (client != null) {
          client.close();
        }
        System.out.println("Client [" + latch.getId() + "] Server closed...");
      }
    
      private CuratorFramework getClient(final int thread) {
        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
        // Fluent风格创建
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.58.42:2181")
            .sessionTimeoutMs(1000000)
            .connectionTimeoutMs(3000)
            .retryPolicy(rp)
            .build();
        client.start();
        System.out.println("Client [" + thread + "] Server connected...");
        return client;
      }
    
    }

    程序运行,输出以下结果:

    Client [3] Server connected…
    Client [2] Server connected…
    Client [4] Server connected…
    Client [0] Server connected…
    Client [1] Server connected…
    Client [1] I am the leader !
    Client [0] Server closed…
    Client [1] I am the follower !
    Client [1] Server closed…
    Client [2] I am the leader !
    Client [2] I am the follower !
    Client [2] Server closed…
    Client [4] I am the leader !
    Client [3] Server closed…
    Client [4] I am the follower !
    Client [4] Server closed…

    在上面的程序中,启动了5个zookeeper客户端,程序会随机选中其中一个作为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。有可能优先close的并不是leader节点,但是当leader节点close的时候,可以继续在已有的节点中重新选举leader节点。

    LeaderElection
    上面讲了怎么使用LeaderLatch方式进行master选举,Curator提供了两种选举,一种是LeaderLatch,提供的另一种Leader选举策略是Leader Election。

    跟LeaderLatch选举策略相比,LeaderElection选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。

    另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。LeaderElection属于公平的选举方式,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。

    选主过程
    如下图所示,LeaderElection选举中,各客户端均创建/zkroot/leader节点,且其类型为Ephemeral与Sequence。

    由于是Sequence类型节点,故上图中三个客户端均创建成功,只是序号不一样。此时,每个客户端都会判断自己创建成功的节点的序号是不是当前最小的。如果是,则该客户端为 Leader,否则即为 Follower。

    在上图中,Client1 创建的节点序号为1 ,Client2创建的节点序号为2,Client3创建的节点序号为3。由于最小序号为 1 ,且该节点由Client1创建,故Client 1为 Leader 。

    放弃领导权
    Leader 如果主动放弃领导权,直接删除其创建的节点即可。
    如果 Leader 所在进程意外宕机,其与 Zookeeper 间的 Session 结束,由于其创建的节点为Ephemeral类型,故该节点自动被删除。

    感知领导权的放弃
    与LeaderLatch方式不同,每个 Follower 并非都 Watch 由 Leader 创建出来的节点,而是 Watch 序号刚好比自己序号小的节点。

    在上图中,总共有 1、2、3 共三个节点,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序号应该是10位数字,而非一位数字,序号最大为int最大值)。
    一旦Leader弃权或者宕机,/zkroot/leader1被删除,Client2可得到通知。此时Client3由于 Watch 的是/zkroot/leader2,故不会得到通知。

    重新选举
    Client2得到/zkroot/leader1被删除的通知后,不会立即成为新的 Leader 。而是先判断自己的序号2是不是当前最小的序号。在该场景下,其序号确为最小。因此Client 2成为新的 Leader 。

    这里要注意,如果在Client1放弃领导权之前,Client2就宕机了,Client3会收到通知。此时Client3不会立即成为Leader,而是要先判断自己的序号3是否为当前最小序号。很显然,由于Client1创建的/zkroot/leader1还在,因此Client 3不会成为新的 Leader ,并向Client2序号2 前面的序号,也即 1 创建 Watch。该过程如下图所示。

    LeaderElection模式总结

    扩展性好,每个客户端都只Watch 一个节点且每次节点被删除只须通知一个客户端
    旧 Leader 放弃领导权时,其它客户端根据竞选的先后顺序(也即节点序号)成为新 Leader,这也是公平模式的由来。
    延迟相对非公平模式要高,因为它必须等待特定节点得到通知才能选出新的 Leader。

    使用过程

    相关的类
    LeaderSelector
    LeaderSelectorListener
    LeaderSelectorListenerAdapter
    CancelLeadershipException

    使用方法 创建 LeaderSelector

    public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
    public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);

    启动

    leaderSelector.start();

    一旦启动,如果获取了leadership的话,takeLeadership()会被调用,只有当leader释放了leadership的时候,takeLeadership()才会返回。

    释放
    调用close()释放 leadership

    leaderSelector.close();

    示例代码
    LeaderSelectorListener的实现类
    实现LeaderSelectorListener 或者 继承LeaderSelectorListenerAdapter,用于定义获取领导权后的业务逻辑:

    public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    
        /** 客户端名称 */
        private String name;
        /** leaderSelector */
        private LeaderSelector leaderSelector;
        /** 原子性的 用来记录获取 leader的次数 */
        public AtomicInteger leaderCount = new AtomicInteger(1);
    
        public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
            this.name = name;
            this.leaderSelector = new LeaderSelector(client, path, this);
    
            /**
             * 自动重新排队
             * 该方法的调用可以确保此实例在释放领导权后还可能获得领导权
             */
            leaderSelector.autoRequeue();
        }
    
        /**
         * 启动  调用leaderSelector.start()
         * @throws IOException
         */
        public void start() throws IOException {
            leaderSelector.start();
        }
    
        /**
         * 获取领导权之后执行的业务逻辑,执行完自动放弃领导权
         * @param client
         * @throws Exception
         */
        @Override
        public void takeLeadership(CuratorFramework client) throws Exception {
            final int waitSeconds = 2;
            System.out.println(name + "成为当前leader" + " 共成为leader的次数:" + leaderCount.getAndIncrement() + "次");
            try{
                //模拟业务逻辑执行2秒
                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
            }catch ( InterruptedException e ){
                System.err.println(name + "已被中断");
                Thread.currentThread().interrupt();
            }finally{
                System.out.println(name + "放弃领导权");
            }
        }
    
        @Override
        public void close() throws IOException {
            leaderSelector.close();
        }
    }

    多个客户端测试

    public class TestLeaderElection {
    
      private static final String PATH = "/demo/leader";
      /** 3个客户端 */
      private static final Integer CLIENT_COUNT = 3;
    
      public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
    
        for (int i = 0; i < CLIENT_COUNT; i++) {
          final int index = i;
          service.submit(new Runnable() {
            @Override
            public void run() {
              try {
                new TestLeaderElection().schedule(index);
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          });
        }
    
        Thread.sleep(30 * 1000);
        service.shutdownNow();
      }
    
      private void schedule(final int thread) throws Exception {
        CuratorFramework client = this.getClient(thread);
        CustomLeaderSelectorListenerAdapter leaderSelectorListener =
            new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
        leaderSelectorListener.start();
      }
    
      private CuratorFramework getClient(final int thread) {
        RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
        // Fluent风格创建
        CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("192.168.58.42:2181")
            .sessionTimeoutMs(1000000)
            .connectionTimeoutMs(3000)
            .retryPolicy(rp)
            .build();
        client.start();
        System.out.println("Client [" + thread + "] Server connected...");
        return client;
      }
    
    }

    运行程序,输出以下内容:

    Client [0] Server connected…
    Client [1] Server connected…
    Client [2] Server connected…
    Client #2成为当前leader 共成为leader的次数:1次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:1次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:1次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:2次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:2次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:2次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:3次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:3次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:3次
    Client #1放弃领导权
    Client #2成为当前leader 共成为leader的次数:4次
    Client #2放弃领导权
    Client #0成为当前leader 共成为leader的次数:4次
    Client #0放弃领导权
    Client #1成为当前leader 共成为leader的次数:4次
    Client #1放弃领导权

    上面只是简单测试代码,并没有关闭client等操作,每个实例在获取领导权后,如果 takeLeadership(CuratorFramework client) 方法执行结束,将会释放其领导权。而且获取领导权 也是按照 Client #2, Client #0 ,Client #1 顺序来的,正好验证了它的公平性。

    LeaderSelectorListener类继承了ConnectionStateListener。一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,curator实例必须确保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出现,curator实例不再是leader并且其takeLeadership()应该直接退出。

    推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。
    建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。

    欢迎关注微信公众号:大数据从业者
  • 相关阅读:
    mongoDB在windows下安装和配置.
    node 中的定时器, nextTick()和setImmediate()的使用
    node 通过指令创建一个package.json文件
    node 中 npm报错 Error: ENOENT, stat 'C:UsersAdministratorAppDataRoaming pm'
    canvas之太阳系效果
    canvas之抒写文字
    canvas之绘制一张图片
    canvas之画一个三角形
    canvas之旋转一条线段
    unity3d 2d游戏制作的模式
  • 原文地址:https://www.cnblogs.com/felixzh/p/15688123.html
Copyright © 2011-2022 走看看