zoukankan      html  css  js  c++  java
  • 五、Zookeeper基于API操作Node节点

    安装zookeeper :linux下安装Zookeeper 3.4.14

    zookeeper 分为5个包:

    1. org.apache.zookeeper //客户端主要类文件

    2. org.apache.zookeeper.data //

    3. org.apache.zookeeper.server

    4. org.apache.zookeeper.server.quorum

    5. org.apache.zookeeper.server.upgrade

    建立会话

    引入依赖:

    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
    </dependency>
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class CreateSession implements Watcher {
    
        private  static CountDownLatch countDownLatch = new CountDownLatch(1);
    
    
        /*
          建立会话
         */
        public static void main(String[] args) throws IOException, InterruptedException {
    
         /*
            客户端可以通过创建一个zk实例来连接zk服务器
            new Zookeeper(connectString,sesssionTimeOut,Wather)
            connectString: 连接地址:IP:端口
            sesssionTimeOut:会话超时时间:单位毫秒
            Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
    
            ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
            System.out.println(zooKeeper.getState());
    
            // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
            countDownLatch.await();
    
            System.out.println("客户端与服务端会话真正建立了");
    
        }
    
        /*
            回调方法:处理来自服务器端的watcher通知
         */
        public void process(WatchedEvent watchedEvent) {
            // SyncConnected
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
    
                //解除主程序在CountDownLatch上的等待阻塞
                System.out.println("process方法执行了...");
                countDownLatch.countDown();
    
            }
    
    
        }
    }

    注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话

    创建节点

    节点介绍:

    /**
    * path :节点创建的路径
    * data[] :节点创建要保存的数据,是个byte类型的
    * acl :节点创建的权限信息(4种类型)
    *         ANYONE_ID_UNSAFE : 表示任何人
    *         AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
    *         OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
    *         CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
    * createMode :创建节点的类型(4种类型)
    *        PERSISTENT:持久节点
    *        PERSISTENT_SEQUENTIAL:持久顺序节点
    *        EPHEMERAL:临时节点
    *        EPHEMERAL_SEQUENTIAL:临时顺序节点
    String node = zookeeper.create(path,data,acl,createMode);
    */

    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class CreateNote implements Watcher {
    
        private  static CountDownLatch countDownLatch = new CountDownLatch(1);
    
        private static ZooKeeper zooKeeper;
    
    
        /*
          建立会话
         */
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    
         /*
            客户端可以通过创建一个zk实例来连接zk服务器
            new Zookeeper(connectString,sesssionTimeOut,Wather)
            connectString: 连接地址:IP:端口
            sesssionTimeOut:会话超时时间:单位毫秒
            Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
    
             zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
            System.out.println(zooKeeper.getState());
    
            // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
            //countDownLatch.await();
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    
    
    
        /*
            回调方法:处理来自服务器端的watcher通知
         */
        public void process(WatchedEvent watchedEvent) {
            // SyncConnected
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
    
                //解除主程序在CountDownLatch上的等待阻塞
                System.out.println("process方法执行了...");
                // 创建节点
                try {
                    createNoteSync();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
    
    
        }
    
    
        /*
           创建节点的方法
       */
        private static void createNoteSync() throws KeeperException, InterruptedException {
    
            /**
             *  path        :节点创建的路径
             *  data[]      :节点创建要保存的数据,是个byte类型的
             *  acl         :节点创建的权限信息(4种类型)
             *                 ANYONE_ID_UNSAFE    : 表示任何人
             *                 AUTH_IDS    :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
             *                 OPEN_ACL_UNSAFE    :这是一个完全开放的ACL(常用)--> world:anyone
             *                 CREATOR_ALL_ACL  :此ACL授予创建者身份验证ID的所有权限
             *  createMode    :创建节点的类型(4种类型)
             *                  PERSISTENT:持久节点
             *                    PERSISTENT_SEQUENTIAL:持久顺序节点
             *                  EPHEMERAL:临时节点
             *                  EPHEMERAL_SEQUENTIAL:临时顺序节点
             String node = zookeeper.create(path,data,acl,createMode);
             */
    
            // 持久节点
            String note_persistent = zooKeeper.create("/g-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            // 临时节点
            String note_ephemeral = zooKeeper.create("/g-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    
            // 持久顺序节点
            String note_persistent_sequential = zooKeeper.create("/g-persistent_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
    
            System.out.println("创建的持久节点" + note_persistent);
            System.out.println("创建的临时节点" + note_ephemeral);
            System.out.println("创建的持久顺序节点" + note_persistent_sequential);
    
        }
    }

    读取节点:

    public class GetNoteData implements Watcher {
    
        private  static CountDownLatch countDownLatch = new CountDownLatch(1);
    
        private static ZooKeeper zooKeeper;
    
    
        /*
          建立会话
         */
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    
         /*
            客户端可以通过创建一个zk实例来连接zk服务器
            new Zookeeper(connectString,sesssionTimeOut,Wather)
            connectString: 连接地址:IP:端口
            sesssionTimeOut:会话超时时间:单位毫秒
            Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
    
             zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
            System.out.println(zooKeeper.getState());
    
            // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
            //countDownLatch.await();
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    
    
    
        /*
            回调方法:处理来自服务器端的watcher通知
         */
        public void process(WatchedEvent watchedEvent) {
    
            /*
                子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
                要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
             */
            if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
    
                List<String> children = null;
                try {
                    children = zooKeeper.getChildren("/g-persistent", true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(children);
    
            }
    
    
    
            // SyncConnected
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
    
                //解除主程序在CountDownLatch上的等待阻塞
                System.out.println("process方法执行了...");
    
                // 获取节点数据的方法
                try {
                    getNoteData();
    
                    // 获取节点的子节点列表方法
                    getChildrens();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
            }
    
    
        }
    
        /*
            获取某个节点的内容
         */
        private void getNoteData() throws KeeperException, InterruptedException {
    
            /**
             * path    : 获取数据的路径
             * watch    : 是否开启监听
             * stat    : 节点状态信息
             *        null: 表示获取最新版本的数据
             *  zk.getData(path, watch, stat);
             */
            byte[] data = zooKeeper.getData("/g-persistent", false, null);
            System.out.println(new String(data));
    
    
        }
    
    
        /*
            获取某个节点的子节点列表方法
         */
        public static void getChildrens() throws KeeperException, InterruptedException {
    
            /*
                path:路径
                watch:是否要启动监听,当子节点列表发生变化,会触发监听
                zooKeeper.getChildren(path, watch);
             */
            List<String> children = zooKeeper.getChildren("/g-persistent", true);
            System.out.println(children);
    
    
    
        }
    
    
    
    }

    删除节点

    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class DeleteNote implements Watcher {
    
        private  static CountDownLatch countDownLatch = new CountDownLatch(1);
    
        private static ZooKeeper zooKeeper;
    
    
        /*
          建立会话
         */
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    
         /*
            客户端可以通过创建一个zk实例来连接zk服务器
            new Zookeeper(connectString,sesssionTimeOut,Wather)
            connectString: 连接地址:IP:端口
            sesssionTimeOut:会话超时时间:单位毫秒
            Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
    
             zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
            System.out.println(zooKeeper.getState());
    
            // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
            //countDownLatch.await();
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    
    
    
        /*
            回调方法:处理来自服务器端的watcher通知
         */
        public void process(WatchedEvent watchedEvent) {
            // SyncConnected
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
    
                //解除主程序在CountDownLatch上的等待阻塞
                System.out.println("process方法执行了...");
                // 删除节点
                try {
                    deleteNoteSync();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
    
            }
    
    
        }
    
        /*
            删除节点的方法
         */
        private void deleteNoteSync() throws KeeperException, InterruptedException {
    
            /*
              zooKeeper.exists(path,watch) :判断节点是否存在
              zookeeper.delete(path,version) : 删除节点
          */
    
            Stat stat = zooKeeper.exists("/g-persistent/c1", false);
            System.out.println(stat == null ? "该节点不存在":"该节点存在");
    
            if(stat != null){
                zooKeeper.delete("/g-persistent/c1",-1);
            }
    
            Stat stat2 = zooKeeper.exists("/g-persistent/c1", false);
            System.out.println(stat2 == null ? "该节点不存在":"该节点存在");
    
    
    
        }
    
    
    }

    更新节点

     /*
            更新数据节点内容的方法
         */
        private void updateNoteSync() throws KeeperException, InterruptedException {
    
             /*
                path:路径
                data:要修改的内容 byte[]
                version:为-1,表示对最新版本的数据进行修改
                zooKeeper.setData(path, data,version);
             */
    
    
            byte[] data = zooKeeper.getData("/g-persistent", false, null);
            System.out.println("修改前的值:" + new String(data));
    
            //修改/lg-persistent 的数据 stat: 状态信息对象
            Stat stat = zooKeeper.setData("/g-persistent", "客户端修改了节点数据".getBytes(), -1);
    
            byte[] data2 = zooKeeper.getData("/g-persistent", false, null);
            System.out.println("修改后的值:" + new String(data2));
    
        }
  • 相关阅读:
    SQL Server未找到或无法訪问server问题解决
    cmd 控制台 提示:请求的操作须要提升!
    网站反爬虫
    辨异 —— 近义词(词组)
    推理集 —— 现象与观察
    推理集 —— 现象与观察
    生活中的物理、化学(四)
    生活中的物理、化学(四)
    中医我还是持赞成的态度
    文言的训练和积累
  • 原文地址:https://www.cnblogs.com/aGboke/p/12991755.html
Copyright © 2011-2022 走看看