zoukankan      html  css  js  c++  java
  • Zookeeper 监控指定节点数据变化

    一、监控数据变化,并且只监控一次

    1、java代码

    public class ZkDemo {
        public static Logger logger = Logger.getLogger(ZkDemo.class);
    
        private static String connectString = "192.168.229.129:2181";
        private static int SESSION_TIME_OUT = 60 * 1000;
        private ZooKeeper zk = null;
        private String newValue = null;
    
        public String getOldValue() {
            return oldValue;
        }
    
        public void setOldValue(String oldValue) {
            this.oldValue = oldValue;
        }
    
        private String oldValue = null;
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
        public String getNewValue() {
            return newValue;
        }
    
        public void setNewValue(String newValue) {
            this.newValue = newValue;
        }
    
        /**
         * 启动并连接 Zookeeper
         */
        public ZooKeeper start() throws IOException {
            return new ZooKeeper(connectString, SESSION_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                }
            });
        }
    
        /**
         * 创建节点
         */
        public void createNode(String path, String data) throws IOException, KeeperException, InterruptedException {
            // 返回当前节点的名称
            String currentNode = zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("create zookeeper node success,currentNode******" + currentNode);
        }
    
    
        /**
         * 获取当前节点的数据,并且设置观察者
         */
        public String getNodeData(String path) throws IOException, KeeperException, InterruptedException {
            byte[] oldData = zk.getData(path, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        // 该节点的值是否发生变化的标识,true:发生变化,false:未发生变化
                        Boolean flag = triggerWathcher(path);
                        logger.info("*******flag:" + flag);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new Stat());
            // 获取当前节点的值,如果当前节点的值发生变化,将新值覆盖原先的值
            if (newValue != null && !oldValue.equals(newValue)) {
                oldValue = newValue;
            }
            oldValue = new String(oldData, "UTF-8");
            return oldValue;
        }
    
        /**
         * 触发观察者
         */
        public Boolean triggerWathcher(String path) throws IOException, KeeperException, InterruptedException {
            byte[] newData = zk.getData(path, false, new Stat());
            newValue = new String(newData, "UTF-8");
    
            if (oldValue.equals(newValue)) {
                logger.info("******this node value is no change");
                return false;
            } else {
                logger.info("******oldValue:" + oldValue + "******" + "newValue:" + newValue);
                return true;
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
            ZkDemo zkDemo = new ZkDemo();
            zkDemo.setZk(zkDemo.start());
            // 如果不存在 /xiaomao 这个节点,那么就创建该节点
            if (zkDemo.getZk().exists("/xiaomao", false) == null) {
                zkDemo.createNode("/xiaomao", "123");
            }
            zkDemo.getNodeData("/xiaomao");
            Thread.sleep(Long.MAX_VALUE);
        }
    }

    2、执行完 java 代码 zookeeper 客户端的变化情况

    // 查看根节点下所有的子节点,可以看到 /xiaomao 这个子节点已经创建出来了
    [zk: localhost:2181(CONNECTED) 75] ls /
    [zookeeper, xiaomao]
    // 获取节点 /xiaomao 的值,可以看到它的值就是我们设置的 123
    [zk: localhost:2181(CONNECTED) 76] get /xiaomao
    123
    cZxid = 0x143
    ctime = Mon Oct 12 08:00:54 CST 2020
    mZxid = 0x143
    mtime = Mon Oct 12 08:00:54 CST 2020
    pZxid = 0x143
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0

    3、手动在客户端变更节点 /xiaomao 的值

    // 手动将节点 /xiaomao 的值修改为456
    [zk: localhost:2181(CONNECTED) 77] set /xiaomao 456
    cZxid = 0x143
    ctime = Mon Oct 12 08:00:54 CST 2020
    mZxid = 0x144
    mtime = Mon Oct 12 08:01:18 CST 2020
    pZxid = 0x143
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    // 再一次手动将节点 /xiaomao 的值修改为789
    [zk: localhost:2181(CONNECTED) 78] set /xiaomao 789
    cZxid = 0x143
    ctime = Mon Oct 12 08:00:54 CST 2020
    mZxid = 0x145
    mtime = Mon Oct 12 08:01:23 CST 2020
    pZxid = 0x143
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    // 获取节点 /xiaomao 现在的值,可以看到值为789
    [zk: localhost:2181(CONNECTED) 79] get /xiaomao
    789
    cZxid = 0x143
    ctime = Mon Oct 12 08:00:54 CST 2020
    mZxid = 0x145
    mtime = Mon Oct 12 08:01:23 CST 2020
    pZxid = 0x143
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0

    4、控制台变化

    // 当前节点的名称
    2020-10-12 00:00:56,199 INFO [ZkDemo] - create zookeeper node success,currentNode******/xiaomao
    2020-10-12 00:00:56,203 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40014, packet:: 
    2020-10-12 00:01:09,538 DEBUG [org.apache.zookeeper.ClientCnxn] - Got ping response for sessionid: 0x100000365a40014 after 2ms
    2020-10-12 00:01:20,361 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40014
    2020-10-12 00:01:20,362 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected 
    2020-10-12 00:01:20,363 DEBUG [org.apache.zookeeper.ClientCnxn] - Got ping response for sessionid: 0x100000365a40014 after 2ms
    2020-10-12 00:01:20,364 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40014, packet:: 
    // 虽然我们在 Zookeeper 的控制台设置了两次值,一次是设置为 456 ,另外一次设置为 789 ,但是由于我们是一次性的监控,所以控制台只有一次变化的值
    2020-10-12 00:01:20,364 INFO [ZkDemo] - ******oldValue:123******newValue:456
    2020-10-12 00:01:20,364 INFO [ZkDemo] - *******flag:true
    

      

    二、监控数据变化,动态监控(多次监控)

    1、java代码

    public class ZkDemo {
        public static Logger logger = Logger.getLogger(ZkDemo.class);
    
        private static String connectString = "192.168.229.129:2181";
        private static int SESSION_TIME_OUT = 60 * 1000;
        private ZooKeeper zk = null;
        private String newValue = null;
    
        public String getOldValue() {
            return oldValue;
        }
    
        public void setOldValue(String oldValue) {
            this.oldValue = oldValue;
        }
    
        private String oldValue = null;
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
        public String getNewValue() {
            return newValue;
        }
    
        public void setNewValue(String newValue) {
            this.newValue = newValue;
        }
    
        /**
         * 启动并连接 Zookeeper
         */
        public ZooKeeper start() throws IOException {
            return new ZooKeeper(connectString, SESSION_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                }
            });
        }
    
        /**
         * 创建节点
         */
        public void createNode(String path, String data) throws IOException, KeeperException, InterruptedException {
            // 返回当前节点的名称
            String currentNode = zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("create zookeeper node success,currentNode******" + currentNode);
        }
    
    
        /**
         * 获取当前节点的数据,并且设置观察者
         */
        public String getNodeData(String path) throws IOException, KeeperException, InterruptedException {
            byte[] oldData = zk.getData(path, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        // 该节点的值是否发生变化的标识,true:发生变化,false:未发生变化
                        Boolean flag = triggerWathcher(path);
                        logger.info("*******flag:" + flag);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new Stat());
            oldValue = new String(oldData, "UTF-8");
            return oldValue;
        }
    
        /**
         * 触发观察者
         */
        public Boolean triggerWathcher(String path) throws IOException, KeeperException, InterruptedException {
            byte[] newData = zk.getData(path, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    try {
                        // 回调自身,类似于递归,可以实现实时监控
                        triggerWathcher(path);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new Stat());
            newValue = new String(newData, "UTF-8");
    
            if (oldValue.equals(newValue)) {
                logger.info("******this node value is no change");
                return false;
            } else {
                logger.info("******oldValue:" + oldValue + "******" + "newValue:" + newValue);
                // 如果值发生了变化,将新值覆盖原来的值
                oldValue = newValue;
                return true;
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
            ZkDemo zkDemo = new ZkDemo();
            zkDemo.setZk(zkDemo.start());
            // 如果不存在 /xiaomao 这个节点,那么就创建该节点
            if (zkDemo.getZk().exists("/xiaomaomao", false) == null) {
                zkDemo.createNode("/xiaomaomao", "123");
            }
            zkDemo.getNodeData("/xiaomaomao");
            Thread.sleep(Long.MAX_VALUE);
        }
    }

    2、Zookeeper 客户端操作

    // 获取根节点的值
    [zk: localhost:2181(CONNECTED) 82] ls /
    [xiaomaomao, zookeeper]
    [zk: localhost:2181(CONNECTED) 83] get /xiaomaomao
    123
    cZxid = 0x149
    ctime = Mon Oct 12 08:10:14 CST 2020
    mZxid = 0x149
    mtime = Mon Oct 12 08:10:14 CST 2020
    pZxid = 0x149
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    // 修改节点 /xiaomaomao 的值(原先是 123 ,设置一个相同的值,看一下效果)
    [zk: localhost:2181(CONNECTED) 84] set /xiaomaomao 123
    cZxid = 0x149
    ctime = Mon Oct 12 08:10:14 CST 2020
    mZxid = 0x14a
    mtime = Mon Oct 12 08:10:32 CST 2020
    pZxid = 0x149
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    // 修改节点 /xiaomaomao 的值为 456
    [zk: localhost:2181(CONNECTED) 85] set /xiaomaomao 456
    cZxid = 0x149
    ctime = Mon Oct 12 08:10:14 CST 2020
    mZxid = 0x14b
    mtime = Mon Oct 12 08:10:38 CST 2020
    pZxid = 0x149
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    // 修改节点 /xiaomaomao 的值为 789
    [zk: localhost:2181(CONNECTED) 86] set /xiaomaomao 789
    cZxid = 0x149
    ctime = Mon Oct 12 08:10:14 CST 2020
    mZxid = 0x14c
    mtime = Mon Oct 12 08:10:42 CST 2020
    pZxid = 0x149
    cversion = 0
    dataVersion = 3
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 0
    

    3、控制台变化

    // 设置一个相同的值时,控制台的变化情况
    2020-10-12 00:10:33,553 INFO [ZkDemo] - ******this node value is no change
    2020-10-12 00:10:33,553 INFO [ZkDemo] - *******flag:false
    2020-10-12 00:10:39,593 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40015
    2020-10-12 00:10:39,594 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected 
    2020-10-12 00:10:39,595 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40015, 
    // 将节点的值修改为 456
    2020-10-12 00:10:39,595 INFO [ZkDemo] - ******oldValue:123******newValue:456
    2020-10-12 00:10:43,993 DEBUG [org.apache.zookeeper.ClientCnxn] - Got notification sessionid:0x100000365a40015
    2020-10-12 00:10:43,993 DEBUG [org.apache.zookeeper.ClientCnxn] - Got WatchedEvent state:SyncConnected type:
    2020-10-12 00:10:43,995 DEBUG [org.apache.zookeeper.ClientCnxn] - Reading reply sessionid:0x100000365a40015, 
    // 将节点的值修改为 456
    2020-10-12 00:10:43,995 INFO [ZkDemo] - ******oldValue:456******newValue:789
    

      

     

  • 相关阅读:
    jupyterlab数据处理
    系统监测模块
    登录验证码的实现
    编码格式检测chardet模块
    图像处理pillow模块
    内存数据的读取
    力扣(LeetCode)728. 自除数
    力扣(LeetCode)709. 转换成小写字母
    Java 层序创建和遍历二叉树
    力扣(LeetCode) 849. 到最近的人的最大距离
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/13787639.html
Copyright © 2011-2022 走看看