zoukankan      html  css  js  c++  java
  • Zookeeper实战-分布式锁

    1. 简介

    我们在之前的博文中讲解了如何使用redis实现分布式锁,其实除了 redis 还有 zookeeper 也能实现分布式锁。

    废话不多说,直接上图。

    从整个流程中可以看出,zk实现分布式锁,主要是靠zk的临时顺序节点和watch机制实现的。

    2. quick start

    Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。

    curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

    2.1 引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>
    

    curator-recipes中已经依赖了zookeepercurator-framework jar,所以这里不用额外的依赖其他jar。

    2.2 测试代码

    测试代码其实很简单,只需要几行代码而已,初始化客户端,创建锁对象,加锁 和 释放锁。

    这里先把加锁的代码注释掉,试下不加锁的情况。

    package com.ldx.zookeeper.controller;
    
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import javax.annotation.PostConstruct;
    
    /**
     * 分布式锁demo
     *
     * @author ludangxin
     * @date 2021/9/4
     */
    @Slf4j
    @RestController
    @RequestMapping("lock")
    @RequiredArgsConstructor
    public class LockDemoController {
       /**
        * 库存数
        */
       private Integer stock = 30;
       /**
        * zk client 
        */
       private static CuratorFramework CLIENT;
    
       /**
        * 初始化连接信息
        */
       @PostConstruct
       private void init() {
          RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
          CLIENT = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(retryPolicy).build();
          CLIENT.start();
       }
    
       @GetMapping("buy")
       public String buy() {
          // 可重入锁
          InterProcessMutex mutexLock = new InterProcessMutex(CLIENT, "/lock");
          try {
             // 加锁
    //         mutexLock.acquire();
             if(this.stock > 0) {
                Thread.sleep(500);
                this.stock--;
             }
             log.info("剩余库存==={}", this.stock);
          } catch(Exception e) {
             log.error(e.getMessage());
             return "no";
          }
          finally {
             try {
                // 释放锁
    //            mutexLock.release();
             } catch(Exception e) {
                log.error(e.getMessage());
             }
          }
          return "ok";
       }
    }
    

    2.3 启动测试

    这里我们使用jemter进行模拟并发请求,当然我们这里只启动了一个server,主要是为了节约文章篇幅(启动多个server还得连接db...),能说明问题即可。

    同一时刻发送一百个请求。

    测试结果部分日志如下:

    很明显出现了超卖了现象,并且请求是无序的(请求是非公平的)。

    此时我们将注释的加锁代码打开,再进行测试。

    测试结果部分日志如下:

    很明显没有出现超卖的现象。

    通过zk 客户端工具查看创建的部分临时节点如下:

    3. 源码解析

    3.1 加锁逻辑

    我们再通过查看Curator加锁源码来验证下我们的加锁逻辑。

    首先我们查看InterProcessMutex::acquire()方法,并且我们通过注释可以得知该方法加的锁是可重入锁。

    /**
     * Acquire the mutex - blocking until it's available. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    

    查看internalLock方法如下。

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        // 获取当前线程
        Thread currentThread = Thread.currentThread();
        // 在map中查看当前线程有没有请求过
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null) {
            // 请求过 则 +1 , 实现了锁的重入逻辑
            lockData.lockCount.incrementAndGet();
            return true;
        }
        // 尝试获取锁
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null) {
            // 创建锁对象
            LockData newLockData = new LockData(currentThread, lockPath);
            // 添加到map中
            threadData.put(currentThread, newLockData);
            return true;
        }
    
        return false;
    }
    

    我们继续查看LockInternals::attemptLock()尝试获取锁逻辑如下。

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;
        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while(!isDone) {
            // 成功标识
            isDone = true;
    
            try {
                // 创建锁
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 判断是否加锁成功
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch( KeeperException.NoNodeException e ) {
                // 当StandardLockInternalsDriver 找不到锁定节点时,它会抛出会话过期等情况。因此,如果重试允许,则继续循环
                if( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
    
        if(hasTheLock) {
            return ourPath;
        }
    
        return null;
    }
    

    在这里先查看下创建锁的逻辑StandardLockInternalsDriver::createsTheLock(),如下。

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        // 判断有没有传znode data 我们这里为null
        if(lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
          // 创建Container父节点且创建临时的顺序节点
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    

    锁创建成功后我们再查看下程序是如何加锁的LockInternals::internalLockLoop()

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try {
            if(revocable.get() != null) {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
    	// 当客户端初始化好后 且 还没有获取到锁
            while((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
                // 获取所有的子节点 且 递增排序
                List<String>        children = getSortedChildren();
                // 获取当前节点 path
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1);
    	    // 获取当前锁
                // 1. 先判断当前节点是不是下标为0的节点,即是不是序列值最小的节点。
                // 2. 如果是则获取锁成功,返回成功标识。
                // 3. 如果不是则返回比它小的元素作为被监听的节点
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if(predicateResults.getsTheLock()) {
                    // 获取锁成功 返回成功标识
                    haveTheLock = true;
                } else {
                    // 索取锁失败,则获取比它小的上一个节点元素
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
                    synchronized(this) {
                        try {
                            // 监听比它小的上一个节点元素
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            // 如果设置了超时,则继续判断是否超时
                            if(millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if(millisToWait <= 0) {
                                    doDelete = true;    
                                    break;
                                }
    			    // 没有超时则 等待
                                wait(millisToWait);
                            } else {
    			    // 没有超时则 等待
                                wait();
                            }
                        } catch(KeeperException.NoNodeException e) {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        } catch(Exception e) {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        } finally {
            // 报错即删除该节点
            if(doDelete) {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
    

    最后 我们再看下上段代码中提到的很关键的方法driver.getsTheLock() 即 StandardLockInternalsDriver::getsTheLock()

    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        // 获取当前节点的下标 
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        // 这里的maxLeases == 1,即当前节点的下标是不是0
        boolean         getsTheLock = ourIndex < maxLeases;
        // 如果当前节点的下标为0,则不返回被监听的节点(因为自己已经是最小的节点了),如果不是则返回比自己小的节点作为被监听的节点。
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        // 构造返回结果
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    

    3.2 小节

    其实加锁的源码还是比较清晰和易懂的,我们在这里再总结下。

    1. 执行InterProcessMutex::acquire()加锁方法。
    2. InterProcessMutex::internalLock()判断当前线程是加过锁,如果加过则加锁次数+1实现锁的重入,如果没有加过锁,则调用LockInternals::attemptLock()尝试获取锁。
    3. LockInternals::attemptLock()首先创建Container父节点再创建临时的顺序节点,然后执行加锁方法LockInternals::internalLockLoop()
    4. LockInternals::internalLockLoop()
      1. 先获取当前Container下的所有顺序子节点并且按照从小到大排序。
      2. 调用StandardLockInternalsDriver::getsTheLock()方法加锁,先判断当前节点是不是最小的顺序节点,如果是则加锁成功,如果不是则返回上一个比他小的节点,作为被监听的节点。
      3. 上一步加锁成功则返回true,如果失败则执行监听逻辑。

    3.3 释放锁逻辑

    @Override
    public void release() throws Exception {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */
        // 获取当前线程
        Thread currentThread = Thread.currentThread();
        // 查看当前线程有没有锁
        LockData lockData = threadData.get(currentThread);
        if(lockData == null) {
            // 没有锁 还释放,报错
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        // 有锁则 锁次数 -1
        int newLockCount = lockData.lockCount.decrementAndGet();
        // 如果锁的次数还大于0,说明还不能释放锁,因为重入的方法还未执行完
        if (newLockCount > 0) {
            return;
        }
        if (newLockCount < 0) {
            // 锁的次数小于0,报错
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            // 删除节点
            internals.releaseLock(lockData.lockPath);
        }
        finally {
            // 从当前的map中移除
            threadData.remove(currentThread);
        }
    }
    
    final void releaseLock(String lockPath) throws Exception{
        client.removeWatchers();
        revocable.set(null);
        deleteOurPath(lockPath);
    }
    

    4. redis 和 zookeeper

    Zookeeper采用临时节点和事件监听机制可以实现分布式锁,Redis主要是通过setnx命令实现分布式锁。
    Redis需要不断的去尝试获取锁,比较消耗性能,Zookeeper是可以通过对锁的监听,自动获取到锁,所以性能开销较小。
    另外如果获取锁的jvm出现bug或者挂了,那么只能redis过期删除key或者超时删除key,Zookeeper则不存在这种情况,连接断开节点则会自动删除,这样会即时释放锁。

    这样一听感觉zk的优势还是很大的。

    但是要考虑一个情况在锁并发不高的情况下 zk没有问题 如果在并发很高的情况下 zk的数据同步 可能造成锁时延较长,在选举过程中需要接受一段时间zk不可用(因为ZK 是 CP 而 redis集群是AP)。

    所以说没有哪个技术是适用于任何场景的,具体用哪个技术,还是要结合当前的技术架构和业务场景做选型和取舍。

  • 相关阅读:
    IDEA与Eclipse
    解释器模式
    设计模式(十一)—— 策略模式
    设计模式(六)—— 装饰模式
    Java注解
    Spring源码阅读(二)—— AOP
    业务开发(八)—— Maven
    高性能MySQL笔记
    Java源码阅读(六)—— ReentrantLock
    业务开发(六)—— MyBatis框架
  • 原文地址:https://www.cnblogs.com/ludangxin/p/15236163.html
Copyright © 2011-2022 走看看