zoukankan      html  css  js  c++  java
  • watcher

    watcher架构:

      Watcher实现由三个部分组成:

        Zookeeper服务端

        Zookeeper客户端

        客户端的ZKWatchManager对象

      客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当Zookeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关Watcher来回调响应的处理逻辑,从而完成整体的数据发布/订阅流程。

      

    watcher特性:

      

    watcher接口设计:

      Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚类:KeeperState、EventType

        

       

      

     捕获相应的事件:

      建立zookeeper的watcher监听:在zookeeper中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)这样的方式为某个znode注册监听。

      下表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:

        

     注册watcher的方法:

      1.客户端与服务器的连接状态

        

    public class ZKConnectionWatcher implements Watcher {
        // 计数器对象
        static CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接对象
        static ZooKeeper zooKeeper;
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功");
                        countDownLatch.countDown(); // 通知线程可以继续往下执行了
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("断开连接");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
                        // 会话超时,重新创建
                        zooKeeper = new ZooKeeper("192.168.43.182:2181", 5000, new ZKConnectionWatcher());
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            try {
                zooKeeper = new ZooKeeper("192.168.43.182:2181", 5000, new ZKConnectionWatcher());
                countDownLatch.await(); // 阻塞线程,等待连接的创建
                // 会话id
                System.out.println(zooKeeper.getSessionId());
    
                // 添加授权用户
                zooKeeper.addAuthInfo("digest", "fan:123456".getBytes());
                byte[] bytes = zooKeeper.getData("/node1", false, null);
                System.out.println(new String(bytes));
    
                Thread.sleep(5000);
                System.out.println("结束");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (zooKeeper != null) {
                    try {
                        zooKeeper.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code

      2.检查节点是否存在(watcher是一次性的,下面演示创建节点、更新节点、删除节点,每次都需要重新启动测试类,启动一次连续测试的话只有第一次编辑节点可以监听到)

    public class ZKWatcherExists {
    
        private String IP = "192.168.43.182:2181";
        private ZooKeeper zooKeeper;
    
        @Before
        public void before() {
            try {
                // 计数器对象
                CountDownLatch countDownLatch = new CountDownLatch(1);
    
                // 参数1:服务器的ip和端口    参数2:客户端和服务器之间的会话超时时间,以毫秒为单位 参数3:监视器对象
                zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            System.out.println("连接创建成功!");
                            countDownLatch.countDown();
                        }
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                    }
                });
                // 主线程阻塞等待连接对象的创建成功
                countDownLatch.await();
                // 会话编号
                System.out.println(zooKeeper.getSessionId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @After
        public void after() {
            if (zooKeeper != null) {
                try {
                    zooKeeper.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Test
        public void watcherExists1 () throws Exception {
            // 参数1:节点的路径 参数2:是否使用连接对象中注册的监视器
            zooKeeper.exists("/watcher1", true);
            Thread.sleep(50000);
            System.out.println("结束");
            /**
             * 启动测试类后,在zookeeper的客户端创建节点(create /watcher1 "aaaa")
             * 可以看到控制台输出:
             *          path=/watcher1
             *          eventType=NodeCreated
             *
             * 同样的方式可以测试节点数据的改变:
             *          path=/watcher1
             *          eventType=NodeDataChanged
             *
             * 节点的删除:
             *          path=/watcher1
             *          eventType=NodeDeleted
             */
        }
    
        @Test
        public void watcherExists2 () throws Exception {
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("自定义watcher");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
    
                        // watcher的创建是一次性的,如果需要连续监听:
                        zooKeeper.exists("/watcher1", this);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            // 自定义watcher对象
            zooKeeper.exists("/watcher1", watcher);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        // 注册多个监听器对象,每个监听器都能监听到
        @Test
        public void watcherExists3 () throws Exception {
            zooKeeper.exists("/watcher1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("1");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            zooKeeper.exists("/watcher1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("2");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    View Code

      3.查看节点

        @Test
        public void watcherGetData1() throws Exception {
            zooKeeper.getData("/watcher2", true, null);
            Thread.sleep(50000);
            System.out.println("结束");
            /**
             * 修改节点数据,可以看到控制台输出:
             *          path=/watcher2
             *          eventType=NodeDataChanged
             *
             * 节点的删除:
             *          path=/watcher2
             *          eventType=NodeDeleted
             */
        }
    
        @Test
        public void watcherGetData2() throws Exception {
            zooKeeper.getData("/watcher2", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            }, null);
            Thread.sleep(50000);
            System.out.println("结束");
    
            /**watcher连续监听(需要判断如果是删除节点,则不需要再继续监听)、注册多个监听器对象的用法与检查节点是否存在的用法一样**/
        }
    View Code

      4.查看子节点

        @Test
        public void watcherGetChildren1() throws Exception {
            zooKeeper.getChildren("/watcher3", true);
            Thread.sleep(50000);
            System.out.println("结束");
            /**
             * 创建子节点、删除子节点、递归删除所有节点(rmr watcher3),都可以看到控制台输出:
             *          path=/watcher3
             *          eventType=NodeChildrenChanged
             *
             * 删除节点watcher3:
             *          path=/watcher3
             *          eventType=NodeDeleted
             */
        }
    
        @Test
        public void watcherGetChildren2() throws Exception {
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("自定义watcher");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        // 如果是根节点被删除,没必须再继续监听
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            zooKeeper.getChildren("/watcher3", this);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            zooKeeper.getChildren("/watcher3", watcher);
            Thread.sleep(50000);
            System.out.println("结束");
    
            /**注册多个监听器对象的用法与前面的例子一样**/
        }
    View Code
  • 相关阅读:
    Android 5.0以下系统支持TLS 1.1/1.2协议版本
    Java & Android未捕获异常处理机制
    Oppo Reno2 不允许安装非正式签名应用
    Android ADB 实用总结
    Android Studio中的非项目文件及项目目录下的全局搜索
    Android开发中网络代理设置实用总结
    基于时间偏差思路下的时间周期度量
    nodejs anywhere 搭建本地静态文件服务
    Android 支持库迁移到AndroidX
    项目Gradle版本从4.4升级到4.6
  • 原文地址:https://www.cnblogs.com/roadlandscape/p/12982751.html
Copyright © 2011-2022 走看看