zoukankan      html  css  js  c++  java
  • 分布式锁实现(二):Zookeeper

    前言

    紧跟上文的:分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码

    设计实现

    一、基本算法

    1.在某父节点下创建临时有序节点
    2.判断创建的节点是否是当前父节点下所有子节点中序号最小的
    3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁
    4.解锁的时候删除当前节点
    

    二、关键点

    临时有序节点

    实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点
    1.PERSISTENT 持久,若不手动删除就永久存在
    2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序)
    3.EPHEMERAL 临时,一个客户端会话断开后会自动删除
    4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序)
    

    监听

    Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待
    

    三、代码实现

    我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的
    坐标如下:
      <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
    

    代码如下:

    public class MyDistributedLock {
    
    
        private ZkClient zkClient;
        private String name;
        private String currentLockPath;
        private CountDownLatch countDownLatch;
    
        private static final String PARENT_LOCK_PATH = "/distribute_lock";
    
        public MyDistributedLock(ZkClient zkClient, String name) {
            this.zkClient = zkClient;
            this.name = name;
        }
    
    	//加锁
        public void lock() {
        	//判断父节点是否存在,不存在就创建
            if (!zkClient.exists(PARENT_LOCK_PATH)) {
                try {
                	//多个线程只会成功建立一次
                    zkClient.createPersistent(PARENT_LOCK_PATH);
                } catch (Exception ignored) {
                }
            }
            //创建当前目录下的临时有序节点
            currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
            //校验是否最小节点
            checkMinNode(currentLockPath);
        }
    
    	//解锁
        public void unlock() {
            System.out.println("delete : " + currentLockPath);
            zkClient.delete(currentLockPath);
        }
    
    
        private boolean checkMinNode(String lockPath) {
    		//获取当前目录下所有子节点
            List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
            Collections.sort(children);
            int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));
            if (index == 0) {
                System.out.println(name + ":success");
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
                return true;
            } else {
                String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
                //等待前一个节点释放的监听
                waitForLock(waitPath);
                return false;
            }
        }
    
    
        private void waitForLock(String prev) {
            System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);
            countDownLatch = new CountDownLatch(1);
            zkClient.subscribeDataChanges(prev, new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
    
                }
    
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println("prev node is done");
                    checkMinNode(currentLockPath);
                }
            });
            if (!zkClient.exists(prev)) {
                return;
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch = null;
        }
    }
    

    加锁

    1. zkClient.exists先判断父节点是否存在,不存在就创建,zookeeper可以保证只会创建成功一次

    2. 在当前目录下zkClient.createEphemeralSequential创建临时有序节点,再判断当前目录下此节点是否为序号最小的,如果是,成功获取锁,否则的话拿比自己小的节点,并做监听

    3. waitForLock等待比自己小的节点,subscribeDataChanges监听一个节点的变化,handleDataDeleted里面再次做checkMinNode的判断

    4. 监听完毕后,再判断一次此节点是否存在,因为在监听的过程中有可能之前小的那个节点重新释放了锁,如果之前节点不存在的话,无需在这里等待,这里的等待是通过countDownLatch实现的

    解锁

    解锁就是通过zkClient的delete删除当前节点

    测试用例

    通过启动多个线程来测试lock、unlock的过程,查看是否有序

    public class MyDistributedLockTest {
    
    
        public static void main(String[] args) {
    
            ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);
    
            for (int i = 0; i < 20; i++) {
    
                String name = "thread" + i;
                Thread thread = new Thread(() -> {
                    MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);
                    myDistributedLock.lock();
    //                try {
    //                    Thread.sleep(1 * 1000);
    //                } catch (InterruptedException e) {
    //                    e.printStackTrace();
    //                }
                    myDistributedLock.unlock();
                });
                thread.start();
            }
    
        }
    }
    

    执行结果如下,多线程情况下lock/unlock和监听一切正常:

    thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006
    thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005
    thread3:success
    delete : /distribute_lock2/0000000000
    thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004
    thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003
    thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008
    thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007
    thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000
    thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001
    thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002
    delete : /distribute_lock2/0000000001
    prev node is done
    thread8:success
    delete : /distribute_lock2/0000000002
    prev node is done
    thread4:success
    delete : /distribute_lock2/0000000003
    prev node is done
    thread7:success
    delete : /distribute_lock2/0000000004
    prev node is done
    thread2:success
    delete : /distribute_lock2/0000000005
    prev node is done
    thread6:success
    delete : /distribute_lock2/0000000006
    prev node is done
    thread1:success
    delete : /distribute_lock2/0000000007
    prev node is done
    thread5:success
    delete : /distribute_lock2/0000000008
    prev node is done
    thread9:success
    delete : /distribute_lock2/0000000009
    

    Curator源码分析

    一、基本使用

     		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
            client.start();
            InterProcessMutex lock2 = new InterProcessMutex(client, "/test");
    
            try {
                lock.acquire();
                //业务
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.release();
            }
    
    1. CuratorFrameworkFactory.newClient获取zookeeper的客户端,retryPolicy指定重试策略,开启客户端

    2. Curator本身提供了多种锁的实现,这里我们以InterProcessMutex可重入锁为例, lock.acquire()方法获取锁,lock.release()来释放锁,acquire方法也提供了重载的等待时间参数

    二、源码分析

    加锁

    acquire内部就直接internalLock方法,传了-1的等待时间

     public void acquire() throws Exception {
            if(!this.internalLock(-1L, (TimeUnit)null)) {
                throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
            }
        }
    

    internalLock方法首先判断是否是重入锁,通过ConcurrentMap维护线程和一个原子计数器,非重入锁的话,再通过attemptLock去获取锁

     private boolean internalLock(long time, TimeUnit unit) throws Exception
        {
            /*
               Note on concurrency: a given lockData instance
               can be only acted on by a single thread so locking isn't necessary
            */
    
            Thread currentThread = Thread.currentThread();
    
            LockData lockData = threadData.get(currentThread);
            if ( lockData != null )
            {
                // re-entering
                lockData.lockCount.incrementAndGet();
                return true;
            }
    
            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
            if ( lockPath != null )
            {
                LockData newLockData = new LockData(currentThread, lockPath);
                threadData.put(currentThread, newLockData);
                return true;
            }
    
            return false;
        }
    

    attemptLock在这里进行循环等待,createsTheLock方法去创建节点,internalLockLoop去判断当前节点是否是最小节点

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
        {
            final long      startMillis = System.currentTimeMillis();
            final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
            final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
            int             retryCount = 0;
    
            String          ourPath = null;
            boolean         hasTheLock = false;
            boolean         isDone = false;
            while ( !isDone )
            {
                isDone = true;
    
                try
                {
                    ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                    hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
                }
                catch ( KeeperException.NoNodeException e )
                {
                    // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                    // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                    if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    {
                        isDone = false;
                    }
                    else
                    {
                        throw e;
                    }
                }
            }
    
            if ( hasTheLock )
            {
                return ourPath;
            }
    
            return null;
        }
    

    createsTheLock就是调用curator封装的api去创建临时有序节点

       public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
        {
            String ourPath;
            if ( lockNodeBytes != null )
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
            }
            else
            {
                ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
            }
            return ourPath;
        }
    

    internalLockLoop锁判断,内部就是driver.getsTheLock去判断是否是当前目录下最小节点,如果是的话,返回获取锁成功,否则的话对previousSequencePath进行监听,监听动作完成后再对等待时间进行重新判断

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        {
            boolean     haveTheLock = false;
            boolean     doDelete = false;
            try
            {
                if ( revocable.get() != null )
                {
                    client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
                }
    
                while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                {
                    List<String>        children = getSortedChildren();
                    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    
                    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    if ( predicateResults.getsTheLock() )
                    {
                        haveTheLock = true;
                    }
                    else
                    {
                        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                        synchronized(this)
                        {
                            try 
                            {
                                // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                                if ( millisToWait != null )
                                {
                                    millisToWait -= (System.currentTimeMillis() - startMillis);
                                    startMillis = System.currentTimeMillis();
                                    if ( millisToWait <= 0 )
                                    {
                                        doDelete = true;    // timed out - delete our node
                                        break;
                                    }
    
                                    wait(millisToWait);
                                }
                                else
                                {
                                    wait();
                                }
                            }
                            catch ( KeeperException.NoNodeException e ) 
                            {
                                // it has been deleted (i.e. lock released). Try to acquire again
                            }
                        }
                    }
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                doDelete = true;
                throw e;
            }
            finally
            {
                if ( doDelete )
                {
                    deleteOurPath(ourPath);
                }
            }
            return haveTheLock;
        }
    
    

    解锁

    release代码相对来说比较简单,就是先判断map里面是否存在当前线程的锁计数,不存在抛出异常,存在的话,进行原子减一操作,releaseLock内部就是删除节点操作,小于0的时候,从map里面移除

      public void release() throws Exception
        {
            /*
                Note on concurrency: a given lockData instance
                can be only acted on by a single thread so locking isn't necessary
             */
    
            Thread currentThread = Thread.currentThread();
            LockData lockData = threadData.get(currentThread);
            if ( lockData == null )
            {
                throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
            }
    
            int newLockCount = lockData.lockCount.decrementAndGet();
            if ( newLockCount > 0 )
            {
                return;
            }
            if ( newLockCount < 0 )
            {
                throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
            }
            try
            {
                internals.releaseLock(lockData.lockPath);
            }
            finally
            {
                threadData.remove(currentThread);
            }
        }
    

    后记

    分布式锁的实现目前主流比较常用的实现就是Redis和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习
    
    千里之行,积于跬步;万里之船,成于罗盘,共勉。
    
    千里之行,积于跬步;万里之船,成于罗盘,共勉。
  • 相关阅读:
    linux下的socket编程(3)--server端的简单示例
    高级I/O函数
    补充:memset透彻分析
    空间复杂度为0的数据交换
    排序算法一:直接插入排序
    [Github]watch和star的区别
    计算机网络总结(四)
    计算机网络总结(三)
    Java集合
    计算机网络总结(二)
  • 原文地址:https://www.cnblogs.com/Kelin-/p/9485954.html
Copyright © 2011-2022 走看看