zoukankan      html  css  js  c++  java
  • Redisson分布式锁以及其底层原理

    介绍与配置

    Redisson官方文档:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D

    Springboot 自动配置类: RedissonAutoConfiguration 

    pom配置:

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-pool2</artifactId>
    </dependency>
    <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson-spring-boot-starter</artifactId>
       <version>3.13.6</version>
    </dependency>

    如果什么都不配置的话,会默认使用单Redis节点模式,代码中直接就可以使用  RedissonClient 

    具体配置可参考官方文档:https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

    分布式锁测试

    @Slf4j
    @SpringBootTest(classes = DemoWebApplication.class)
    public class RedissonTest {
    
      @Resource
        private RedissonClient redissonClient;
    
      @Test
        public void redissonTest() throws InterruptedException {
            log.info("===redissonTest====start===============");
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    lockTest();
                }).start();
            }
            Thread.sleep(30000);
            log.info("===redissonTest====end===============");
        }
      private void tryLockTest() {
        // 见下文
      }
      private void lockTest() {
        // 见下文
      }
    }
     
      private void tryLockTest() {
            String threadName = Thread.currentThread().getName();
            log.info("===Thread=={}===start===", threadName);
            RLock lock = redisson.getLock("DistributedRedisLockTest");
    
            // 尝试加锁,最多等待10秒,上锁以后30秒自动解锁
            boolean lockFlag = false;
            try {
                // 尝试去加锁,10秒没获取到锁,则返回false
                // res = lock.tryLock(10, TimeUnit.SECONDS);
                // 尝试去加锁,10秒没获取到锁,则返回false,获取到则返回true,获取到锁后30秒自动释放
                // 当waitTime设置为0时,就相当于setNx,获取不到锁直接退出
                lockFlag = lock.tryLock(5, 1, TimeUnit.SECONDS);
                if (!lockFlag) {
                    log.info("===Thread=={}==res={}==没有获取到锁,退出===", threadName, lockFlag);
                    return;
                }
    
                log.info("===Thread=={}============getLock===", threadName);
                // 模拟业务逻辑
                Thread.sleep(2000);
            } catch (Exception e) {
                log.error("执行异常,e:{}", ExceptionUtils.getStackTrace(e));
            } finally {
                log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread());
                // 释放锁也可能出现异常,比如业务代码没执行完,锁就过期,此时进行释放会抛异常,加个当前线程是否持有所的判断
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
            log.info("===Thread=={}==lockFlag={}=end===", threadName, lockFlag);
        }
        private void lockTest() {
            String threadName = Thread.currentThread().getName();
            log.info("===Thread=={}===start===", threadName);
            RLock lock = redisson.getLock("DistributedRedisLockTest");
    
            // lock表示去加锁,加锁成功,没有返回值,继续执行下面代码;加锁失败,它会一直阻塞,直到锁被释放,再继续往下执行
            // lock.lock();
            // 1秒自动释放时间,但是后续执行unlock操作时会报错(自己只能解锁自己的,第一个线程释放之后执行到unlock方法,但是此时锁已经是第二个线程的了)
            lock.lock(1, TimeUnit.SECONDS);
    
            log.info("===Thread=={}============getLock===", threadName);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                log.error("执行异常,e:{}", ExceptionUtils.getStackTrace(e));
            } finally {
                log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread());
                // 释放锁也可能出现异常,比如业务代码没执行完,锁就过期,此时进行释放会抛异常,加个当前线程是否持有所的判断
                if (lock.isHeldByCurrentThread()) {
                    lock.unlock();
                }
            }
            log.info("===Thread=={}===end===", threadName);
        }

    原理

    本文中Redisson版本为 redisson-spring-boot-starter 3.13.6

    先看下接口方法:

    public interface RRLock extends Lock, RLockAsync{
        //----------------------Lock接口方法-----------------------
    
        /**
         * 加锁 锁的有效期默认30秒
         */
        void lock();
        /**
         * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
         */
        boolean tryLock();
        /**
         * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
         * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
         *
         * @param time 等待时间
         * @param unit 时间单位 小时、分、秒、毫秒等
         */
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        /**
         * 解锁
         */
        void unlock();
        /**
         * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
         * Thread.currentThread().interrupt(); 方法真正中断该线程
         */
        void lockInterruptibly();
    
        //----------------------RLock接口方法-----------------------
        /**
         * 加锁 上面是默认30秒这里可以手动设置锁的有效时间
         *
         * @param leaseTime 锁有效时间
         * @param unit      时间单位 小时、分、秒、毫秒等
         */
        void lock(long leaseTime, TimeUnit unit);
        /**
         * 这里比上面多一个参数,多添加一个锁的有效时间
         *
         * @param waitTime  等待时间
         * @param leaseTime 锁有效时间
         * @param unit      时间单位 小时、分、秒、毫秒等
         */
        boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
        /**
         * 检验该锁是否被线程使用,如果被使用返回True
         */
        boolean isLocked();
        /**
         * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
         * 这个比上面那个实用
         */
        boolean isHeldByCurrentThread();
        /**
         * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
         * @param leaseTime  锁有效时间
         * @param unit       时间单位 小时、分、秒、毫秒等
         */
        void lockInterruptibly(long leaseTime, TimeUnit unit);  
    }
    • tryLock方法
    @Override
        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
         // 1.尝试申请锁,返回还剩余的锁过期时间 Long ttl
    = tryAcquire(waitTime, leaseTime, unit, threadId); // 2.如果为空,表示申请锁成功, 返回true if (ttl == null) { return true; } // 3.申请锁的耗时如果大于等于最大等待时间,则申请锁失败 time -= System.currentTimeMillis() - current; if (time <= 0) {
           // 通过 promise.trySuccess 设置异步执行的结果为null
           // Promise从Uncompleted-->Completed ,通知 Future 异步执行已完成 acquireFailed(waitTime, unit, threadId);
    return false; } current = System.currentTimeMillis();
    /**
            * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
            * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
            * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
            * 当 this.await返回true,进入循环尝试获取锁
            */
         RFuture
    <RedissonLockEntry> subscribeFuture = subscribe(threadId); // await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
         if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false; } try {
           // 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败 time
    -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; }
           /**
               * 5.收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
               * 获取锁成功,则立马返回true,
               * 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回false结束循环
               */
    while (true) { long currentTime = System.currentTimeMillis();
              // 再次尝试申请锁 ttl
    = tryAcquire(waitTime, leaseTime, unit, threadId); // 成功获取锁则直接返回true结束循环 if (ttl == null) { return true; }           // 超过最大等待时间则返回false结束循环,获取锁失败 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } // 6.阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息): currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) {
                // 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); }
    else {
                // 则就在wait time 时间范围内等待可以通过信号量 subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); }           // 7.更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间) time
    -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(waitTime, unit, threadId); return false; } } } finally {
           // 8.无论是否获得锁,都要取消订阅解锁消息 unsubscribe(subscribeFuture, threadId); }
    }
        private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
         // 如果指定了失效时间,就按指定的失效时间执行,然后返回
    if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }
         // 如果没有指定失效时间(leaseTime=-1),则默认配置30秒 (getLockWatchdogTimeOut()=30) RFuture
    <Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 加锁完毕之后,启动看门狗线程,定时的延期失效时间(定时任务为 internalLockLeaseTime / 3 毫秒之后执行)
         ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
           if (e != null) {
                    return;
                }
                if (ttlRemaining == null) {
              // 启动看门狗任务 scheduleExpirationRenewal(threadId); } });
    return ttlRemainingFuture; }
     
        <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
         // 通过lua脚本访问Redis,保证操作的原子性, 以及达到批量操作的效果,提升性能
         
    // KEYS[1] :表示分布式锁的key
         // ARGV[1] :锁的租约时间(持有锁的有效时间),默认30s;
         // ARGV[2] :获取锁时set的唯一值 value,即UUID:threadId。
         return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // 如果缓存中的key不存在,则设置唯一标识和超时时间,初始化value=1
                  // 返回空值 nil ,表示获取锁成功

         "if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
      "return nil; " +
                  "end; " +
                   // 如果key已经存在,并且value也匹配(重入情况),表示是当前线程持有的锁,则执行 hincrby 命令,重入次数加1,并且设置失效时间
                   "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +   "redis.call('pexpire', KEYS[1], ARGV[1]); " +   "return nil; " + "end; " +
                   // 如果key已经存在,但是value不匹配,说明锁已经被其他线程持有,通过 pttl 命令获取锁的剩余存活时间并返回,至此获取锁失败 "return redis.call('pttl', KEYS[1]);"
    ,
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2] Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

    • unlock方法

       调用关系:unlock —> unlockAsync —> unlockInnerAsyncunlockInnerAsync是解锁的核心代码

      @Override
        public RFuture<Void> unlockAsync(long threadId) {
            RPromise<Void> result = new RedissonPromise<Void>();
            RFuture<Boolean> future = unlockInnerAsync(threadId);
    
            future.onComplete((opStatus, e) -> {
           // 释放锁后取消刷新锁失效时间的调度任务 cancelExpirationRenewal(threadId);
    if (e != null) { result.tryFailure(e); return; }        // 非锁的持有者释放锁时抛出异常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }   // 通过 EVAL 命令执行 Lua 脚本获取锁,保证了原子性
      // KEYS[1] :需要加锁的key,这里需要是字符串类型。
      // KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
      // ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
      // ARGV[2] :锁的超时时间,防止死锁
      // ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(UUID.randomUUID()) + “:” + threadId
      protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                   // 如果分布式锁存在,但是value不匹配,表示锁已经被其他线程占用,无权释放锁,那么直接返回空值(解铃还须系铃人)      
    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +   "return nil;" + "end; " +
                   // 如果value匹配,则就是当前线程占有分布式锁,那么将重入次数减1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                   // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只能更新失效时间,还不能删除 "if (counter > 0) then " +   "redis.call('pexpire', KEYS[1], ARGV[2]); " +   "return 0; " + "else " +
                   // 重入次数减1后的值如果为0,这时就可以删除这个KEY,并发布解锁消息,返回1   "redis.call('del', KEYS[1]); " +   "redis.call('publish', KEYS[2], ARGV[1]); " +   "return 1; " + "end; " + "return nil;"
    ,
              // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }

    注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。

    解锁消息通知

    之前加锁的时候源码里写过,如果没获取锁成功,就监听这个锁,监听它什么时候释放,所以解锁的时候,要发出这个消息通知,让其他想获取锁的客户端知道。

    public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
    
        public static final Long UNLOCK_MESSAGE = 0L;
        public static final Long READ_UNLOCK_MESSAGE = 1L;
    
        public LockPubSub(PublishSubscribeService service) {
            super(service);
        }
        
        @Override
        protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
            return new RedissonLockEntry(newPromise);
        }
    
        @Override
        protected void onMessage(RedissonLockEntry value, Long message) {
    
            /**
             * 判断是否是解锁消息
             */
            if (message.equals(UNLOCK_MESSAGE)) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                }
    
                /**
                 * 释放一个信号量,唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁
                 */
                value.getLatch().release();
            } else if (message.equals(READ_UNLOCK_MESSAGE)) {
                while (true) {
                    /**
                     * 如果还有其他Listeners回调,则也唤醒执行
                     */
                    Runnable runnableToExecute = value.getListeners().poll();
                    if (runnableToExecute == null) {
                        break;
                    }
                    runnableToExecute.run();
                }
    
                value.getLatch().release(value.getLatch().getQueueLength());
            }
        }
    }

    大体流程图:

    特点

    • 互斥性。在任意时刻,只有一个客户端能持有锁,也叫唯一性。

    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    • 防误删。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。(业务执行时间过长,超过锁失效时间,锁被释放,第二个线程获取锁,此时第一个线程执行到释放锁代码时,不能删除第二个线程的锁)

    • 看门狗机制。延长过期时间(没有设置过期时间的情况,leaseTime=-1,默认失效时间为30秒,启动看门狗线程,定时检查是否需要延长时间scheduleExpirationRenewal)。

    • 具有可用性、容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。

    • 可重入性。相同线程不需要在等待锁,而是可以直接进行相应操作
    • 锁种类多样。可重入锁、公平锁、联锁、红锁、读写锁
    • 可阻塞等待

    存在的问题

     分布式架构中的CAP理论,分布式系统只能同时满足两个: 一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)

    • Redisson分布式锁是AP模式,当锁存在的redis节点宕机,可能会被误判为锁失效,或者没有加锁。(Zookeeper实现的分布式锁,是CP理论)

     因为在工作中Redis都是集群部署的,所以要考虑集群节点挂掉的问题。给大家举个例子:

    1. A客户端请求主节点获取到了锁 
    2. 主节点挂掉了,但是还没把锁的信息同步给其他从节点
    3. 由于主节点挂了,这时候开始主从切换,从节点成为主节点继续工作,但是新的主节点上,没有A客户端的加锁信息 
    4. 这时候B客户端来加锁,因为目前是一个新的主节点,上面没有其他客户端加锁信息,所以B客户端获取锁成功
    5. 这时候就存在问题了,A和B两个客户端同时都持有锁,同时在执行代码,那么这时候分布式锁就失效了。

     这里大家会有疑问了,为啥官方给出一个分布式锁的实现,却不解决这个问题呢,因为发生这种情况的几率不大,而且解决这个问题的成本有点小高。

     

      -- 解决办法:

    1. tradoff,分布式锁的redis采用单机部署,分布式锁专用
    2. RedLock: RedLock算法思想,意思是不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。
    3. 如果对锁比较关注,一致性要求比较高,可以使用ZK实现的分布式锁

    与Zookeeper实现的分布式锁比较

     TODO

    总结

    • 如果考虑各种网络、宕机等原因,很多问题需要考虑,问题会变的复杂,其实分布式锁的应用场景不多,很多情况可以绕开分布式锁,使用其他方式解决,比如 队列,异步,响应式
    • 个人经验:分布式锁的场景,更多的应用是一个操作不能同时多处进行,不能短时间内重复执行,需要幂等操作等场景,比如:防止快速的重复提交,mq与定时任务双线更改状态,防止消息重复消费 等等。这些情况一般使用setNx即可解决
    • 所谓的减库存其实也用不到分布式锁

    参考:

    Redisson实现分布式锁(1)---原理

    Redisson实现分布式锁(2)—RedissonLock

    利用Redisson实现分布式锁及其底层原理解析

    分布式锁在面试中会被问到哪一些知识点?[视频]

    带你研究Redis分布式锁,源码走起

  • 相关阅读:
    Richardson成熟度模型:关于REST的不同风格
    领域驱动设计
    dubbo初探
    pom使用异常问题
    [转]解决BootStrap validator验证的图标错位问题
    万恶的360浏览器
    利用Barcode4j实现输出ean13条形码到文件、流的工具类
    使用hibernate中的hql进行分页设置setMaxResults(int a)出错,索引1越界,hql执行不了等等问题
    Mysql 5.6以上版本zip安装方法
    Hibernate5生成的映射文件导致findByExample无法正确查询到结果
  • 原文地址:https://www.cnblogs.com/dong320/p/13993553.html
Copyright © 2011-2022 走看看