zoukankan      html  css  js  c++  java
  • Redisson分布式锁学习总结:可重入锁 RedissonLock#lock 获取锁源码分析

    原文:Redisson分布式锁学习总结:可重入锁 RedissonLock#lock 获取锁源码分析

    一、RedissonLock#lock 源码分析

    1、根据锁key计算出 slot,一个slot对应的是redis集群的一个节点

    redisson 支持分布式锁的功能,基本都是基于 lua 脚本来完成的,因为分布式锁肯定是具有比较复杂的判断逻辑,而lua脚本可以保证复杂判断和复杂操作的原子性。

    redisson 的 RedissonLock 执行lua脚本,需要先找到当前锁key需要存放到哪个slot,即在集群中哪个节点进行操作,后续不同客户端或不同线程再使用这个锁key进行上锁,也需要到对应的节点的slot中进行加锁操作。

    执行lua脚本的源码:

    org.redisson.command.CommandAsyncService#evalWriteAsync(java.lang.String, org.redisson.client.codec.Codec, org.redisson.client.protocol.RedisCommand<T>, java.lang.String, java.util.List<java.lang.Object>, java.lang.Object...)
    
    
    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        // 根据锁key找到对应的redis节点
        NodeSource source = getNodeSource(key);
        return evalAsync(source, false, codec, evalCommandType, script, keys, params);
    }
    
    private NodeSource getNodeSource(String key) {
        // 计算锁key对应的slot
        int slot = connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }
    

    计算 slot 分主从模式和集群模式,我们一般生产环境都是使用集群模式。
    在这里插入图片描述

    public static final int MAX_SLOT = 16384;
    
    @Override
    public int calcSlot(String key) {
        if (key == null) {
            return 0;
        }
    
        int start = key.indexOf('{');
        if (start != -1) {
            int end = key.indexOf('}');
            key = key.substring(start+1, end);
        }
        // 使用 CRC16 算法来计算 slot,其中 MAX_SLOT 就是 16384,redis集群规定最多有 16384 个slot。
        int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
        log.debug("slot {} for {}", result, key);
        return result;
    }
    

    2、RedissonLock 之 lua 脚本加锁

    RedissonLock#tryLockInnerAsync
    
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
    
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    2.1、KEYS

    Collections.singletonList(getName())

    KEYS:["myLock"]

    2.2、ARGVS

    internalLockLeaseTime,getLockName(threadId)

    internalLockLeaseTime:其实就是 watchdog 的超时时间,默认是30000毫秒 Config#lockWatchdogTimeout。

    private long lockWatchdogTimeout = 30 * 1000;
    

    getLockName(threadId):客户端ID(UUID):线程ID(threadId)

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }
    

    ARGVS:[30000,"UUID:threadId"]

    2.3、lua 脚本分析

    1、分支一:不存在加锁记录,获取锁成功

    lua脚本:

    "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
    "end; " +
    

    分析:

    1. 利用 exists 命令判断 myLock 这个 key 是否存在

      exists myLock
      
    2. 如果不存在,则执行下面两个操作

      1. 执行一个map的操作,给指定key的值增加1

        hincrby myLock UUID:threadId
        

        执行后多了一个map数据结构:

        myLock:{
            "UUID:threadId":1
        }
        
      2. 给 myLock 设置过期时间为30000毫秒

        expire myLock 30000
        
    3. 最后返回nil,即null

    2、分支二:锁记录已存在,重复加锁

    lua脚本:

    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        "return nil; " +
    "end; " +
    

    分析:

    1. 判断之前加锁的是否为当前客户端当前线程

      hexists myLock UUID:threadId
      
    2. 如果存在,则将加锁次数增加1

      hincrby myLock UUID:threadId 1
      

      增加1后,map集合内容为:

      myLock:{
          "UUID:threadId":2
      }
      

      利用map这个数据结构,存放加锁的客户端线程信息,从而支持可重入锁。

    3. 重新刷新 myLock 的过期时间为30000毫秒

      expire myLock 30000
      

    3、分支三:获取锁失败,直接返回锁剩余过期时间

    lua脚本:

    "return redis.call('pttl', KEYS[1]);"
    

    分析:

    1. 利用 pttl 命令获取锁剩余毫秒数
      pttl myLock
      
    2. 返回步骤1获取的毫秒数

    3、watchdog 不断为锁续命

    因为我们是利用 lock() 方法获取锁的,没有指定多久后释放,但是 redisson 不可能真的不设置锁key的过期时间。

    因为要考虑到一个场景:一个客户端成功获取锁,但是没有设置多久释放,如果redisson 在redis实例中设置锁的时候也没有设置过期时间,如果这个时候客户端所在的服务器挂掉了,那么他就不会执行到unlock() 方法去释放锁了,那么这个时候就会导致死锁,其他任何的客户端都获取不到锁。

    所以 redisson 会有一个 watchdog 的角色,每隔10_000毫秒就会为锁续命,详细可看看下面截图:
    在这里插入图片描述

    再看看定时任务详细的设计:

    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            // 一开始就是null,直接放入 EXPIRATION_RENEWAL_MAP 中
            entry.addThreadId(threadId);
            // 调用定时任务
            renewExpiration();
        }
    }
    
    private void renewExpiration() {
        // 上面已经传入,不为空
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        // 开启定时任务,时间是 internalLockLeaseTime / 3 毫秒后执行
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 判断是否存在 ExpirationEntry,只要加锁了,肯定存在
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        // 循环调用
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }
    
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // 判断 myLock map 中是否存在当前客户端当前线程
                myLock:{
                    "UUID:threadId":1
                }
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        // 存在,刷新过期时间,30_000毫秒
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }
    

    4、死循环获取锁

    关于死循环获取锁,这里是抓大放小,没有深入研究里面比较细的点,只有自己大概的猜测。
    代码看下图:
    在这里插入图片描述

    如果获取锁失败,在进入死循环前,会订阅指定渠道:redisson_lock__channel:{myLock},然后进入死循环。

    在死循环里面,首先会先尝试再获取一遍锁,因为可能之前获取锁的客户端刚好释放锁了。如果获取失败,那么就进入等待状态,等待时间是获取锁失败时返回的锁key的ttl。

    订阅指定channel猜测:因为在客户端释放锁的时候,会往这个channel发送消息;因此可以利用此消息来提前让等待的线程被唤醒去尝试获取锁,因为此时锁已经被释放了。

    5、其他的加锁方式

    如果我们需要指定获取锁成功后持有锁的时长,可以执行下面方法,指定 leaseTime

    lock.lock(10, TimeUnit.SECONDS);
    

    如果指定了 leaseTime,watchdog就不会再启用了。

    如果不但需要指定持有锁的时长,还想避免锁获取失败时的死循环,可以同时指定 leaseTime 和 waitTime

    boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    

    如果指定了 waitTime,只会在 waitTime 时间内循环尝试获取锁,超过 waitTime 如果还是获取失败,直接返回false。

    今天,你学习了吗
  • 相关阅读:
    UVA 1513
    《ArcGIS Runtime SDK for Android开发笔记》——问题集:.geodatabase创建,创建时内容缺失问题总结
    《ArcGIS Runtime SDK for Android开发笔记》——问题集:使用TextSymbol做标注显示乱码
    《ArcGIS Runtime SDK for Android开发笔记》——(7)、示例代码arcgis-runtime-samples-android的使用
    《ArcGIS Runtime SDK for Android开发笔记》——(6)、基于Android Studio的ArcGIS Android工程结构解析
    《ArcGIS Runtime SDK for Android开发笔记》——(5)、基于Android Studio构建ArcGIS Android开发环境(离线部署)
    《ArcGIS Runtime SDK for Android开发笔记》——(4)、基于Android Studio构建ArcGIS Android开发环境
    《ArcGIS Runtime SDK for Android开发笔记》——(3)、ArcGIS Runtime SDK概述
    《ArcGIS Runtime SDK for Android开发笔记》——(2)、Android Studio基本配置与使用
    《ArcGIS Runtime SDK for Android开发笔记》——(1)、Android Studio下载与安装
  • 原文地址:https://www.cnblogs.com/Howinfun/p/15755551.html
Copyright © 2011-2022 走看看