zoukankan      html  css  js  c++  java
  • Zookeeper中的watcher监听和leader选举机制

    watcher监听

    什么是watcher接口

    同一个事件类型在不同的通知状态中代表的含义有所不同,下图列举了常见的通知状态和事件类型。

     Watcher通知状态与事件类型一览

    上图列举了ZooKeeper中最常见的几个通知状态和事件类型。
    回调方法process()
    process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:
    abstract public void process(WatchedEvent event);
    这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。
    WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
    提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
    服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
    需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到信息。

    代码

    public class ZkWatcher implements Watcher {
        //连接地址
        private static final String ZK_ADDRESS="0.0.0.0:2181";
        //超时时间
        private static final Integer ZK_SESSION_TIMEOUT=2000;
    
    
        private ZooKeeper zooKeeper;
    
        public ZkWatcher() {
            openConnection(ZK_ADDRESS,ZK_SESSION_TIMEOUT);
        }
    
        /**
         * 回调方法,监听连接,监听增删改节点
         * @param event
         */
        @Override
        public void process(WatchedEvent event) {
            //获取当前的状态
            Event.KeeperState keeperState = event.getState();
            //获取通知类型
            Event.EventType eventType = event.getType();
            //获取操作节点的路径
            String path = event.getPath();
    
            System.out.println("当前状态为:"+keeperState+"	通知类型为:"+eventType+"	操作的节点路径:"+path);
            //已经成功连接
            if(Event.KeeperState.SyncConnected==keeperState){
                //连接状态
                if(Event.EventType.None==eventType){
                    System.out.println("========================连接事件回调========================");
                }
                //创建节点
                if (Event.EventType.NodeCreated==eventType){
                    System.out.println("========================创建节点事件回调========================");
                }
                //修改节点
                if(Event.EventType.NodeDataChanged==eventType){
                    System.out.println("========================修改节点事件回调========================");
                }
                //删除节点
                if(Event.EventType.NodeDeleted==eventType){
                    System.out.println("========================删除节点事件回调========================");
                }
            }
        }
        /**
         * 连接zk方法
         */
        public void openConnection(String zk_address,Integer zk_sessionout){
            try {
                zooKeeper = new ZooKeeper(zk_address, zk_sessionout, this);
                System.out.println("!!!!!连接zk成功!!!!!!!");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 关闭zk连接
         *
         */
         public void closeConnection(){
             try {
                 if(zooKeeper!=null){
                     zooKeeper.close();
                 }
    
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
    
        /**
         * 添加节点
         * @param
         */
        public void createNode(String path,String data){
            try {
                //启动监听
                zooKeeper.exists(path,true);
                String result = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("创建节点成功:"+result);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 修改节点
         */
        public void setNode(String path,String data){
    
            try {
                //启动监听
                zooKeeper.exists(path,true);
                zooKeeper.setData(path, data.getBytes(), -1);
                System.out.println("修改节点成功");
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 删除节点
         */
        public void deleteNode(String path){
    
            try {
                //启动监听
                zooKeeper.exists(path,true);
                zooKeeper.delete(path, -1);
                System.out.println("删除节点成功");
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            ZkWatcher zkWatcher=new ZkWatcher();
            //zkWatcher.createNode("/zkNode","zkValue");
    
            zkWatcher.setNode("/zkNode","zkValueSet");
    
            zkWatcher.deleteNode("/zkNode");
    
            zkWatcher.closeConnection();
        }
    }

     leader选举机制

    Zookeeper Server三种角色:Leader,Follower,Observer。
    Leader是Zookeeper 集群工作机制的核心,主要工作:
    1.调度者:集群内部各个服务节点的调度者
    2.事务请求:事务请求的唯一调度和处理者,保证集群事务处理的顺序性
    Follower主要职责:
    1.非事务请求:Follower 直接处理非事务请求,对于事务请求,转发给 Leader
    2.Proposal 投票:Leader 上执行事务时,需要 Follower 投票,Leader 才真正执行
    3.Leader 选举投票
    Observer主要职责:
    1.非事务请求:Follower 直接处理非事务请求,对于事务请求,转发给 Leader
    Observer 跟 Follower的区别:
    1.Follower 参与投票:Leader 选举、Proposal 提议投票(事务执行确认)
    2.Observer 不参与投票:只用于提供非事务请求的处理

    选择机制中的概念
    serverId(服务器ID 既 myid)
    比如有三台服务器,编号分别是1,2,3。
    编号越大在选择算法中的权重越大。
    zxid(最新的事物ID 既 LastLoggedZxid)
    服务器中存放的最大数据ID。
    ID值越大说明数据越新,在选举算法中数据越新权重越大。
    epoch (逻辑时钟 既 PeerEpoch)
    每个服务器都会给自己投票,或者叫投票次数,同一轮投票过程中的逻辑时钟值是相同的。
    每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比。
    如果收到低于当前轮次的投票结果,该投票无效,需更新到当前轮次和当前的投票结果。

    选举状态
    LOOKING,竞选状态。
    FOLLOWING,随从状态,同步leader状态,参与投票。
    OBSERVING,观察状态,同步leader状态,不参与投票。
    LEADING,领导者状态。

    选举算法
    通过 zoo.cfg 配置文件中的 electionAlg 属性指定 (1-3),要理解算法,需要一些paxos算法的理论基础。
    1 对应:LeaderElection 算法。
    2 对应:AuthFastLeaderElection 算法。
    3 对应:FastLeaderElection 默认的算法。

    投票内容
    选举人ID
    选举人数据ID
    选举人选举轮数
    选举人选举状态
    推举人ID
    推举人选举轮数

  • 相关阅读:
    Saltstack module acl 详解
    Saltstack python client
    Saltstack简单使用
    P5488 差分与前缀和 NTT Lucas定理 多项式
    CF613D Kingdom and its Cities 虚树 树形dp 贪心
    7.1 NOI模拟赛 凸包套凸包 floyd 计算几何
    luogu P5633 最小度限制生成树 wqs二分
    7.1 NOI模拟赛 dp floyd
    springboot和springcloud
    springboot集成mybatis
  • 原文地址:https://www.cnblogs.com/dabrk/p/11936057.html
Copyright © 2011-2022 走看看