zoukankan      html  css  js  c++  java
  • Redisson分布式锁源码分析

    分布式锁的特点

    • 互斥性
    • 锁超时
    • 可重入性
    • 支持阻塞和非阻塞
    • 性能好

    Redisson实现分布式锁示例

    public class DistributedLockerImpl {
    
        private RedissonClient redissonClient;
    
        //加锁
        public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
            //获取锁对象
            RLock lock = redissonClient.getLock(lockKey);
            try {
                //最常用的使用方法
                //waitTime:获取锁的最大等待时间
                //leaseTime:获取锁成功的持有时间,过期自动解锁
                //unit:时间单位
                return lock.tryLock(waitTime, leaseTime, unit);
            } catch (InterruptedException e) {
                log.error("tryLock error", e);
                Thread.currentThread().interrupt();
                return false;
            }
        }
            
        //解锁
        public void unlock(String lockKey) {
            RLock lock = redissonClient.getLock(lockKey);
            if(lock != null) {
                lock.unlock();
            }
        }
    }
    
    

    加锁源码分析

        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            //加锁核心代码
            Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            
            /**
            订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
    	基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件
    	一旦锁释放会发消息通知待等待的线程进行竞争.
            */
            current = System.currentTimeMillis();
            RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
    
            try {
                //如果获取锁的耗时超过最大等待时间,加锁失败
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            	//在最大等待时间内循环获取锁
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    //获取锁
                    ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
    				//判断时间
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
    
                    // waiting for message,等待解锁消息
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    				//更新等待时间
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
                }
            } finally {
                //取消订阅消息
                unsubscribe(subscribeFuture, threadId);
            }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    

    如果tryAcquire()获取锁成功,返回null,ttl为已存在锁的过期时间。

    如果tryAcquire()获取锁失败,判断最大等待时间是否大于加锁耗时时间,如果小于,直接返回false加锁失败。否则,客户端的线程id通过redis的channel订阅锁释放的事件,超过最大等待时间,则加锁失败,如果等到了锁的释放事件通知,则进入一个不断获取锁的循环,尝试加锁。

    加锁核心代码:
    tryAcquireAsync():

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //如果leaseTime!=-1,加锁成功直接返回,否则启动一个Watch Dog线程,定期延长持有锁时间
        if (leaseTime != -1) {
            //加锁
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
    		// 续锁逻辑
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    

    tryLockInnerAsync():

        <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    debug结果:
    image

    lua 脚本含义:

    第一段 if 判断语句,就是用 exists myLock 命令判断一下,如果要加锁的key 不存在,就使用 hincrby 命令设置一个 hash 结构,接着会执行 pexpire myLock 1800000 命令,设置 myLock 这个锁 key 的生存时间是 1800 秒。到此为止,加锁完成。

    如果此时有第二个客户端请求加锁,就会进入第二个if判断语句,判断myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,如果不是,客户端 2 会执行:return redis.call('pttl', KEYS[1]);返回了myLock持有锁的时间。

    可重入锁
    如果myLock 锁 key 的 hash 数据结构中,包含客户端的 ID,就会执行hincrby myLock 5f431de4-a523-4bd7-965b-32b83fee3897:366 1 ,加锁次数加1,此时存储结果如下:
    image
    redis的key为锁名称(所示myLock),hash结构key为客户端ID,value为客户端加锁的次数

    解锁源码分析

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //释放锁
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    	//取消watch dog机制
        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);
    
            if (e != null) {
                result.tryFailure(e);
                return;
            }
    
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
    
            result.trySuccess(null);
        });
    
        return result;
    }
    

    unlockInnerAsync()

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    

    lua脚本含义:

    判断锁是否存在,如果存在,将客户端ID对应的value值递减,直到为0后删除缓存,然后向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息

    Redisson方案缺点

    1. 如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。在这期间,假如 Master 宕机,主备切换,slave 变为了 Master。此时客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,失去控制。这是Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。
  • 相关阅读:
    Java IO包装流如何关闭?
    Validator关于js,jq赋值不触发验证解决办法
    4 统计量&抽样分布
    3 概率的基本概念&离散型随机变量
    2 数据的概括性度量
    1 数据 & 图表
    Python 闭包
    一个自动修正数据时间和补全缺失数据的MapReduce程序
    MapReduce原理
    Hadoop MapReduce常用输入输出格式
  • 原文地址:https://www.cnblogs.com/jiezai/p/15212146.html
Copyright © 2011-2022 走看看