zoukankan      html  css  js  c++  java
  • Zookeeper入门实战(3)-Curator操作Zookeeper

    Apache Curator是用于Apache ZooKeeper的一个Java客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。

    本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。

    1、引入依赖

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13</version>
    </dependency>

    2、基本操作

    package com.inspur.demo.general.zookeeper;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executors;
    
    /**
     * 使用Curator操作Zookeeper
     */
    public class CuratorCase {
        //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
        private static String connectString = "10.49.196.10:2181";
        private static int sessionTimeout = 20 * 1000;
        private static int connectionTimeout = 10 * 1000;
    
        private CuratorFramework cf;
        @Before
        public void before() {
            RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
            cf = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeout)
                    .connectionTimeoutMs(connectionTimeout)
                    .retryPolicy(retryPolicy)
                    .build();
            cf.start();
        }
    
        @After
        public void after() throws Exception {
            cf.close();
        }
    
        /**
         * 创建节点
         */
        @Test
        public void create() throws Exception {
            /*
             * 同步创建节点
             * 1.除非指明创建节点的类型,默认是持久节点
             * 2.临时节点没有子节点;所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
             */
            //创建一个内容为空的节点
            cf.create().forPath("/curator/node1");
            //创建一个内容为aaa的节点
            cf.create().forPath("/curator/node2", "aaa".getBytes());
            //创建一个临时节点
            cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3");
            //递归创建,最后的节点类型为临时节点
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b");
            //创建一个节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
            cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5");
    
            /*
             * 异步创建节点
             *  可以指定线程池,不指定则使用Zookeeper的EventThread线程对事件进行串行处理
             */
            CountDownLatch counter = new CountDownLatch(2);
            cf.create().inBackground(new BackgroundCallback(){
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println(event);
                    counter.countDown();
                }
            }, Executors.newFixedThreadPool(1)).forPath("/curator/node6");
            cf.create().inBackground(new BackgroundCallback(){
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println(event);
                    counter.countDown();
                }
            }).forPath("/curator/node7");
            counter.await();
        }
    
        /**
         * 获取节点内容
         * @throws Exception
         */
        @Test
        public void getData() throws Exception {
            Stat stat = new Stat();
            byte[] bytes = cf.getData()
                    .storingStatIn(stat)//状态,可选
                    .forPath("/curator/node2");
            System.out.println("状态信息:" + stat);
            System.out.println("内容:" + new String(bytes));
    
            //异步获取数据
            CountDownLatch counter = new CountDownLatch(1);
            cf.getData().inBackground(new BackgroundCallback(){
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("event:" + event);
                    System.out.println("内容:"+ new String(event.getData()));
                    counter.countDown();
                }
            }).forPath("/curator/node2");
            counter.await();
        }
    
        /**
         * 设置节点的值
         * @throws Exception
         */
        @Test
        public void setData() throws Exception {
            cf.setData()
                    .withVersion(0) //指定版本,可选
                    .forPath("/curator/node2", "测试修改".getBytes());
        }
    
        /**
         * 删除节点
         * @throws Exception
         */
        @Test
        public void delete() throws Exception {
            cf.delete()
                    .guaranteed() //如果删除失败,只要会话有效就会不断的重试,直到删除成功为止
                    .deletingChildrenIfNeeded()//删除子节点,可选
                    .withVersion(0) //指定版本,可选
                    .forPath("/curator/node4");
        }
    
        /**
         * 获取子节点
         * @throws Exception
         */
        @Test
        public void getChildren() throws Exception {
            List<String> list = cf.getChildren().forPath("/curator");
            System.out.println("子节点:" + list);
        }
    }

    3、监控数据变化

    package com.inspur.demo.general.zookeeper;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.*;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.util.concurrent.*;
    
    public class CuratorWatchCase {
        //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
        private static String connectString = "10.49.196.10:2181";
        private static int sessionTimeout = 20 * 1000;
        private static int connectionTimeout = 10 * 1000;
    
        private CuratorFramework cf;
        @Before
        public void before() {
            RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
            cf = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeout)
                    .connectionTimeoutMs(connectionTimeout)
                    .retryPolicy(retryPolicy)
                    .build();
            cf.start();
        }
    
        @After
        public void after() throws Exception {
            cf.close();
        }
    
        /**
         * 监控节点变化
         * @throws Exception
         */
        @Test
        public void watchNode() throws Exception {
            CountDownLatch counter = new CountDownLatch(1);
    
            NodeCache cache = new NodeCache(cf, "/curator/node2", false);
            cache.start();
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("路径为:" + cache.getCurrentData().getPath());
                    System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
                    System.out.println("状态为:" + cache.getCurrentData().getStat());
    
                    //某种情况下退出监控
                    //if (...) {
                    //    counter.countDown();
                    //}
                }
            });
    
            counter.await();
        }
    
        /**
         * 监控子节点变化
         * @throws Exception
         */
        @Test
        public void watchChildren() throws Exception {
            //使用自定义的线程池
            ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
            CountDownLatch counter = new CountDownLatch(1);
    
            PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true);
            cache.start();
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("CHILD_ADDED");
                            break;
                        case CHILD_UPDATED:
                            System.out.println("CHILD_UPDATED");
                            break;
                        case CHILD_REMOVED:
                            System.out.println("CHILD_REMOVED");
                            break;
                        default:
                            System.out.println(event.getType());
                    }
                    System.out.println("子节点信息:" + event.getData());
    
                    //某种情况下退出监控
                    //if (...) {
                    //    counter.countDown();
                    //}
                }
            }, threadPool);
    
            counter.await();
            threadPool.shutdownNow();
        }
    }

    可以看到不管是基本的增删改查还是监控数据变化,Curator都比原生的API好用很多。

  • 相关阅读:
    (转)很简短,但读完你会感触良多!
    (转)让 win8 快速通过认证的5个提示
    WPF 资源路径解析
    47、SimpleOrientationSensor
    45、SplashScreen
    让IE6也支持position:fixed
    utf8编码引起js输出中文乱码的解决办法(实用)
    javascript的currying函数
    sicily 1036. Crypto Columns
    sicily 6774. Buying Mortadella
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/12600743.html
Copyright © 2011-2022 走看看