zoukankan      html  css  js  c++  java
  • Kafka学习笔记(4)----Kafka的Leader Election

    1. Zookeeper的基本操作

      zookeeper中的节点可以持久化/有序的两个维度分为四种类型:

      PERSIST:持久化无序(保存在磁盘中)

      PERSIST_SEQUENTIAL:持久化有序递增

      EPHEMERAL:非持久化的无序的,保存在内存中,当客户端关闭后消失。

      EPHEMERAL_SEQUENTIAL:非持久有序递增,保存在内存中,当客户端关闭后消失

      每个节点都可以注册Watch操作,用于监听节点的变化,有四种事件类型如下:

      Created event: Enabled with a call to exists

      Deleted event: Enabled with a call to exists, getData, and getChildren

      Changed event: Enabled with a call to exists and getData

      Child event: Enabled with a call to getChildren

      Watch的基本特征是客户端先得到通知,然后才能得到数据,Watch被fire之后就立即取消了,不会再有Watch后续变化,想要监听只能重新注册;

    使用原生Zookeeper创建节点和监听节点变化代码如下:

      1. 引入依赖,pom.xml

     <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.13</version>
        </dependency>

      2. 客户端连接类

    package com.wangx.kafka.zk;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class ZkDemo {
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            //创建链接,并监听连接状态
            ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("链接客户端");
                    System.out.println(watchedEvent.getState());
                }
            });
            //创建节点,/parent:节点路径, data.xx:数据,Ids:设置权限CreateNode.PERSISTENT:创建节点类型
            String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            //监听节点变化
            zooKeeper.exists("/testRoot", new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("state" + watchedEvent.getState());
                }
            });
           System.out.println(parent);
            Thread.sleep(10000000);
        }
    }

      运行创建一个持久化的节点。

      查看客户端可以看到:

      

      parent节点创建成功。

      删除parent节点,观察watche变化。

      控制台打印:

      

      表示监听了删除节点事件,此时再在客户端手动创建节点,观察变化

      

      控制台并没有打印任何创建信息,说明没有监听到,这就是我们说的一旦watche被fire之后就会被关闭,此时改造一下代码:

    package com.wangx.kafka.zk;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class ZkDemo {
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            //创建链接,并监听连接状态
            final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("链接客户端");
                    System.out.println(watchedEvent.getState());
                }
            });
            //创建节点
            String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            //监听节点变化
            zooKeeper.exists("/parent", new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("state" + watchedEvent.getState());
                    try {
                       //重新注册监听事件
                        zooKeeper.exists("/parent", this);
                    } catch (KeeperException e) {
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    //        System.out.println(newNode);
            Thread.sleep(10000000);
        }
    }

      删除节点,再手动创建节点:

      

      控制台打印如下:

      

      这样创建节点的事件就又被重新注册并监听到了。

    2. 基于Zookeeper的Leader Election

      1. 抢注Leader节点——非公平模式

      编码流程:

      1. 创建Leader父节点,如/chroot,并将其设置为persist节点

      2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral

      3. 若某创建Leader节点成功,则该客户端成功竞选为Leader

      4. 若创建Leader节点失败,则竞选Leader失败,在/chroot/leader节点上注册exist的watch,一旦该节点被删除则获得通知

      5. Leader可通过删除Leader节点来放弃Leader

      6. 如果Leader宕机,由于Leader节点被设置为ephemeral,Leader节点会自行删除。而其它节点由于在Leader节点上注册了watch,故可得到通知,参与下一轮竞选,从而保证总有客户端以Leader角色工作。

      实现代码如下:

    package com.wangx.kafka.zk;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class ZkDemo {
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            //创建链接,并监听连接状态
            final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("链接客户端");
                    System.out.println(watchedEvent.getState());
                }
            });
            //创建节点
            String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    
            //监听节点变化
            zooKeeper.exists("/parent", new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("state" + watchedEvent.getState());
                    try {
                        zooKeeper.exists("/parent", this);
                    } catch (KeeperException e) {
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
            String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
            String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
    //        System.out.println(newNode);
            Thread.sleep(10000000);
        }
    }

      当存在节点之后,会抛出异常,这样就会导致节点创建不成功,所以只有创建成功的node才能成为leader。使用watcher监听可以在节点被删除或宕机之后来抢占leader.

      2.  先到先得,后者监视前者——公平模式

      1. 创建Leader父节点,如/chroot,并将其设置为persist节点

      2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral_sequential

      3. 客户端通过getChildren方法获取/chroot/下所有子节点,如果其注册的节点的id在所有子节点中最小,则当前客户端竞选Leader成功

      4. 否则,在前面一个节点上注册watch,一旦前者被删除,则它得到通知,返回step 3(并不能直接认为自己成为新Leader,因为可能前面的节点只是宕机了)

      5. Leader节点可通过自行删除自己创建的节点以放弃Leader

      代码实现如下:

    package com.wangx.kafka.zk;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class ZkDemo {
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            //创建链接,并监听连接状态
            final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("链接客户端");
                    System.out.println(watchedEvent.getState());
                }
            });
            //创建节点
            String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    
            //监听节点变化
            zooKeeper.exists("/parent", new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("state" + watchedEvent.getState());
                    try {
                        zooKeeper.exists("/parent", this);
                    } catch (KeeperException e) {
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
    //        System.out.println(newNode);
            Thread.sleep(10000000);
        }
    }

      可以看到zk中的parent下多出了三个节点:

      

      默认以node+十个十进制数命名节点名称,数据递增。

      当id在所有子节点中最小,选举成为leader.

    3. Leader Election在Curator中的实现

      手下引入Curator依赖,pom.xml如下:

     <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
          <version>3.2.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-recipes</artifactId>
          <version>3.2.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.curator</groupId>
          <artifactId>curator-client</artifactId>
          <version>3.2.1</version>
        </dependency>

      1. Curator LeaderLatch特点及api的作用:

      1. 竞选为Leader后,不可自行放弃领导权

      2. 只能通过close方法放弃领导权

      3. 强烈建议增加ConnectionStateListener,当连接SUSPENDED或者LOST时视为丢失领导权

      4. 可通过await方法等待成功获取领导权,并可加入timeout

      5. 可通过hasLeadership方法判断是否为Leader

      6. 可通过getLeader方法获取当前Leader

      7. 可通过getParticipants方法获取当前竞选Leader的参与方

      简单实现:

    package com.wangx.kafka.zk;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderLatch;
    import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CuratorLeaderLatch {
        public static void main(String[] args) throws Exception {
            //设置重试策略,这里是沉睡一秒后开始重试,重试五次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
            //通过工厂类获取curatorFramework
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
            //leader节点创建
            LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node");
            //监听leader节点
            leaderLatch.addListener(new LeaderLatchListener() {
                //当前节点是leader时回调
                public void isLeader() {
                    System.out.println("I am a listener");
                }
                //不再是leader时回调
                public void notLeader() {
                    System.out.println("I am not a  listener");
                }
            });
            //启动
            curatorFramework.start();
            leaderLatch.start();
            Thread.sleep(100000000);
            leaderLatch.close();
            curatorFramework.close();
        }
    }

      2. Curator LeaderSelector特点及api的作用:

      1. 竞选Leader成功后回调takeLeadership方法

      2. 可在takeLeadership方法中实现业务逻辑

      3. 一旦takeLeadership方法返回,即视为放弃领导权

      4. 可通过autoRequeue方法循环获取领导权

      5. 可通过hasLeadership方法判断是否为Leader

      6. 可通过getLeader方法获取当前Leader

      7. 可通过getParticipants方法获取当前竞选Leader的参与方

    简单实现:

    package com.wangx.kafka.zk;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.*;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CuratorLeaderSelector {
        public static void main(String[] args) throws Exception {
            //设置重试策略,这里是沉睡一秒后开始重试,重试五次
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5);
            //通过工厂类获取curatorFramework
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy);
            //leader节点创建,监听Leader状态,并在takeLeadership回调函数中做自己的业务逻辑
            LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() {
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    Thread.sleep(1000);
                    System.out.println("启动了 takeLeadership");
                }
            });
            leaderSelector.autoRequeue();
            leaderSelector.start();
            //启动
            curatorFramework.start();
            Thread.sleep(100000000);
            leaderSelector.close();
            curatorFramework.close();
        }
    }

      这里的LeaderSelectorListenerAdapter实现了LeaderSelectorListener接口,源码如下:

    //
    // Source code recreated from a .class file by IntelliJ IDEA
    // (powered by Fernflower decompiler)
    //
    
    package org.apache.curator.framework.recipes.leader;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.state.ConnectionState;
    
    public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener {
        public LeaderSelectorListenerAdapter() {
        }
        //当连接失败时,会抛出异常,这样就会中断takeLeadership方法,防止业务逻辑错误操作
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (client.getConnectionStateErrorPolicy().isErrorState(newState)) {
                throw new CancelLeadershipException();
            }
        }
    }

    4. Kafka的Leader Election

      1. Kafka“各自为政”Leader Election

      每个Partition的多个Replica同时竞争Leader,这样做的好处是实现起来比较简单,但是同样出现的问题的就是Herd Effect(可能会有很多的leader节点),Zookeeper负载过重,Latency较大(可能会产生很多其他的问题)

      2. Kafka基于Controller的Leader Election

      原理是在整个集群中选举出一个Broker作为Controller,Controller为所有Topic的所有Partition指定Leader及Follower,Kafka通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 

      利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。 当leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起leader选举。

      这样做极大缓解Herd Effect问题,减轻Zookeeper负载,Controller与Leader及Follower间通过RPC通信,高效且实时,但是由于引入Controller增加了复杂度,同时需要考虑Controller的Failover(容错)

      

    原文 Kafka学习笔记(4)----Kafka的Leader Election

  • 相关阅读:
    2017.10.25总结与回顾
    2017.10.24总结与回顾
    CSS盒子模型
    2017.10.23学习知识总结回顾及编写新网页
    JAVA与mysql之间的编码问题
    想写好代码,送你三个神器
    Vue+highligh.js实现语法高亮(转)
    Vue.JS实现复制到粘帖板功能
    npm install、npm install --save与npm install --save-dev区别(转)
    ES7与ES8新特性
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10867419.html
Copyright © 2011-2022 走看看