zoukankan      html  css  js  c++  java
  • zookeper

    package com.zkdemo.zkdemo.client;
    
    import com.zkdemo.zkdemo.client.MyZkWatcher;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 描述:zookeeper增删改查
     * 作者:七脉
     * 重点:删除、修改、查询所用的version版本号,分布式事务锁的实现方式,乐观锁。ACL权限:所有人、限定人、限定IP等
     */
    public class Test {
    
        private static Logger log = LogManager.getLogger(Test.class);
    
        //集群节点
        public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183";
    
        //单一节点
        public static final String zkServerSingleConnect = "192.168.1.90:2181";
    
        //超时毫秒数
        public static final int timeout = 3000;
    
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
            //建立连接
            ZooKeeper zk = connect();
            //zk.close();//关闭后不支持重连
            log.info("zk 状态:" + zk.getState());
    
            /**恢复会话连接**/
            //long sessionId = zk.getSessionId();
            //byte[] sessionPasswd = zk.getSessionPasswd();
            //zk2会话重连后,zk会话将失效,不再支持做增删改查操作。
            //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd);
    
            /**创建节点**/
            //create(zk, "/myzk", "myzk");
    
            /**查询节点Data**/
            queryData(zk, "/myzk");
    
            /**修改节点data**/
            //update(zk, "/myzk", "myzk-update");
    
            /**删除节点**/
            //delete(zk, "/myzk");
        }
    
    
        /**
         * 描述:建立连接
         * 作者:七脉
         *
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        public static ZooKeeper connect() throws IOException, InterruptedException {
            //可重入锁
            CountDownLatch cdl = new CountDownLatch(1);
            log.info("准备建立zk服务");
            ZooKeeper zk = new ZooKeeper(zkServerSingleConnect, timeout, new MyZkWatcher(cdl, "建立连接"));
            log.info("完成建立zk服务");
            //这里为了等待wather监听事件结束
            cdl.await();
            return zk;
        }
    
        /**
         * 描述:重新连接服务
         * 作者:七脉
         *
         * @param sessionId     现有会话ID
         * @param sessionPasswd 现有会话密码
         * @return
         * @throws IOException
         * @throws InterruptedException 重点:关闭后的会话连接,不支持重连。重连后,前会话连接将会失效。
         */
        public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException {
            //可重入锁
            CountDownLatch cdl = new CountDownLatch(1);
            log.info("准备重新连接zk服务");
            ZooKeeper zk = new ZooKeeper(zkServerSingleConnect, timeout, new MyZkWatcher(cdl, "重新连接"), sessionId, sessionPasswd);
            log.info("完成重新连接zk服务");
            //这里为了等待wather监听事件结束
            cdl.await();
            return zk;
        }
    
        /**
         * 描述:创建节点
         * 作者:七脉
         *
         * @param zk
         * @param nodePath
         * @param nodeData
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static void create(ZooKeeper zk, String nodePath, String nodeData) throws KeeperException, InterruptedException {
            log.info("开始创建节点:{}, 数据:{}", nodePath, nodeData);
            List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
            //创建持久节点
            CreateMode createMode = CreateMode.PERSISTENT;
            String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode);
            //创建节点有两种,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            log.info("创建节点返回结果:{}", result);
            log.info("完成创建节点:{}, 数据:{}", nodePath, nodeData);
        }
    
        /**
         * 描述:查询节点结构信息
         * 作者:七脉
         *
         * @param zk
         * @param nodePath
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static Stat queryStat(ZooKeeper zk, String nodePath) throws KeeperException, InterruptedException {
            log.info("准备查询节点Stat,path:{}", nodePath);
            Stat stat = zk.exists(nodePath, false);
            log.info("结束查询节点Stat,path:{},version:{}", nodePath, stat.getVersion());
            return stat;
        }
    
        /**
         * 描述:查询节点Data值信息
         * 作者:七脉
         *
         * @param zk
         * @param nodePath
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static String queryData(ZooKeeper zk, String nodePath) throws KeeperException, InterruptedException {
            log.info("准备查询节点Data,path:{}", nodePath);
            String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath)));
            log.info("结束查询节点Data,path:{},Data:{}", nodePath, data);
            return data;
        }
    
    
        /**
         * 描述:修改节点
         * 作者:七脉
         *
         * @param zk
         * @param nodePath
         * @param nodeData
         * @throws KeeperException
         * @throws InterruptedException 重点:每次修改节点的version版本号都会变更,所以每次修改都需要传递节点原版本号,以确保数据的安全性。
         */
        public static Stat update(ZooKeeper zk, String nodePath, String nodeData) throws KeeperException, InterruptedException {
            //修改节点前先查询该节点信息
            Stat stat = queryStat(zk, nodePath);
            log.info("准备修改节点,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion());
            Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion());
            //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            //zk.setData(path, data, version, cb, ctx);
            log.info("完成修改节点,path:{},data:{},现version:{}", nodePath, nodeData, newStat.getVersion());
            return stat;
        }
    
        /**
         * 描述:删除节点
         * 作者:七脉
         *
         * @param zk
         * @param nodePath
         * @throws InterruptedException
         * @throws KeeperException
         */
        public static void delete(ZooKeeper zk, String nodePath) throws InterruptedException, KeeperException {
            //删除节点前先查询该节点信息
            Stat stat = queryStat(zk, nodePath);
            log.info("准备删除节点,path:{},原version:{}", nodePath, stat.getVersion());
            zk.delete(nodePath, stat.getVersion());
            //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            //zk.delete(path, version, cb, ctx);
            log.info("完成删除节点,path:{}", nodePath);
        }
    
    }
    

      

    package com.zkdemo.zkdemo.client;
    
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 描述:ZK服务观察者事件
     * 作者:七脉
     */
    public class MyZkWatcher implements Watcher{
    
        private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class);
    
        //异步锁
        private CountDownLatch cdl;
    
        //标记
        private String mark;
    
        public MyZkWatcher(CountDownLatch cdl,String mark) {
            this.cdl = cdl;
            this.mark = mark;
        }
    
        //监听事件处理方法
        public void process(WatchedEvent event) {
            log.info(mark+" watcher监听事件:{}",event);
            cdl.countDown();
        }
    
    }
    

      学习参考地址:https://www.cnblogs.com/liuwei6/p/6736335.html

           Apache Curator https://www.cnblogs.com/erbing/p/9799098.html

  • 相关阅读:
    Linux异步IO
    基本数据类型总结--
    总结
    字典魔法二
    字典及其魔法
    元祖的魔法
    列表的特点
    运算符
    while ……else……和while……continue……和 while…………break…………
    作业---写一个程序,用户名 、密码输入错误3次 错误
  • 原文地址:https://www.cnblogs.com/java-le/p/12201166.html
Copyright © 2011-2022 走看看