zoukankan      html  css  js  c++  java
  • zookeeper分布式锁

    一、使用zookeeper原生api实现分布式锁

    1、思路一:利用Zookeeper不能重复创建一个节点的特性来实现一个分布式锁

    流程:

    1. 需要获得锁时创建锁node节点。
    2. 如果创建失败,则表示该锁已经被别人占用,watch该节点状态,等待锁。
    3. 如果创建成功,则表示获得锁。
    4. 主动释放锁时删除对应的node节点即可。
    5. 获得锁的session超时或断开,由于锁node为临时节点则该节点也会删除。
    6. 节点删除时watch该节点的线程重新争抢锁。

    代码:

    public class ZookeeperLock1 implements Lock {
    
        private final String zookeeperLockRootNode = "zookeeperLock";
    
        private final ZooKeeper zooKeeper;
        private final String lockName;
    
        private class LockWatcher implements Watcher {
    
            // 该watch对应的需要获得锁的线程
            private final Thread thread;
    
            private LockWatcher(Thread thread) {
                this.thread = thread;
            }
    
            @Override
            public void process(WatchedEvent event) {
                if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                    synchronized (thread) {
                        thread.notifyAll();
                    }
                }
            }
        }
    
        public ZookeeperLock1(ZooKeeper zooKeeper, String lockName) throws KeeperException, InterruptedException {
            this.lockName = lockName;
            this.zooKeeper = zooKeeper;
            // 检查所有锁的根节点,如果没有则创建。
            try {
                zooKeeper.create("/" + zookeeperLockRootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                    throw e;
                }
            }
        }
    
        @Override
        public void getLock() throws KeeperException, InterruptedException {
            String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
            while (true) {
                // 直接创建锁节点,创建成功则表示拿到锁则return。返回节点存在异常则表示获取锁失败则等待锁。
                try {
                    zooKeeper.create(zookeeperLockNodeName, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                    break;
                } catch (KeeperException e) {
                    if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                        throw e;
                    }
                }
                // 添加节点监控
                zooKeeper.exists(zookeeperLockNodeName, new LockWatcher(Thread.currentThread()));
                synchronized (Thread.currentThread()) {
                    // 线程等待锁,只有在删除节点的watch中才会重新激活线程
                    Thread.currentThread().wait();
                }
            }
        }
    
        @Override
        public void freeLock() throws KeeperException, InterruptedException {
            String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
            zooKeeper.delete(zookeeperLockNodeName, -1);
        }
    }

    测试:

    public class ZookeeperLock1Test {
    
        @Test
        public void testLock() throws IOException, InterruptedException {
            final String zookeeperHost = "10.5.31.155";
            final String zookeeperPort = "2181";
            final String lockName = "testLock1";
            final int threadCnt = 9;
            final CountDownLatch countDownLatchConnect = new CountDownLatch(1);
            final ZooKeeper zooKeeper = new ZooKeeper(zookeeperHost + ":" + zookeeperPort, 60000, event -> {
                if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                    countDownLatchConnect.countDown();
                }
            });
            countDownLatchConnect.await();
    
            final CountDownLatch countDownLatchThread = new CountDownLatch(threadCnt);
            for (int i = 1; i <= threadCnt; i++) {
                Runnable runnable = () -> {
                    try {
                        ZookeeperLock1 zookeeperLock1 = new ZookeeperLock1(zooKeeper, lockName);
                        zookeeperLock1.getLock();
                        System.out.println(Thread.currentThread().getName() + ":获得锁");
                        Thread.sleep(2000);
                        zookeeperLock1.freeLock();
                        System.out.println(Thread.currentThread().getName() + ":释放锁");
                    } catch (InterruptedException | KeeperException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatchThread.countDown();
                    }
                };
                Thread thread = new Thread(runnable, "线程" + i);
                thread.start();
            }
            countDownLatchThread.await();
        }
    
    }

    优点:

    1. 实现比较简单,可拿来即用
    2. 有通知机制,能提供较快的响应
    3. 通过临时节点机制,保证节点能及时删掉

    缺点:

    有惊群效应。一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调。

    2、思路二:利用Zookeeper顺序节点特性来实现一个分布式锁

    由于思路一实现的分布式锁有惊群效应,所以我们可以利用zookeeper顺序节点特性避免比效果。流程如下:

    1. 需要获得锁时创建顺序临时节点。
    2. 查看该节点是否为最小节点,如果是则表示获得锁,如果否则表示锁已经被别人占用,watch该节点上一个顺序节点,等待锁。
    3. 主动释放锁时删除最小节点即可。
    4. 获得锁的session超时或断开,由于锁node为临时节点则该节点也会删除。
    5. 节点删除时watch该节点的下一个节点会重新判断自己是否为最小节点,执行第2步。

    代码:

    public class ZookeeperLock2 implements Lock{
    
        private final String zookeeperLockRootNode = "zookeeperLock";
        private final String zookeeperLockPrefix = "lock_";
    
        private final ZooKeeper zooKeeper;
        private final String lockName;
    
    
        private class LockWatcher implements Watcher {
    
            // 该watch对应的需要获得锁的线程
            private final Thread thread;
    
            private LockWatcher(Thread thread) {
                this.thread = thread;
            }
    
            @Override
            public void process(WatchedEvent event) {
                if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                    synchronized (thread) {
                        thread.notifyAll();
                    }
                }
            }
        }
    
        public ZookeeperLock2(ZooKeeper zooKeeper, String lockName) throws InterruptedException, KeeperException {
            this.zooKeeper = zooKeeper;
            this.lockName = lockName;
            // 检查所有锁的根节点,如果没有则创建。
            try {
                zooKeeper.create("/" + zookeeperLockRootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                    throw e;
                }
            }
        }
    
    
        @Override
        public void getLock() throws InterruptedException, KeeperException {
            String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
            try {
                zooKeeper.create(zookeeperLockNodeName, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                    throw e;
                }
            }
            // 创建临时顺序节点
            String zookeeperLockSubNodeName = zooKeeper.create(zookeeperLockNodeName + "/" + zookeeperLockPrefix, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                // 获取该锁下所有子节点并排序
                List<String> zookeeperLockNodeChildren = zooKeeper.getChildren(zookeeperLockNodeName, false);
                Collections.sort(zookeeperLockNodeChildren);
                // 判断刚刚创建的节点是否为所有子节点中最小的那个,如果是则表示获得锁,如果否则表示等待锁
                if (zookeeperLockSubNodeName.equals(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0))) {
                    break;
                } else {
                    // 获取刚刚创建节点的上一个顺序节点
                    String zookeeperLockPriorNodeName = zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0);
                    for (int i = 0; i < zookeeperLockNodeChildren.size(); i++) {
                        if (zookeeperLockSubNodeName.equals(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(i))) {
                            break;
                        } else {
                            zookeeperLockPriorNodeName = zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(i);
                        }
                    }
                    // 监视刚刚创建节点的上一个顺序节点
                    zooKeeper.exists(zookeeperLockPriorNodeName, new LockWatcher(Thread.currentThread()));
                    synchronized (Thread.currentThread()) {
                        // 线程等待锁,只有在删除节点的watch中才会重新激活线程
                        Thread.currentThread().wait();
                    }
                }
            }
        }
    
        @Override
        public void freeLock() throws KeeperException, InterruptedException {
            String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
            // 获取该锁下所有子节点并排序
            List<String> zookeeperLockNodeChildren = zooKeeper.getChildren(zookeeperLockNodeName, false);
            Collections.sort(zookeeperLockNodeChildren);
            // 删除节点
            zooKeeper.delete(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0), -1);
        }
    }

    测试:

    public class ZookeeperLock2Test {
    
        @Test
        public void testLock() throws IOException, InterruptedException {
            final String zookeeperHost = "10.5.31.155";
            final String zookeeperPort = "2181";
            final String lockName = "testLock2";
            final int threadCnt = 9;
            final CountDownLatch countDownLatchConnect = new CountDownLatch(1);
            final ZooKeeper zooKeeper = new ZooKeeper(zookeeperHost + ":" + zookeeperPort, 60000, event -> {
                if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                    countDownLatchConnect.countDown();
                }
            });
            countDownLatchConnect.await();
    
            final CountDownLatch countDownLatchThread = new CountDownLatch(threadCnt);
            for (int i = 1; i <= threadCnt; i++) {
                Runnable runnable = () -> {
                    try {
                        ZookeeperLock2 zookeeperLock2 = new ZookeeperLock2(zooKeeper, lockName);
                        zookeeperLock2.getLock();
                        System.out.println(Thread.currentThread().getName() + ":获得锁");
                        Thread.sleep(2000);
                        zookeeperLock2.freeLock();
                        System.out.println(Thread.currentThread().getName() + ":释放锁");
                    } catch (InterruptedException | KeeperException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatchThread.countDown();
                    }
                };
                Thread thread = new Thread(runnable, "线程" + i);
                thread.start();
            }
            countDownLatchThread.await();
        }
    
    }

    二、使用Curator实现分布式锁

    其实上面写了这么多,也不过是重复造轮子罢了。Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。用Curator实现zookeeper的分布式锁非常简单。

    public class ZookeeperLock3Test {
    
        @Test
        public void testLock() throws InterruptedException {
            final String zookeeperHost = "10.5.31.155";
            final String zookeeperPort = "2181";
            final int threadCnt = 9;
    
            final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zookeeperHost + ":" + zookeeperPort, new RetryNTimes(3, 5000));
            zkClient.start();
    
            CountDownLatch countDownLatch = new CountDownLatch(threadCnt);
            for (int i = 1; i <= threadCnt; i++) {
                Runnable runnable = () -> {
                    InterProcessMutex zkLock = new InterProcessMutex(zkClient, "/zookeeperLock/testLock3");
                    try {
                        zkLock.acquire();
                        System.out.println(Thread.currentThread().getName() + ":获得锁");
                        Thread.sleep(2000);
                        zkLock.release();
                        System.out.println(Thread.currentThread().getName() + ":释放锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                };
                Thread thread = new Thread(runnable, "线程" + i);
                thread.start();
            }
            countDownLatch.await();
    
            zkClient.close();
        }
    }
  • 相关阅读:
    POJ 3126 Prime Path
    POJ 2429 GCD & LCM Inverse
    POJ 2395 Out of Hay
    【Codeforces 105D】 Bag of mice
    【POJ 3071】 Football
    【POJ 2096】 Collecting Bugs
    【CQOI 2009】 余数之和
    【Codeforces 258E】 Devu and Flowers
    【SDOI 2010】 古代猪文
    【BZOJ 2982】 combination
  • 原文地址:https://www.cnblogs.com/LOVE0612/p/9714163.html
Copyright © 2011-2022 走看看