zoukankan      html  css  js  c++  java
  • 六、zookeeper 事件监听机制

    一、watcher概念

      zookeeper提供了数据的发布/订阅功能多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者

      zookeeper采用了Watcher机制实现数据的发布/订阅功能该机制在被订阅对象发生变化时会异步通知客户端因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力

      watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。

    二、watcher架构

      Watcher实现由三个部分组成:

      • Zookeeper服务端

      • Zookeeper客户端

      • 客户端的ZKWatchManager对象

      客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关Watcher来回相应处理逻辑,从而完成整体的数据发布/订阅流程。

    三、watcher特性

    特性说明
    一次性 watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册
    客户端顺序回调 watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行
    轻量级 WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容;
    时效性 watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知;

    四、watcher接口设计

      Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举类:KeeperState、EventType

    • Watcher通知状态(KeeperState)

      KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性如下:

    枚举属性说明
    SyncConnected 客户端与服务器正常连接时
    Disconnected 客户端与服务器断开连接时
    Expired 会话session失效时
    AuthFailed 身份认证失败时
    • Watcher事件类型(EventType)

      EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举属性如下:

    枚举属性说明
    None
    NodeCreated Watcher监听的数据节点被创建时
    NodeDeleted Watcher监听的数据节点被删除时
    NodeDataChanged Watcher监听的数据节点内容发生变更时(无论内容数据 是否变化)
    NodeChildrenChanged Watcher监听的数据节点的子节点列表发生变更时

      注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;

    五、捕获相应的事件

      上面讲到zookeeper客户端连接的状态和zookeeper对znode节点监听的事件类型,下面我们来讲解如何建立zookeeper的watcher监听。在zookeeper中采用zk.getChildren(path, watch)、zk.exists(path, watch)、zk.getData(path, watcher, stat)这样的方式为某个znode注册监听

      下表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:

    注册方式CreatedChildrenChangedChangedDeleted
    zk.exists(“/node- x”,watcher) 可监控   可监控 可监控
    zk.getData(“/node- x”,watcher)     可监控 可监控
    zk.getChildren(“/node- x”,watcher)   可监控   可监控

     六、注册watcher的方法

    6.1、客服端与服务器的连接状态

    KeeperState 通知状态
    SyncConnected:客户端与服务器正常连接时
    Disconnected:客户端与服务器断开连接时
    Expired:会话session失效时
    AuthFailed:身份认证失败时
    事件类型为:None
    public class ZKConnectionWatcher implements Watcher {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        // 计数器对象
        static CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接对象
        static ZooKeeper zooKeeper;
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功!");
                        countDownLatch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("断开连接!");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时!");
                        zooKeeper = new ZooKeeper("192.168.60.130:2181", 5000, new ZKConnectionWatcher());
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败!");
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            try {
                zooKeeper = new ZooKeeper(IP, TIME_OUT, new ZKConnectionWatcher());
                // 阻塞线程等待连接的创建
                countDownLatch.await();
                // 会话id
                System.out.println(zooKeeper.getSessionId());
                // 添加授权用户
                zooKeeper.addAuthInfo("digest", "wss:12345".getBytes());
                byte[] bs = zooKeeper.getData("/hello", false, null);
                System.out.println(new String(bs));
                Thread.sleep(50000);
                zooKeeper.close();
                System.out.println("结束");
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

     6.2、检查节点是否存在

    // 使用连接对象的监视器
    exists(String path, boolean b)
    // 自定义监视器
    exists(String path, Watcher w)
       
    // NodeCreated:节点创建
    // NodeDeleted:节点删除
    // NodeDataChanged:节点内容发生变化
    • path- znode路径。

    • b- 是否使用连接对象中注册的监视器。

    • w-监视器对象。

    public class ZKWatcherExists {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        ZooKeeper zooKeeper = null;
    
        @Before
        public void before() throws IOException, InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            // 连接zookeeper客户端
            zooKeeper = new ZooKeeper(IP, TIME_OUT, (event)-> {
                    System.out.println("连接对象的参数!");
                    // 连接成功
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        countDownLatch.countDown();
                    }
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
            });
            countDownLatch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zooKeeper.close();
        }
    
        @Test
        public void watcherExists1() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:使用连接对象中的watcher
            Stat exists = zooKeeper.exists("/hello", true);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherExists2() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:自定义watcher对象
            zooKeeper.exists("/watcher1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("自定义watcher");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherExists3() throws KeeperException, InterruptedException {
            // watcher一次性
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("自定义watcher");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        zooKeeper.exists("/watcher1", this);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            };
            zooKeeper.exists("/watcher1", watcher);
            Thread.sleep(80000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherExists4() throws KeeperException, InterruptedException {
            // 注册多个监听器对象
            zooKeeper.exists("/watcher1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("1");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            zooKeeper.exists("/watcher1", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("2");
                    System.out.println("path=" + event.getPath());
                    System.out.println("eventType=" + event.getType());
                }
            });
            Thread.sleep(80000);
            System.out.println("结束");
        }
    }

     6.3、查看节点

    // 使用连接对象的监视器
    getData(String path, boolean b, Stat stat)
    // 自定义监视器
    getData(String path, Watcher w, Stat stat)
       
    // NodeDeleted:节点删除
    // NodeDataChanged:节点内容发生变化
    • path- znode路径。

    • b- 是否使用连接对象中注册的监视器。

    • w-监视器对象。

    • stat- 返回znode的元数据。'

    public class ZKWatcherGetData {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        ZooKeeper zooKeeper = null;
    
        @Before
        public void before() throws IOException, InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            // 连接zookeeper客户端
            zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            });
            countDownLatch.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zooKeeper.close();
        }
    
        @Test
        public void watcherGetData1() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:使用连接对象中的watcher
            zooKeeper.getData("/hello", true, null);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetData2() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:自定义watcher对象
            zooKeeper.getData("/hello", (event) -> {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            }, null);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetData3() throws KeeperException, InterruptedException {
            // 一次性
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("自定义watcher");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() == Event.EventType.NodeDataChanged) {
                            zooKeeper.getData("/hello", this, null);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            };
            zooKeeper.getData("/hello", watcher, null);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetData4() throws KeeperException, InterruptedException {
            // 注册多个监听器对象
            zooKeeper.getData("/hello", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("1");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() == Event.EventType.NodeDataChanged) {
                            zooKeeper.getData("/hello", this, null);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }, null);
    
            zooKeeper.getData("/hello", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("2");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() == Event.EventType.NodeDataChanged) {
                            zooKeeper.getData("/hello", this, null);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }, null);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    }

     6.4、查看子节点

    // 使用连接对象的监视器
    getChildren(String path, boolean b)
    // 自定义监视器
    getChildren(String path, Watcher w)
       
    // NodeChildrenChanged:子节点发生变化
    // NodeDeleted:节点删除
    • path- znode路径。

    • b- 是否使用连接对象中注册的监视器。

    • w-监视器对象。

    public class ZKWatcherGetChild {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        ZooKeeper zooKeeper = null;
    
        @Before
        public void before() throws IOException, InterruptedException {
            CountDownLatch connectedSemaphore = new CountDownLatch(1);
            // 连接zookeeper客户端
            zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
                System.out.println("连接对象的参数!");
                // 连接成功
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    connectedSemaphore.countDown();
                }
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            });
            connectedSemaphore.await();
        }
    
        @After
        public void after() throws InterruptedException {
            zooKeeper.close();
        }
    
        @Test
        public void watcherGetChild1() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:使用连接对象中的watcher
            zooKeeper.getChildren("/hello", true);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetChild2() throws KeeperException, InterruptedException {
            // arg1:节点的路径
            // arg2:自定义watcher
            zooKeeper.getChildren("/hello", (event) -> {
                System.out.println("自定义watcher");
                System.out.println("path=" + event.getPath());
                System.out.println("eventType=" + event.getType());
            });
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetChild3() throws KeeperException, InterruptedException {
            // 一次性
            Watcher watcher = new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("自定义watcher");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() ==
                                Event.EventType.NodeChildrenChanged) {
                            zooKeeper.getChildren("/hello", this);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            };
            zooKeeper.getChildren("/hello", watcher);
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
        @Test
        public void watcherGetChild4() throws KeeperException, InterruptedException {
            // 多个监视器对象
            zooKeeper.getChildren("/hello", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("1");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            zooKeeper.getChildren("/hello", this);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            });
            zooKeeper.getChildren("/hello", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        System.out.println("2");
                        System.out.println("path=" + event.getPath());
                        System.out.println("eventType=" + event.getType());
                        if (event.getType() ==
                                Event.EventType.NodeChildrenChanged) {
                            zooKeeper.getChildren("/hello", this);
                        }
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            });
            Thread.sleep(50000);
            System.out.println("结束");
        }
    
    }

     七、配置中心案例

      工作中有这样的一个场景: 数据库用户名和密码信息放在一个配置文件中,应用读取该配置文件,配置文件信息放入缓存。

      若数据库的用户名和密码改变时候,还需要重新加载缓存,比较麻烦,通过ZooKeeper可以轻松完成,当数据库发生变化时自动完成缓存同步。

      设计思路:

      1. 连接zookeeper服务器

      2. 读取zookeeper中的配置信息,注册watcher监听器,存入本地变量

      3. 当zookeeper中的配置信息发生变化时,通过watcher的回调方法捕获数据变化事件

      4. 重新获取配置信息

    public class MyConfigCenter implements Watcher {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
    
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接对象
        static ZooKeeper zooKeeper;
        // 用于本地化存储配置信息
        private String url;
        private String username;
        private String password;
    
        // 构造方法
        public MyConfigCenter() {
            initValue();
        }
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 捕获事件状态
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接成功");
                        countDownLatch.countDown();
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("连接断开!");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("连接超时!");
                        // 超时后服务器端已经将连接释放,需要重新连接服务器端
                        zooKeeper = new ZooKeeper(IP, TIME_OUT, new ZKConnectionWatcher());
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("验证失败!");
                    }
                    // 当配置信息发生变化时
                } else if (event.getType() == Event.EventType.NodeDataChanged) {
                    initValue();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        private void initValue() {
            try {
                // 创建连接对象
                zooKeeper = new ZooKeeper(IP, TIME_OUT, this);
                // 阻塞线程,等待连接的创建成功
                countDownLatch.await();
                // 读取配置信息
                this.url = new String(zooKeeper.getData("/config/url", true, null));
                this.username = new String(zooKeeper.getData("/config/username", true, null));
                this.password = new String(zooKeeper.getData("/config/password", true, null));
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            try {
                MyConfigCenter myConfigCenter = new MyConfigCenter();
                for (int i = 1; i <= 20; i++) {
                    Thread.sleep(5000);
                    System.out.println("url:" + myConfigCenter.getUrl());
                    System.out.println("username:" + myConfigCenter.getUsername());
                    System.out.println("password:" + myConfigCenter.getPassword());
                    System.out.println("########################################");
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
    }

     八、生成分布式唯一ID

      在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是分库分表后,就无法在依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。

      设计思路:

      1. 连接zookeeper服务器

      2. 指定路径生成临时有序节点

      3. 取序列号及为分布式环境下的唯一ID

    public class GloballyUniqueId implements Watcher {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        //  计数器对象
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //  用户生成序号的节点
        String defaultPath = "/uniqueId";
        //  连接对象
        ZooKeeper zooKeeper;
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 捕获事件状态
                if (event.getType() == Watcher.Event.EventType.None) {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        System.out.println("连接成功");
                        countDownLatch.countDown();
                    } else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
                        System.out.println("连接断开!");
                    } else if (event.getState() == Watcher.Event.KeeperState.Expired) {
                        System.out.println("连接超时!");
                        // 超时后服务器端已经将连接释放,需要重新连接服务器端
                        zooKeeper = new ZooKeeper(IP, TIME_OUT,new ZKConnectionWatcher());
                    } else if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
                        System.out.println("验证失败!");
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        // 构造方法
        public GloballyUniqueId() {
            try {
                //打开连接
                zooKeeper = new ZooKeeper(IP, TIME_OUT, this);
                // 阻塞线程,等待连接的创建成功
                countDownLatch.await();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        // 生成id的方法
        public String getUniqueId() {
            String path = "";
            try {
                //创建临时有序节点
                path = zooKeeper.create(defaultPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            // /uniqueId0000000001
            return path.substring(9);
        }
    
        public static void main(String[] args) {
            GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
            for (int i = 1; i <= 5; i++) {
                String id = globallyUniqueId.getUniqueId();
                System.out.println(id);
            }
        }
    }

     九、分布式锁

      分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如何实现排他锁。

      设计思路:

      1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock 000000001

      2. 客户端取得/Locks下子节点,并进行排序,判断排在最前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功

      3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock 000000001

      4. 当前一位锁节点(Lock000000002)的逻辑

      5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

    public class MyLock {
        private static final String IP = "192.168.126.141:2181,192.168.126.142:2181,192.168.126.143:2181";
        private static final int TIME_OUT = 5000;
        //  计数器对象
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //ZooKeeper配置信息
        ZooKeeper zooKeeper;
        private static final String LOCK_ROOT_PATH = "/Locks";
        private static final String LOCK_NODE_NAME = "Lock_";
        private String lockPath;
    
        // 打开zookeeper连接
        public MyLock() {
            try {
                zooKeeper = new ZooKeeper(IP, TIME_OUT, (event) -> {
                    if (event.getType() == Watcher.Event.EventType.None) {
                        if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                            System.out.println("连接成功!");
                            countDownLatch.countDown();
                        }
                    }
                });
                countDownLatch.await();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        //获取锁
        public void acquireLock() throws Exception {
            //创建锁节点
            createLock();
            //尝试获取锁
            attemptLock();
        }
    
        //创建锁节点
        private void createLock() throws Exception {
            //判断Locks是否存在,不存在创建
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            // 创建临时有序节点
            lockPath = zooKeeper.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("节点创建成功:" + lockPath);
        }
    
        //监视器对象,监视上一个节点是否被删除
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            }
        };
    
        //尝试获取锁
        private void attemptLock() throws Exception {
            // 获取Locks节点下的所有子节点
            List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            // 对子节点进行排序
            Collections.sort(list);
            // /Locks/Lock_000000001
            int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
            if (index == 0) {
                System.out.println("获取锁成功!");
                return;
            } else {
                // 上一个节点的路径
                String path = list.get(index - 1);
                Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
                if (stat == null) {
                    attemptLock();
                } else {
                    synchronized (watcher) {
                        watcher.wait();
                    }
                    attemptLock();
                }
            }
        }
    
        //释放锁
        public void releaseLock() throws Exception {
            //删除临时有序节点
            zooKeeper.delete(this.lockPath, -1);
            zooKeeper.close();
            System.out.println("锁已经释放:" + this.lockPath);
        }
    
        public static void main(String[] args) {
            try {
                MyLock myLock = new MyLock();
                myLock.createLock();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
    public class TicketSeller {
        private void sell() {
            System.out.println("售票开始");
            // 线程随机休眠数毫秒,模拟现实中的费时操作
            int sleepMillis = 5000;
            try {
                //代表复杂逻辑执行了一段时间
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("售票结束");
        }
    
        public void sellTicketWithLock() throws Exception {
            MyLock lock = new MyLock();
            // 获取锁
            lock.acquireLock();
            sell();
            //释放锁
            lock.releaseLock();
        }
    
        public static void main(String[] args) throws Exception {
            TicketSeller ticketSeller = new TicketSeller();
            for (int i = 0; i < 10; i++) {
                ticketSeller.sellTicketWithLock();
            }
        }
    }
  • 相关阅读:
    c++深拷贝与浅拷贝
    c++构造函数的explicit
    c++虚函数和虚函数表
    c++重载、重写、隐藏(重定义)
    c++传值、传指针、传引用
    ASP.Net Core API 学习の中间件
    WPF中String Format的用法
    ASP.Net Core API 全局处理异常
    989. Add to Array-Form of Integer
    1014. Best Sightseeing Pair
  • 原文地址:https://www.cnblogs.com/jdy1022/p/14822872.html
Copyright © 2011-2022 走看看