zoukankan      html  css  js  c++  java
  • 【Zookeeper】Re02 CuratorAPI

    Curator,提供给Java操作ZK的API组件:

    需要的组件依赖:

            <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>5.2.0</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>5.2.0</version>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.32</version>
            </dependency>
    
    
            <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.32</version>
                <scope>test</scope>
            </dependency>

    Curator要求配置日志组件,当然不配也是可以的

    Log4J还要求配置log4j.properties,配置文件是随便找的

    # priority  :debug<info<warn<error
    #you cannot specify every priority with different file for log4j
    log4j.rootLogger=debug,stdout,info,debug,warn,error 
    
    #console
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern= [%d{yyyy-MM-dd HH:mm:ss a}]:%p %l%m%n
    #info log
    log4j.logger.info=info
    log4j.appender.info=org.apache.log4j.DailyRollingFileAppender 
    log4j.appender.info.DatePattern='_'yyyy-MM-dd'.log'
    log4j.appender.info.File=./src/com/hp/log/info.log
    log4j.appender.info.Append=true
    log4j.appender.info.Threshold=INFO
    log4j.appender.info.layout=org.apache.log4j.PatternLayout 
    log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
    #debug log
    log4j.logger.debug=debug
    log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender 
    log4j.appender.debug.DatePattern='_'yyyy-MM-dd'.log'
    log4j.appender.debug.File=./src/com/hp/log/debug.log
    log4j.appender.debug.Append=true
    log4j.appender.debug.Threshold=DEBUG
    log4j.appender.debug.layout=org.apache.log4j.PatternLayout 
    log4j.appender.debug.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
    #warn log
    log4j.logger.warn=warn
    log4j.appender.warn=org.apache.log4j.DailyRollingFileAppender 
    log4j.appender.warn.DatePattern='_'yyyy-MM-dd'.log'
    log4j.appender.warn.File=./src/com/hp/log/warn.log
    log4j.appender.warn.Append=true
    log4j.appender.warn.Threshold=WARN
    log4j.appender.warn.layout=org.apache.log4j.PatternLayout 
    log4j.appender.warn.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
    #error
    log4j.logger.error=error
    log4j.appender.error = org.apache.log4j.DailyRollingFileAppender
    log4j.appender.error.DatePattern='_'yyyy-MM-dd'.log'
    log4j.appender.error.File = ./src/com/hp/log/error.log 
    log4j.appender.error.Append = true
    log4j.appender.error.Threshold = ERROR 
    log4j.appender.error.layout = org.apache.log4j.PatternLayout
    log4j.appender.error.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n

    API操作:

    创建连接:

    第一种,根据构造参数创建客户端对象

    第二种,使用构建工厂管理创建客户端对象

        /**
         * 创建节点一
         *
         */
        @Test
        public void establishConnection1() {
            /**
             * 方式一
             * String connectString,
             *  单例参数 "192.168.242.101:2181"
             *  集群参数 "192.168.242.101:2181, 192.168.242.102:2181, 192.168.242.103:2181"
             * int sessionTimeoutMs,
             *  回话超时限制
             * int connectionTimeoutMs,
             *  连接超时限制
             * RetryPolicy retryPolicy, ZKClientConfig zkClientConfig
             *  重新连接策略
             *
             */
            ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 10);// 3秒 10次
    
            CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
                    "192.168.242.101:2181",
                    1000 * 60, // 1分钟
                    1000 * 15, // 15秒
                    exponentialBackoffRetry
            );
    
            curatorFramework.start(); // 建立连接
        }
    
        /**
         * 创建节点二
         *
         *
         */
        @Test
        public void establishConnection2() {
            CuratorFramework curatorFramework = CuratorFrameworkFactory
                    .builder()
                    .sessionTimeoutMs(1000 * 60)
                    .connectionTimeoutMs(1000 * 15)
                    .connectString("192.168.242.101:2181")
                    .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                    .namespace("nmNode") // 指定在一个节点下进行操作
                    .build();
    
            curatorFramework.start();
        }

    为便于操作Zookeeper,使用Junit的前置调用和后置调用注解:

    创建资源和释放资源

        CuratorFramework zookeeperClient;
    @Before
    public void prepareConnection() { zookeeperClient = CuratorFrameworkFactory .builder() .sessionTimeoutMs(1000 * 60) .connectionTimeoutMs(1000 * 15) .connectString("192.168.242.101:2181") .retryPolicy(new ExponentialBackoffRetry(3000, 10)) .namespace("nmNode") // 指定在一个节点下进行操作 .build(); zookeeperClient.start(); } @After public void afterSettleConnection() { if (zookeeperClient != null) zookeeperClient.close(); }

    创建ZK节点:

        /**
         * 创建节点操作
         *
         */
        @Test
        public void createNodeAPI() {
            String forPath = null;
            try {
                // 纯创建节点
                forPath = zookeeperClient.create().forPath("/node1");
                System.out.println(forPath);
    
                // 创建带数据的节点
                forPath = zookeeperClient.create().forPath("/node2", "node with data".getBytes(StandardCharsets.UTF_8));
                System.out.println(forPath);
                /**
                 * 指定模式
                 * CreateMode
                 *  PERSISTENT(0, false, false, false, false),
                 *     PERSISTENT_SEQUENTIAL    持久化顺序模式
                 *     EPHEMERAL    临时模式
                 *     EPHEMERAL_SEQUENTIAL     临时顺序模式
                 *     CONTAINER    容器模式
                 *     PERSISTENT_WITH_TTL  持久化超时模式
                 *     PERSISTENT_SEQUENTIAL_WITH_TTL   持久化顺序超时模式
                 */
                forPath = zookeeperClient
                        .create()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath("/node2", "node with data".getBytes(StandardCharsets.UTF_8));
                System.out.println(forPath);
    
                /**
                 * 创建多级节点
                 *  .creatingParentsIfNeeded()
                 */
                forPath = zookeeperClient
                        .create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath("/node2/subNode1/subSubNode1", "node with data2".getBytes(StandardCharsets.UTF_8));
                System.out.println(forPath);
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }

    查询ZK节点:

        /**
         * 查询节点
         */
        @Test
        public void queryNodes() {
            byte[] data = null;
            try {
                // 获取该节点下的数据
                data = zookeeperClient.getData().forPath("/node2");
                System.out.println(new String(data, StandardCharsets.UTF_8));
    
                /**
                 * 获取该节点下的子节点
                 *
                 * 注意使用/根节点表示时,namespace("nmNode")的影响
                 * zookeeperClient.getChildren().forPath("/")
                 */
                List<String> strings = zookeeperClient.getChildren().forPath("/node1");
                System.out.println(strings);
    
    
                /**
                 * ls -s 节点
                 * 查询节点状态
                 *
                 * stat空参数表示 0 0 0 0 0 初始状态
                 */
                Stat stat = new Stat();
    
                // 携带构造后会进行赋值操作
                zookeeperClient.getData().storingStatIn(stat).forPath("/node2");
                System.out.println(stat);  // 52,52,1635080231927,1635080231927,0,4,0,0,14,4,73
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    修改节点:

        /**
         * 修改节点
         */
        @Test
        public void updateNode() {
            try {
                /**
                 * 简单修改节点值
                 */
                Stat stat = zookeeperClient.setData().forPath("/node1/subNode1", "changes Data".getBytes(StandardCharsets.UTF_8));
    
                /**
                 * 乐观锁控制
                 * getVersion
                 */
                String subNode2 = "/node1/subNode1";
                Stat subStat = new Stat();
                zookeeperClient.getData().storingStatIn(subStat);
                zookeeperClient.setData().withVersion(subStat.getVersion()).forPath(subNode2, "changes Data2".getBytes(StandardCharsets.UTF_8));
    
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }

    删除ZK节点

        /**
         * 删除节点
         *
         *
         */
        @Test
        public void deleteNode() {
    
            try {
    
                /**
                 * 普通删除 /node1/subNode1
                 *
                 * 该节点带有子节点则报错,不能被删除
                 * KeeperErrorCode = Directory not empty for /nmNode/node1
                 * */
                // Void unused = zookeeperClient.delete().forPath("/node1");
    
                /**
                 * 带节点全部删除
                 *
                 */
                // zookeeperClient.delete().deletingChildrenIfNeeded().forPath("/node2");
    
                // 带保证的删除? 防止网络抖动断开连接,会调用重试删除
                zookeeperClient.delete().guaranteed().forPath("delete success");
    
                // 删除成功的回调处理
                zookeeperClient.delete().guaranteed().inBackground(new BackgroundCallback() {
                    // 操作成功时将执行回调
                    @Override
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                        System.out.println("delete success");
    
                    }
                }).forPath("/node1");
    
            } catch (Exception exception) {
                exception.printStackTrace();
            }
    
        }

     

     

    监听节点:

        @Test
        @SuppressWarnings("deprecation")
        public void listeningFeature() {
    
            // nodeCache 被标注为已过时方法
            NodeCache nodeCache = new NodeCache(zookeeperClient, "/node1");
    
    
            // Listenable<使用这个泛型只能声明一种监听类型> listenable = nodeCache.getListenable();
            Listenable listenable = nodeCache.getListenable();
    
            // 绑定监听事件
            listenable.addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    String path = nodeCache.getPath();
                    System.out.println("this node changed: " + path);
    
    
                    // 获取变化后的数据
                    byte[] data = nodeCache.getCurrentData().getData();
                    System.out.println("after change, data is " + new String(data, StandardCharsets.UTF_8));
                }
            });
    
            // 绑定子节点监听事件
            listenable.addListener(new PathChildrenCacheListener() {
    
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    System.out.println("subNode changed, event -> " + pathChildrenCacheEvent);
    
                    // 获取子节点事件类型
                    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
    
                    // 如果是修改事件触发,则开始执行XXX。。。
                    if (PathChildrenCacheEvent.Type.CHILD_UPDATED .equals(type)) {
                        byte[] data = pathChildrenCacheEvent.getData().getData();
                        System.out.println("new Data -> " + new String(data, StandardCharsets.UTF_8));
                    }
                }
            });
    
            // 绑定树节点监听事件 一般直接监听当前命名空间的全部节点
            TreeCache treeCache = new TreeCache(zookeeperClient, "/");
            Listenable<TreeCacheListener> listenable1 = treeCache.getListenable();
            listenable1.addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                    System.out.println("changed");
                    System.out.println(treeCacheEvent);
                }
            });
        }
  • 相关阅读:
    springboot ssm propertis 如何搭建多数据源动态切换
    发送验证码
    二维码生成
    文件上传 下载
    git拉代码报错
    通过url 下载文件
    原生JS实现挡板小球游戏
    深入浅出解析AJAX
    深入解析CSS3圆周运动
    JS递归原理
  • 原文地址:https://www.cnblogs.com/mindzone/p/15464356.html
Copyright © 2011-2022 走看看