zoukankan      html  css  js  c++  java
  • 7、zookeeper应用场景-分布式锁

    分布式锁

      实现原理:有序节点+watch监听机制实现

    分布式锁有多种实现方式,比如通过数据库、redis都可实现。作为分布式协同工具Zookeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如果实现排他锁

    设计思路

    1. 每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001

    2. 客户端取得/Locks下子节点,并进行排序,判断排在前面的是否为自己,如果自己的锁节点在第一位,代表获取锁成功

    3. 如果自己的锁节点不在第一位,则监听自己前一位的锁节点。例如,自己锁节点Lock_000000002,那么则监听Lock_000000001

    4. 当前一位锁节点(Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(Lock_000000002)的逻辑

    5. 监听客户端重新执行第2步逻辑,判断自己是否获得了锁

    6. zookeeper是有工具包的(这里采用手写)
    // 线程测试类
    public class ThreadTest {
        public static void delayOperation(){
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        static interface Runable{
            void run();
        }
        public static void run(Runable runable,int threadNum){
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(30, 30,
                    0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            for (int i = 0; i < threadNum; i++) {
                threadPoolExecutor.execute(runable::run);
            }
            threadPoolExecutor.shutdown();
        }
    
        public static void main(String[] args) {
    //        DistributedLock distributedLock = new DistributedLock();
    //        distributedLock.acquireLock();
    //        delayOperation();
    //        distributedLock.releaseLock();
            DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            // 每秒打印信息
            run(() -> {
                for (int i = 0; i < 999999999; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String format = dateTimeFormatter.format(LocalDateTime.now());
                    System.out.println(format);
                }
            },1);
            // 线程测试
            run(() -> {
                DistributedLock distributedLock = new DistributedLock();
                distributedLock.acquireLock();
                delayOperation();
                distributedLock.releaseLock();
            },30);
        }
    }
    public class DistributedLock {
        private String IP = "192.168.133.133:2181";
        private final String ROOT_LOCK = "/Root_Lock";
        private final String LOCK_PREFIX = "/Lock_";
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        private final byte[] DATA = new byte[0];
    
        private ZooKeeper zookeeper;
        private String path;
    
        private void init(){
            // 初始化
            try {
                zookeeper = new ZooKeeper(IP, 200000, w -> {
                    if(w.getState() == Watcher.Event.KeeperState.SyncConnected){
                        System.out.println("连接成功");
                    }
                    countDownLatch.countDown();
                });
                countDownLatch.await();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        // 暴露的外部方法,主逻辑
        public void acquireLock(){
            init();
            createLock();
            attemptLock();
        }
    
        // 暴露的外部方法,主逻辑
        public void releaseLock(){
            try {
                zookeeper.delete(path,-1);
                System.out.println("锁释放了" + path);
            } catch (InterruptedException | KeeperException e) {
                e.printStackTrace();
            }
        }
    
        private void createLock(){
            try {
                // 创建一个目录节点
                Stat root = zookeeper.exists(ROOT_LOCK, false);
                if(root == null)
                    zookeeper.create(ROOT_LOCK, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                // 目录下创建子节点
                path = zookeeper.create(ROOT_LOCK + LOCK_PREFIX, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        private Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Event.EventType.NodeDeleted){
                    synchronized (this){
                        this.notifyAll();
                    }
                }
            }
        };
    
        private void attemptLock(){
            try {
                // 获取正在排队的节点,由于是zookeeper生成的临时节点,不会出错,这里不能加监视器
                // 因为添加了监视器后,任何子节点的变化都会触发监视器
                List<String> nodes = zookeeper.getChildren(ROOT_LOCK,false);
                nodes.sort(String::compareTo);
                // 获取自身节点的排名
                int ranking = nodes.indexOf(path.substring(ROOT_LOCK.length() + 1));
                // 已经是最靠前的节点了,获取锁
                if(ranking == 0){
                    return;
                }else {
                    // 并不是靠前的锁,监视自身节点的前一个节点
                    Stat status = zookeeper.exists(ROOT_LOCK+"/"+nodes.get(ranking - 1), watcher);
                    // 有可能这这个判断的瞬间,0号完成了操作(此时我们应该判断成功自旋才对),但是上面的status变量已经获取了值并且不为空,1号沉睡
                    // 但是,请注意自行测试,虽然1号表面上沉睡了,但是实际上watcher.wait()是瞬间唤醒的
                    if(status == null){
                        attemptLock();
                    }else {
                        synchronized (watcher){
                            watcher.wait();
                        }
                        attemptLock();
                    }
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    [算法] 堆栈
    [刷题] PTA 02-线性结构3 Reversing Linked List
    java IO流 (八) RandomAccessFile的使用
    java IO流 (七) 对象流的使用
    java IO流 (六) 其它的流的使用
    java IO流 (五) 转换流的使用 以及编码集
    java IO流 (四) 缓冲流的使用
    java IO流 (三) 节点流(或文件流)
    java IO流 (二) IO流概述
    java IO流 (一) File类的使用
  • 原文地址:https://www.cnblogs.com/lemon-flm/p/14605388.html
Copyright © 2011-2022 走看看