zoukankan      html  css  js  c++  java
  • 3. ZooKeeper客户端(一)

    ZooKeeper常用客户端有三种:原生客户端、zkClient、curator

    项目中使用前,需要导入相关依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
    </dependencies>

    原生客户端

    创建会话

    不使用监听

    public class TestCreateSession {
        /*服务地址*/
        private static final String ZK_SERVER = "127.0.0.1:2181";
        @Test
        public void createSession2() throws IOException {
            ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
            System.out.println("zk.getState() = " + zk.getState());
        }
    }

    zk.getState() = CONNECTING 

    通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况

    使用监听

    public class TestCreateSession {
        /*服务地址*/
        private static final String ZK_SERVER = "127.0.0.1:2181";
        /*倒计时器*/
        private CountDownLatch latch = new CountDownLatch(1);
        @Test
        public void createSession() throws IOException, InterruptedException {
            ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
                        latch.countDown();
                    }
                }
            });
            latch.await();
            System.out.println("zk.getState() = " + zk.getState());
        }
    }

    zk.getState() = CONNECTED

    使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作

    客户端基本操作

      1 public class TestJavaApi implements Watcher {
      2     /*zk服务地址*/
      3     private static final String ZK_SERVER = "127.0.0.1:2181";
      4     /*会话连接超时时间*/
      5     private static final int SESSION_TIMEOUT = 50000;
      6     /*指定目录【节点】*/
      7     private static final String ZK_PATH = "/zkDir";
      8     /*客户端连接会话*/
      9     private ZooKeeper zk = null;
     10 
     11     /*倒计时器*/
     12     private CountDownLatch latch = new CountDownLatch(1);
     13     /**
     14      * 事件被触发时的动作
     15      * @param event 事件
     16      */
     17     @Override
     18     public void process(WatchedEvent event) {
     19         System.out.println("收到事件通知:" + zk.getState() +"
    ");
     20         if (event.getState() == Event.KeeperState.SyncConnected){
     21             latch.countDown();
     22         }
     23     }
     24 
     25     /**
     26      * 创建zk会话连接
     27      * @param connectString     zk服务器地址列表,可以是"地址1,地址2,...."
     28      * @param sessionTimeout    Session超时时间
     29      */
     30     public void createZkSession(String connectString, int sessionTimeout){
     31         try {
     32             zk = new ZooKeeper(connectString,sessionTimeout,this);
     33             latch.await();
     34             System.out.println("zk.getState() = " + zk.getState());
     35         } catch (IOException|InterruptedException e) {
     36             System.out.println("连接创建失败");
     37             e.printStackTrace();
     38         }
     39     }
     40 
     41     /**
     42      * 关闭zk会话
     43      */
     44     public void releaseSession(){
     45         try {
     46             zk.close();
     47         } catch (InterruptedException e) {
     48             e.printStackTrace();
     49         }
     50     }
     51 
     52     /**
     53      * 创建节点【目录、文件】
     54      * @param path  节点
     55      * @param data  节点数据
     56      * @return
     57      */
     58     public boolean createNode(String path,String data){
     59         try {
     60             String node = zk.create(path/*节点path*/,
     61                     data.getBytes()/*节点数据*/,
     62                     ZooDefs.Ids.OPEN_ACL_UNSAFE/*权限控制  OPEN_ACL_UNSAFE相当于world:anyone*/,
     63                     CreateMode.EPHEMERAL)/*临时节点*/;
     64             System.out.println("节点创建成功,node = " + node);
     65             return true;
     66         } catch (KeeperException|InterruptedException e) {
     67             System.out.println("节点创建失败");
     68             e.printStackTrace();
     69         }
     70         return false;
     71     }
     72 
     73     /**
     74      * 获取节点数据
     75      * @param path  节点路径
     76      * @return
     77      */
     78     public String readNode(String path){
     79         try {
     80             byte[] data = zk.getData(path, true, null);
     81             String nodeData = new String(data,"utf-8");
     82             //System.out.println("获取"+path+"节点数据:"+nodeData);
     83             return nodeData;
     84         } catch (KeeperException | InterruptedException | UnsupportedEncodingException e) {
     85             e.printStackTrace();
     86             return null;
     87         }
     88     }
     89 
     90     /**
     91      * 修改节点数据
     92      * @param path      节点path
     93      * @param newData   节点新数据
     94      * @return
     95      */
     96     public boolean writeNode(String path,String newData){
     97         try {
     98             Stat stat = zk.setData(path, newData.getBytes(), -1);
     99             System.out.println("节点["+path+"]修改成功");
    100             return true;
    101         } catch (KeeperException|InterruptedException e) {
    102             e.printStackTrace();
    103         }
    104         return false;
    105     }
    106 
    107     /**
    108      * 删除指定节点
    109      * @param path  节点path
    110      */
    111     public void deleteNode(String path){
    112         try {
    113             zk.delete(path,-1);
    114             System.out.println("节点["+path+"]删除成功");
    115         } catch (InterruptedException|KeeperException e) {
    116             System.out.println("节点["+path+"]删除失败");
    117             e.printStackTrace();
    118         }
    119     }
    120 
    121     public static void main(String[] args) {
    122         TestJavaApi api = new TestJavaApi();
    123         api.createZkSession(ZK_SERVER,SESSION_TIMEOUT);
    124         if(api.createNode(ZK_PATH,"初始节点内容")){
    125             System.out.println("第一次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
    126             api.writeNode(ZK_PATH,"修改ZK_PATH节点数据");
    127             System.out.println("第二次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH));
    128             api.deleteNode(ZK_PATH);
    129         }
    130         api.releaseSession();
    131     }
    132 }
    133 /**
    134 ************输出结果***********
    135     收到事件通知:CONNECTED
    136 
    137     zk.getState() = CONNECTED
    138     节点创建成功,node = /zkDir
    139     第一次读/zkDir节点数据:初始节点内容
    140     收到事件通知:CONNECTED
    141 
    142     节点[/zkDir]修改成功
    143     第二次读/zkDir节点数据:修改ZK_PATH节点数据
    144     收到事件通知:CONNECTED
    145 
    146     节点[/zkDir]删除成功
    147 */
    View Code

    watch机制

      1 public class ZkWatcher implements Watcher {
      2     private static final String ZK_SERVER = "127.0.0.1:2181";
      3     private static final int SESSION_TIMEOUT = 15000;
      4     private static final String PARENT_PATH ="/testWatcher";
      5     private static final String CHILDREN_PATH = "/testWatcher/children";
      6     private ZooKeeper zk = null;
      7     /*定义原子变量,用于计算进入监听的次数*/
      8     private static AtomicInteger seq = new AtomicInteger();
      9     /*会话进入标志*/
     10     private static final String LOG_PREFIX_OF_MAIN = "【main】";
     11 
     12     /*倒计时器*/
     13     private CountDownLatch latch = new CountDownLatch(1);
     14     @Override
     15     public void process(WatchedEvent event) {
     16         System.out.println("**************进入process方法**************");
     17         System.out.println("event = " + event);
     18         /*模拟业务连接初始化工作*/
     19         TimeUtils.threadSleep(200);
     20         if (event == null) { return; }
     21         /*连接状态*/
     22         Event.KeeperState eventState = event.getState();
     23         /*事件类型*/
     24         Event.EventType eventType = event.getType();
     25         /*受影响的路径*/
     26         String eventPath = event.getPath();
     27         /*进入监听标志*/
     28         String logPreFix = "【watcher-"+seq.incrementAndGet()+"】";
     29         System.out.println(logPreFix + "收到watcher通知");
     30         System.out.println(logPreFix + "连接状态:	"+eventState.toString());
     31         System.out.println(logPreFix + "事件类型:	"+eventType.toString());
     32 
     33         if(Event.KeeperState.SyncConnected == eventState){
     34             if (Event.EventType.None == eventType){/*成功连接上ZK服务器*/
     35                 System.out.println(logPreFix + "成功连接上ZK服务器");
     36                 latch.countDown();
     37             }else if (Event.EventType.NodeCreated == eventType){/*创建节点*/
     38                 System.out.println(logPreFix + "创建节点");
     39                 TimeUtils.threadSleep(100);
     40                 /*使用监听*/
     41                 exist(eventPath,true);
     42             }else if (Event.EventType.NodeChildrenChanged == eventType){
     43                 System.out.println(logPreFix + "子节点变更");
     44                 TimeUtils.threadSleep(1000);
     45                 System.out.println(logPreFix + "子节点列表:" + getChildren(eventPath,true));
     46             }else if (Event.EventType.NodeDataChanged == eventType){
     47                 System.out.println(logPreFix + "修改节点数据");
     48                 TimeUtils.threadSleep(100);
     49                 System.out.println(logPreFix + "修改后节点内容:" + readNode(eventPath, true));
     50             }else if (Event.EventType.NodeDeleted == eventType){
     51                 System.out.println(logPreFix + "删除节点");
     52                 System.out.println(logPreFix + "节点 " + eventPath + " 被删除");
     53             }
     54         }else if(Event.KeeperState.Disconnected == eventState){
     55             System.out.println(logPreFix + "与zk服务器断开连接");
     56         }else if(Event.KeeperState.AuthFailed == eventState){
     57             System.out.println(logPreFix + "验证失败");
     58         }else if(Event.KeeperState.Expired == eventState){
     59             System.out.println(logPreFix + "会话超时");
     60         }
     61         System.out.println("----------------------------------------");
     62     }
     63     /**
     64      * 创建ZK连接
     65      * @param connectAddr ZK服务器地址列表
     66      * @param sessionTimeout Session超时时间
     67      */
     68     public void createConnection(String connectAddr, int sessionTimeout) {
     69         this.releaseConnection();
     70         try {
     71             zk = new ZooKeeper(connectAddr, sessionTimeout, this);
     72             System.out.println(LOG_PREFIX_OF_MAIN + "开始连接zk服务器");
     73             latch.await();
     74         } catch (Exception e) {
     75             e.printStackTrace();
     76         }
     77     }
     78 
     79     /**
     80      * 关闭ZK连接
     81      */
     82     public void releaseConnection() {
     83         if (this.zk != null) {
     84             try {
     85                 this.zk.close();
     86             } catch (InterruptedException e) {
     87                 e.printStackTrace();
     88             }
     89         }
     90     }
     91 
     92     /**
     93      * 创建节点
     94      * @param path 节点路径
     95      * @param data 数据内容
     96      * @return
     97      */
     98     public boolean createPath(String path, String data) {
     99         try {/*设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)*/
    100             zk.exists(path, true);
    101             System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
    102                     this.zk.create(    /*路径*/
    103                             path,/*数据*/
    104                             data.getBytes(),/*所有可见*/
    105                             ZooDefs.Ids.OPEN_ACL_UNSAFE,/*永久存储*/
    106                             CreateMode.PERSISTENT ) +
    107                     ", content: " + data);
    108         } catch (Exception e) {
    109             e.printStackTrace();
    110             return false;
    111         }
    112         return true;
    113     }
    114 
    115     /**
    116      * 删除所有节点
    117      */
    118     public void deleteAllTestPath() {
    119         if(this.exist(CHILDREN_PATH, false) != null){
    120             this.deleteNode(CHILDREN_PATH);
    121         }
    122         if(this.exist(PARENT_PATH, false) != null){
    123             this.deleteNode(PARENT_PATH);
    124         }
    125     }
    126 
    127     /**
    128      * 删除指定节点
    129      * @param path
    130      */
    131     public void deleteNode(String path) {
    132         try {
    133             zk.delete(path,-1);
    134             System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
    135         } catch (InterruptedException|KeeperException e) {
    136             e.printStackTrace();
    137         }
    138     }
    139 
    140     /**
    141      * 获取节点内容
    142      * @param path
    143      * @param needWatch
    144      * @return
    145      */
    146     public String readNode(String path, boolean needWatch) {
    147         try {
    148             byte[] data = zk.getData(path, needWatch, null);
    149             return new String(data,"utf-8");
    150         } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {
    151             e.printStackTrace();
    152             return null;
    153         }
    154     }
    155 
    156     /**
    157      * 获取指定节点的子节点列表
    158      * @param path
    159      * @param needWatch
    160      * @return
    161      */
    162     public List<String> getChildren(String path, boolean needWatch) {
    163         try {
    164             return this.zk.getChildren(path, needWatch);
    165         } catch (KeeperException|InterruptedException e) {
    166             e.printStackTrace();
    167             return null;
    168         }
    169     }
    170     /**
    171      * 更新指定节点数据内容
    172      * @param path 节点路径
    173      * @param data 数据内容
    174      * @return
    175      */
    176     public boolean writeNode(String path, String data) {
    177         try {
    178             System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
    179                     this.zk.setData(path, data.getBytes(), -1));
    180         } catch (Exception e) {
    181             e.printStackTrace();
    182         }
    183         return false;
    184     }
    185     /**
    186      * path节点是否存在
    187      * @param path
    188      * @param needWatch
    189      * @return
    190      */
    191     public Stat exist(String path, boolean needWatch) {
    192         try {
    193             return zk.exists(path,needWatch);
    194         } catch (KeeperException|InterruptedException e) {
    195             e.printStackTrace();
    196             return null;
    197         }
    198     }
    199 
    200     public static void main(String[] args) throws Exception {
    201         //建立watcher
    202         ZkWatcher watcher = new ZkWatcher();
    203         //创建连接
    204         watcher.createConnection(ZK_SERVER, SESSION_TIMEOUT);
    205         //System.out.println(zkWatch.zk.toString());
    206         Thread.sleep(1000);
    207         // 清理节点
    208         watcher.deleteAllTestPath();
    209         if (watcher.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
    210             System.out.println("---------------------- read parent ----------------------------");
    211             /*
    212             读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。
    213             watch是一次性的,也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。
    214             */
    215             watcher.readNode(PARENT_PATH, true);
    216             watcher.writeNode(PARENT_PATH, System.currentTimeMillis() + "");
    217             System.out.println("---------------------- read children path ----------------------------");
    218             /*
    219             读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,
    220             而不会输出NodeChildrenChanged,也就是说创建子节点时没有watch。
    221             如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在
    222             创建c1时watch,输出c1的NodeChildrenChanged,而不会输出创建c2时的NodeChildrenChanged,
    223             如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
    224             其中path="/p/c1"
    225              */
    226             watcher.getChildren(PARENT_PATH, true);
    227             Thread.sleep(1000);
    228             // 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)
    229             watcher.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
    230             Thread.sleep(1000);
    231             watcher.readNode(CHILDREN_PATH, true);
    232             watcher.writeNode(CHILDREN_PATH, System.currentTimeMillis() + "");
    233         }
    234         Thread.sleep(20000);
    235         // 清理节点
    236         watcher.deleteAllTestPath();
    237         Thread.sleep(1000);
    238         watcher.releaseConnection();
    239     }
    240 }
    241 
    242 class TimeUtils{
    243     public static void threadSleep(long mills){
    244         try {
    245             Thread.sleep(mills);
    246         } catch (InterruptedException e) {
    247             e.printStackTrace();
    248         }
    249     }
    250 }
    251 
    252 /*
    253 *********输出结果********
    254 【main】开始连接zk服务器
    255 **************进入process方法**************
    256 event = WatchedEvent state:SyncConnected type:None path:null
    257 【watcher-1】收到watcher通知
    258 【watcher-1】连接状态:    SyncConnected
    259 【watcher-1】事件类型:    None
    260 【watcher-1】成功连接上ZK服务器
    261 ----------------------------------------
    262 **************进入process方法**************
    263 event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher
    264 【main】节点创建成功, Path: /testWatcher, content: 1567510219582
    265 ---------------------- read parent ----------------------------
    266 【main】更新数据成功,path:/testWatcher, stat: 223,224,1567510219588,1567510219598,1,0,0,0,13,0,223
    267 
    268 ---------------------- read children path ----------------------------
    269 【watcher-2】收到watcher通知
    270 【watcher-2】连接状态:    SyncConnected
    271 【watcher-2】事件类型:    NodeCreated
    272 【watcher-2】创建节点
    273 ----------------------------------------
    274 **************进入process方法**************
    275 event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher
    276 【watcher-3】收到watcher通知
    277 【watcher-3】连接状态:    SyncConnected
    278 【watcher-3】事件类型:    NodeDataChanged
    279 【watcher-3】修改节点数据
    280 【watcher-3】修改后节点内容:1567510219598
    281 ----------------------------------------
    282 **************进入process方法**************
    283 event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher/children
    284 【main】节点创建成功, Path: /testWatcher/children, content: 1567510220605
    285 【watcher-4】收到watcher通知
    286 【watcher-4】连接状态:    SyncConnected
    287 【watcher-4】事件类型:    NodeCreated
    288 【watcher-4】创建节点
    289 ----------------------------------------
    290 **************进入process方法**************
    291 event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
    292 【watcher-5】收到watcher通知
    293 【watcher-5】连接状态:    SyncConnected
    294 【watcher-5】事件类型:    NodeChildrenChanged
    295 【watcher-5】子节点变更
    296 【main】更新数据成功,path:/testWatcher/children, stat: 225,226,1567510220606,1567510221615,1,0,0,0,13,0,225
    297 
    298 【watcher-5】子节点列表:[children]
    299 ----------------------------------------
    300 **************进入process方法**************
    301 event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher/children
    302 【watcher-6】收到watcher通知
    303 【watcher-6】连接状态:    SyncConnected
    304 【watcher-6】事件类型:    NodeDataChanged
    305 【watcher-6】修改节点数据
    306 【watcher-6】修改后节点内容:1567510221615
    307 ----------------------------------------
    308 **************进入process方法**************
    309 event = WatchedEvent state:SyncConnected type:NodeDeleted path:/testWatcher/children
    310 【main】删除节点成功,path:/testWatcher/children
    311 【main】删除节点成功,path:/testWatcher
    312 【watcher-7】收到watcher通知
    313 【watcher-7】连接状态:    SyncConnected
    314 【watcher-7】事件类型:    NodeDeleted
    315 【watcher-7】删除节点
    316 【watcher-7】节点 /testWatcher/children 被删除
    317 ----------------------------------------
    318 **************进入process方法**************
    319 event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
    320 【watcher-8】收到watcher通知
    321 【watcher-8】连接状态:    SyncConnected
    322 【watcher-8】事件类型:    NodeChildrenChanged
    323 【watcher-8】子节点变更
    324 
    325 */
    View Code

    ZooKeeper认证机制

  • 相关阅读:
    NSSelectorFromString 使用示例
    NSClassFromString 实例话静态库中的类
    iOS Simulator hang up ( Xcode4.6.3)
    RabbitMQ 相关概念和方法详解
    Python pika, TypeError: exchange_declare() got an unexpected keyword argument 'type' 问题修复
    巧用 git rebase 将某一部分 commit 复制到另一个分支
    巧用 git rebase 合并多个 commit。
    分享常用的GoLang包工具
    Laradock使用教程(新手版)
    PHP中抽象类与接口的区别
  • 原文地址:https://www.cnblogs.com/qf123/p/11454686.html
Copyright © 2011-2022 走看看