zoukankan      html  css  js  c++  java
  • RedissonClient获取锁源码解析

    代码示例:

    public static boolean acquire(RedissonClient redisson, String lockKey, Long waitTime, Long leaseTime)  {
         // 实例化锁对象(此时未请求redis) RLock lock
    = redisson.getLock(lockKey); boolean lockResult; try { lock.lock();
           // 加锁 lockResult
    = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); }catch (Exception e){ lockResult = false; } if (lockResult){ log.info("======lock success <"+lockKey+">======"+Thread.currentThread().getName()); }else { log.error("======lock error <"+lockKey+">======"+Thread.currentThread().getName()); } //加锁成功 return lockResult; }

    流程图:

    源码分析:

    Redisson实例化

    /************************Redisson************************/
    
    //创建RedissonClient实现类对象
    public static RedissonClient create(Config config) {
       Redisson redisson = new Redisson(config);
       if (config.isRedissonReferenceEnabled()) {
           redisson.enableRedissonReferenceSupport();
       }
       return redisson;
    }
    
    //实例化 new Redisson(config);
    //方法被protected修饰, 只能通过静态方法create(Config config)创建
    protected Redisson(Config config) {
       this.config = config;
       //配置赋值
       Config configCopy = new Config(config);
       //连接管理器 连接redis有多种模式(集群 托管 单例 主从 哨兵), 所以需要根据配置创建一种连接方式
       connectionManager = ConfigSupport.createConnectionManager(configCopy);
       //执行器 依赖连接管理器
       commandExecutor = new CommandSyncService(connectionManager);
       //回收管理器
       evictionScheduler = new EvictionScheduler(commandExecutor);
       //编解码器
       codecProvider = config.getCodecProvider();
       //解释器
       resolverProvider = config.getResolverProvider();
    }

    1 锁实例化(只是实例化 未请求redis) RLock lock = redisson.getLock(lockKey)

    简述获取锁的过程

    /**************************Redisson***********************/
    @Override
    public RLock getLock(String name) {
      //commandExecutor命令执行器 
      //id redisson唯一ID(一般一组redis只注入一个redisson)
       return new RedissonLock(commandExecutor, name, id);
    }
    
    
    /**************************RedissonLock***********************/
    protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
       super(commandExecutor, name);
       this.commandExecutor = commandExecutor;
       this.id = id;
    }
    
    /**************************RedissonExpirable***********************/
    RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
       super(connectionManager, name);
    }
    
    
    /**************************RedissonObject***********************/
    public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
       this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
    }
    public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
       this.codec = codec;
       this.name = name;
       this.commandExecutor = commandExecutor;
    }

    2 请求rediss获取锁  lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)

    /************************RedissonLock************************/
    public boolean tryLock(long waitTime /*获取锁等待时间*/, long leaseTime /*锁过期时间*/, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        //当前线程ID 用户拼接key
        final long threadId = Thread.currentThread().getId();
        //尝试获取锁 如果ttl==null则获取到锁 ttl!=null则表示锁已存在,ttl为锁剩余时间(毫秒)
        //tryAcquire最终实现方法tryAcquireAsync在下面代码
        Long ttl = tryAcquire(leaseTime, unit);
        //获取锁成功
        if (ttl == null) {
            return true;
        }
        
        //time = time-上一次尝试获取锁之后所用时间 
        time -= (System.currentTimeMillis() - current);
        //这里有两种情况
        //1 若waitTime为负数(不失效缓存), 则返回false
        //2 若waitTime为正数 经过第一次尝试获取锁之后未获取成功 但已经超过等待时长 返回false
        if (time <= 0) {
            return false;
        }
        
        current = System.currentTimeMillis();
        //订阅消息
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //阻塞等待一段时间(time) 等待订阅结果
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            //尝试获取CountDownLatch锁失败时直接返回失败
            return false;
        }
    
        try {
            time -= (System.currentTimeMillis() - current);
            //超过waittime返回失败
            if (time <= 0) {
                return false;
            }
            //死循环 很简单 就是在等待时长内重复获取锁 直到获取成功或超时
            while (true) {
                long currentTime = System.currentTimeMillis();
                //尝试获取锁 tryAcquire最终实现方法tryAcquireAsync在下面代码
                ttl = tryAcquire(leaseTime, unit);
                // 过期时间为null 则获取成功
                if (ttl == null) {
                    return true;
                }
    
                time -= (System.currentTimeMillis() - currentTime);
                //依然判断超时
                if (time <= 0) {
                    return false;
                }
    
                currentTime = System.currentTimeMillis();
                //等待时间大于过期时间 那么就等已存在的过期之后再获取
                if (ttl >= 0 && ttl < time) {
                    //阻塞时长ttl
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } 
                //尝试获取锁
                else {
                    //阻塞时长time
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                //再次计算time>0 
                time -= (System.currentTimeMillis() - currentTime);
                //若是小于等于0则为超时 直接返回false 否则进入下一次循环( while (true) )
                if (time <= 0) {
                    return false;
                }
            }
        } finally {
            //最终取消订阅 订阅代码为 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId)
            unsubscribe(subscribeFuture, threadId);
        }
    }
    
    //尝试获取锁(同步)
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        //若过期时间为有限时间(leaseTime==-1为永不过期)
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //以下为永不过期
        //设置把永不过期时间改为30秒过期
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        //注册监听器
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
    
            //获取锁操作结束触发下面操作
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                //如果为获取到锁 不做任何事情
                if (!future.isSuccess()) {
                    return;
                }
                Long ttlRemaining = future.getNow();
                // 获取到了锁
                if (ttlRemaining == null) {
                    //定期对锁进行延时 达到永不过期目的
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
    
    
    //尝试获取锁(同步方式)
    //看下面代码之前需要先了解Redis的Hash的数据结构, 下面脚本使用的就是Hash
    //String数据结构为 Map<Key, Value>, 通常根据Key取值
    //Hash数据结构为Map<Key, Map<Field, Value>>, Field
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        //时间转换
        internalLockLeaseTime = unit.toMillis(leaseTime);
        //执行调用redis脚本 
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //为了方便理解对下面数组进行解释 KEYS[1]为Hash:设置的lockKey, ARGV[1]过期时间:leaseTime, ARGV[2]为Hash的Field:当前线程ID和当前redisson拼接
                  //如果key不存在 则设置当前key(KEYS[1]) field(ARGV[2]) value(1), 设置key(KEYS[1])过期时间ARGV[1] 返回空值(nil)
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //如果根据key和field能查询到值, 则value在原来基础上加1
                  //这里可能会有误解: 上面已经判断key已经存在, 因为这里是独占锁, 
                  //若根据此锁不属于当前线程(redis.call('hexists', KEYS[1], ARGV[2]) != 1), 则肯定被其他线程占有,获取锁失败
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //返回当前key的剩余时间
                  "return redis.call('pttl', KEYS[1]);",
                    //关键代码
                    Collections.<Object>singletonList(getName()) /*KEYS*/, internalLockLeaseTime /*ARGV[1]*/, getLockName(threadId)/*ARGV[2]*/ );
    }

    额外拓展:

    redisson使用大量的异步操作(基于netty),代码比较难读,下面针对订阅lockKey的代码画出大致类结构图 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId)

  • 相关阅读:
    Fibonacci数列--矩阵乘法优化
    没有上司的舞会--树形DP
    扩展欧几里德--解的个数
    洛谷 P1284 三角形牧场 题解(背包+海伦公式)
    2017-2018 ACM-ICPC Latin American Regional Programming Contest J
    求1-1e11内的素数个数(HDU 5901 Count primes )
    Educational Codeforces Round 96 (Rated for Div. 2) E. String Reversal 题解(思维+逆序对)
    HHKB Programming Contest 2020 D
    牛客练习赛71 数学考试 题解(dp)
    2019-2020 ICPC Asia Hong Kong Regional Contest J. Junior Mathematician 题解(数位dp)
  • 原文地址:https://www.cnblogs.com/xieyanke/p/12195916.html
Copyright © 2011-2022 走看看