zoukankan      html  css  js  c++  java
  • 06_zookeeper原生Java API使用

    【Zookeeper构造方法概述】

    /**
         * 客户端和zk服务端的连接是一个异步的过程
         * 当连接成功后,客户端会收到一个watch通知
         *
         * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
         *          long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
         * 参数介绍
         * connectString:连接服务器的ip字符串
         *      比如:"192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
         *      可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
         *      也可以在ip后加路径
         * sessionTimeout:超时时间,心跳收不到了,那就超时
         * watcher:通知事件,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设为null
         * sessionId:会话的id
         * sessionPasswd:会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
         * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
         *                此时数据被读取到的可能是旧数据,一般设置为false,不推荐使用
         *
         */
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

    【Zookeeper API 客户端连接服务端例子】

    package com.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    /**
     * Created by HigginCui on 2018/9/20.
     */
    public class ZkConnect implements Watcher{
    
        public static final String zkServerPath = "127.0.0.1:2181";
    
        public static final Integer timeout = 5000;
    
        /**
         * 客户端和zk服务端的连接是一个异步的过程
         * 当连接成功后,客户端会收到一个watch通知
         */
        public static void main(String[] args) throws Exception{
            ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect());
    
            for (int i=0;i<20;i++) {
                Thread.sleep(10);  //休眠10ms,在这个过程中,连接状态会从CONNECTING--->CONNECTED
                System.out.println(i+"---"+zk.getState());
            }
    
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            System.err.println("收到zk的watch通知----" );
        }
    }

    【运行结果】

    【使用CountDownLatch优化zk连接过程】 

    package com.zk.demo;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.concurrent.CountDownLatch;
    
    public class ZkConnect implements Watcher{
    
        public static final String zkServerPath = "127.0.0.1:2181";
    
        public static final Integer timeout = 5000;
    
        private static CountDownLatch latch = new CountDownLatch(1);
    
        public static void main(String[] args) throws Exception{
            ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect());
    
            System.out.println("连接状态---" + zk.getState());
            latch.await();
            System.out.println("连接状态---" + zk.getState());
    
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            System.err.println("收到zk的watch通知----" );
            latch.countDown();
        }
    }

    【运行结果】

    【创建节点】

    /**
     * 
     * String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
     * 
     * 同步或异步创建节点,都不支持子节点的递归操作,异步有一个callBack方法
     * 参数
     * path:创建的路径
     * data:存储的数据,byte[]类型
     * acl:权限控制策略
     *      Ids.OPEN_ACL_UNSAFE ---> world:anyone:crdwa
     *      CREATE_ALL_ACL ---> auth:user:password:cdrwa
     * createMode:节点类型,是一个枚举
     *      CreateMode.PERSISTENT:           持久节点
     *      CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点
     *      CreateMode.EPHEMERAL:            临时节点
     *      CreateMode.EPHEMERAL_SEQUENTIAL: 临时顺序节点
     */

     【创建一个临时节点】

    package com.zk.demo;
    
    import org.apache.zookeeper.*;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Created by HigginCui on 2018/9/21.
     */
    public class ZkNodeOperator implements Watcher{
    
    
        private static ZooKeeper zooKeeper = null;
    
        public static final String zkServerPath = "127.0.0.1:2181";
    
        public static final Integer timeout = 5000;
    
        private static CountDownLatch latch = new CountDownLatch(1);
    
        public ZkNodeOperator() {
        }
    
        private static void init()  {
            try{
                zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator());
                latch.await();
            }catch (Exception e){
                e.printStackTrace();
                if(zooKeeper!=null){
                    try {
                        zooKeeper.close();
                    }catch (InterruptedException e1){
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception{
            init();
          //创建/testnode临时节点,包含数据为haha,权限是OPEN_ACL_UNSAFE,类型为临时节点 zooKeeper.create(
    "/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } /** * 观察者 */ @Override public void process(WatchedEvent event) { System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState()); latch.countDown(); } }

    【运行结果 直接看打开zkCli.sh连接】

     【创建持久节点,并且产生一个回调通知】

    package com.zk.demo;
    
    import org.apache.zookeeper.*;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Created by HigginCui on 2018/9/21.
     */
    public class ZkNodeOperator implements Watcher{
    
    
        private static ZooKeeper zooKeeper = null;
    
        public static final String zkServerPath = "127.0.0.1:2181";
    
        public static final Integer timeout = 5000;
    
        private static CountDownLatch latch = new CountDownLatch(1);
    
        public ZkNodeOperator() {
        }
    
        private static void init()  {
            try{
                zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator());
                latch.await();
            }catch (Exception e){
                e.printStackTrace();
                if(zooKeeper!=null){
                    try {
                        zooKeeper.close();
                    }catch (InterruptedException e1){
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception{
            init();
            //create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
            String ctx = "{'create':'success'}";
            zooKeeper.create("/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new CreateNodeCallBack(),ctx);
    
            System.in.read();  //线程停在这里,等待回调线程的打印结果
        }
    
    
        /**
         * 观察者
         */
        @Override
        public void process(WatchedEvent event) {
            System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState());
            latch.countDown();
    
        }
    }
    
    /**
     * 创建节点回调通知类
     */
    class CreateNodeCallBack implements AsyncCallback.StringCallback{
        @Override
        public void processResult(int i, String path, Object ctx, String s1) {
            System.out.println("【回调方法】:创建节点的路径:"+path);
            System.out.println("【回调方法】:ctx==="+(String)ctx);
        }
    }

    【运行结果——控制台打印】

    【运行结果——zk客户端连接】

    【修改节点的数据】 

    /**
    * path:节点路径
    * data:修改后的数据
    * version:版本号,这里的版本号必须是正要修改的节点数据的版本号!!
    */
    Stat setData(String path, byte[] data, int version)

    修改节点代码示例

    Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 1);
    System.out.println("修改后数据的版本号为---"+stat.getVersion());

    【运行结果】

    先看下对应的数据版本号信息

    看下控制台运行结果

    修改下代码,将版本号改为0,与zk上要修改的数据的版本号保持一致

    Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 0);
    System.out.println("修改后数据的版本号为---"+stat.getVersion());

    【查看运行结果】

     【删除节点】

    /**
    * 删除节点
    * 注意:version版本号必须和要删除的节点数据的版本号一致
    */
    public void delete(String path, int version)

    【删除实例】

    zooKeeper.delete("/testnode",1);

    【运行结果】

    删除前,先看下对应节点数据

    执行删除代码后,可以看到节点已经不存在了

    【获取节点数据】

    public byte[] getData(String path, boolean watch, Stat stat) 
    public byte[] getData(String path, Watcher watcher, Stat stat)
    public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
    public void getData(String path, boolean watch, DataCallback cb, Object ctx)

    [获取节点数据实例]

    byte[] dateBytes = zooKeeper.getData("/testnode", true, null);
    String str = new String(dateBytes);
    System.err.println("获取的数据为==="+str);

    [运行结果]

    【获取子节点的列表数据】

    public List<String> getChildren(String path, Watcher watcher)
    public List<String> getChildren(String path, boolean watch, Stat stat)
    public List<String> getChildren(String path, boolean watch, Stat stat) 
    public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
    ......

    [获取子节点列表实例]

    List<String> childList = zooKeeper.getChildren("/testnode", null);
    for (String child : childList) {
        System.err.println("子节点=="+child);
    }

    [ 运行结果 ]

    先看下zk上对应的数据

    [ 控制台运行结果 ]

  • 相关阅读:
    Coding.net进阶,使用Git管理代码
    经典算法问题
    浅谈三款常用软件
    Coding.net简单使用指南
    湖北宜化总结
    天顺风能经验总结
    Vue中watch的高级用法
    html 锚点三种实现方法
    【机器学习】EM算法详细推导和讲解
    【机器学习】BP神经网络实现手写数字识别
  • 原文地址:https://www.cnblogs.com/HigginCui/p/9678505.html
Copyright © 2011-2022 走看看