zoukankan      html  css  js  c++  java
  • Zookeeper原生客户端

    1.1.1.1. 客户端基本操作

    package cn.enjoy.javaapi;

    import org.apache.zookeeper.*;

    import java.io.IOException;

    import java.util.concurrent.CountDownLatch;

    public class TestJavaApi implements Watcher {

        private static final int SESSION_TIMEOUT = 10000;

        private static final String CONNECTION_STRING = "192.168.30.10:2181";

        private static final String ZK_PATH = "/leader";

        private ZooKeeper zk = null;

        private CountDownLatch connectedSemaphore = new CountDownLatch(1);

        /**

         * 创建ZK连接

         *

         * @param connectString  ZK服务器地址列表

         * @param sessionTimeout Session超时时间

         */

        public void createConnection(String connectString, int sessionTimeout) {

            this.releaseConnection();

            try {

                zk = new ZooKeeper(connectString, sessionTimeout, this);

                connectedSemaphore.await();

            } catch (InterruptedException e) {

                System.out.println("连接创建失败,发生 InterruptedException");

                e.printStackTrace();

            } catch (IOException e) {

                System.out.println("连接创建失败,发生 IOException");

                e.printStackTrace();

            }

        }

        /**

         * 关闭ZK连接

         */

        public void releaseConnection() {

            if (null != this.zk) {

                try {

                    this.zk.close();

                } catch (InterruptedException e) {

                    // ignore

                    e.printStackTrace();

                }

            }

        }

        /**

         * 创建节点

         *

         * @param path 节点path

         * @param data 初始数据内容

         * @return

         */

        public boolean createPath(String path, String data) {

            try {

                System.out.println("节点创建成功, Path: "

                        + this.zk.create(path, // 节点路径

                        data.getBytes(), // 节点内容

                        ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限

                        CreateMode.EPHEMERAL) //节点类型

                        + ", content: " + data);

            } catch (KeeperException e) {

                System.out.println("节点创建失败,发生KeeperException");

                e.printStackTrace();

            } catch (InterruptedException e) {

                System.out.println("节点创建失败,发生 InterruptedException");

                e.printStackTrace();

            }

            return true;

        }

        /**

         * 读取指定节点数据内容

         *

         * @param path 节点path

         * @return

         */

        public String readData(String path) {

            try {

                System.out.println("获取数据成功,path" + path);

                return new String(this.zk.getData(path, false, null));

            } catch (KeeperException e) {

                System.out.println("读取数据失败,发生KeeperExceptionpath: " + path);

                e.printStackTrace();

                return "";

            } catch (InterruptedException e) {

                System.out.println("读取数据失败,发生 InterruptedExceptionpath: " + path);

                e.printStackTrace();

                return "";

            }

        }

        /**

         * 更新指定节点数据内容

         *

         * @param path 节点path

         * @param data 数据内容

         * @return

         */

        public boolean writeData(String path, String data) {

            try {

                System.out.println("更新数据成功,path" + path + ", stat: " +

                        this.zk.setData(path, data.getBytes(), -1));

            } catch (KeeperException e) {

                System.out.println("更新数据失败,发生KeeperExceptionpath: " + path);

                e.printStackTrace();

            } catch (InterruptedException e) {

                System.out.println("更新数据失败,发生 InterruptedExceptionpath: " + path);

                e.printStackTrace();

            }

            return false;

        }

        /**

         * 删除指定节点

         *

         * @param path 节点path

         */

        public void deleteNode(String path) {

            try {

                this.zk.delete(path, -1);

                System.out.println("删除节点成功,path" + path);

            } catch (KeeperException e) {

                System.out.println("删除节点失败,发生KeeperExceptionpath: " + path);

                e.printStackTrace();

            } catch (InterruptedException e) {

                System.out.println("删除节点失败,发生 InterruptedExceptionpath: " + path);

                e.printStackTrace();

            }

        }

        public static void main(String[] args) {

            TestJavaApi sample = new TestJavaApi();

            sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

            if (sample.createPath(ZK_PATH, "我是节点初始内容")) {

                System.out.println();

                System.out.println("数据内容: " + sample.readData(ZK_PATH) + " ");

                sample.writeData(ZK_PATH, "更新后的数据");

                System.out.println("数据内容: " + sample.readData(ZK_PATH) + " ");

                sample.deleteNode(ZK_PATH);

            }

            sample.releaseConnection();

        }

        /**

         * 收到来自ServerWatcher通知后的处理。

         */

        @Override

        public void process(WatchedEvent event) {

            System.out.println("收到事件通知:" + event.getState() + " ");

            if (Event.KeeperState.SyncConnected == event.getState()) {

                connectedSemaphore.countDown();

            }

        }

    }

    1.1.1.2. Watch机制

    package cn.enjoy.javaapi;

    import org.apache.zookeeper.*;

    import org.apache.zookeeper.data.Stat;

    import java.util.List;

    import java.util.concurrent.CountDownLatch;

    import java.util.concurrent.atomic.AtomicInteger;

    public class ZooKeeperWatcher implements Watcher  {

        /** 定义原子变量 */

        AtomicInteger seq = new AtomicInteger();

        /** 定义session失效时间 */

        private static final int SESSION_TIMEOUT = 10000;

        /** zookeeper服务器地址 */

        private static final String CONNECTION_ADDR = "192.168.30.10:2181";

        /** zk父路径设置 */

        private static final String PARENT_PATH = "/testWatch";

        /** zk子路径设置 */

        private static final String CHILDREN_PATH = "/testWatch/children";

        /** 进入标识 */

        private static final String LOG_PREFIX_OF_MAIN = "Main";

        /** zk变量 */

        private ZooKeeper zk = null;

        /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */

        private CountDownLatch connectedSemaphore = new CountDownLatch(1);

        /**

         * 创建ZK连接

         * @param connectAddr ZK服务器地址列表

         * @param sessionTimeout Session超时时间

         */

        public void createConnection(String connectAddr, int sessionTimeout) {

            this.releaseConnection();

            try {

                zk = new ZooKeeper(connectAddr, sessionTimeout, this);

                System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

                connectedSemaphore.await();

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

        /**

         * 关闭ZK连接

         */

        public void releaseConnection() {

            if (this.zk != null) {

                try {

                    this.zk.close();

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

        /**

         * 创建节点

         * @param path 节点路径

         * @param data 数据内容

         * @return

         */

        public boolean createPath(String path, String data) {

            try {

                //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)

                this.zk.exists(path, true);

                System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +

                        this.zk.create( /**路径*/

                                path,

                                /**数据*/

                                data.getBytes(),

                                /**所有可见*/

                                ZooDefs.Ids.OPEN_ACL_UNSAFE,

                                /**永久存储*/

                                CreateMode.PERSISTENT ) +

                        ", content: " + data);

            } catch (Exception e) {

                e.printStackTrace();

                return false;

            }

            return true;

        }

        /**

         * 读取指定节点数据内容

         * @param path 节点路径

         * @return

         */

        public String readData(String path, boolean needWatch) {

            try {

                return new String(this.zk.getData(path, needWatch, null));

            } catch (Exception e) {

                e.printStackTrace();

                return "";

            }

        }

        /**

         * 更新指定节点数据内容

         * @param path 节点路径

         * @param data 数据内容

         * @return

         */

        public boolean writeData(String path, String data) {

            try {

                System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path" + path + ", stat: " +

                        this.zk.setData(path, data.getBytes(), -1));

            } catch (Exception e) {

                e.printStackTrace();

            }

            return false;

        }

        /**

         * 删除指定节点

         *

         * @param path

         *            节点path

         */

        public void deleteNode(String path) {

            try {

                this.zk.delete(path, -1);

                System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path" + path);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

        /**

         * 判断指定节点是否存在

         * @param path 节点路径

         */

        public Stat exists(String path, boolean needWatch) {

            try {

                return this.zk.exists(path, needWatch);

            } catch (Exception e) {

                e.printStackTrace();

                return null;

            }

        }

        /**

         * 获取子节点

         * @param path 节点路径

         */

        private List<String> getChildren(String path, boolean needWatch) {

            try {

                return this.zk.getChildren(path, needWatch);

            } catch (Exception e) {

                e.printStackTrace();

                return null;

            }

        }

        /**

         * 删除所有节点

         */

        public void deleteAllTestPath() {

            if(this.exists(CHILDREN_PATH, false) != null){

                this.deleteNode(CHILDREN_PATH);

            }

            if(this.exists(PARENT_PATH, false) != null){

                this.deleteNode(PARENT_PATH);

            }

        }

        /**

         * 收到来自ServerWatcher通知后的处理。

         */

        @Override

        public void process(WatchedEvent event) {

            System.out.println("进入 process 。。。。。event = " + event);

            try {

                Thread.sleep(200);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            if (event == null) {

                return;

            }

            // 连接状态

            Watcher.Event.KeeperState keeperState = event.getState();

            // 事件类型

            Watcher.Event.EventType eventType = event.getType();

            // 受影响的path

            String path = event.getPath();

            String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";

            System.out.println(logPrefix + "收到Watcher通知");

            System.out.println(logPrefix + "连接状态: " + keeperState.toString());

            System.out.println(logPrefix + "事件类型: " + eventType.toString());

            if (Event.KeeperState.SyncConnected == keeperState) {

                // 成功连接上ZK服务器

                if (Event.EventType.None == eventType) {

                    System.out.println(logPrefix + "成功连接上ZK服务器");

                    connectedSemaphore.countDown();

                }

                //创建节点

                else if (Event.EventType.NodeCreated == eventType) {

                    System.out.println(logPrefix + "节点创建");

                    try {

                        Thread.sleep(100);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    this.exists(path, true);

                }

                //更新节点

                else if (Event.EventType.NodeDataChanged == eventType) {

                    System.out.println(logPrefix + "节点数据更新");

                    try {

                        Thread.sleep(100);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));

                }

                //更新子节点

                else if (Event.EventType.NodeChildrenChanged == eventType) {

                    System.out.println(logPrefix + "子节点变更");

                    try {

                        Thread.sleep(3000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                    System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));

                }

                //删除节点

                else if (Event.EventType.NodeDeleted == eventType) {

                    System.out.println(logPrefix + "节点 " + path + " 被删除");

                }

            }

            else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

                System.out.println(logPrefix + "ZK服务器断开连接");

            }

            else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

                System.out.println(logPrefix + "权限检查失败");

            }

            else if (Watcher.Event.KeeperState.Expired == keeperState) {

                System.out.println(logPrefix + "会话失效");

            }

            System.out.println("--------------------------------------------");

        }

        public static void main(String[] args) throws Exception {

            //建立watcher

            ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();

            //创建连接

            zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);

            //System.out.println(zkWatch.zk.toString());

            Thread.sleep(1000);

            // 清理节点

            zkWatch.deleteAllTestPath();

            if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {

                // 读取数据,在操作节点数据之前先调用zookeepergetData()方法是为了可以watch到对节点的操作。watch是一次性的,

                // 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。

                 System.out.println("---------------------- read parent ----------------------------");

                zkWatch.readData(PARENT_PATH, true);

                // 更新数据

               zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

                /** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged

                 也就是说创建子节点时没有watch

                 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1watch,输出c1NodeChildrenChanged

                 而不会输出创建c2时的NodeChildrenChanged,如果watchc2NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,

                 其中path="/p/c1"

                 */

                System.out.println("---------------------- read children path ----------------------------");

                zkWatch.getChildren(PARENT_PATH, true);

               Thread.sleep(1000);

                // 创建子节点,同理如果想要watchNodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)

                zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

                Thread.sleep(1000);

                zkWatch.readData(CHILDREN_PATH, true);

                zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");

            }

            Thread.sleep(50000);

            // 清理节点

            zkWatch.deleteAllTestPath();

            Thread.sleep(1000);

            zkWatch.releaseConnection();

        }

    }

    1.1.1.3. ZK认证机制

    package cn.enjoy.javaapi;

    import org.apache.zookeeper.Watcher;

    import org.apache.zookeeper.*;

    import org.apache.zookeeper.data.ACL;

    import org.apache.zookeeper.data.Stat;

    import java.util.ArrayList;

    import java.util.List;

    import java.util.concurrent.CountDownLatch;

    import java.util.concurrent.atomic.AtomicInteger;

    public class TestZookeeperAuth implements Watcher {

        /** 连接地址 */

        final static String CONNECT_ADDR = "192.168.30.10:2181";

        /** 测试路径 */

        final static String PATH = "/testAuth";

        final static String PATH_DEL = "/testAuth/delNode";

        /** 认证类型 */

        final static String authentication_type = "digest";

        /** 认证正确方法 */

        final static String correctAuthentication = "123456";

        /** 认证错误方法 */

        final static String badAuthentication = "654321";

        static ZooKeeper zk = null;

        /** 计时器 */

        AtomicInteger seq = new AtomicInteger();

        /** 标识 */

        private static final String LOG_PREFIX_OF_MAIN = "Main";

        private CountDownLatch connectedSemaphore = new CountDownLatch(1);

        @Override

        public void process(WatchedEvent event) {

            try {

                Thread.sleep(200);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            if (event==null) {

                return;

            }

            // 连接状态

            Event.KeeperState keeperState = event.getState();

            // 事件类型

            Event.EventType eventType = event.getType();

            // 受影响的path

            String path = event.getPath();

            String logPrefix = "Watcher-" + this.seq.incrementAndGet() + "";

            System.out.println(logPrefix + "收到Watcher通知");

            System.out.println(logPrefix + "连接状态: " + keeperState.toString());

            System.out.println(logPrefix + "事件类型: " + eventType.toString());

            if (Event.KeeperState.SyncConnected == keeperState) {

                // 成功连接上ZK服务器

                if (Event.EventType.None == eventType) {

                    System.out.println(logPrefix + "成功连接上ZK服务器");

                    connectedSemaphore.countDown();

                }

            } else if (Event.KeeperState.Disconnected == keeperState) {

                System.out.println(logPrefix + "ZK服务器断开连接");

            } else if (Event.KeeperState.AuthFailed == keeperState) {

                System.out.println(logPrefix + "权限检查失败");

            } else if (Event.KeeperState.Expired == keeperState) {

                System.out.println(logPrefix + "会话失效");

            }

            System.out.println("--------------------------------------------");

        }

        /**

         * 创建ZK连接

         *

         * @param connectString

         *            ZK服务器地址列表

         * @param sessionTimeout

         *            Session超时时间

         */

        public void createConnection(String connectString, int sessionTimeout) {

            this.releaseConnection();

            try {

                zk = new ZooKeeper(connectString, sessionTimeout, this);

                //添加节点授权

                zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());

                System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

                //倒数等待

                connectedSemaphore.await();

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

        /**

         * 关闭ZK连接

         */

        public void releaseConnection() {

            if (this.zk!=null) {

                try {

                    this.zk.close();

                } catch (InterruptedException e) {

                }

            }

        }

        /**

         *

         * @param args

         * @throws Exception

         */

        public static void main(String[] args) throws Exception {

            TestZookeeperAuth testAuth = new TestZookeeperAuth();

            testAuth.createConnection(CONNECT_ADDR,2000);

            List<ACL> acls = new ArrayList<ACL>(1);

            for (ACL ids_acl : ZooDefs.Ids.CREATOR_ALL_ACL) {

                acls.add(ids_acl);

            }

            try {

                zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);

                System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");

            } catch (Exception e) {

                e.printStackTrace();

            }

            try {

                zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);

                System.out.println("使用授权key" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");

            } catch (Exception e) {

                e.printStackTrace();

            }

            // 获取数据

            getDataByNoAuthentication();

            getDataByBadAuthentication();

            getDataByCorrectAuthentication();

            // 更新数据

            updateDataByNoAuthentication();

            updateDataByBadAuthentication();

            updateDataByCorrectAuthentication();

            // 删除数据

            deleteNodeByBadAuthentication();

            deleteNodeByNoAuthentication();

            deleteNodeByCorrectAuthentication();

            //

            Thread.sleep(1000);

            deleteParent();

            //释放连接

            testAuth.releaseConnection();

        }

        /** 获取数据:采用错误的密码 */

        static void getDataByBadAuthentication() {

            String prefix = "[使用错误的授权信息]";

            try {

                ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                //授权

                badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

                Thread.sleep(2000);

                System.out.println(prefix + "获取数据:" + PATH);

                System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));

            } catch (Exception e) {

                System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

            }

        }

        /** 获取数据:不采用密码 */

        static void getDataByNoAuthentication() {

            String prefix = "[不使用任何授权信息]";

            try {

                System.out.println(prefix + "获取数据:" + PATH);

                ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                Thread.sleep(2000);

                System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));

            } catch (Exception e) {

                System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

            }

        }

        /** 采用正确的密码 */

        static void getDataByCorrectAuthentication() {

            String prefix = "[使用正确的授权信息]";

            try {

                System.out.println(prefix + "获取数据:" + PATH);

                System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));

            } catch (Exception e) {

                System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());

            }

        }

        /**

         * 更新数据:不采用密码

         */

        static void updateDataByNoAuthentication() {

            String prefix = "[不使用任何授权信息]";

            System.out.println(prefix + "更新数据: " + PATH);

            try {

                ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                Thread.sleep(2000);

                Stat stat = nozk.exists(PATH, false);

                if (stat!=null) {

                    nozk.setData(PATH, prefix.getBytes(), -1);

                    System.out.println(prefix + "更新成功");

                }

            } catch (Exception e) {

                System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 更新数据:采用错误的密码

         */

        static void updateDataByBadAuthentication() {

            String prefix = "[使用错误的授权信息]";

            System.out.println(prefix + "更新数据:" + PATH);

            try {

                ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                //授权

                badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

                Thread.sleep(2000);

                Stat stat = badzk.exists(PATH, false);

                if (stat!=null) {

                    badzk.setData(PATH, prefix.getBytes(), -1);

                    System.out.println(prefix + "更新成功");

                }

            } catch (Exception e) {

                System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 更新数据:采用正确的密码

         */

        static void updateDataByCorrectAuthentication() {

            String prefix = "[使用正确的授权信息]";

            System.out.println(prefix + "更新数据:" + PATH);

            try {

                Stat stat = zk.exists(PATH, false);

                if (stat!=null) {

                    zk.setData(PATH, prefix.getBytes(), -1);

                    System.out.println(prefix + "更新成功");

                }

            } catch (Exception e) {

                System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 不使用密码 删除节点

         */

        static void deleteNodeByNoAuthentication() throws Exception {

            String prefix = "[不使用任何授权信息]";

            try {

                System.out.println(prefix + "删除节点:" + PATH_DEL);

                ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                Thread.sleep(2000);

                Stat stat = nozk.exists(PATH_DEL, false);

                if (stat!=null) {

                    nozk.delete(PATH_DEL,-1);

                    System.out.println(prefix + "删除成功");

                }

            } catch (Exception e) {

                System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 采用错误的密码删除节点

         */

        static void deleteNodeByBadAuthentication() throws Exception {

            String prefix = "[使用错误的授权信息]";

            try {

                System.out.println(prefix + "删除节点:" + PATH_DEL);

                ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

                //授权

                badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

                Thread.sleep(2000);

                Stat stat = badzk.exists(PATH_DEL, false);

                if (stat!=null) {

                    badzk.delete(PATH_DEL, -1);

                    System.out.println(prefix + "删除成功");

                }

            } catch (Exception e) {

                System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 使用正确的密码删除节点

         */

        static void deleteNodeByCorrectAuthentication() throws Exception {

            String prefix = "[使用正确的授权信息]";

            try {

                System.out.println(prefix + "删除节点:" + PATH_DEL);

                Stat stat = zk.exists(PATH_DEL, false);

                if (stat!=null) {

                    zk.delete(PATH_DEL, -1);

                    System.out.println(prefix + "删除成功");

                }

            } catch (Exception e) {

                System.out.println(prefix + "删除失败,原因是:" + e.getMessage());

            }

        }

        /**

         * 使用正确的密码删除节点

         */

        static void deleteParent() throws Exception {

            try {

                Stat stat = zk.exists(PATH_DEL, false);

                if (stat == null) {

                    zk.delete(PATH, -1);

                }

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

    }

    1.1.1. ZkClient

    1.1.1.1. 基本操作

     package cn.enjoy.zkclient;

    import org.I0Itec.zkclient.ZkClient;

    import org.I0Itec.zkclient.ZkConnection;

    import java.util.List;

    /**

     * Created by VULCAN on 2018/11/7.

     */

    public class ZkClientOperator {

        /** zookeeper地址 */

        static final String CONNECT_ADDR = "192.168.30.10:2181";

        /** session超时时间 */

        static final int SESSION_OUTTIME = 10000;//ms

        public static void main(String[] args) throws Exception {

           // ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

            ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);

            //1. create and delete方法

            zkc.createEphemeral("/temp");

            zkc.createPersistent("/super/c1", true);

            Thread.sleep(10000);

            zkc.delete("/temp");

            zkc.deleteRecursive("/super");

            //2. 设置pathdata 并且读取子节点和每个节点的内容

            zkc.createPersistent("/super", "1234");

            zkc.createPersistent("/super/c1", "c1内容");

            zkc.createPersistent("/super/c2", "c2内容");

            List<String> list = zkc.getChildren("/super");

            for(String p : list){

                System.out.println(p);

                String rp = "/super/" + p;

                String data = zkc.readData(rp);

                System.out.println("节点为:" + rp + ",内容为: " + data);

            }

            //3. 更新和判断节点是否存在

            zkc.writeData("/super/c1", "新内容");

            System.out.println(zkc.readData("/super/c1").toString());

            System.out.println(zkc.exists("/super/c1"));

    // 4.递归删除/super内容

            zkc.deleteRecursive("/super");

        }

    }

    1.1.1.2. 监听机制

    package cn.enjoy.zkclient;

    import org.I0Itec.zkclient.IZkChildListener;

    import org.I0Itec.zkclient.IZkDataListener;

    import org.I0Itec.zkclient.ZkClient;

    import org.I0Itec.zkclient.ZkConnection;

    import org.junit.Test;

    import java.util.List;

    public class TestZkClientWatcher {

        /** zookeeper地址 */

        static final String CONNECT_ADDR = "192.168.30.10:2181";

        /** session超时时间 */

        static final int SESSION_OUTTIME = 10000;//ms

        @Test

        /**

         * subscribeChildChanges方法 订阅子节点变化

         */

        public  void testZkClientWatcher1() throws Exception {

            ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

            //对父节点添加监听子节点变化。

            zkc.subscribeChildChanges("/super", new IZkChildListener() {

                @Override

                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {

                    System.out.println("parentPath: " + parentPath);

                    System.out.println("currentChilds: " + currentChilds);

                }

            });

            Thread.sleep(3000);

            zkc.createPersistent("/super");

            Thread.sleep(1000);

            zkc.createPersistent("/super" + "/" + "c1", "c1内容");

            Thread.sleep(1000);

            zkc.createPersistent("/super" + "/" + "c2", "c2内容");

            Thread.sleep(1000);

            zkc.delete("/super/c2");

            Thread.sleep(1000);

            zkc.deleteRecursive("/super");

            Thread.sleep(Integer.MAX_VALUE);

        }

        @Test

        /**

         * subscribeDataChanges 订阅内容变化

         */

        public void testZkClientWatcher2() throws Exception {

            ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

            zkc.createPersistent("/super", "1234");

            //对父节点添加监听子节点变化。

            zkc.subscribeDataChanges("/super", new IZkDataListener() {

                @Override

                public void handleDataDeleted(String path) throws Exception {

                    System.out.println("删除的节点为:" + path);

                }

                @Override

                public void handleDataChange(String path, Object data) throws Exception {

                    System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);

                }

            });

            Thread.sleep(3000);

            zkc.writeData("/super", "456", -1);

            Thread.sleep(1000);

            zkc.delete("/super");

            Thread.sleep(Integer.MAX_VALUE);

        }

    }

    1.1.2. Curator

    1.1.2.1.  基本操作

    package cn.enjoy.curator;

    import org.apache.curator.RetryPolicy;

    import org.apache.curator.framework.CuratorFramework;

    import org.apache.curator.framework.CuratorFrameworkFactory;

    import org.apache.curator.framework.api.BackgroundCallback;

    import org.apache.curator.framework.api.CuratorEvent;

    import org.apache.curator.framework.api.CuratorListener;

    import org.apache.curator.framework.api.transaction.CuratorOp;

    import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

    import org.apache.curator.retry.ExponentialBackoffRetry;

    import org.apache.zookeeper.CreateMode;

    import org.apache.zookeeper.data.Stat;

    import org.junit.Before;

    import org.junit.Test;

    import java.util.List;

    import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;

    /**

     *  测试Apache Curator框架的基本用法

     */

    public class OperatorTest {

        //ZooKeeper服务地址

        private static final String SERVER = "192.168.30.10:2181";

        //会话超时时间

        private final int SESSION_TIMEOUT = 30000;

        //连接超时时间

        private final int CONNECTION_TIMEOUT = 5000;

        //创建连接实例

        private CuratorFramework client = null;

        /**

         * baseSleepTimeMs:初始的重试等待时间

         * maxRetries:最多重试次数

         *

         *

         * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

         * RetryNTimes:重试N

         * RetryOneTime:重试一次

         * RetryUntilElapsed:重试一定时间

         */

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    @org.junit.Before

        public void init(){

            //创建 CuratorFrameworkImpl实例

            client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

            //启动

            client.start();

        }

        /**

         * 测试创建节点

         * @throws Exception

         */

        @Test

        public void testCreate() throws Exception{

            //创建永久节点

            client.create().forPath("/curator","/curator data".getBytes());

            //创建永久有序节点

            client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());

            //创建临时节点

            client.create().withMode(CreateMode.EPHEMERAL)

                    .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());

            //创建临时有序节点

            client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

                    .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());

        }

        /**

         * 测试检查某个节点是否存在

         * @throws Exception

         */

        @Test

        public void testCheck() throws Exception{

            Stat stat1 = client.checkExists().forPath("/curator");

            Stat stat2 = client.checkExists().forPath("/curator2");

            System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false));

            System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));

        }

        /**

         * 测试异步设置节点数据

         * @throws Exception

         */

        @Test

        public void testSetDataAsync() throws Exception{

            //创建监听器

            CuratorListener listener = new CuratorListener() {

                @Override

                public void eventReceived(CuratorFramework client, CuratorEvent event)

                        throws Exception {

                    System.out.println(event.getPath());

                }

            };

            //添加监听器

            client.getCuratorListenable().addListener(listener);

            //异步设置某个节点数据

            client.setData().inBackground().forPath("/curator","sync".getBytes());

            //为了防止单元测试结束从而看不到异步执行结果,因此暂停10

            Thread.sleep(10000);

        }

        /**

         * 测试另一种异步执行获取通知的方式

         * @throws Exception

         */

        @Test

        public void testSetDataAsyncWithCallback() throws Exception{

            BackgroundCallback callback = new BackgroundCallback() {

                @Override

                public void processResult(CuratorFramework client, CuratorEvent event)

                        throws Exception {

                    System.out.println(event.getPath());

                }

            };

            //异步设置某个节点数据

            client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());

            //为了防止单元测试结束从而看不到异步执行结果,因此暂停10

            Thread.sleep(10000);

        }

        /**

         * 测试删除节点

         * @throws Exception

         */

        @Test

        public void testDelete() throws Exception{

            //创建测试节点

            client.create().orSetData().creatingParentsIfNeeded()

                    .forPath("/curator/del_key1","/curator/del_key1 data".getBytes());

            client.create().orSetData().creatingParentsIfNeeded()

                    .forPath("/curator/del_key2","/curator/del_key2 data".getBytes());

            client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());

            //删除该节点

            client.delete().forPath("/curator/del_key1");

            //级联删除子节点

            client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");

        }

        /*

         * 测试事务管理:碰到异常,事务会回滚

         * @throws Exception

         */

        @Test

        public void testTransaction() throws Exception{

            //定义几个基本操作

            CuratorOp createOp = client.transactionOp().create()

                    .forPath("/curator/one_path","some data".getBytes());

            CuratorOp setDataOp = client.transactionOp().setData()

                    .forPath("/curator","other data".getBytes());

            CuratorOp deleteOp = client.transactionOp().delete()

                    .forPath("/curator");

            //事务执行结果

            List<CuratorTransactionResult> results = client.transaction()

                    .forOperations(createOp,setDataOp,deleteOp);

            //遍历输出结果

            for(CuratorTransactionResult result : results){

                System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());

            }

        }

    }

    1.1.2.2. 监听机制

    package cn.enjoy.curator;

    import org.apache.curator.RetryPolicy;

    import org.apache.curator.framework.CuratorFramework;

    import org.apache.curator.framework.CuratorFrameworkFactory;

    import org.apache.curator.framework.api.CuratorEvent;

    import org.apache.curator.framework.api.CuratorListener;

    import org.apache.curator.framework.recipes.cache.*;

    import org.apache.curator.retry.ExponentialBackoffRetry;

    import org.apache.zookeeper.CreateMode;

    import org.apache.zookeeper.WatchedEvent;

    import org.apache.zookeeper.Watcher;

    import org.junit.Test;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    public class EventTest {

        //ZooKeeper服务地址

        private static final String SERVER = "192.168.30.10:2181";

        //会话超时时间

        private final int SESSION_TIMEOUT = 30000;

        //连接超时时间

        private final int CONNECTION_TIMEOUT = 5000;

        //创建连接实例

        private CuratorFramework client = null;

        /**

         * baseSleepTimeMs:初始的重试等待时间

         * maxRetries:最多重试次数

         *

         *

         * ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

         * RetryNTimes:重试N

         * RetryOneTime:重试一次

         * RetryUntilElapsed:重试一定时间

         */

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

        @org.junit.Before

        public void init(){

            //创建 CuratorFrameworkImpl实例

            client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

            //启动

            client.start();

        }

        /**

         *

         * @描述:第一种监听器的添加方式: 对指定的节点进行添加操作

         * 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好

         */

        @Test

        public  void TestListenterOne() throws Exception{

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            // 注册观察者,当节点变动时触发

            byte[] data = client.getData().usingWatcher(new Watcher() {

                @Override

                public void process(WatchedEvent event) {

                    System.out.println("获取 test 节点 监听器 : " + event);

                }

            }).forPath("/test");

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            Thread.sleep(1000);

            System.out.println("节点数据: "+ new String(data));

            Thread.sleep(10000);

        }

        /**

         *

         * @描述:第二种监听器的添加方式: Cache 的三种实现

         *   Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。

         *                  产生的事件会传递给注册的PathChildrenCacheListener

         *  Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

         *  Tree CachePath CacheNode Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

         */

        //1.path Cache  连接  路径  是否获取数据

        //能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听

        @Test

        public void setListenterTwoOne() throws Exception{

            ExecutorService pool = Executors.newCachedThreadPool();

            PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);

            PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {

                @Override

                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

                    System.out.println("开始进行事件分析:-----");

                    ChildData data = event.getData();

                    switch (event.getType()) {

                        case CHILD_ADDED:

                            System.out.println("CHILD_ADDED : "+ data.getPath() +"  数据:"+ data.getData());

                            break;

                        case CHILD_REMOVED:

                            System.out.println("CHILD_REMOVED : "+ data.getPath() +"  数据:"+ data.getData());

                            break;

                        case CHILD_UPDATED:

                            System.out.println("CHILD_UPDATED : "+ data.getPath() +"  数据:"+ data.getData());

                            break;

                        case INITIALIZED:

                            System.out.println("INITIALIZED : "+ data.getPath() +"  数据:"+ data.getData());

                            break;

                        default:

                            break;

                    }

                }

            };

            childrenCache.getListenable().addListener(childrenCacheListener);

            System.out.println("Register zk watcher successfully!");

            childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

            //创建一个节点

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes());

            Thread.sleep(1000);

            client.delete().forPath("/test/node02");

            Thread.sleep(10000);

        }

        //2.Node Cache  监控本节点的变化情况   连接 目录 是否压缩

        //监听本节点的变化  节点可以进行修改操作  删除节点后会再次创建(空节点)

        @Test

        public void setListenterTwoTwo() throws Exception{

            ExecutorService pool = Executors.newCachedThreadPool();

            //设置节点的cache

            final NodeCache nodeCache = new NodeCache(client, "/test", false);

            nodeCache.getListenable().addListener(new NodeCacheListener() {

                @Override

                public void nodeChanged() throws Exception {

                    System.out.println("the test node is change and result is :");

                    System.out.println("path : "+nodeCache.getCurrentData().getPath());

                    System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));

                    System.out.println("stat : "+nodeCache.getCurrentData().getStat());

                }

            });

            nodeCache.start();

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes());

            Thread.sleep(10000);

        }

        //3.Tree Cache

        // 监控 指定节点和节点下的所有的节点的变化--无限监听  可以进行本节点的删除(不在创建)

        @Test

        public void TestListenterTwoThree() throws Exception{

            ExecutorService pool = Executors.newCachedThreadPool();

            //设置节点的cache

            TreeCache treeCache = new TreeCache(client, "/test");

            //设置监听器和处理过程

            treeCache.getListenable().addListener(new TreeCacheListener() {

                @Override

                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

                    ChildData data = event.getData();

                    if(data !=null){

                        switch (event.getType()) {

                            case NODE_ADDED:

                                System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                                break;

                            case NODE_REMOVED:

                                System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                                break;

                            case NODE_UPDATED:

                                System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

                                break;

                            default:

                                break;

                        }

                    }else{

                        System.out.println( "data is null : "+ event.getType());

                    }

                }

            });

            //开始监听

            treeCache.start();

            //创建一个节点

            client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());

            Thread.sleep(1000);

            client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());

            Thread.sleep(10000);

        }

    }

  • 相关阅读:
    yablog: calculate cosine with python numpy
    HDF
    numarray 1.5.1
    Angles between two ndimensional vectors in Python Stack Overflow
    3D stem plot
    linq to sql一定要注意的地方!
    将IRepository接口进行抽象,使它成为数据基类的一个对象,这样每个子类都可以有自己的最基础的CURD了
    (SQL)比较一个集合是否在另一个集合里存在的方法
    linq to sql统一更新方法,直接返回更新的对象(解决更新后再刷新数据错误显示问题)
    LINQ TO SQL数据实体应该这样设计(解决多表关联问题)
  • 原文地址:https://www.cnblogs.com/Soy-technology/p/11391701.html
Copyright © 2011-2022 走看看