zoukankan      html  css  js  c++  java
  • ZooKeeper(五)-- Curator使用

    前言

      Curator是Netflix开源的一套ZooKeeper客户端框架:

        1.封装ZooKeeper client与ZooKeeper server之间的连接处理; 

        2.提供了一套Fluent风格的操作API; 

        3.提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装。

      Curator几个组成部分:

        Client:是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法

        Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理 

        Recipes:实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上 

        Utilities:各种ZooKeeper的工具类

        Errors: 异常处理, 连接, 恢复等

        Extensions: recipe扩展 

      Curator内部实现的几种重试策略:

        ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加

        RetryNTimes:指定最大重试次数的重试策略

        RetryOneTime:仅重试一次

        RetryUntilElapsed:一直重试直到达到规定的时间

    正文

      1.项目使用maven工程,在pom.xml中添加依赖

      <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
       </dependency>
       <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>2.5.0</version>
       </dependency>

      2.下面代码从增删改查、事务、事件订阅/监听器来实现的。

    package om.xbq.demo;
    
    import java.util.Collection;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.CuratorWatcher;
    import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.data.Stat;
    
    public class CuratorDemo {
    
        // 此demo使用的集群,所以有多个ip和端口
        private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
        private static int SESSION_TIMEOUT = 3000;
        private static int CONNECTION_TIMEOUT = 3000;
        
        public static void main(String[] args) {
            // 连接 ZooKeeper 
            CuratorFramework framework = CuratorFrameworkFactory.
                    newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new ExponentialBackoffRetry(1000,10));
            // 启动
            framework.start();
            
            Stat stat = ifExists(framework);
            
            if(stat != null){
    //            update(framework);
    //            delete(framework);
    //            query(framework);
                
    //            监听事件,只监听一次,不推荐
    //            listener1(framework);
            }else {
    //            add(framework);
            }
            
    //        事务
    //        transaction(framework);
            
    //        持久监听,推荐使用
            listener2(framework);
        }
        
        /**
         * 判断节点是否存在
         * @param cf
         * @return
         */
        public static Stat ifExists(CuratorFramework cf){
            Stat stat = null;
            try {
                stat = cf.checkExists().forPath("/node_curator/test");;
                System.out.println(stat);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return stat;
        }
        
        /**
         * @Title: add 
         * @Description: TODO(增加节点 , 可以增加 多级节点) 
         * @param @param cf    设定文件 
         * @return void    返回类型 
         * @throws
         */
        public static void add(CuratorFramework cf){
            try {
                String rs = cf.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT).forPath("/node_curator/test","xbq".getBytes());
                System.out.println(rs);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
         * @Title: update 
         * @Description: TODO(修改指定节点的值) 
         * @param @param cf    设定文件 
         * @return void    返回类型 
         * @throws
         */
        public static void update(CuratorFramework cf){
            try {
                Stat stat = cf.setData().forPath("/node_curator/test", "javaCoder".getBytes());
                System.out.println(stat);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
         * @Title: delete 
         * @Description: TODO(删除节点或者删除包括子节点在内的父节点) 
         * @param @param cf    设定文件 
         * @return void    返回类型 
         * @throws
         */
        public static void delete(CuratorFramework cf){
            try {
                // 递归删除的话,则输入父节点
                cf.delete().deletingChildrenIfNeeded().forPath("/node_curator");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
         * @Title: query 
         * @Description: TODO(查询节点的值) 
         * @param @param cf    设定文件 
         * @return void    返回类型 
         * @throws
         */
        public static void query(CuratorFramework cf){
            try {
                byte[] value = cf.getData().forPath("/node_curator/test");
                System.out.println(new String(value));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
         * @Title: transaction 
         * @Description: TODO(一组crud操作同生同灭) 
         * @param @param cf    设定文件 
         * @return void    返回类型 
         * @throws
         */
        public static void transaction(CuratorFramework cf){
            try {
                // 事务处理, 事务会自动回滚
                Collection<CuratorTransactionResult> results = cf.inTransaction()
                        .create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq1").and()
                        .create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq2").and().commit();
                // 遍历
                for(CuratorTransactionResult result:results){
                    System.out.println(result.getResultStat() + "->" + result.getForPath() + "->" + result.getType());
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
        * @Title: listener1 
        * @Description: TODO(监听 事件    --  通过 usingWatcher 方法) 
        *  注意:通过CuratorWatcher 去监听指定节点的事件, 只监听一次
        * @param @param cf    设定文件 
        * @return void    返回类型 
        * @throws
         */
        public static void listener1(CuratorFramework cf){
            try {
                cf.getData().usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        System.out.println("触发事件:" + event.getType());
                    }
                }).forPath("/javaCoder");
                
                System.in.read(); // 挂起,在控制台上输入 才停止
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
        
        /**
        * @Title: listener2 
        * @Description: TODO(监听 子节点的事件,不监听 自己    --  通过 PathChildrenCacheListener 方法,推荐使用) 
        * @param @param cf    设定文件 
        * @return void    返回类型 
        * @throws
         */
        public static void listener2(CuratorFramework cf) {
            // 节点node_xbq不存在 会新增
            PathChildrenCache cache = new PathChildrenCache(cf, "/node_xbq", true);
            try {
                cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                cache.getListenable().addListener(new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                        System.out.println("触发事件:" + event.getType());
                    }
                });
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                cf.close();
            }
        }
    }

     源码下载

       https://gitee.com/xbq168/CuratorDemo

      

  • 相关阅读:
    分布式事务的MQ实现
    zipkin 介绍入门
    线程5问?
    微服务分布式系统架构,转载,备份
    微服务,分布式架构
    史上最全 40 道 Dubbo 面试题及答案,看完碾压面试官!
    Tomcat优化
    windows 下安装kafka
    经典台词
    分布式锁3种实现
  • 原文地址:https://www.cnblogs.com/xbq8080/p/6623738.html
Copyright © 2011-2022 走看看