zoukankan      html  css  js  c++  java
  • 05_zookeeper_原生API使用1(更新)

    1. java方式操作远端zookeeper集群概述

    步骤:下载zookeeper压缩包并解压, 创建java工程,导入zookeeper相关jar包

    (1)下载zookeeper压缩包

    http://archive.apache.org/dist/zookeeper/, 下载tar.gz源码包,  并进行解压

    (2)创建Java工程

    Eclipse ->File->New->Java Project,  输入工程名称,一路Next结束

    (3)导入zookeeper相关jar包

    * 选中新建的工程并右键,选择 “Build Path”-> “Configure Build Path”

    * 切换到”Libraries”,  选择”Add  External Jars”

    * 先添加zookeeper-3.4.5.jar

    * 然后添加zookeeper所依赖的jar包 (lib目录下的所有jar)

    * 导入jar包结束

    2. 通过Java API连接zookeeper

    (1)   创建测试类CreateSession

    (2)  连接zk集群

     1 import org.apache.zookeeper.WatchedEvent;
     2 import org.apache.zookeeper.Watcher;
     3 import org.apache.zookeeper.ZooKeeper;
     4 import java.io.IOException;
     5 
     6 public class CreateSession { //建立和zookeeper集群的连接,并自动周期性发送心跳维护连接
     7     private static ZooKeeper zk;
     8 
     9     public static void main(String args[]) throws IOException, InterruptedException {
    10         //zk will connect to zk server, asynchronized
    11         zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher()); //myWatcher中覆盖process方法,定义Client端对Server发来的哪些event进行处理以及如何处理
    12         Thread.sleep(Integer.MAX_VALUE);
    16       }
    17     }
    18 
    19 class myWatcher implements Watcher{
    20     @Override
    21     public void process(WatchedEvent event) {
    22         //handle connected event
    23         if ( event.getState()== Event.KeeperState.SyncConnected ) {  //连接建立事件的处理
    24             System.out.println("Event: " +  event);
    25             System.out.println("=======Client Connected to zookeeper======");
    26         }
    28     }
    29 }

     核心API分析:   zk = new ZooKeeper("192.168.179.101:2181", 5000, new myWatcher());

      1)Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过Zookeeper类的实例进行,这个实例就是zk client, 和命令行客户端是同样的角色

      2)ZooKeeper实例的创建需要传递3个参数

      参数1:connectString

      *String类型变量,代表要连接的zk集群服务器,通过逗号分隔,"192.168.179.100:2181,192.168.179.101:2181,192.168.179.102:2181"

      *ZooKeeper实例连接zk集群服务器时,将在给定的服务器中随机选择,并不存在特定的顺序

      参数2:sessionTimeout

     *int型变量,表示Zookeeper实例和zkserver间的超时时间,单位为毫秒

     *连接正常连接后,ZooKeeper实例将自动和zkserver间通过心跳信息来维持连接,不需要我们介入

     参数3:watcher

     *Watcher类实例,通常需要我们自己定义一个类,实现框架提供的Watcher接口中的process方法

     *process方法,本质是一个回调函数,先解释什么是回调函数

     *回调函数理解:打个比方,有一家旅馆提供叫醒服务,但是要求旅客自己决定叫醒的方法。可以是打客房电话,也可以是派服务员去敲门,睡得死怕耽误事的,还可以要求往自己头上浇盆水。“叫醒”这个行为是旅馆提供的,但是叫醒的方式是由旅客决定并告诉旅馆的,也就是回调函数。而旅客告诉旅馆怎么叫醒自己的动作,通常在登记入住的时候完成,称为登记回调函数(to register a callback function)

    *再看new myWatcher和process函数:当创建1个ZooKeeper实例时,我们传入了1个myWatcher实例,myWatcher类的内部实现了process方法。本质上就是:我们在 “登记入住”(创建ZooKeeper实例)时,在zk集群这家旅馆 “登记” 1个“通知” 服务(process方法),  并且告诉旅馆,在出现某些特定事情的时候才进行通知,并且我带了一个小弟(myWatcher实例),通知给他就行,他收到通知后会进行相应的处理(myWatcher实例调用process方法)

    ZooKeeper实例创建中的联动操作

    ZooKeeper实例在创建的过程中,会随机挑选1个zkserver创建连接,但这个动作是异步的

    也就是说new ZooKeeper()这个函数,并不是在和zkserver建立好连接后,才结束函数;大多数情况下,函数返回后,和zk集群的连接并没有建立完成

    这也是Thread.sleep(Integer.MAX_VALUE)出现的原因:new完了,就让当前这个运行的线程休息,一直等待;当连接真正建立的时候,这个session的连接状态会变化为SyncConnected;

    zkserver此时会向对应的Client发送1个连接变化事件, 事件的处理则自动由myWatcher实例这个小弟去调用process方法来搞定

    3. 通过Java API创建节点(同步方式)

    (1)   创建znode节点

     1 import org.apache.zookeeper.*;
     2 import java.io.IOException;
     3 
     4 public class CreateNode implements Watcher {
     5     private static ZooKeeper zk;
     6 
     7     public static void main(String args[]) throws IOException, InterruptedException {
     8         //zk will connect to zk server, asynchronized
     9         zk = new ZooKeeper("192.168.179.101:2181", 5000, new CreateNode());
    10         Thread.sleep(Integer.MAX_VALUE);
    12     }
    15 @Override 16 public void process(WatchedEvent event) {//Client端处理连接建立事件,处理动作为添加1个永久节点 17 // create persistent node if connected 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //创建znode节点 20 try { 21 createNodeSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //create node, synchronized 32 private void createNodeSync() throws KeeperException, InterruptedException { 33 System.out.println("Create node with Sync mode"); 34 String path = zk.create("/node_by_java", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 35 System.out.println("New Node added: " + path); 36 } 38 }

    运行该java类,通过终端查看结果:

    在zookeeper集群上,通过zkCli.sh客户端连接zookeeper集群,验证节点是否已经添加

    核心API分析:  path = zk.create("/node_by_java",   "123".getBytes(),  ZooDefs.Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT );

    public String create(String path,
                         byte[] data,
                         List<ACL> acl,
                         CreateMode createMode)
                  throws KeeperException,
                         InterruptedException

    参数1 path

    String类型,表示要在zk集群上创建的znode的绝对路径

    参数2 data

    byte数组类型,表示要创建的znode中写入的数据,Java字符串提供getBytes()方法,可以直接将字符串转化为1个byte数组

    参数3 acl:

    List<ACL>类型,List可以理解为升级版的数组,并且数组中的元素为ACL类型变量,本质上这里指定的是要创建的znode的访问权限

    OPEN_ACL_UNSAFE  = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));  本质上是创建了1个允许任何人进行操作的权限

    参数4 createMode:

    指定要创建的znode类型

    CreateMode.PERSISTENT  永久节点

    CreateMode.EPHEMERAL   临时节点

    CreateMode.PERSISTENT_SEQUENTIAL  永久顺序节点

    CreateMode.EPHEMERAL_SEQUENTIAL   临时顺序节点

    返回值:path

    String类型,被创建的znode节点的实际路径

    需要注意的是,这里的create方式是同步方式,也就意味着:当znode创建完成或者创建中出现异常时,函数才会返回

    4. 通过Java API查询子节点列表1(同步方式)

     1 import org.apache.zookeeper.*;
     2 import java.io.IOException;
     3 import java.util.List;
     4 
     5 public class GetChildrenSync implements Watcher {
     6     private static ZooKeeper zk;
     7 
     8     public static void main(String args[]) throws IOException, InterruptedException {
     9         //zk will connect to zk server, asynchronized
    zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync()); //类实例,该类要实现Watcher类的process函数,定义Client处理哪些zkserver发来的事件event 11 Thread.sleep(Integer.MAX_VALUE); 12 13 } 14 15 @Override 16 public void process(WatchedEvent event) { //框架定义的接口,我们要实现Client处理哪些event,如何处理这些event 17 // 只在连接建立后,查询/的子节点列表 18 if (event.getState() == Event.KeeperState.SyncConnected) { 19 //查询子节点列表 20 try { 21 getChildrenSync(); 22 } catch (KeeperException e) { 23 e.printStackTrace(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 } 28 29 } 30 31 //get children , synchronized 32 private void getChildrenSync() throws KeeperException, InterruptedException { 33 System.out.println("Get Children in sync mode"); 34 //false, 不关注子节点列表的变更事件(不注册watcher) 35 List<String> children = zk.getChildren("/", false); 36 System.out.println("Children list of / :" + children); 37 } 38 39 }

     运行java类,通过终端查看结果:

    核心API分析:   List<String> children = zk.getChildren("/", false);

    public List<String> getChildren(String path,
                                    boolean watch)
                             throws KeeperException,
                                    InterruptedException

    参数1:path
    String类型,指明要查询哪个znode的子节点列表
    参数2:watch
    boolean类型,false表示只是查询,并不需要zkserver在检测到子节点列表发生变化时,进行事件通知(不关注子节点发生变化的Event)
    返回值:List<String>
    返回子节点列表,每个子节点通过字符串表示,构成一个“数组”

    5. 通过Java API查询子节点列表 (同步 + 设置子节点列表变更的watcher)

    import org.apache.zookeeper.*;
    import java.io.IOException;
    import java.util.List;
    
    public class GetChildrenSync implements Watcher {
        private static ZooKeeper zk;
        private String path;
    
        public static void main(String args[]) throws IOException, InterruptedException {
            //zk will connect to zk server, asynchronized
            zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetChildrenSync());
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    
        @Override
        public void process(WatchedEvent event) {
            // “子节点列表发生变化” event的处理
            if(event.getType() == Event.EventType.NodeChildrenChanged) {
                //再次获取子节点列表
                try {
                    List<String> new_children = zk.getChildren(event.getPath(), true); //event.getPath()返回 哪个znode的子节点列表发生了变化
                    System.out.println(new_children);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            else {
                // 连接建立事件的处理
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    //查询子节点列表
                    try {
                        getChildrenSync(); //设置关注子节点列表
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        //get children , synchronized
        private void getChildrenSync() throws KeeperException, InterruptedException {
            System.out.println("Get Children in sync mode");
            //true, 关注子节点列表的变更(注册子节点变更watcher)
            List<String> children = zk.getChildren("/", true); //关注“/”子节点列表变化event
            System.out.println("Children list of / :" + children);
        }
    
    }

    分析:

    代码运行时,会首先建立和zkserver的连接,第一次会查询/的子节点列表,并设置了/路径子节点变更的watcher;

    通过命令行方式在/添加1个节点,此时zkserver会向java cliet发送子节点发生变化的事件,此时process函数被再次触发,并且执行再次获取子节点列表的操作

    要注意process中event的设计顺序,想想看如果检测Event的state是否为SyncConnected是process中首先出现的检测代码,会出现什么情况?(先挖坑)

    开始填坑:

    zookeeper框架中的WatchedEvent类型的event,  携带了3类信息

    1)当前连接的状态,   event.getState() 可以获取:已连接,已断开等

    2)事件类型 event.getType() 可以获取: 子节点列表发生变化,节点数据内容发生变化

    3)事件关联的znode节点 event.getPath()   可以获取该znode的绝对路径: 子节点列表发生变化,则关联的znode就是父节点

    当client和zk集群刚刚建立连接时,zk会向client发送1个连接建立事件,此时事件的连接状态为connected, 事件类型为EventType.None,  事件关联的节点为空(event.getPath==null)

    当子节点列表变化的事件发生时,该事件的连接状态也为connected, 事件类型为EventType.NodeChildrenChanged, 事件关联的节点为/

    如果将 event.getState() == Event.KeeperState.SyncConnected放在process函数的开始,则只会执行连接建立时的逻辑,并不会执行子节点变更的处理逻辑

    严格来说,连接刚刚建立时的逻辑处理应该进行修改,添加event.getType和event.getPath()来更加精确的描述 “连接刚刚建立”

            else {
                // 连接刚刚建立事件的处理
                if (event.getState() == Event.KeeperState.SyncConnected) {
              if(event.getType()==Event.EventType.None && event.getPath()==null){

    //查询子节点列表
                          try {
                                getChildrenSync(); //设置关注子节点列表
                          } catch (KeeperException e) {
                                 e.printStackTrace();
                          } catch (InterruptedException e) {
                                 e.printStackTrace();
                          }
                     }//
    }

     Event的状态,类型总结

    * 在 Watcher 接口里面,除了回调函数 process 以外,还包含 Event.KeeperState 和 Event.EventType 两个枚举类,分别代表了通知状态和事件类型

    6. 查询节点数据(同步方式)

     1 import org.apache.zookeeper.KeeperException;
     2 import org.apache.zookeeper.WatchedEvent;
     3 import org.apache.zookeeper.Watcher;
     4 import org.apache.zookeeper.ZooKeeper;
     5 import org.apache.zookeeper.data.Stat;
     6 
     7 import java.io.IOException;
     8 import java.util.List;
     9 
    10 public class GetDataSync implements Watcher {
    11     private static ZooKeeper zk;
    12     private String path;
    13 
    14     public static void main(String args[]) throws IOException, InterruptedException {
    15         //zk will connect to zk server, asynchronized
    16         zk = new ZooKeeper("192.168.179.101:2181", 5000, new GetDataSync());
    17         Thread.sleep(Integer.MAX_VALUE);
    18 
    19     }
    20 
    21     @Override
    22     public void process(WatchedEvent event) {
    23         // 连接建立后,获取给定节点数据
    24         if (event.getState() == Event.KeeperState.SyncConnected) {
    25             // 连接刚刚建立
    26             if (event.getType() == Event.EventType.None && event.getPath() == null) {//连接建立后,查询给定路径的znode的数据
    27                 //查询给定路径的znode数据
    28                 try {
    29                     getNodeData("/node_by_java");
    30                 } catch (KeeperException e) {
    31                     e.printStackTrace();
    32                 } catch (InterruptedException e) {
    33                     e.printStackTrace();
    34                 }
    35 
    36             }
    37             else if(event.getType()== Event.EventType.NodeDataChanged){
    38                 //节点数据发生变化事件
    39                 //获取节点的新数据,并再次关注节点数据发生变化的事件
    40                 Stat stat = new Stat();
    41                 byte[] data = new byte[0];
    42                 try {
    43                     data = zk.getData(event.getPath(), true, stat);
    44                 } catch (KeeperException e) {
    45                     e.printStackTrace();
    46                 } catch (InterruptedException e) {
    47                     e.printStackTrace();
    48                 }
    49                 System.out.println("Updated data is: " + new String(data));
    50             }
    51         }
    52 
    53     }//process
    54 
    55 
    56     //get node data
    57     private void getNodeData(String path) throws KeeperException, InterruptedException {
    58         System.out.println("Get Node data in sync mode");
    60         Stat stat = new Stat();    //创建1个空状态
    61         byte[] data = zk.getData(path, false, stat);    //stat会被更新为节点的最新状态, false表示不关注节点数据发生变更的事件
    //byte[] data = zk.getData(path, true, stat); //true表示关注节点数据发生变更的事件,注意一次性
    62 String data2string = new String(data); 63 System.out.println("Data of " + path + "is: " + data2string); 64 } 65 66 }

     首先不关注节点数据发生变更,看能够正常获取到znode数据

     然后修改为关注节点数据发生变更,通过命令行方式修改数据,查看是否再次获取到更新后的节点数据

     7. 删除节点(同步方式)

    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.List;
    
    public class DeleteNodeSync implements Watcher {
        private static ZooKeeper zk;
        private String path;
        private int version;
    
        public static void main(String args[]) throws IOException, InterruptedException {
            //zk will connect to zk server, asynchronized
            zk = new ZooKeeper("192.168.179.101:2181", 5000, new DeleteNodeSync());
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    
        @Override
        public void process(WatchedEvent event) {
            // 连接建立后,删除给定路径的znode
            if (event.getState() == Event.KeeperState.SyncConnected) {
                // 连接刚刚建立
                if (event.getType() == Event.EventType.None && event.getPath() == null) {
                    //查询子节点列表
                    try {
                        delNode("/node_by_java");
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            }
    
        }//process
    
    
        //get node data
        private void delNode(String path) throws KeeperException, InterruptedException {
            System.out.println("Delete Node in sync mode");
            //删除给定路径的znode
            zk.delete(path, -1);  //删除指定路径, 指定dataversion的znode, 如果version指定-1,则删除节点时不进行dataversion校验
            System.out.println("Node deleted: "+ path);
            //删除后再次查询/子节点列表
            List<String> children = zk.getChildren("/", false);
            System.out.println("Children list of / is" + children);
        }
    
    }

     

  • 相关阅读:
    超过经理收入的员工
    搜索插入位置
    整数反转
    俩数之和
    tar 命令参数解释
    【记录】Transaction rolled back because it has been marked as rollback-only
    【转载】BIO、NIO、AIO
    【转载】JDK自带的log工具
    【转载】java8中的Calendar日期对象(LocalDateTime)
    【对象属性复制】BeanUtils.copyProperties(obj1, obj2);
  • 原文地址:https://www.cnblogs.com/shay-zhangjin/p/7764630.html
Copyright © 2011-2022 走看看