zoukankan      html  css  js  c++  java
  • Redisson分布式锁源码分析

    分布式锁的特点

    • 互斥性
    • 锁超时
    • 可重入性
    • 支持阻塞和非阻塞
    • 性能好

    Redisson实现分布式锁示例

    public class DistributedLockerImpl {
    
        private RedissonClient redissonClient;
    
        //加锁
        public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
            //获取锁对象
            RLock lock = redissonClient.getLock(lockKey);
            try {
                //最常用的使用方法
                //waitTime:获取锁的最大等待时间
                //leaseTime:获取锁成功的持有时间,过期自动解锁
                //unit:时间单位
                return lock.tryLock(waitTime, leaseTime, unit);
            } catch (InterruptedException e) {
                log.error("tryLock error", e);
                Thread.currentThread().interrupt();
                return false;
            }
        }
            
        //解锁
        public void unlock(String lockKey) {
            RLock lock = redissonClient.getLock(lockKey);
            if(lock != null) {
                lock.unlock();
            }
        }
    }
    
    

    加锁源码分析

        public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            //加锁核心代码
            Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            // 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            
            /**
            订阅锁释放事件,并通过 await 方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
    	基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件
    	一旦锁释放会发消息通知待等待的线程进行竞争.
            */
            current = System.currentTimeMillis();
            RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
    
            try {
                //如果获取锁的耗时超过最大等待时间,加锁失败
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            	//在最大等待时间内循环获取锁
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    //获取锁
                    ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
    				//判断时间
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
    
                    // waiting for message,等待解锁消息
                    currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
    				//更新等待时间
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(waitTime, unit, threadId);
                        return false;
                    }
                }
            } finally {
                //取消订阅消息
                unsubscribe(subscribeFuture, threadId);
            }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    

    如果tryAcquire()获取锁成功,返回null,ttl为已存在锁的过期时间。

    如果tryAcquire()获取锁失败,判断最大等待时间是否大于加锁耗时时间,如果小于,直接返回false加锁失败。否则,客户端的线程id通过redis的channel订阅锁释放的事件,超过最大等待时间,则加锁失败,如果等到了锁的释放事件通知,则进入一个不断获取锁的循环,尝试加锁。

    加锁核心代码:
    tryAcquireAsync():

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //如果leaseTime!=-1,加锁成功直接返回,否则启动一个Watch Dog线程,定期延长持有锁时间
        if (leaseTime != -1) {
            //加锁
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }
    		// 续锁逻辑
            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    

    tryLockInnerAsync():

        <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                            "end; " +
                            "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    debug结果:
    image

    lua 脚本含义:

    第一段 if 判断语句,就是用 exists myLock 命令判断一下,如果要加锁的key 不存在,就使用 hincrby 命令设置一个 hash 结构,接着会执行 pexpire myLock 1800000 命令,设置 myLock 这个锁 key 的生存时间是 1800 秒。到此为止,加锁完成。

    如果此时有第二个客户端请求加锁,就会进入第二个if判断语句,判断myLock 锁 key 的 hash 数据结构中,是否包含客户端 2 的 ID,如果不是,客户端 2 会执行:return redis.call('pttl', KEYS[1]);返回了myLock持有锁的时间。

    可重入锁
    如果myLock 锁 key 的 hash 数据结构中,包含客户端的 ID,就会执行hincrby myLock 5f431de4-a523-4bd7-965b-32b83fee3897:366 1 ,加锁次数加1,此时存储结果如下:
    image
    redis的key为锁名称(所示myLock),hash结构key为客户端ID,value为客户端加锁的次数

    解锁源码分析

    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //释放锁
        RFuture<Boolean> future = unlockInnerAsync(threadId);
    	//取消watch dog机制
        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);
    
            if (e != null) {
                result.tryFailure(e);
                return;
            }
    
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
    
            result.trySuccess(null);
        });
    
        return result;
    }
    

    unlockInnerAsync()

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }
    

    lua脚本含义:

    判断锁是否存在,如果存在,将客户端ID对应的value值递减,直到为0后删除缓存,然后向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息

    Redisson方案缺点

    1. 如果你对某个 Redis Master 实例完成了加锁,此时 Master 会异步复制给其对应的 slave 实例。在这期间,假如 Master 宕机,主备切换,slave 变为了 Master。此时客户端 2 来尝试加锁的时候,在新的 Master 上完成了加锁,而客户端 1 也以为自己成功加了锁,此时就会导致多个客户端对一个分布式锁完成了加锁,失去控制。这是Redis Master-Slave 架构的主从异步复制导致的 Redis 分布式锁的最大缺陷(在 Redis Master 实例宕机的时候,可能导致多个客户端同时完成加锁)。
  • 相关阅读:
    centos 安装 TortoiseSVN svn 客户端
    linux 定时任务 日志记录
    centos6.5 安装PHP7.0支持nginx
    linux root 用户 定时任务添加
    composer 一些使用说明
    laravel cookie写入
    laravel composer 安装指定版本以及基本的配置
    mysql 删除重复记录语句
    linux php redis 扩展安装
    linux php 安装 memcache 扩展
  • 原文地址:https://www.cnblogs.com/jiezai/p/15212146.html
Copyright © 2011-2022 走看看