zoukankan      html  css  js  c++  java
  • zookeeper记录4(Apache Curator客户端的使用)

    录:

    1、常用的zk java客户端
    2、搭建maven工程,建立curator与zkserver的连接
    3、zk命名空间以及创建节点
    4、修改节点数据以及删除节点
    5、读取节点数据、节点下面子节点列表、判断节点是否存在
    6、一次性监听--curator之usingWatcher
    7、curator之nodeCache一次注册N次监听
    8、curator之PathChildrenCache子节点监听
    9、curator之acl权限操作与认证授权

    1、常用的zk java客户端    <--返回目录

    1)zk原生api
        不足之处:超时重连不支持自动,需要手动操作:watch注册一次后会失效;不支持递归创建节点;
    2)zkclient
    3)apache curator
        apache的开源项目
        解决watcher注册一次就失效的
        api更加简单易用
        提供更多解决方案并且实现简单,比如分布式锁
        提供常用的zookeeper工具类

    2、搭建maven工程,建立curator与zkserver的连接    <--返回目录

      依赖

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.11</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

      CuratorOperator

    package com.oy.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class CuratorOperator {
        private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
        private CuratorFramework client = null;
        private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
    
        public CuratorOperator() {
            // 参数1 重试次数; 参数2 每次重试间隔的时间
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath).sessionTimeoutMs(20000)
                    .retryPolicy(retryPolicy).build();
            client.start();
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            CuratorOperator curatorOperator = new CuratorOperator();
            boolean started = curatorOperator.client.isStarted();
            log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
            new Thread().sleep(5000);
            curatorOperator.closeZKClient();
            boolean started1 = curatorOperator.client.isStarted();
            log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
        }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

    3、zk命名空间以及创建节点    <--返回目录

    package com.oy.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class CuratorOperator {
        private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
        private CuratorFramework client = null;
        private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
    
        public CuratorOperator() {
            // 参数1 重试次数; 参数2 每次重试间隔的时间
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath).sessionTimeoutMs(20000)
                    .retryPolicy(retryPolicy).namespace("workspace").build();
            client.start();
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            CuratorOperator curatorOperator = new CuratorOperator();
            boolean started = curatorOperator.client.isStarted();
            log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
            // 创建节点
            String nodePath = "/super/son1";
            byte[] data = "testnode".getBytes();
            curatorOperator.client.create().creatingParentContainersIfNeeded()
                    .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(nodePath, data);
    
            new Thread().sleep(5000);
            curatorOperator.closeZKClient();
            boolean started1 = curatorOperator.client.isStarted();
            log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
        }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

    4、修改节点数据以及删除节点    <--返回目录

      更新节点数据

    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        // 创建节点
        String nodePath = "/super/son1";
    //        byte[] data = "testnode".getBytes();
    //        curatorOperator.client.create().creatingParentContainersIfNeeded()
    //                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    //                .forPath(nodePath, data);
    
        // 更新节点数据
        byte[] newData = "newtestnode".getBytes();
        curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
    
        new Thread().sleep(5000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }

      删除节点

    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        // 创建节点
        String nodePath = "/super/son1";
    //        byte[] data = "testnode".getBytes();
    //        curatorOperator.client.create().creatingParentContainersIfNeeded()
    //                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    //                .forPath(nodePath, data);
    
        // 更新节点数据
    //    byte[] newData = "newtestnode".getBytes();
    //    curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
    
        // 删除节点
        curatorOperator.client.delete()
                .guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
                .deletingChildrenIfNeeded() // 如果有子节点,也删除
                .forPath(nodePath);
    
        new Thread().sleep(5000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }

      测试删除前,在/workspace/super/son1下面创建子节点,删除son1的时候,下面的子节点也被删除了

    5、读取节点数据、节点下面子节点列表、判断节点是否存在    <--返回目录

    package com.oy.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    
    public class CuratorOperator {
        private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
        private CuratorFramework client = null;
        private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
    
        public CuratorOperator() {
            // 参数1 重试次数; 参数2 每次重试间隔的时间
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath).sessionTimeoutMs(20000)
                    .retryPolicy(retryPolicy).namespace("workspace").build();
            client.start();
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        // 创建节点
        String nodePath = "/super/son1";
    //        byte[] data = "testnode".getBytes();
    //        curatorOperator.client.create().creatingParentContainersIfNeeded()
    //                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
    //                .forPath(nodePath, data);
    
        // 更新节点数据
    //    byte[] newData = "newtestnode".getBytes();
    //    curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
    
        // 删除节点
    //    curatorOperator.client.delete()
    //            .guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
    //            .deletingChildrenIfNeeded() // 如果有子节点,也删除
    //            .forPath(nodePath);
    
        // 读取节点数据
        Stat stat = new Stat();
        byte[] nodeData = curatorOperator.client.getData().storingStatIn(stat).forPath(nodePath);
        log.warn(nodePath + "节点数据: {}, 版本: {}", new String(nodeData), stat.getVersion());
    
        // 查询节点下面的子节点列表
        List<String> childNodes = curatorOperator.client.getChildren().forPath(nodePath);
        for (String child : childNodes) {
            log.warn(child);
        }
    
        // 判断节点是否存在,如果不存在则为空
        Stat stat1 = curatorOperator.client.checkExists().forPath(nodePath + "/xxx");
        log.warn("stat1: {}", stat1);
    
        new Thread().sleep(5000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

    6、一次性监听--curator之usingWatcher    <--返回目录

    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        String nodePath = "/super/son1";
    
        // watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
        curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        //curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
    
        new Thread().sleep(50000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }

      测试:通过客户端zkCli.sh 多次修改 set /workspace/super/son1 bbb, 控制台只打印一次watcher监听结果。

    7、curator之nodeCache一次注册N次监听    <--返回目录

    package com.oy.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    
    public class CuratorOperator {
        private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
        private CuratorFramework client = null;
        private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
    
        public CuratorOperator() {
            // 参数1 重试次数; 参数2 每次重试间隔的时间
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath).sessionTimeoutMs(20000)
                    .retryPolicy(retryPolicy).namespace("workspace").build();
            client.start();
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        String nodePath = "/super/son1";
    
        // watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
        //curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        //curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
    
        // NodeCache: 监听数据节点的变化,会触发事件
        final NodeCache nodeCache = new NodeCache(curatorOperator.client, nodePath);
        // buildInital: 为true则在初始化时获取node的值并缓存
        nodeCache.start(true);
        if (nodeCache.getCurrentData() != null) {
           log.warn("节点初始化数据为:{}", new String(nodeCache.getCurrentData().getData()));
        } else {
            log.warn("节点初始化数据为空");
        }
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                // 注意:删除节点时下面代码nodeCache.getCurrentData()==null
                String data = new String(nodeCache.getCurrentData().getData());
                log.warn("节点路径{}的数据:{}", nodeCache.getPath(), data);
            }
        });
    
        new Thread().sleep(50000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

      测试:

      1)启动main方法

      2)客户端修改/workspace/super/son1的值 set /workspace/super/son1 eee/fff/ggg

    8、curator之PathChildrenCache子节点监听    <--返回目录

    package com.oy.curator;
    
    import javafx.scene.shape.Path;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.*;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    
    public class CuratorOperator {
        private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
        private CuratorFramework client = null;
        private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
    
        public CuratorOperator() {
            // 参数1 重试次数; 参数2 每次重试间隔的时间
            RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
            client = CuratorFrameworkFactory.builder()
                    .connectString(zkServerPath).sessionTimeoutMs(20000)
                    .retryPolicy(retryPolicy).namespace("workspace").build();
            client.start();
        }
    
        /**
         * 测试客户端连接
         * @param args
         * @throws Exception
         */
    public static void main(String[] args) throws Exception {
        CuratorOperator curatorOperator = new CuratorOperator();
        boolean started = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
    
        String nodePath = "/super/son1";
        // 参数3 cacheData 设置缓存节点的数据状态
        PathChildrenCache childrenCache = new PathChildrenCache(curatorOperator.client, nodePath, true);
        // StartMode 初始化方式
        // POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件
        // NORMAL: 异步初始化,初始化后不触发事件; BUILD_INITIAL_CACHE: 同步初始化
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    
        // 只有同步初始化,下面才能获取子节点数据
        List<ChildData> childDataList = childrenCache.getCurrentData();
        log.warn("当前数据节点的子节点数据列表:");
        for (ChildData cd : childDataList) {
            log.warn(new String(cd.getData()));
        }
    
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
                    log.warn("字节点初始化完成"); // childrenCache.start(POST_INITIALIZED_EVENT) 异步初始化完成后触发调用
                }
                else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { // 添加子节点。初始化完成时,有几个子节点,这个CHILD_ADDED就会触发几次
                    log.warn("添加子节点:{}", event.getData().getPath());
                    log.warn("子节点数据:{}", new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { // 修改子节点数据
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // 删除子节点
                }
            }
        });
    
        new Thread().sleep(50000);
        curatorOperator.closeZKClient();
        boolean started1 = curatorOperator.client.isStarted();
        log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
    }
    
        /**
         * 关闭zk客户端连接
         */
        public void closeZKClient() {
            if (client != null) client.close();
        }
    }

     9、curator之acl权限操作与认证授权    <--返回目录

    // 参数1 重试次数; 参数2 每次重试间隔的时间
    RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
    CuratorFramework client = CuratorFrameworkFactory.builder().authorization("digest", "user:password".getBytes())
            .connectString(zkServerPath).sessionTimeoutMs(20000)
            .retryPolicy(retryPolicy).namespace("workspace").build();
    client.start();
    // 自定义用户认证访问
    ArrayList<ACL> acls = new ArrayList<>();
    Id userPwd1 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan1:123"));
    Id userPwd2 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan2:123"));
    acls.add(new ACL(ZooDefs.Perms.ALL, userPwd1));
    acls.add(new ACL(ZooDefs.Perms.READ, userPwd2));
    acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.DELETE, userPwd2));
    
    // 创建节点, 使用自定义的权限列表
    String nodePath = "/super/son1";
    byte[] data = "testnode".getBytes();
    curatorOperator.client.create().creatingParentContainersIfNeeded()
            .withMode(CreateMode.PERSISTENT).withACL(acls)
            .forPath(nodePath, data);

    ---

  • 相关阅读:
    鼠标向下滑动加载div
    选择排序
    插入排序法
    Android问题-Delphi XE5 常用功具与下载
    Android问题-DelphiXE5编义时提示找不到“连接器(arm-linux-androideabi-ld.exe)"
    VisualStudio2010中创建ASP.Net WebService
    delphi调用webservice (.NET C#版)
    delphi 完全控制Excel 文件
    EXCEL 建立工作薄与工作表
    Delphi给窗体镶边-为控件加边框,描边,改变边框颜色
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/14927222.html
Copyright © 2011-2022 走看看