zoukankan      html  css  js  c++  java
  • 监听ZK节点数据变化的几种方式

    一、原生watch方式

    1、引入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Dubbo-Code</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>zk-client</artifactId>
    
        <dependencies>
            <!-- Dubbo -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo</artifactId>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.75</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.5.6</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    </project>

    2、代码

    package com.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.LinkedHashMap;
    import java.util.List;
    import java.util.Map;
    
    public class Demo {
        static ZooKeeper zooKeeper;
        private static final String parentPath="/node";
        //充当redis
        private static Map<String, Object> map = new LinkedHashMap<String, Object>();
    
        static {
            try {
                zooKeeper = new ZooKeeper("192.168.20.93:2181", 2000, null);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) throws Exception {
            //获取节点数据,new Watcher监听节点数据变化
            if (zooKeeper==null){
                System.out.println("zk为空");
            }
            Watcher watcher=new Watcher() {
                public void process(WatchedEvent event) {
                    //event.getPath()获取节点路径,event.getType()获取节点变化类型
                    System.out.println(event.getPath()+"节点数据发生了变化:"+event.getType());
                    getChlidren(parentPath,this);
                }
            };
            getChlidren(parentPath,watcher);
    
            Thread.sleep(Integer.MAX_VALUE);
            zooKeeper.close();
        }
    
        /**
         * 递归获取节点信息
         */
        private static void getChlidren(String path,Watcher watcher)  {
            try {
                if(path.equals(parentPath)) {
                    byte[] data = zooKeeper.getData(path, watcher, null);
                    String dataStr = new String(data);
                    map.put(path, dataStr);
                    System.out.println("[" + path + "]" + dataStr);
                }
    
                List<String> children = zooKeeper.getChildren(path, watcher);
                if(children.isEmpty() || children.size()==0){
                    return ;
                }
                for (String child : children) {
                    String key = path+"/"+child;
                    byte[] data1 = zooKeeper.getData(key, watcher, null);
                    String dataStr1=new String(data1);
                    map.put(path, dataStr1);
                    System.out.println("["+key+"]"+dataStr1);
                    getChlidren(key,watcher);
                }
            } catch ( Exception e) {
                System.err.println("获取节点信息异常");
                System.out.println(e.getMessage());
            }
            System.out.println("reloadCache : "+map.toString() );
        }
    }

     二、Curator

    Curator应用场景-Watch监听机制(NodeCache,PathChildrenCache,TreeCache)

    1、引用pom(注意Curator版本跟zk的版本有关系

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Dubbo-Code</artifactId>
            <groupId>org.example</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>zk-client</artifactId>
    
        <dependencies>
            <!-- Dubbo -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo</artifactId>
                <version> 2.6.8</version>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version> 0.11</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.75</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.5.6</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>
        </dependencies>
    </project>

    2、TreeCache案例代码

    package com.zk.cache;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.TreeCache;
    import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
    import org.apache.curator.framework.recipes.cache.TreeCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.data.Stat;
    
    public class TreeCacheListen {
    
        public static void main(String[] args) throws Exception {
            CuratorFramework zkClient = getZkClient();
            String path = "/node";
            byte[] initData = "initData".getBytes();
            Stat stat = zkClient.checkExists().forPath(path);
            if (stat == null)
            {
                //创建节点用于测试
                zkClient.create().forPath(path, initData);
            }
            TreeCache treeCache = new TreeCache(zkClient, path);
            //调用start方法开始监听
            treeCache.start();
            //添加TreeCacheListener监听器
            treeCache.getListenable().addListener(new TreeCacheListener() {
    
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    System.out.println("监听到节点数据变化,类型:"+event.getType()+",路径:"+event.getData().getPath()+",数据:"+new String(event.getData().getData()));
                }
            });
            Thread.sleep(1000);
            //更新父节点数据
            zkClient.setData().forPath(path, "222".getBytes());
            Thread.sleep(1000);
            String childNodePath = path + "/child";
            //创建子节点
            zkClient.create().forPath(childNodePath, "111".getBytes());
            Thread.sleep(1000);
            //更新子节点
            zkClient.setData().forPath(childNodePath, "222".getBytes());
            Thread.sleep(1000);
            //删除子节点
            zkClient.delete().forPath(childNodePath);
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        private static CuratorFramework getZkClient() {
            String zkServerAddress = "192.168.20.93:2181";
            ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
            CuratorFramework zkClient = CuratorFrameworkFactory.builder()
                    .connectString(zkServerAddress)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
            zkClient.start();
            return zkClient;
        }
    }
  • 相关阅读:
    ES6(严格模式,let&const,箭头函数,解构赋值,字符串扩展方法,Symbol,Set和Map,生成器函数)
    动画实现-微信语音小喇叭样式
    JS与React分别实现倒计时(天时分秒)
    MacOS下如何设置hosts?
    原生JS实现‘点击元素切换背景及字体等’
    mysql数据库设计规范
    如何对 ElasticSearch 集群进行压力测试
    设计实现SAM--无服务器应用模型
    韩立刚计算机网络笔记-第11章 因特网上的音频视频-无线网络
    韩立刚计算机网络笔记-第10章 网络安全
  • 原文地址:https://www.cnblogs.com/raorao1994/p/14869879.html
Copyright © 2011-2022 走看看