zoukankan      html  css  js  c++  java
  • 【分布式锁】04-使用Redisson实现ReadWriteLock原理

    前言

    关于读写锁,大家应该都了解JDK中的ReadWriteLock, 当然Redisson也有读写锁的实现。

    所谓读写锁,就是多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的

    Redisson中使用RedissonReadWriteLock来实现读写锁,它是RReadWriteLock的子类,具体实现读写锁的类分别是:RedissonReadLockRedissonWriteLock

    Redisson读写锁使用例子

    还是从官方文档中找的使用案例:

    RReadWriteLock rwlock = redisson.getReadWriteLock("tryLock");

    RLock lock = rwlock.readLock();
    // or
    RLock lock = rwlock.writeLock();

    // traditional lock method
    lock.lock();

    // or acquire lock and automatically unlock it after 10 seconds
    lock.lock(10, TimeUnit.SECONDS);

    // or wait for lock aquisition up to 100 seconds 
    // and automatically unlock it after 10 seconds
    boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    if (res) {
       try {
         ...
       } finally {
           lock.unlock();
       }
    }

    Redisson加读锁逻辑原理

    public class RedissonReadLock extends RedissonLock implements RLock {
        @Override
        <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);

            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                    "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                    "if (mode == false) then " +
                                      "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                      "redis.call('set', KEYS[2] .. ':1', 1); " +
                                      "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                      "return nil; " +
                                    "end; " +
                                    "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                                      "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                      "local key = KEYS[2] .. ':' .. ind;" +
                                      "redis.call('set', key, 1); " +
                                      "redis.call('pexpire', key, ARGV[1]); " +
                                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                      "return nil; " +
                                    "end;" +
                                    "return redis.call('pttl', KEYS[1]);",
                            Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                            internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
        }
    }

    客户端A(UUID_01:threadId_01)来加读锁

    注:
    以下文章中客户端A用:UUID_01:threadId_01标识
    客户端B用:UUID_02:threadId_02标识

    KEYS:

    • KEYS1: getName() = tryLock
    • KEYS[2]: getReadWriteTimeoutNamePrefix(threadId) = {anyLock}:UUID_01:threadId_01:rwlock_timeout

    ARGV:

    • ARGV1: internalLockLeaseTime = 30000毫秒
    • ARGV[2]: getLockName(threadId) = UUID_01:threadId_01
    • ARGV[3]: getWriteLockName(threadId) = UUID_01:threadId_01:write

    接着对代码中lua脚本一行行解读:

    1. hget anyLock mode 第一次加锁时是空的
    2. mode = false,进入if逻辑
    3. hset anyLock UUID_01:threadId_01 1
      anyLock是hash结构,设置hash的key、value
    4. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
      设置一个string类型的key value数据
    5. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 30000
      设置key value的过期时间
    6. pexpire anyLock 30000
      设置anyLock的过期时间

    此时redis中存在的数据结构为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
    客户端A 第二次来加读锁

    继续分析,客户端A已经加过读锁,此时如果继续加读锁会怎样处理呢?

    1. hget anyLock mode 此时mode=read,会进入第二个if判断
    2. hincrby anyLock UUID_01:threadId_01 1 此时hash中的value会加1,变成2
    3. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1
      ind 为hincrby结果,hincrby返回是2
    4. pexpire anyLock 30000
    5. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 30000

    此时redis中存在的数据结构为:

    anyLock: {
      “mode”: “read”,
      “UUID_01:threadId_01”: 2
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
    {anyLock}:UUID_01:threadId_01:rwlock_timeout:2  1

    客户端B (UUID_02:threadId_02)第一次来加读锁

    基本步骤和上面一直,加锁后redis中数据为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 2,
      "UUID_02:threadId_02": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
    {anyLock}:UUID_01:threadId_01:rwlock_timeout:2  1
    {anyLock}:UUID_02:threadId_02:rwlock_timeout:1  1

    这里需要注意一下:
    为哈希表 key 中的域 field 的值加上增量 increment,如果 key 不存在,一个新的哈希表被创建并执行 HINCRBY 命令。

    Redisson加写锁逻辑原理

    Redisson中由RedissonWriteLock 来实现写锁,我们看下写锁的核心逻辑:

    public class RedissonWriteLock extends RedissonLock implements RLock {
        @Override
        <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);

            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                "if (mode == false) then " +
                                      "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                  "if (mode == 'write') then " +
                                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                          "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                          "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                          "return nil; " +
                                      "end; " +
                                    "end;" +
                                    "return redis.call('pttl', KEYS[1]);",
                            Arrays.<Object>asList(getName()), 
                            internalLockLeaseTime, getLockName(threadId));
        }
    }

    还是像上面一样,一行行来分析每句lua脚本执行语义。

    客户端A先加读写、再加写锁

    KEYS和ARGV参数:

    • KEYS1 = anyLock
    • ARGV1 = 30000
    • ARGV[2] = UUID_01:threadId_01:write
    1. hget anyLock mode,此时没人加锁,mode=false
    2. hset anyLock mode write
    3. hset anyLock UUID_01:threadId_01:write 1
    4. pexpire anyLock 30000

    此时redis中数据格式为:

    anyLock: {
        "mode": "write",
        "UUID_01:threadId_01:write": 1
    }

    此时再次来加写锁,直接到另一个if语句中:

    1. hexists anyLock UUID_01:threadId_01:write
    2. hincrby anyLock UUID_01:threadId_01:write 1
    3. pexpire anyLock pttl + 30000

    此时redis中数据格式为:

    anyLock: {
        "mode": "write",
        "UUID_01:threadId_01:write": 2
    }

    客户端A和客户端B,先后加读锁,客户端C来加写锁

    读锁加完后,此时redis数据格式为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 1,
      "UUID_02:threadId_02": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1
    {anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

    客户端C参数为:

    • KEYS1 = anyLock
    • ARGV1 = 30000
    • ARGV[2] = UUID_03:threadId_03:write

    hget anyLock mode,mode = read,已经有人加了读锁,不是写锁,此时会直接执行:pttl
    anyLock,返回一个anyLock的剩余生存时间

    1. hget anyLock mode,mode = read,已经有人加了读锁,不是写锁,所以if语句不会成立
    2. pttl anyLock,返回一个anyLock的剩余生存时间

    客户端C加锁失败,就会不断的尝试重试去加锁

    客户端A先加写锁、客户端B接着加读锁

    加完写锁后此时Redis数据格式为:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01:write": 1
    }

    客户端B执行读锁逻辑参数为:

    • KEYS1 = anyLock
    • KEYS[2] = {anyLock}:UUID_02:threadId_02:rwlock_timeout
    • ARGV1 = 30000毫秒
    • ARGV[2] = UUID_02:threadId_02
    • ARGV[3] = UUID_02:threadId_02:write

    接着看下加锁逻辑:

    image.pngimage.png

    如上图,客户端B加读锁会走到红框中的if逻辑:

    1. hget anyLock mode,mode = write
      客户端A已经加了一个写锁
    2. hexists anyLock UUID_02:threadId_02:write,存在的话,如果客户端B自己之前加过写锁的话,此时才能进入这个分支
    3. 返回pttl anyLock,导致加锁失败

    客户端A先加写锁、客户端A接着加读锁

    还是接着上面的逻辑,继续分析:

    1. hget anyLock mode,mode = write
      客户端A已经加了一个写锁
    2. hexists anyLock UUID_01:threadId_01:write,此时存在这个key,所以可以进入if分支
    3. hincrby anyLock UUID_01:threadId_01 1,也就是说此时,加了一个读锁
    4. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1,
    5. pexpire anyLock 30000
    6. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 30000

    此时redis中数据格式为:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01:write": 1,
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

    客户端A先加读锁、客户端A接着加写锁

    客户端A加读锁后,redis中数据结构为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1

    此时客户端A再来加写锁,逻辑如下:

    image.pngimage.png

    此时客户端A先加的读锁,mode=read,所以再次加写锁是不能成功的

    如果是同一个客户端同一个线程,先加了一次写锁,然后加读锁,是可以加成功的,默认是在同一个线程写锁的期间,可以多次加读锁

    而同一个客户端同一个线程,先加了一次读锁,是不允许再被加写锁的

    总结

    显然还有写锁与写锁互斥的逻辑就不分析了,通过上面一些场景的分析,我们可以知道:

    • 读锁与读锁非互斥
    • 读锁与写锁互斥
    • 写锁与写锁互斥
    • 读读、写写 同个客户端同个线程都可重入
    • 先写锁再加读锁可重入
    • 先读锁再写锁不可重入

    Redisson读写锁释放原理

    Redission 读锁释放原理

    不同客户端加了读锁 / 同一个客户端+线程多次可重入加了读锁

    例如客户端A先加读锁,然后再次加读锁
    最后客户端B来加读锁

    此时Redis中数据格式为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 2,
      "UUID_02:threadId_02": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1        1
    {anyLock}:UUID_01:threadId_01:rwlock_timeout:2        1
    {anyLock}:UUID_02:threadId_02:rwlock_timeout:1        1

    接着我们看下释放锁的核心代码:

    public class RedissonReadLock extends RedissonLock implements RLock {
        @Override
        protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
            String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                    "if (mode == false) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
                    "if (lockExists == 0) then " +
                        "return nil;" +
                    "end; " +

                    "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
                    "if (counter == 0) then " +
                        "redis.call('hdel', KEYS[1], ARGV[2]); " + 
                    "end;" +
                    "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

                    "if (redis.call('hlen', KEYS[1]) > 1) then " +
                        "local maxRemainTime = -3; " + 
                        "local keys = redis.call('hkeys', KEYS[1]); " + 
                        "for n, key in ipairs(keys) do " + 
                            "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                            "if type(counter) == 'number' then " + 
                                "for i=counter, 1, -1 do " + 
                                    "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                                    "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                                "end; " + 
                            "end; " + 
                        "end; " +

                        "if maxRemainTime > 0 then " +
                            "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                            "return 0; " +
                        "end;" + 

                        "if mode == 'write' then " + 
                            "return 0;" + 
                        "end; " +
                    "end; " +

                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; ",
                    Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
                    LockPubSub.unlockMessage, getLockName(threadId));
        }
    }

    客户端A来释放锁:
    对应的KEYS和ARGV参数为:

    • KEYS1 = anyLock

    • KEYS[2] = redisson_rwlock:{anyLock}

    • KEYS[3] = {anyLock}:UUID_01:threadId_01:rwlock_timeout

    • KEYS[4] = {anyLock}

    • ARGV1 = 0

    • ARGV[2] = UUID_01:threadId_01

    接下来开始执行操作:

    1. hget anyLock mode,mode = read
    2. hexists anyLock UUID_01:threadId_01,肯定是存在的,因为这个客户端A加过读锁
    3. hincrby anyLock UUID_01:threadId_01 -1,将这个客户端对应的加锁次数递减1,现在就是变成1,counter = 1
    4. del {anyLock}:UUID_01:threadId_01:rwlock_timeout:2,删除了一个timeout key

    此时Redis中的数据结构为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 1,
      "UUID_02:threadId_02": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1
    {anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

    此时继续往下,具体逻辑如图:

    image.pngimage.png

    1. hlen anyLock > 1,就是hash里面的元素超过1个
    2. pttl {anyLock}:UUID_01:threadId_01:rwlock_timeout:1,此时获取那个timeout key的剩余生存时间还有多少毫秒,比如说此时这个key的剩余生存时间是20000毫秒

    这个for循环的含义是获取到了所有的timeout key的最大的一个剩余生存时间,假设最大的剩余生存时间是25000毫秒

    客户端A继续来释放锁:

    此时客户端A执行流程还会和上面一直,执行完成后Redis中数据结构为:

    anyLock: {
      "mode": "read",
      "UUID_02:threadId_02": 1
    }

    {anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

    因为这里会走counter == 0的逻辑,所以会执行"redis.call('hdel', KEYS[1], ARGV[2]); "

    客户端B继续来释放锁:

    客户端B流程也和上面一直,执行完后就会删除anyLock这个key

    同一个客户端/线程先加写锁再加读锁

    上面已经分析过这种情形,操作过后Redis中数据结构为:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01:write": 1,
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

    此时客户端A来释放读锁:

    1. hincrby anyLock UUID_01:threadId_01 -1,将这个客户端对应的加锁次数递减1,现在就是变成1,counter = 0
    2. hdel anyLock UUID_01:threadId_01,此时就是从hash数据结构中删除客户端A这个加锁的记录
    3. del {anyLock}:UUID_01:threadId_01:rwlock_timeout:1,删除了一个timeout key

    此时Redis中数据变成:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01:write": 1
    }

    Redisson写锁释放原理

    先看下写锁释放的核心逻辑:

    public class RedissonWriteLock extends RedissonLock implements RLock {
        @Override
        protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                    "if (mode == false) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                    "if (mode == 'write') then " +
                        "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                        "if (lockExists == 0) then " +
                            "return nil;" +
                        "else " +
                            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                            "if (counter > 0) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                "return 0; " +
                            "else " +
                                "redis.call('hdel', KEYS[1], ARGV[3]); " +
                                "if (redis.call('hlen', KEYS[1]) == 1) then " +
                                    "redis.call('del', KEYS[1]); " +
                                    "redis.call('publish', KEYS[2], ARGV[1]); " + 
                                "else " +
                                    // has unlocked read-locks
                                    "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                                "end; " +
                                "return 1; "+
                            "end; " +
                        "end; " +
                    "end; "
                    + "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), 
            LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
        }
    }
    同一个客户端多次可重入加写锁 / 同一个客户端先加写锁再加读锁

    客户端A加两次写锁释放

    此时Redis中数据为:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01:write": 2,
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

    客户端A来释放锁KEYS和ARGV参数:

    • KEYS1 = anyLock

    • KEYS[2] = redisson_rwlock:{anyLock}

    • ARGV1 = 0

    • ARGV[2] = 30000

    • ARGV[3] = UUID_01:threadId_01:write

    直接分析lua代码:

    1. 上面mode=write,后面使用hincrby进行-1操作,此时count=1
    2. 如果count>0,此时使用pexpire然后返回0
    3. 此时客户端A再来释放写锁,count=0
    4. hdel anyLock UUID_01:threadId_01:write

    此时Redis中数据:

    anyLock: {
      "mode": "write",
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

    后续还会接着判断,如果count=0,代表写锁都已经释放完了,此时hlen如果>1,代表加的还有读锁,所以接着执行:hset anyLock mode read, 将写锁转换为读锁

    最终Redis数据为:

    anyLock: {
      "mode": "read",
      "UUID_01:threadId_01": 1
    }

    {anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

    总结

    Redisson陆续也更新了好几篇了,疫情期间宅在家里一直学习Redisson相关内容,这篇文章写了2天,从早到晚。

    读写锁这块内容真的很多,本篇篇幅很长,如果学习本篇文章最好跟着源码一起读,后续还会继续更新Redisson相关内容,如有不正确的地方,欢迎指正!

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

    22.jpg

  • 相关阅读:
    面向对象property属性、静态方法和类方法
    【枚举】枚举简单使用
    【Mac】Mac快捷键与Mac下的Idea快捷键
    @PostConstruct注解
    【通过ssh oracle11g安装】centos6静默安装oracle11g
    【Zookeeper02】ZK的作用以及使用
    【linux杂记】Ubuntu查看端口使用情况
    【数据结构--二叉树】Java递归实现二叉树遍历
    【python3两小时根本不够】入门笔记04:线程+Lock安全同步
    【python3两小时快速入门】入门笔记03:简单爬虫+多线程爬虫
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12543108.html
Copyright © 2011-2022 走看看