zoukankan      html  css  js  c++  java
  • Zookeeper Curator 事件监听

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -25【 博客园 总入口


    写在前面

    ​ 大家好,我是作者尼恩。目前和几个小伙伴一起,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战

    ​ 前面,已经完成一个高性能的 Java 聊天程序的四件大事:

    接下来,需要进入到分布式开发的环节了。 分布式的中间件,疯狂创客圈的小伙伴们,一致的选择了zookeeper,不仅仅是由于其在大数据领域,太有名了。更重要的是,很多的著名框架,都使用了zk。

    本篇介绍 ZK Curator 的事件监听

    1.1. Curator 事件监听

    Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。

    Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。

    Watcher 监听器比较简单,只有一种。Cache事件监听的种类有3种Path Cache,Node Cache,Tree Cache。

    1.1.1. Watcher 标准的事件处理器

    在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型。

    Watcher接口定义了事件的回调方法:process(WatchedEvent event)。定义一个Watcher的实例很简单,代码如下:

    Watcher w = new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            log.info("监听器watchedEvent:" + watchedEvent);
        }
    };
    
    

    使用Watcher监听器实例的方式也很简单,在Curator的调用链上,加上usingWatcher方法即可,代码如下:

    byte[] content = client.getData()
            .usingWatcher(w).forPath(workerPath);
    

    一个Watcher监听器在向服务端完成注册后,当服务端的一些事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知,来实现分布式的通知功能。客户收到服务器的通知后,Curator 会封装一个WatchedEvent 事件实例,传递给监听器的回调方法process(WatchedEvent event)。

    WatchedEvent包含了三个基本属性:

    (1)通知状态(keeperState)

    (2)事件类型(EventType)

    (3)节点路径(path)

    注意,WatchedEvent并不是直接从ZooKeeper集群直接传递过来的事件实例,而是Curator 封装过的事件实例。WatchedEvent类型没有实现序列化接口java.io.Serializable,因此不能用于网络传输。ZooKeeper集群直接网络传输传递过来的事件实例是啥呢? 是一个WatcherEvent类型的实例,这个传输实例和Curator 封装过的WatchedEvent实例,在名称上有一个字母之差,而且功能也是一样的,都表示的是同一个事物,都是对一个服务端事件的封装。

    因此,这里只讲Curator 封装过的WatchedEvent实例。下边列举了ZooKeeper中最常见的几个通知状态和事件类型。

    KeeperState EventType 触发条件 说明
    None (-1) 客户端与服务端成功建立连接
    SyncConnected (0) NodeCreated (1) Watcher监听的对应数据节点被创建
    NodeDeleted (2) Watcher监听的对应数据节点被删除 此时客户端和服务器处于连接状态
    NodeDataChanged (3) Watcher监听的对应数据节点的数据内容发生变更
    NodeChildChanged (4) Wather监听的对应数据节点的子节点列表发生变更
    Disconnected (0) None (-1) 客户端与ZooKeeper服务器断开连接 此时客户端和服务器处于断开连接状态
    Expired (-112) Node (-1) 会话超时 此时客户端会话失效,通常同时也会受到SessionExpiredException异常
    AuthFailed (4) None (-1) 通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 通常同时也会收到AuthFailedException异常

    利用Watcher来对节点进行监听操作,但此监听操作只能监听一次。来看一个简单的实例程序:

    @Slf4j
    
    @Data
    
    public class ZkWatcherDemo {
    
     
    
        private String workerPath = "/test/listener/node";
        private String subWorkerPath = "/test/listener/node/id-";
    
     
        @Test
        public void testWatcher() {
            CuratorFramework client = ZKclient.instance.getClient();
    
            //检查节点是否存在,没有则创建
            boolean isExist = ZKclient.instance.isNodeExist(workerPath);
            if (!isExist) {
                ZKclient.instance.createNode(workerPath, null);
            }
    
            try {
    
                Watcher w = new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
                    }
                };
    
                byte[] content = client.getData()
                        .usingWatcher(w).forPath(workerPath);
    
                log.info("监听节点内容:" + new String(content));
    
                // 第一次变更节点数据
                client.setData().forPath(workerPath, "第1次更改内容".getBytes());
    
                // 第二次变更节点数据
                client.setData().forPath(workerPath, "第2次更改内容".getBytes());
    
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    		//....
    
    }
    
    

    运行代码,输出的结果如下:

    监听到的变化 watchedEvent = WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/node
    

    程序中,对节点路径 “/test/listener/node”注册一个Watcher监听器实例,随后调用setData方法两次改变节点内容,但是,监听器仅仅监听到了一个事件。也就是说,当第二次改变节点内容时,监听已经失效,无法再次获得节点变动事件。

    也就是说,Watcher监听器是一次性的,如果要反复使用,就需要反复的使用usingWatcher提前注册。

    所以,Watcher监听器不能应用于节点的数据变动或者节点变动这样的一般业务场景。而是适用于一些特殊的,比如会话超时、授权失败等这样的特殊场景。

    既然Watcher监听器是一次性的,在开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听。

    1.1.2. NodeCache 节点缓存的监听

    Curator引入的Cache缓存实现,是一个系列,包括了Node Cache 、Path Cache、Tree Cache三组类。其中Node Cache节点缓存可以用于ZNode节点的监听,Path Cache子节点缓存用于ZNode的子节点的监听,而Tree Cache树缓存是Path Cache的增强,不光能监听子节点,也能监听ZNode节点自身。

    Node Cache,可以用于监控本节点的新增,删除,更新。

    Node Cache使用的第一步,就是构造一个NodeCache缓存实例。

    有两个构造方法,具体如下:

    NodeCache(CuratorFramework client, String path) 
    
    NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 
    
    

    第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。

    NodeCache使用的第二步,就是构造一个NodeCacheListener监听器实例。该接口的定义如下:

    package org.apache.curator.framework.recipes.cache;
    
    public interface NodeCacheListener {
    
        void nodeChanged() throws Exception;
    
    }
    
    

    NodeCacheListener监听器接口,只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调到。

    在创建完NodeCacheListener的实例之后,需要将这个实例注册到NodeCache缓存实例,使用缓存实例的addListener方法。 然后使用缓存实例nodeCache的start方法,启动节点的事件监听。

    nodeCache.getListenable().addListener(l);
    
    nodeCache.start(); 
    
    

    强调下,需要调用nodeCache的start方法能进行缓存和事件监听,这个方法有两个版本:

    void    start()//Start the cache.
    
    void    start(boolean buildInitial)  //true代表缓存当前节点
    

    唯一的一个参数buildInitial代表着是否将该节点的数据立即进行缓存。如果设置为true的话,在start启动时立即调用NodeCache的getCurrentData方法就能够得到对应节点的信息ChildData类,如果设置为false的就得不到对应的信息。

    使用NodeCache来监听节点的事件,完整的实例代码如下:

        @Test
        public void testNodeCache() {
    
            //检查节点是否存在,没有则创建
            boolean isExist = ZKclient.instance.isNodeExist(workerPath);
            if (!isExist) {
                ZKclient.instance.createNode(workerPath, null);
            }
    
            CuratorFramework client = ZKclient.instance.getClient();
            try {
                NodeCache nodeCache =
                        new NodeCache(client, workerPath, false);
                NodeCacheListener l = new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        ChildData childData = nodeCache.getCurrentData();
                        log.info("ZNode节点状态改变, path={}", childData.getPath());
                        log.info("ZNode节点状态改变, data={}", new String(childData.getData(), "Utf-8"));
                        log.info("ZNode节点状态改变, stat={}", childData.getStat());
                    }
                };
                nodeCache.getListenable().addListener(l);
                nodeCache.start();
    
                // 第1次变更节点数据
                client.setData().forPath(workerPath, "第1次更改内容".getBytes());
                Thread.sleep(1000);
    
                // 第2次变更节点数据
                client.setData().forPath(workerPath, "第2次更改内容".getBytes());
    
                Thread.sleep(1000);
    
                // 第3次变更节点数据
                client.setData().forPath(workerPath, "第3次更改内容".getBytes());
                Thread.sleep(1000);
    
                // 第4次变更节点数据
    //            client.delete().forPath(workerPath);
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception e) {
                log.error("创建NodeCache监听失败, path={}", workerPath);
            }
        }
    
    

    运行的结果是,NodeCashe节点缓存能够重复的进行事件节点。代码中的第三次监听的输出节选如下:

    - ZNode节点状态改变, path=/test/listener/node
    
    - ZNode节点状态改变, data=第3次更改内容
    
    - ZNode节点状态改变, stat=17179869191,...
    

    最后说明一下,如果NodeCache监听的节点为空(也就是说传入的路径不存在)。那么如果我们后面创建了对应的节点,也是会触发事件从而回调nodeChanged方法。

    1.1.3. PathChildrenCache 子节点监听

    PathChildrenCache子节点缓存用于子节点的监听,监控本节点的子节点被创建、更新或者删除。需要强调两点:

    (1)只能监听子节点,监听不到当前节点

    (2)不能递归监听,子节点下的子节点不能递归监控

    PathChildrenCache子节点缓存使用的第一步,就是构造一个缓存实例。

    有多个重载版本的构造方法,选择4个进行说明,具体如下:

    public PathChildrenCache(CuratorFramework client, String path,boolean cacheData)
    
    public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, 
             boolean dataIsCompressed,final ExecutorService executorService)
    
    public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
             boolean dataIsCompressed,ThreadFactory threadFactory)
    
    public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
             ThreadFactory threadFactory)
    
    

    所有的构造方法,前三个参数,都是一样的。

    第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数cacheData表示是否把节点内容缓存起来。如果cacheData为true,那么接收到节点列表变更事件的同时,会将获得节点内容。

    dataIsCompressed参数(如果有),表示是否对节点数据进行压缩。

    executorService 和threadFactory参数差不多,表示通过传入的线程池或者线程工厂,来异步处理监听事件。

    threadFactory参数(如果有)表示线程池工厂,当PathChildrenCache内部需要开启新的线程执行时,使用该线程池工厂来创建线程。

    PathChildrenCache子节点缓存使用的第二步,就是构造一个子节点缓存监听器PathChildrenCacheListener实例。该接口的定义如下:

    package org.apache.curator.framework.recipes.cache;
    
    import org.apache.curator.framework.CuratorFramework;
     
    public interface PathChildrenCacheListener {
    
       void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;
    
    }
    

    PathChildrenCacheListener监听器接口中,也只定义了一个简单的方法 childEvent,当子节点有变化时,这个方法就会被回调到。

    在创建完PathChildrenCacheListener的实例之后,需要将这个实例注册到PathChildrenCache缓存实例,使用缓存实例的addListener方法。 然后使用缓存实例nodeCache的start方法,启动节点的事件监听。

    这里的start方法,需要传入启动的模式。可以传入三种模式,也就是API列表中看到的StartMode,其中定义了下面三种枚举:

    (1)NORMAL——异步初始化cache

    (2)BUILD_INITIAL_CACHE——同步初始化cache

    (3)POST_INITIALIZED_EVENT——异步初始化cache,并触发完成事件

    对于start模式的三种启动方式,详细的说明如下:

    BUILD_INITIAL_CACHE:启动时,同步初始化cache,以及创建cache后,就从服务器拉取对应的数据。

    POST_INITIALIZED_EVENT:启动时,异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知。

    最后是第一个枚举常量,NORMAL:启动时,异步初始化cache,完成后不会发出通知。

    使用PathChildrenCache来监听节点的事件,完整的实例代码如下:

    
        @Test
        public void testPathChildrenCache() {
    
            //检查节点是否存在,没有则创建
            boolean isExist = ZKclient.instance.isNodeExist(workerPath);
            if (!isExist) {
                ZKclient.instance.createNode(workerPath, null);
            }
    
            CuratorFramework client = ZKclient.instance.getClient();
    
            try {
                PathChildrenCache cache =
                        new PathChildrenCache(client, workerPath, true);
                PathChildrenCacheListener l =
                        new PathChildrenCacheListener() {
                            @Override
                            public void childEvent(CuratorFramework client,
                                                   PathChildrenCacheEvent event) {
                                try {
                                    ChildData data = event.getData();
                                    switch (event.getType()) {
                                        case CHILD_ADDED:
    
                                            log.info("子节点增加, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
    
                                            break;
                                        case CHILD_UPDATED:
                                            log.info("子节点更新, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
                                            break;
                                        case CHILD_REMOVED:
                                            log.info("子节点删除, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
                                            break;
                                        default:
                                            break;
                                    }
    
                                } catch (
                                        UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                            }
                        };
                cache.getListenable().addListener(l);
                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                Thread.sleep(1000);
                for (int i = 0; i < 3; i++) {
                    ZKclient.instance.createNode(subWorkerPath + i, null);
                }
    
                Thread.sleep(1000);
                for (int i = 0; i < 3; i++) {
                    ZKclient.instance.deleteNode(subWorkerPath + i);
                }
    
                 } catch (Exception e) {
                log.error("PathCache监听失败, path=", workerPath);
            }
    
        }
    
    

    运行的结果如下:

    - 子节点增加, path=/test/listener/node/id-0, data=to set content
    
    - 子节点增加, path=/test/listener/node/id-2, data=to set content
    
    - 子节点增加, path=/test/listener/node/id-1, data=to set content
    
    ......
    
    - 子节点删除, path=/test/listener/node/id-2, data=to set content
    
    - 子节点删除, path=/test/listener/node/id-0, data=to set content
    
    - 子节点删除, path=/test/listener/node/id-1, data=to set content
    
    

    可以看到,PathChildrenCache 能够反复的监听到节点的新增和删除。

    简单说下Curator的监听原理,无论是PathChildrenCache,还是TreeCache,所谓的监听,都是进行Curator本地缓存视图和ZooKeeper服务器远程的数据节点的对比。

    在什么场景下触发事件呢?

    以节点增加事件NODE_ADDED为例,所在本地缓存视图开始的时候,本地视图为空,在数据同步的时候,本地的监听器就能监听到NODE_ADDED事件。这是因为,刚开始本地缓存并没有内容,然后本地缓存和服务器缓存进行对比,发现ZooKeeper服务器有节点而本地缓存没有,这才将服务器的节点缓存到本地,就会触发本地缓存的NODE_ADDED事件。

    1.1.4. Tree Cache 节点树缓存

    前面已经讲完了两个系列的缓存监听。简单回顾一下:

    Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。Node Cache是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。

    Path Cache子节点缓存用来观察ZNode的子节点、并缓存子节点的状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。Path Cache是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。

    最后的一个系列,是Tree Cache。Tree Cache可以看做是上两种的合体,Tree Cache观察的是当前ZNode节点的所有数据。而TreeCache节点树缓存是PathChildrenCache的增强,不光能监听子节点,也能监听节点自身。

    Tree Cache使用的第一步,就是构造一个TreeCache缓存实例。

    有两个构造方法,具体如下:

    TreeCache(CuratorFramework client, String path) 
     
    
    TreeCache(CuratorFramework client, String path,
              boolean cacheData, boolean dataIsCompressed, int maxDepth, 
    		 ExecutorService executorService, boolean createParentNodes,
    		 TreeCacheSelector selector) 
    

    第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。maxDepth表示缓存的层次深度,默认为整数最大值。executorService 表示监听的的执行线程池,默认会创建一个单一线程的线程池。createParentNodes 表示是否创建父亲节点,默认为false。

    一般情况下,使用第一个构造函数即可。

    TreeCache使用的第二步,就是构造一个TreeCacheListener监听器实例。该接口的定义如下:

    package org.apache.curator.framework.recipes.cache;
    
     import org.apache.curator.framework.CuratorFramework;
    
    public interface TreeCacheListener {
        void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
    
    }
    

    TreeCacheListener 监听器接口中,也只定义了一个简单的方法 childEvent,当子节点有变化时,这个方法就会被回调到。

    在创建完TreeCacheListener 的实例之后,使用缓存实例的addListener方法,将TreeCacheListener 监听器实例注册到TreeCache 缓存实例。 然后使用缓存实例nodeCache的start方法,启动节点的事件监听。

    整个实例的代码如下:

     @Test
        public void testTreeCache() {
    
            //检查节点是否存在,没有则创建
            boolean isExist = ZKclient.instance.isNodeExist(workerPath);
            if (!isExist) {
                ZKclient.instance.createNode(workerPath, null);
            }
    
            CuratorFramework client = ZKclient.instance.getClient();
    
            try {
                TreeCache treeCache  =
                        new TreeCache(client, workerPath);
                TreeCacheListener l =
                        new TreeCacheListener() {
                            @Override
                            public void childEvent(CuratorFramework client,
                                                   TreeCacheEvent event) {
                                try {
                                    ChildData data = event.getData();
                                    if(data==null)
                                    {
                                        log.info("数据为空");
                                        return;
                                    }
                                    switch (event.getType()) {
                                        case NODE_ADDED:
    
                                            log.info("[TreeCache]节点增加, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
    
                                            break;
                                        case NODE_UPDATED:
                                            log.info("[TreeCache]节点更新, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
                                            break;
                                        case NODE_REMOVED:
                                            log.info("[TreeCache]节点删除, path={}, data={}",
                                                    data.getPath(), new String(data.getData(), "UTF-8"));
                                            break;
                                        default:
                                            break;
                                    }
    
                                } catch (
                                        UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                            }
                        };
                treeCache.getListenable().addListener(l);
                treeCache.start();
                Thread.sleep(1000);
                for (int i = 0; i < 3; i++) {
                    ZKclient.instance.createNode(subWorkerPath + i, null);
                }
    
                Thread.sleep(1000);
                for (int i = 0; i < 3; i++) {
                    ZKclient.instance.deleteNode(subWorkerPath + i);
                }
                Thread.sleep(1000);
    
                ZKclient.instance.deleteNode(workerPath);
    
                Thread.sleep(Integer.MAX_VALUE);
    
            } catch (Exception e) {
                log.error("PathCache监听失败, path=", workerPath);
            }
    
        }
    
    

    运行的结果如下:

    - [TreeCache]节点增加, path=/test/listener/node, data=to set content
    
     
    
    - [TreeCache]节点增加, path=/test/listener/node/id-0, data=to set content
    
    - [TreeCache]节点增加, path=/test/listener/node/id-1, data=to set content
    
    - [TreeCache]节点增加, path=/test/listener/node/id-2, data=to set content
    
     
    
    - [TreeCache]节点删除, path=/test/listener/node/id-2, data=to set content
    
    - [TreeCache]节点删除, path=/test/listener/node/id-1, data=to set content
    
    - [TreeCache]节点删除, path=/test/listener/node/id-0, data=to set content
    
     
    
    - [TreeCache]节点删除, path=/test/listener/node, data=to set content
    
    

    最后,说明下事件的类型,对应于节点的增加、修改、删除,TreeCache 的事件类型为:

    (1)NODE_ADDED

    (2)NODE_UPDATED

    (3)NODE_REMOVED

    这一点,与Path Cache 的事件类型不同,与Path Cache 的事件类型为:

    (1)CHILD_ADDED

    (2)CHILD_UPDATED

    (3)CHILD_REMOVED

    写在最后

    ​ 下一篇:基于zk,实现分布式锁。


    疯狂创客圈 亿级流量 高并发IM 实战 系列

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

    
    
    
    
  • 相关阅读:
    C++Primer第5版学习笔记(三)
    C++Primer第5版学习笔记(二)
    C++Primer第5版学习笔记(一)
    A*寻路算法的探寻与改良(三)
    A*寻路算法的探寻与改良(二)
    A*寻路算法的探寻与改良(一)
    html5页面js判断是否安装app,以及判断是否在app内部打开html5页面
    结合prototype和xmlhttprequest封装ajax请求
    前端常见的性能优化和浏览器兼容性问题
    常见的HTTP状态码
  • 原文地址:https://www.cnblogs.com/crazymakercircle/p/10228385.html
Copyright © 2011-2022 走看看