zoukankan      html  css  js  c++  java
  • Zookeeper客户端Curator基本API

      在使用zookeper的时候一般不使用原生的API,Curator,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

    Curator包含了几个包:

    • curator-framework:对zookeeper的底层api的一些封装
    • curator-client:提供一些客户端的操作,例如重试策略等
    • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

    Maven依赖:

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.12.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.12.0</version>
            </dependency>

     ====================基本的API==============

    1.创建会话

    (1)静态工厂创建会话

    源码如下:

        public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
        {
            return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
        }
        /**
         * Create a new client
         *
         * @param connectString       list of servers to connect to
         * @param sessionTimeoutMs    session timeout
         * @param connectionTimeoutMs connection timeout
         * @param retryPolicy         retry policy to use
         * @return client
         */
        public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
        {
            return builder().
                connectString(connectString).
                sessionTimeoutMs(sessionTimeoutMs).
                connectionTimeoutMs(connectionTimeoutMs).
                retryPolicy(retryPolicy).
                build();
        }

    测试代码:采用4个参数的方法

        private static CuratorFramework getClient() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            return CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);
        }

    参数解释:

    connectString: 链接 URL

    sessionTimeoutMs: 会话超时时间,单位毫秒,默认60000ms

    connectionTimeoutMs:    连接创建超时时间,单位毫秒,默认60000ms

    retryPolicy:  重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口

    1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 
    以sleepMsBetweenRetries的间隔重连,直到超过maxElapsedTimeMs的时间设置
    2.RetryNTimes(int n, int sleepMsBetweenRetries) 
    指定重连次数
    3.RetryOneTime(int sleepMsBetweenRetry)
    重连一次,简单粗暴
    4.ExponentialBackoffRetry
    ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
    ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 
    时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))

    (2)第二种方法:

        private static CuratorFramework getClient() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL)
                    .sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();
            return client;
        }

    (3)创建包含隔离命名空间的会话(下面也基于这种方式)

      为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径。如果设置了该值,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。
        private static CuratorFramework getClient() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(URL).sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("curator").build();
            return client;
        }

    得到会话之后,调用   client.start();   即可启动客户端。

    原来zookeper数据结构如下:

    关闭会话的方法如下:(必须在开启之后在关闭,否则会报非法状态异常)

    client.close();

    或者:

    CloseableUtils.closeQuietly(client);

    2.创建节点

      这个可以递归创建父节点并不抛出异常;指定节点类型(临时、顺序、临时永久),默认是永久

            CuratorFramework client = getClient();
            client.start();
            // 创建普通节点(默认是持久节点),内容为空
            client.create().forPath("/t1");
            // 创建普通节点(默认是持久节点)
            client.create().forPath("/t2", "123456".getBytes());
            // 创建永久顺序节点
            client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/t3", "123456".getBytes());
            // 地柜创建,如果父节点不存在也会创建
            client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                    .forPath("/t4/t41/t411", "123456".getBytes());

     结果:

    3.删除节点

       也可以递归删除。当然下面的多个流式接口可以随意组合。

            CuratorFramework client = getClient();
            client.start();
            // 删除子节点,只能删除叶子节点
            client.delete().forPath("/t2");
            // 递归删除
            client.delete().deletingChildrenIfNeeded().forPath("/t4/t41");
            // 指定版本进行删除
            client.delete().withVersion(0).forPath("/t1");
            // 强制删除。guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
            client.delete().guaranteed().forPath("/t30000000002");

     结果:

    4.读取节点数据---可以注册监听

            // 读取数据不获取stat
            byte[] forPath = client.getData().forPath("/t4");
            System.out.println(new String(forPath, "UTF-8"));
    
            // 读取数据且获取stat
            Stat stat = new Stat();
            byte[] forPath2 = client.getData().storingStatIn(stat).forPath("/t4");
            System.out.println(new String(forPath2, "UTF-8"));
            System.out.println(stat);
    
            // 注册观察者,当节点变动时触发
            byte[] data = client.getData().usingWatcher(new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.getType());
                }
            }).forPath("/t4");
            System.out.println("/t4: " + new String(data));

     5.更新数据节点数据

            // 更新数据,返回的是stat
            Stat forPath = client.setData().forPath("/t4", "data".getBytes());
    
            // 更新一个节点的数据内容,强制指定版本进行更新
            Stat stat = new Stat();
            client.getData().storingStatIn(stat).forPath("/t4");
            Stat forPath2 = client.setData().withVersion(stat.getVersion()).forPath("/t4", "data222".getBytes());

    6. 检查节点是否存在

            Stat forPath = client.checkExists().forPath("/t4");
            if (forPath != null) {
                System.out.println("exists");
            } else {
                System.out.println("not exists");
            }

    7. 获取某个节点的所有子节点路径--这个获取的是子节点的名称且不带/

            List<String> forPath = client.getChildren().forPath("/");
            System.out.println(forPath);

    结果:

    [t4]

    8.事务

      允许作为一个原子操作进行提交。

            // inTransaction( )方法开启一个ZooKeeper事务.可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交
            client.inTransaction().
                check().forPath("/t4").
                and().
                create().withMode(CreateMode.EPHEMERAL).forPath("/t3", "data".getBytes()).
                and().
                setData().forPath("/t3", "data2".getBytes()).
                and().
                commit();

    9.  异步接口

      上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。
      BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应码和节点的详细信息。
    CuratorEventType:

    响应码(#getResultCode())

    如下:

            client.create().inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                    System.out.println(curatorEvent.getType() + "   " + curatorEvent.getResultCode());
                }
            }, Executors.newFixedThreadPool(2)).forPath("/t2", "测试值".getBytes());

    注意:    如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。

     ====================监听机制==============

     

       Curator的监听实现是对zookeeper原生监听方法的高级封装,主要体现在两点:监听重复注册,事件发生信息。而且监听事件返回详细的信息,如变动的节点信息,节点的value等。

      Curator 提供了3个接口:PathChildrenCacheListener、 NodeCache、TreeCache。三个接口都可以对一个不存在的节点进行监听。

    1.   PathChildrenCache 

      对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听。

      如果监听的节点不存在会创建节点,如果节点是多级目录会递归创建,节点删除之后监听事件会失效。

    如下:

        /**
         * 
         * @param client
         * @throws Exception
         */
        private static void setListenterThreeOne(CuratorFramework client) throws Exception {
            PathChildrenCache childrenCache = new PathChildrenCache(client, "/t4", true);
            PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    ChildData data = event.getData();
                    switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED : " + data.getPath() + "  数据:" + data.getData());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED : " + data.getPath() + "  数据:" + data.getData());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED : " + data.getPath() + "  数据:" + data.getData());
                        break;
                    default:
                        break;
                    }
                }
            };
    
            // 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理。如果不传会采用默认的线程池
            ExecutorService pool = Executors.newFixedThreadPool(2);
            // childrenCache.getListenable().addListener(childrenCacheListener);
            childrenCache.getListenable().addListener(childrenCacheListener, pool);
            // 设置监听模式
            childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
        }

     StartMode:初始化方式

      POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件。如果节点不存在会创建节点。如果节点下面有子节点会触发CHILD_ADDED事件。
      NORMAL:异步初始化。如果节点不存在会创建节点。如果节点下面有子节点会触发CHILD_ADDED事件。
      BUILD_INITIAL_CACHE:同步初始化。如果节点不存在会创建节点。如果节点下面有子节点会触发CHILD_ADDED事件。

    2.NodeCache  

      对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作。

      可以对一个不存在的节点进行监控,当节点创建之后会触发对应事件;节点被删除并且重建之后事件也仍然。

        // Node Cache 监控本节点的变化情况 连接 目录 是否压缩
        // 监听本节点的变化 节点可以进行修改操作 删除节点后会再次创建(空节点)
        private static void setListenterThreeTwo(CuratorFramework client) throws Exception {
            ExecutorService pool = Executors.newCachedThreadPool();
            // 设置节点的cache
            final NodeCache nodeCache = new NodeCache(client, "/t5", false);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("the test node is change and result is :");
                    System.out.println("path : " + nodeCache.getCurrentData().getPath());
                    System.out.println("data : " + new String(nodeCache.getCurrentData().getData()));
                    System.out.println("stat : " + nodeCache.getCurrentData().getStat());
                }
            });
            nodeCache.start();
        }

    3.TreeCache

       监控指定节点和节点下的所有的节点的变化--无限监听 ,也就是可以监听子孙目录。

      可以对一个不存在的节点进行监控,当节点创建之后会触发对应事件;节点被删除并且重建之后事件也仍然。

        // 监控 指定节点和节点下的所有的节点的变化--无限监听 
        private static void setListenterThreeThree(CuratorFramework client) throws Exception {
            // 设置节点的cache
            TreeCache treeCache = new TreeCache(client, "/t5");
            // 设置监听器和处理过程
            treeCache.getListenable().addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    ChildData data = event.getData();
                    if (data != null) {
                        switch (event.getType()) {
                        case NODE_ADDED:
                            System.out.println("NODE_ADDED : " + data.getPath() + "  数据:" + new String(data.getData()));
                            break;
                        case NODE_REMOVED:
                            System.out.println("NODE_REMOVED : " + data.getPath() + "  数据:" + new String(data.getData()));
                            break;
                        case NODE_UPDATED:
                            System.out.println("NODE_UPDATED : " + data.getPath() + "  数据:" + new String(data.getData()));
                            break;
    
                        default:
                            break;
                        }
                    } else {
                        System.out.println("data is null : " + event.getType());
                    }
                }
            });
            // 开始监听
            treeCache.start();
        }

     补充:上面的三个接口在设置监听器的时候都可以传入自定义的线程池,也可不传,如下:

            // 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理。如果不传会采用默认的线程池
            ExecutorService pool = Executors.newFixedThreadPool(2);
            // childrenCache.getListenable().addListener(childrenCacheListener);
            childrenCache.getListenable().addListener(childrenCacheListener, pool);
  • 相关阅读:
    Oracle----oracle编程总结
    Oracle----oracle 事务总结
    Oracle----Oracle 11g XE release2安装与指导
    KMP之Z-function (扩展kmp)
    pkg-config
    对拍
    GPU并行编程小结
    DPHARD
    贪心/字符串好题
    树专题(伸展树 / 树链剖分 / 动态树 学习笔记)
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/10497092.html
Copyright © 2011-2022 走看看