zoukankan      html  css  js  c++  java
  • Zookeeper中的watcher监听和leader选举机制

    watcher监听

    什么是watcher接口

    同一个事件类型在不同的通知状态中代表的含义有所不同,下图列举了常见的通知状态和事件类型。

     Watcher通知状态与事件类型一览

    上图列举了ZooKeeper中最常见的几个通知状态和事件类型。
    回调方法process()
    process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:
    abstract public void process(WatchedEvent event);
    这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。
    WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
    提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
    服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
    需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到信息。

    代码

    public class ZkWatcher implements Watcher {
        //连接地址
        private static final String ZK_ADDRESS="0.0.0.0:2181";
        //超时时间
        private static final Integer ZK_SESSION_TIMEOUT=2000;
    
    
        private ZooKeeper zooKeeper;
    
        public ZkWatcher() {
            openConnection(ZK_ADDRESS,ZK_SESSION_TIMEOUT);
        }
    
        /**
         * 回调方法,监听连接,监听增删改节点
         * @param event
         */
        @Override
        public void process(WatchedEvent event) {
            //获取当前的状态
            Event.KeeperState keeperState = event.getState();
            //获取通知类型
            Event.EventType eventType = event.getType();
            //获取操作节点的路径
            String path = event.getPath();
    
            System.out.println("当前状态为:"+keeperState+"	通知类型为:"+eventType+"	操作的节点路径:"+path);
            //已经成功连接
            if(Event.KeeperState.SyncConnected==keeperState){
                //连接状态
                if(Event.EventType.None==eventType){
                    System.out.println("========================连接事件回调========================");
                }
                //创建节点
                if (Event.EventType.NodeCreated==eventType){
                    System.out.println("========================创建节点事件回调========================");
                }
                //修改节点
                if(Event.EventType.NodeDataChanged==eventType){
                    System.out.println("========================修改节点事件回调========================");
                }
                //删除节点
                if(Event.EventType.NodeDeleted==eventType){
                    System.out.println("========================删除节点事件回调========================");
                }
            }
        }
        /**
         * 连接zk方法
         */
        public void openConnection(String zk_address,Integer zk_sessionout){
            try {
                zooKeeper = new ZooKeeper(zk_address, zk_sessionout, this);
                System.out.println("!!!!!连接zk成功!!!!!!!");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 关闭zk连接
         *
         */
         public void closeConnection(){
             try {
                 if(zooKeeper!=null){
                     zooKeeper.close();
                 }
    
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
    
        /**
         * 添加节点
         * @param
         */
        public void createNode(String path,String data){
            try {
                //启动监听
                zooKeeper.exists(path,true);
                String result = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("创建节点成功:"+result);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 修改节点
         */
        public void setNode(String path,String data){
    
            try {
                //启动监听
                zooKeeper.exists(path,true);
                zooKeeper.setData(path, data.getBytes(), -1);
                System.out.println("修改节点成功");
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 删除节点
         */
        public void deleteNode(String path){
    
            try {
                //启动监听
                zooKeeper.exists(path,true);
                zooKeeper.delete(path, -1);
                System.out.println("删除节点成功");
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) {
            ZkWatcher zkWatcher=new ZkWatcher();
            //zkWatcher.createNode("/zkNode","zkValue");
    
            zkWatcher.setNode("/zkNode","zkValueSet");
    
            zkWatcher.deleteNode("/zkNode");
    
            zkWatcher.closeConnection();
        }
    }

     leader选举机制

    Zookeeper Server三种角色:Leader,Follower,Observer。
    Leader是Zookeeper 集群工作机制的核心,主要工作:
    1.调度者:集群内部各个服务节点的调度者
    2.事务请求:事务请求的唯一调度和处理者,保证集群事务处理的顺序性
    Follower主要职责:
    1.非事务请求:Follower 直接处理非事务请求,对于事务请求,转发给 Leader
    2.Proposal 投票:Leader 上执行事务时,需要 Follower 投票,Leader 才真正执行
    3.Leader 选举投票
    Observer主要职责:
    1.非事务请求:Follower 直接处理非事务请求,对于事务请求,转发给 Leader
    Observer 跟 Follower的区别:
    1.Follower 参与投票:Leader 选举、Proposal 提议投票(事务执行确认)
    2.Observer 不参与投票:只用于提供非事务请求的处理

    选择机制中的概念
    serverId(服务器ID 既 myid)
    比如有三台服务器,编号分别是1,2,3。
    编号越大在选择算法中的权重越大。
    zxid(最新的事物ID 既 LastLoggedZxid)
    服务器中存放的最大数据ID。
    ID值越大说明数据越新,在选举算法中数据越新权重越大。
    epoch (逻辑时钟 既 PeerEpoch)
    每个服务器都会给自己投票,或者叫投票次数,同一轮投票过程中的逻辑时钟值是相同的。
    每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比。
    如果收到低于当前轮次的投票结果,该投票无效,需更新到当前轮次和当前的投票结果。

    选举状态
    LOOKING,竞选状态。
    FOLLOWING,随从状态,同步leader状态,参与投票。
    OBSERVING,观察状态,同步leader状态,不参与投票。
    LEADING,领导者状态。

    选举算法
    通过 zoo.cfg 配置文件中的 electionAlg 属性指定 (1-3),要理解算法,需要一些paxos算法的理论基础。
    1 对应:LeaderElection 算法。
    2 对应:AuthFastLeaderElection 算法。
    3 对应:FastLeaderElection 默认的算法。

    投票内容
    选举人ID
    选举人数据ID
    选举人选举轮数
    选举人选举状态
    推举人ID
    推举人选举轮数

  • 相关阅读:
    设计模式-状态模式
    设计模式-策略模式
    Spring MVC 梳理
    Spring MVC 梳理
    设计模式-总结
    Spring boot 梳理
    Spring boot 梳理
    [DP题]最长上升子序列
    NOIP2013 Day1
    20171025日程
  • 原文地址:https://www.cnblogs.com/dabrk/p/11936057.html
Copyright © 2011-2022 走看看