zoukankan      html  css  js  c++  java
  • zookeeper应用场景

    配置中心:

      将配置信息存在zk中的一个节点中,同时给该节点注册一个数据节点变更的watcher监听,一旦节数据发生变更,所有的订阅该节点的客户端都可以获取数据变更通知。

      

      案例:

        

    public class ZKConnectionWatcher implements Watcher {
        // 计数器对象
        static CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接对象
        static ZooKeeper zooKeeper;
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 事件类型
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功");
                        countDownLatch.countDown(); // 通知线程可以继续往下执行了
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("断开连接");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
                        // 会话超时,重新创建
                        zooKeeper = new ZooKeeper("192.168.43.182:2181", 5000, new ZKConnectionWatcher());
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            try {
                zooKeeper = new ZooKeeper("192.168.43.182:2181", 5000, new ZKConnectionWatcher());
                countDownLatch.await(); // 阻塞线程,等待连接的创建
                // 会话id
                System.out.println(zooKeeper.getSessionId());
    
                // 添加授权用户
                zooKeeper.addAuthInfo("digest", "fan:123456".getBytes());
                byte[] bytes = zooKeeper.getData("/node1", false, null);
                System.out.println(new String(bytes));
    
                Thread.sleep(5000);
                System.out.println("结束");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (zooKeeper != null) {
                    try {
                        zooKeeper.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    分布式唯一ID:

      

    public class GloballyUniqueId implements Watcher {
        String IP = "192.168.0.105:2181";
        // 计数器对象
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 用户生成序号的节点
        String defaultPath = "/uniqueId";
        // 连接对象
        static ZooKeeper zooKeeper;
    
        public GloballyUniqueId() {
            try {
                // 创建连接对象
                zooKeeper = new ZooKeeper(IP, 5000, this);
                // 阻塞线程,等待连接的创建成功
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void process(WatchedEvent event) {
            try {
                // 捕获事件状态
                if (event.getType() == Event.EventType.None) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("连接创建成功");
                        countDownLatch.countDown(); // 通知线程可以继续往下执行了
                    } else if (event.getState() == Event.KeeperState.Disconnected) {
                        System.out.println("断开连接");
                    } else if (event.getState() == Event.KeeperState.Expired) {
                        System.out.println("会话超时");
                        // 会话超时,重新创建
                        zooKeeper = new ZooKeeper(IP, 5000, new GloballyUniqueId());
                    } else if (event.getState() == Event.KeeperState.AuthFailed) {
                        System.out.println("认证失败");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // 生成id的方法
        public String getUniqueId() {
            String path = "";
            try {
                // 创建临时有序节点
                path = zooKeeper.create(defaultPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                return path.substring(9);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return path;
        }
    
        public static void main(String[] args) {
            GloballyUniqueId globallyUniqueId = new GloballyUniqueId();
            for (int i = 0; i < 5; i++) {
                String id = globallyUniqueId.getUniqueId();
                System.out.println(id);
            }
        }
    }

    分布式锁:

      分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具ZooKeeper,有着标准的实现方式。

      

    public class MyLock  {
        String IP = "192.168.0.105:2181";
        // 计数器对象
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 连接对象
        static ZooKeeper zooKeeper;
    
        private static final String LOCK_ROOT_PATH = "/locks";
        private static final String LOCK_NODE_NAME = "/Lock_";
        private String lockPath;
    
        public MyLock() {
            try {
                // 创建连接对象
                zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        // 捕获事件状态
                        if (event.getType() == Event.EventType.None) {
                            if (event.getState() == Event.KeeperState.SyncConnected) {
                                System.out.println("连接创建成功");
                                countDownLatch.countDown(); // 通知线程可以继续往下执行了
                            }
                        }
                    }
                });
                // 阻塞线程,等待连接的创建成功
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // 获取锁
        public void acquireLock() throws Exception {
            // 创建锁节点
            createLock();
            // 尝试获取锁
            attemptLock();
        }
    
        // 创建锁节点
        private void createLock() throws Exception {
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            // 创建临时有序节点
            lockPath = zooKeeper.create(LOCK_ROOT_PATH + LOCK_NODE_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("节点创建成功" + lockPath);
        }
    
        // 监视器对象,监视上一个节点是否被删除
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    synchronized (this) {
                        notifyAll();
                    }
                }
            }
        };
    
        // 尝试获取锁
        private void attemptLock() throws Exception {
            List<String> list = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            // 对子节点进行排序
            Collections.sort(list);
            // /locks/Lock_0000000001 --> Lock_0000000001
            int index = list.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
            if(index == 0) {
                System.out.println("获取锁成功!");
                return;
            } else {
                // 上一个节点的路径
                String path = list.get(index - 1);
                Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + path, watcher);
                if (stat == null) {
                    attemptLock();
                } else {
                    synchronized (watcher) {
                        watcher.wait();
                    }
                    attemptLock();
                }
            }
        }
    
        // 释放锁
        public void releaseLock() throws Exception {
            zooKeeper.delete(this.lockPath, -1);
            zooKeeper.close();
            System.out.println("锁已经释放:" + this.lockPath);
        }
    
        public static void main(String[] args) {
            try {
                MyLock myLock = new MyLock();
                myLock.createLock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public class TicketSeller {
        private void sell() {
            System.out.println("售票开始");
            int sleepMillis = 5000;
            try {
                // 代表复杂逻辑执行了一段时间
                Thread.sleep(sleepMillis);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("售票结束");
        }
    
        public void sellTicketWithLock() throws Exception {
            MyLock lock = new MyLock();
            // 获取锁
            lock.acquireLock();
            sell();
            // 释放锁
            lock.releaseLock();
        }
    
        public static void main(String[] args) throws Exception {
            TicketSeller ticketSeller = new TicketSeller();
            for (int i = 0; i < 10; i++) {
                ticketSeller.sellTicketWithLock();
            }
        }
    }
  • 相关阅读:
    正则表达式$的一些用法
    基本类型数据转换(int,char,byte)
    IntelliJ Idea 2017 免费激活方法
    jrebel license server 激活方法
    linux 完全关闭tomcat
    spring data jpa自定义bean字段映射
    idea 启动调试模式总提示端口58346被占用问题
    java使用jsch连接linux
    福大软工 · BETA 版冲刺前准备(团队)
    Alpha 事后诸葛亮
  • 原文地址:https://www.cnblogs.com/roadlandscape/p/12993806.html
Copyright © 2011-2022 走看看