zoukankan      html  css  js  c++  java
  • zookeeper02

    1 zookeeper的事件监听机制

    1.1 watcher概念

    • zookeeper提供了数据的发布/订阅功能,多个订阅者可以同时监听某一个特定主题对象,当该主题对象的自身状态发生改变的时候,例如:节点内容改变、节点下的子节点列表发生改变等等,会实时、主动通知所有订阅者。
    • zookeeper采用了watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生改变的时候会异步通知客户端,因此客户端不必在watcher注册后轮询阻塞,从而减轻了客户端的压力。
    • watcher机制实际上和观察者模式类似,也可以看作是一种观察者模式在分布式场景下的实现方式。

    1.2 watcher架构

    • watcher实现由三部分组成:
      • 1️⃣zookeeper客户端。
      • 2️⃣zookeeper服务端。
      • 3️⃣客户端的ZKWatchManger对象。
    • 客户单首先将watcher注册到服务器端,同时将watcher对象保存到客户端的watch管理器中。当zookeeper服务器端监听的数据状态发生改变的时候,服务器端会主动通知客户端,接着客户端的watch管理器会触发相关的watcher来回调相应的处理逻辑,从而完成整体的数据发布/订阅流程。

    watcher架构

    1.3 watcher特性

    特性 说明
    一次性 watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
    客户端顺序回调 watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响到别的watcher的执行。
    轻量级 WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉 数据节点变化前后的具体内容。
    时效性 watcher只有在当前session彻底失效时才会生效,如果在session有效期内快速重连成功,则watcher依然存在,依然可以接受到通知。

    1.4 watcher接口设计

    • Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举:KeeperState和EventType。

    watcher接口设计

    1.4.1 Watcher通知状态(KeeperState)

    • KeeperState是客户端和服务端连接状态发生变化时对应的通知类型。路径为:org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性如下:
    枚举属性 说明
    SyncConnected 客户端和服务器端正常连接时
    Disconnected 客户端和服务器端断开连接时
    Expired 会话session失效时
    AuthFailed 身份认证失败时

    1.4.2 Watcher事件类型(EventType)

    • EventType是数据节点(Znode)发生变化时对应的通知类型。EventType变化时KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举类属性如下:
    枚举属性 说明
    None
    NodeCreated Watcher监听的数据节点被创建时
    NodeDeleted Watcher监听的数据节点被删除时
    NodeDataChanged Watcher监听的数据节点内容发生变更时(无论内容数据是否变化)
    NodeChildrenChanged Watcher监听的数据节点的子节点列表发生变更时

    注意:客户端接收到的相关事件通知中只包含状态、类型等信息,不包含节点变化前后的具体内容,变化前的数据需要业务自身存储,变化后的数据需要调用get等方法重新获取。

    1.5 捕获相应的事件

    • 在zookeeper中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)这样的方式为某个znode注册监听。
    • 下面以node-x节点为例,说明调用的注册方式和可监听事件间的关系。
    注册方式 Created ChildrenChanged DataChanged Deleted
    zk.exists(node-x,watch) 可以监控 可以监控 可以监控
    zk.getData(node-x,watcher,stat) 可以监控 可以监控
    zk.getChildren(path,watch) 可以监控 可以监控

    1.6 注册watcher的方法

    1.6.1 客户端和服务器端的连接状态

    事件类型为:None。

    KeeperState:通知状态。

    • SyncConnected:客户端和服务器端正常连接时。
    • Disconnected:客户端和服务器端断开连接时。
    • Expired: 会话session失效时。
    • AuthFailed:身份认证失败时。
    • 示例:
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @Test
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    //事件类型
                    if (event.getType().equals(Event.EventType.None)) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            System.out.println("连接创建成功");
                            countDownLatch.countDown();
                        }else if(event.getState() == Event.KeeperState.Disconnected){
                            System.out.println("断开连接");
                        }else if(event.getState() == Event.KeeperState.Expired){
                            System.out.println("会话超时");
                            //一般而言,zookeeper会在我们设置的会话超时时间内重新连接服务器端了,一般超过了我们自己设置的会话超时时间,那么zookeeper就不会再重新连接服务器端了。
                            //通常而言,我们一般在会话超时的时候,让其重新连接zookeeper服务器
                            //代码略
                        }else if(event.getState() == Event.KeeperState.AuthFailed){
                            System.out.println("身份认证失败");
                        }
                    }
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
    
            Thread.sleep(50000);
    
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
    
            zooKeeper.close();
    
        }
    }
    

    1.6.2 检查节点是否存在

    //watch为true时,使用连接对象的监视器 NodeCreated、NodeDeleted、NodeDataChanged 
    public Stat exists(String path, boolean watch){}
    
    //自定义监视器  NodeCreated、NodeDeleted、NodeDataChanged  
    public Stat exists(final String path, Watcher watcher){}
    
    • 示例:使用连接对象的监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests2 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //使用连接对象的watcher
            boolean watch = true;
            zooKeeper.exists(path, watch);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    
    • 示例:自定义监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests2 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //自定义watcher对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("自定义watcher对象");
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
    
                }
            };
            zooKeeper.exists(path, watcher);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    
    • 示例:修改watcher机制的一次性,实现watcher机制的一直监听
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests2 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //自定义watcher对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("自定义watcher对象");
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                    try {
                        zooKeeper.exists(path,this);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            zooKeeper.exists(path, watcher);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    
    • 示例:注册多个监听器对象
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests2 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        /**
         * 注册多个监听器对象
         *
         * @throws KeeperException
         * @throws InterruptedException
         */
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //自定义watcher对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("自定义watcher对象");
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                    try {
                        zooKeeper.exists(path, this);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            };
            //注册第一个监听器对象
            zooKeeper.exists(path, watcher);
            //注册第二个监听器对象
            zooKeeper.exists(path, watcher);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    

    1.6.3 查看节点

    //使用连接对象的监视器  NodeDeleted NodeDataChanged
    public byte[] getData(String path, boolean watch, Stat stat)
                throws KeeperException, InterruptedException {}
    
    //自定义监视器
    public byte[] getData(final String path, Watcher watcher, Stat stat){}
    
    • 示例:使用连接对象的监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests3 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //使用连接对象中的watcher
            boolean watch = true;
            byte[] data = zooKeeper.getData(path, watch, null);
            System.out.println("data = " + new String(data));
            Thread.sleep(50000);
            System.out.println("结束");
    
        }
    }
    
    • 示例:自定义监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests3 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            //节点的路径
            String path = "/watcher";
            //自定义监视器
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper.getData(path, watcher, null);
            Thread.sleep(50000);
            System.out.println("结束");
    
        }
    }
    

    1.6.4 查看子节点

    //使用连接对象的监视器 NodeDeleted NodeChildrenChanged
    public List<String> getChildren(String path, boolean watch)
        throws KeeperException, InterruptedException {}
    
    public List<String> getChildren(final String path, Watcher watcher)
        throws KeeperException, InterruptedException{}
    
    • 示例:使用连接对象的监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests4 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            String path = "/hadoop";
            boolean watch = true;
            List<String> children = zooKeeper.getChildren(path, watch);
            System.out.println("children = " + children);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    
    • 示例:自定义监视器
    package com.sunxiaping.zookeeper02;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperApplicationTests4 {
    
        ZooKeeper zooKeeper;
    
        @AfterEach
        public void after() throws InterruptedException {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }
    
    
        /**
         * 连接zookeeper
         *
         * @throws IOException
         * @throws InterruptedException
         */
        @BeforeEach
        public void testZookeeperConnection() throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口
            String connectString = "192.168.179.100:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("连接对象的参数");
                    //事件类型
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            String path = "/hadoop";
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    String path = event.getPath();
                    System.out.println("path = " + path);
                    Event.EventType type = event.getType();
                    System.out.println("type = " + type);
                }
            };
            List<String> children = zooKeeper.getChildren(path, watcher);
            System.out.println("children = " + children);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }
    

    1.7 配置中心案例

    • 工作中有这样的一个场景:数据库用户名和密码等信息放在一个配置文件中,应用读取配置文件,配置文件信息放入缓存。

    • 如果数据库的用户名和密码等改变的时候,还需要重新加载缓存,比较麻烦,通过zookeeper可以轻松完成,当数据发生变化的时候自动完成缓存同步。

    • 设计思路:

      • 1️⃣连接zookeeper服务器。
      • 2️⃣读取zookeeper中的配置信息,注册watcher监视器,存入本地变量。
      • 3️⃣当zookeeper中的配置信息发生变化的时候,通过watcher的回调方法捕获数据变化事件。
      • 4️⃣重新获取配置信息。
    • 示例:

    • 1️⃣在zookeeper中创建数据库所需要的配置信息:

    create /config "config"
    create /config/url "192.168.179.100:3306"
    create /config/username "root"
    create /config/password "123456"
    

    在zookeeper中创建数据库所需要的配置信息

    • 2️⃣使用watcher机制实现当zookeeper服务器中的配置信息发生改变时,客户端重新获取配置信息:
    package com.sunxiaping.zookeeper03;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 配置中心
     */
    public class MyConfigCenter implements Watcher {
        //zookeeper服务器的ip和端口
        private String connectString = "192.168.179.100:2181";
        //计数器对象
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        //连接对象
        private ZooKeeper zooKeeper;
        //会话超时时间
        private int timeout = 60000;
    
        //用于本地化存储配置信息
        private String url;
        private String username;
        private String password;
    
        //构造方法
        public MyConfigCenter() {
            initValue();
        }
    
    
        @Override
        public void process(WatchedEvent event) {
            try {
                //捕获事件
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接成功");
                        countDownLatch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("连接断开");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
                        zooKeeper = new ZooKeeper(connectString, timeout, this);
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("验证失败");
                    }
                } else if (event.getType() == Event.EventType.NodeDataChanged) { //当节点的数据发生变化的时候
                    initValue();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        //连接zookeeper服务器,读取配置信息
        public void initValue() {
            try {
                //创建连接对象
                zooKeeper = new ZooKeeper(connectString, timeout, this);
                //阻塞线程,等待连接的创建成功
                countDownLatch.await();
                //读取配置信息
                this.url = new String(zooKeeper.getData("/config/url", true, null));
                this.username = new String(zooKeeper.getData("/config/username", true, null));
                this.password = new String(zooKeeper.getData("/config/password", true, null));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        //模拟客户端
        public static void main(String[] args) {
            try {
                MyConfigCenter configCenter = new MyConfigCenter();
    
                //每5秒去服务器查询一次
                for (int i = 0; i < 30; i++) {
                    Thread.sleep(5000);
                    System.out.println("url:" + configCenter.getUrl());
                    System.out.println("username:" + configCenter.getUsername());
                    System.out.println("password:" + configCenter.getPassword());
                    System.out.println("+++++++++++++++++++++++++++++++++++++++");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    

    1.8 生成分布式唯一ID

    • 在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是当分库分表后,就无法再依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。

    • 设计思路:

      • 1️⃣连接zookeeper服务器。
      • 2️⃣指定路径生成临时有序节点。
      • 3️⃣取序列号,即为分布式环境下的唯一ID。
    • 示例:

    package com.sunxiaping.zookeeper03;
    
    import org.apache.zookeeper.*;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 全局唯一ID
     */
    public class GloballyUniqueID implements Watcher {
    
        //zookeeper服务器的ip和端口
        private String connectString = "192.168.179.100:2181";
        //计数器对象
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        //连接对象
        private ZooKeeper zooKeeper;
        //会话超时时间
        private int timeout = 60000;
        //用户生成序号的节点
        String defaultPath = "/uniqueId";
    
        @Override
        public void process(WatchedEvent event) {
            try {
                //捕获事件
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接成功");
                        countDownLatch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("连接断开");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
    
                        zooKeeper = new ZooKeeper(connectString, timeout, this);
    
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("验证失败");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public GloballyUniqueID() {
            try {
                zooKeeper = new ZooKeeper(connectString, timeout, this);
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 生成唯一ID
         *
         * @return
         */
        public String generateUniqueId() {
    
            String path = "";
    
            try {
                //创建临时有序节点
                path = zooKeeper.create(defaultPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                return path.substring(defaultPath.length() + 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            return null;
        }
    
    
        public static void main(String[] args) {
            GloballyUniqueID globallyUniqueID = new GloballyUniqueID();
            for (int i = 0; i < 100; i++) {
                String uniqueId = globallyUniqueID.generateUniqueId();
                System.out.println("uniqueId = " + uniqueId);
            }
        }
    
    }
    

    1.9 分布式锁

    • 分布式锁有多种实现方式,比如通过数据库、redis都可以实现。作为分布式协同工具Zookeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如下实现排他锁。

    • 设计思路:

      • 1️⃣每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001。
      • 2️⃣客户端获取/Locks下的子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,则说明取锁成功。
      • 3️⃣如果自己的锁节点不是第一位,则监听自己前一位的锁节点。例如,自己的锁节点/Locks/Lock_000000002,那么则监听/Locks/Lock_000000001。
      • 4️⃣当前一位锁节点(/Locks/Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(/Locks/Lock_000000002)的逻辑。
      • 5️⃣监听客户端重新执行第2️⃣步的逻辑,判断自己是否获取了锁。
    • 示例:

    package com.sunxiaping.zookeeper03;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class LockDemo {
        //zookeeper服务器的ip和端口
        private String connectString = "192.168.179.100:2181";
        //计数器对象
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        //连接对象
        private ZooKeeper zooKeeper;
        //会话超时时间
        private int timeout = 60000;
    
        private static final String LOCK_ROOT_PATH = "/Locks";
        private static final String LOCK_NODE_PATH = "/Lock_";
    
        private String lockPath;
    
        public LockDemo() {
            try {
                zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        try {
                            //捕获事件
                            if (event.getType() == Event.EventType.None) {
                                if (event.getState() == Event.KeeperState.SyncConnected) {
                                    System.out.println("连接成功");
                                    countDownLatch.countDown();
                                } else if (event.getState() == Event.KeeperState.Disconnected) {
                                    System.out.println("连接断开");
                                } else if (event.getState() == Event.KeeperState.Expired) {
                                    System.out.println("会话超时");
    
                                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                                    System.out.println("验证失败");
                                }
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //获取锁
        public void acquireLock() throws KeeperException, InterruptedException {
            //创建锁
            createLock();
            //尝试获取锁
            attemptLock();
        }
    
    
        //创建锁
        public void createLock() throws KeeperException, InterruptedException {
            //判断/Locks是否存在
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            //创建临时有序节点
            lockPath = zooKeeper.create(LOCK_ROOT_PATH + LOCK_NODE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("lockPath = " + lockPath);
        }
    
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                //监视上一个节点是否被删除
                if (event.getType() == Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            }
        };
    
        //尝试获取锁
        public void attemptLock() throws KeeperException, InterruptedException {
            //获取/Locks下的子节点  /Lock_xxxx
            List<String> lockList = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            //对子节点进行排序
            Collections.sort(lockList);
            //    /Locks/Lock_xxx
            int index = lockList.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
            if (0 == index) { //说明该临时有序节点在锁列表的第一位
                System.out.println("获取锁成功");
                return;
            } else {
                //获取上一个节点的路径
                String previousPath = lockList.get(index - 1);
                Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + previousPath, watcher);
                if (null == stat) {
                    attemptLock();
                } else {
                    synchronized (watcher) {
                        watcher.wait();
                    }
                    attemptLock();
                }
            }
        }
    
        //释放锁
        public void releaseLock() throws KeeperException, InterruptedException {
            //删除临时有序节点
            zooKeeper.delete(lockPath, -1);
            if (zooKeeper != null) {
                zooKeeper.close();
            }
            System.out.println("锁已经释放");
        }
    }
    
    package com.sunxiaping.zookeeper03;
    
    import org.apache.zookeeper.KeeperException;
    
    public class SellTicketDemo {
        private void sell() {
            System.out.println("售票开始");
            int sleepMillis = 5000;
            try {
                Thread.sleep(sleepMillis);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("售票结束");
        }
    
        public void sellTicketWithLock() throws KeeperException, InterruptedException {
            LockDemo lockDemo = new LockDemo();
            //获取锁
            lockDemo.acquireLock();
            sell();
            //释放锁
            lockDemo.releaseLock();
        }
    
        public static void main(String[] args) throws KeeperException, InterruptedException {
            SellTicketDemo ticketDemo = new SellTicketDemo();
            for (int i = 0; i < 10; i++) {
                ticketDemo.sellTicketWithLock();
            }
        }
    }
    

    2 zookeeper集群搭建

    2.1 准备工作

    • 准备三台虚拟机(都装有CentOS7系统),IP分别为192.168.179.101、192.168.179.102和192.168.179.103。
    • 分别在这三台虚拟机装单独的JDK(jdk-8u131-linux-x64.tar.gz)和Zookeeper(zookeeper-3.4.10.tar.gz)。

    2.2 zookeeper集群的搭建

    • 1️⃣关闭三台虚拟机的防火墙,防止不能通信:
    # 192.168.107.101
    systemctl stop firewalld
    systemctl disable firewalld
    
    # 192.168.107.102
    systemctl stop firewalld
    systemctl disable firewalld
    
    # 192.168.107.103
    systemctl stop firewalld
    systemctl disable firewalld
    

    关闭防火墙

    • 2️⃣在三台虚拟机中的/usr/local目录中安装JDK和Zookeeper。

    在三台虚拟机中同样的目录下分别安装JDK和Zookeeper.png

    • 3️⃣分别在192.168.179.101、192.168.179.102和192.168.179.103三台机器上的zookeeper目录中新建data文件夹。
    mkdir -pv data
    

    • 4️⃣修改192.168.179.101、192.168.179.102、192.168.179.103机器上的zookeeper的conf目录中的zoo.cfg文件:
    # 服务器对应的端口号
    clientPort=2181
    # 数据快照文件所在路径
    dataDir=/usr/local/zookeeper-3.4.10/data
    # 集群配置信息
    # server.A=B:C:D
    # A:是一个数字,表示当前服务器的编号
    # B:当前服务器的IP地址
    # C:Leader选举的端口
    # D:Zookeeper服务器之间通信的端口
    server.1=192.168.179.101:2888:3888
    server.2=192.168.179.102:2888:3888
    server.3=192.168.179.103:2888:3888
    

    在三台虚拟机上分别配置zoo.cfg配置文件

    • 5️⃣在上一步dataDir指定的目录下,创建myid文件,然后在该文件中添加上一步server配置的编号数字:
    # 192.168.107.101对应的数字是1
    # /usr/local/zookeeper-3.4.10/data/
    echo "1" > myid
    
    # 192.168.107.102对应的数字是1
    # /usr/local/zookeeper-3.4.10/data/
    echo "2" > myid
    
    # 192.168.107.103对应的数字是1
    # /usr/local/zookeeper-3.4.10/data/
    echo "3" > myid
    

    配置服务器的编号

    • 6️⃣分别启动三台虚拟机中的zookeeper,检查集群状态:
    #192.168.179.101
    ./zkServer.sh start
    
    #192.168.179.102
    ./zkServer.sh start
    
    #192.168.179.103
    ./zkServer.sh start
    

    分别启动三台虚拟机中的zookeeper

    3 一致性协议:ZAB协议

    • ZAB协议的全称是Zookeeper Atomic BroadCast(zookeeper原子广播)。zookeeper是通过ZAB协议来保证分布式事务的最终一致性。
    • 基于ZAB协议,zookeeper集群中的角色主要有以下的三类。如下表所示:

    zookeeper集群中的角色

    • ZAB广播模式工作原理,通过类似于两阶段提交协议的方式解决数据一致性。

    ZAB协议的工作原理

    • 1️⃣Leader从客户端收到一个写请求。
    • 2️⃣Leader生成一个新的事务并为这个事务生成一个唯一的Zxid。
    • 3️⃣Leader将这个事务提议(propose)发送给所有的follower节点。
    • 4️⃣follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack到Leader。
    • 5️⃣当Leader收到大多数的follower(半数以上节点)的ack消息,Leader会发送commit请求。
    • 6️⃣当follower收到commit请求的时候,从历史队列中将事务请求commit。

    4 zookeeper的leader选举

    4.1 服务器状态

    • looking:寻找Leader状态。当服务器处于该状态的时候,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。
    • leading:领导者状态。表明当前服务器的角色是Leader。
    • following:跟随者状态。表明当前服务器的角色是follower。
    • observing:观察者状态。表明当前服务器的角色是observer。

    4.2 服务器启动时期的Leader选举

    • 在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成Leader的选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都视图找到Leader,于是进入Leader选举状态。选举过程如下:
    • 1️⃣每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用(myid,zxid)来表示,此时server1的投票为(1,0),server2的投票为(2,0),然后各自将这个投票发给集群中的其他机器。
    • 2️⃣集群中的每台服务器接收到来自集群中的各个服务器的投票。
    • 3️⃣处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:
      • 优先检查zxid。zxid比较的服务器优先作为Leader。
      • 如果zxid相同,那么就比较myid。myid较大的服务器最为Leader服务器。

    对于server1而言,它的投票是(1,0),接受server2的投票为(2,0),首先会比较两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票为(2,0),然后重新投票,对于server2而言,其无需更新自己的投票,只是再次向集群中的所有机器发出上一次投票信息即可。

    • 4️⃣统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接收到相同的投票新,对于server1和server2而言,都统计出集群中已经有两台机器接收了(2,0)的投票信息,此时便认为已经选出了Leader。
    • 5️⃣改变服务器状态。一旦确定了Leader,每个服务器都会更新自己的状态,如果是follower,那么就变更为following,如果是Leader,就变更为leading。

    4.3 服务器运行时期的Leader选举

    • 在zookeeper运行期间,Leader和follower服务器都各司其职,即便有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂掉,那么整个集群将暂停对外提供服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。
    • 假设正在运行的有server1、server2和server3三台服务器,假设当前Leader是server2,如果某一时刻Leader挂了,此时便开始Leader选举。选举过程如下:
    • 1️⃣变更状态。Leader挂后,余下的服务器都会将自己的状态变为looking,然后开始进入Leader选举过程。
    • 2️⃣每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己一票,产生(1,122)和(3,122),然后各自将投票发给集群中的所有机器。
    • 3️⃣接收来自各个服务器的投票。和启动时过程相同。
    • 4️⃣处理投票。和启动时过程相同,此时Leader3将会成为Leader。
    • 5️⃣统计投票。和启动时过程相同。
    • 6️⃣改变服务器的状态。和启动时过程相同。

    5 observer角色及其配置

    • observer角色特点:
      • 1️⃣不参与集群的Leader的选举。
      • 2️⃣不参与集群中写数据的ack反馈。
    • 为了使用observer角色,在任何想要变成observer角色的配置文件中加入如下配置:
    peerType=observer
    
    • 并在所有sever的配置文件中,配置成observer模式的server的哪行配置追加:observer,例如:
    server.3=192.168.179.103:2888:3888:observer
    

    6 zookeeper API连接集群

    /**
    * @param connectString
    *            comma separated host:port pairs, each corresponding to a zk
    *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
    *            the optional chroot suffix is used the example would look
    *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
    *            where the client would be rooted at "/app/a" and all paths
    *            would be relative to this root - ie getting/setting/etc...
    *            "/foo/bar" would result in operations being run on
    *            "/app/a/foo/bar" (from the server perspective).
    * @param sessionTimeout
    *            session timeout in milliseconds
    * @param watcher
    *            a watcher object which will be notified of state changes, may
    *            also be notified for node events
    *
    * @throws IOException
    *             in cases of network failure
    * @throws IllegalArgumentException
    *             if an invalid chroot path is specified
    */
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
        throws IOException
    {
        this(connectString, sessionTimeout, watcher, false);
    }
    
    • 示例:
    package com.sunxiaping.zookeeper04;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperClusterDemo {
    
        public static void main(String[] args) throws IOException, InterruptedException {
            //计数器对象
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //服务器的ip地址和端口  集群
            String connectString = "192.168.179.101:2181,192.168.179.102:2181,192.168.179.103:2181";
            //客户端和服务器之间的会话超时时间(以毫秒为单位)
            int sessionTimeout = 5000;
            //监视器对象
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    //客户端和服务器连接创建成功
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("event = " + event);
                        System.out.println("连接创建成功");
                        countDownLatch.countDown();
                    }
                }
            };
            ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
    
            //主线程阻塞等待连接对象的创建成功
            countDownLatch.await();
    
            System.out.println("zooKeeper = " + zooKeeper.getSessionId());
        }
    }
    
  • 相关阅读:
    事务
    排序算法
    二维数组中的查找
    在Linux中安装Matlab
    null和“”的区别
    【学习笔记】〖九度OJ〗题目1433:FatMouse
    【学习笔记】〖九度OJ〗题目1464:Hello World for U
    year:2017 month:8 day:1
    year:2017 month:07 day:31
    year:2017 month:7 day:27
  • 原文地址:https://www.cnblogs.com/xuweiweiwoaini/p/13669263.html
Copyright © 2011-2022 走看看