zoukankan      html  css  js  c++  java
  • Curator操作ZooKeeper

    Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理。

    节点的创建和删除

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooKeeper.States;
    import org.apache.zookeeper.data.Stat;
     
    public class CuratorBase {
        
        static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
        static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
        static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
        
        public static void main(String[] args) throws Exception {
            
            //重试策略:初试时间为10s,最大重试次数为20
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
            //创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .retryPolicy(retryPolicy)
                        .build();
            //开启连接
            cf.start();
            
            //建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());
            
            Thread.sleep(30000);
     
            //删除节点
            cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent");
     
            cf.close();
        }
    }

    run as--java application

    线程休眠30s后,执行节点删除操作

    节点内容的修改

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooKeeper.States;
    import org.apache.zookeeper.data.Stat;
     
    public class CuratorBase {
        
        static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
        static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
        static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
        
        public static void main(String[] args) throws Exception {
            
            //重试策略:初试时间为10s,最大重试次数为20
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
            //创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .retryPolicy(retryPolicy)
                        .build();
            //开启连接
            cf.start();
            
            //创建节点
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());
            //cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes());
            //读取节点
            String ret1 = new String(cf.getData().forPath("/persistent/p1"));
            System.out.println(ret1);
            //修改节点
            cf.setData().forPath("/persistent/p1", "new p1 value".getBytes());
            String ret2 = new String(cf.getData().forPath("/persistent/p1"));
            System.out.println(ret2);
     
            cf.close();
        }
    }

    Eclipse的console输出

    Eclipse的ZooKeeper Explorer内容

    节点操作的回调函数

    节点的新增、修改、删除,都可以设置其回调函数。该回调函数可以输出服务器的状态码、服务器事件类型等内容。还可以加入一个线程池进行优化操作。在批量节点操作的时候,可以用线程池去规划callback,可以将很多的任务放到队列中,使用线程池中的线程将队列中的任务进行处理。线程池中线程的个数可以根据具体的机器配置而定。

    下面代码中,节点的创建操作是一个异步的过程,不会阻塞主线程main的执行,代码中将主线程main休眠,子线程在执行完节点的创建操作后执行回调函数并输出相关内容。若不添加主线程休眠的代码,则主线程执行完代码后结束,此时节点创建的子线程还没有完成节点的创建,因main线程的结束子线程也结束,进而就不能完成节点创建和回调函数的执行。

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooKeeper.States;
    import org.apache.zookeeper.data.Stat;
     
    public class CuratorBase {
        
        static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
        static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
        static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
        
        public static void main(String[] args) throws Exception {
            
            //重试策略:初试时间为10s,最大重试次数为20
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
            //创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .retryPolicy(retryPolicy)
                        .build();
            //开启连接
            cf.start();
            
            // 绑定回调函数
            ExecutorService pool = Executors.newCachedThreadPool();
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                    System.out.println("code:" + ce.getResultCode());
                    System.out.println("type:" + ce.getType());
                    System.out.println("线程为:" + Thread.currentThread().getName());
                }
            }, pool)
            .forPath("/persistent/p2","p2 value".getBytes());
            
            System.out.println("主线程:"+Thread.currentThread().getName());
            
            Thread.sleep(Integer.MAX_VALUE);
     
            cf.close();
        }
    }

    Eclipse中console输出

    ZooKeeper Explorer中内容



    获取子节点

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooKeeper.States;
    import org.apache.zookeeper.data.Stat;
     
    public class CuratorBase {
        
        static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";
        static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms 
        static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  
        
        public static void main(String[] args) throws Exception {
            
            //重试策略:初试时间为10s,最大重试次数为20
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);
            //创建连接
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                        .connectString(CONNECT_ADDR)
                        .sessionTimeoutMs(SESSION_TIMEOUT)
                        .retryPolicy(retryPolicy)
                        .build();
            //开启连接
            cf.start();
            
            // 绑定回调函数
            ExecutorService pool = Executors.newCachedThreadPool();
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                    System.out.println("code:" + ce.getResultCode());
                    System.out.println("type:" + ce.getType());
                    System.out.println("线程为:" + Thread.currentThread().getName());
                }
            }, pool)
            .forPath("/persistent/p2","p2 value".getBytes());
            
            System.out.println("主线程:"+Thread.currentThread().getName());
     
            Thread.sleep(20000);//主线程休眠20s,等待节点创建完毕
            
            // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
            List<String> list = cf.getChildren().forPath("/persistent");
            for(String p : list){
                System.out.println(p);
            }
            
            Stat stat_p1 = cf.checkExists().forPath("/persistent/p1");
            System.out.println(stat_p1);
            Stat stat_p2 = cf.checkExists().forPath("/persistent/p2");
            System.out.println(stat_p2);
     
            cf.close();
        }
    }

    Eclipse的console输出

    若上面代码将Thread.sleep(20000);删除,有时会出现下面的异常,原因是节点创建和main主线程的执行是异步的。

  • 相关阅读:
    封装、继承、多态的意义等。
    php中类与对象的区别和关系是什么?
    MySQL数据库中InnoDB和MyISAM两种数据引擎的差别
    什么是索引? 索引的定义与用法等。
    什么是正则表达式?
    字符串—strcpy
    排序
    虚函数
    优先队列(堆)
    因子个数_错排公式
  • 原文地址:https://www.cnblogs.com/cat520/p/9412815.html
Copyright © 2011-2022 走看看