zoukankan      html  css  js  c++  java
  • Java使用Redisson分布式锁实现原理

    本篇文章摘自:https://www.jb51.net/article/149353.htm

    由于时间有限,暂未验证 仅先做记录。有大家注意下哈(会尽快抽时间进行验证)

    1. 基本用法

    添加依赖

    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.8.2</version>
    </dependency>
    Config config = new Config();
    config.useClusterServers()
      .setScanInterval(2000) // cluster state scan interval in milliseconds
      .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
      .addNodeAddress("redis://127.0.0.1:7002");
     
    RedissonClient redisson = Redisson.create(config);
     
    RLock lock = redisson.getLock("anyLock");
     
    lock.lock();
     
    try {
      ...
    } finally {
      lock.unlock();
    }

    针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的

    Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法

    更多请参见https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers

    2. 加锁

     可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法

    由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

    首先,看一下evalWriteAsync方法的定义

    由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键
    
    首先,看一下evalWriteAsync方法的定义

    最后两个参数分别是keys和params

    实际调用是这样的:

    单独将调用的那一段摘出来看

    commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
             "if (redis.call('exists', KEYS[1]) == 0) then " +
               "redis.call('hset', KEYS[1], ARGV[2], 1); " +
               "redis.call('pexpire', KEYS[1], ARGV[1]); " +
               "return nil; " +
             "end; " +
             "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
               "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
               "redis.call('pexpire', KEYS[1], ARGV[1]); " +
               "return nil; " +
             "end; " +
             "return redis.call('pttl', KEYS[1]);",
              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

    结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

    假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

    那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

    因此,这段脚本的意思是

      1、判断有没有一个叫“abc”的key

      2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

      3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

      4、返回“abc”的生存时间(毫秒)

    这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

    用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

    所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

    3. 解锁

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
          "end;" +
          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
          "end; " +
          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
          "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
          "else " +
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; "+
          "end; " +
          "return nil;",
          Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
     
    }

    我们还是假设name=abc,假设线程ID是Thread-1

    同理,我们可以知道

    KEYS[1]是getName(),即KEYS[1]=abc

    KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

    ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

    ARGV[2]是生存时间

    ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

    因此,上面脚本的意思是:

      1、判断是否存在一个叫“abc”的key

      2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1

      3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

      4、若字段不存在,返回空,若字段存在,则字段值减1

      5、若减完以后,字段值仍大于0,则返回0

      6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;

    可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

    4. 等待

    以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

    再回到前面获取锁的位置

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
      long threadId = Thread.currentThread().getId();
      Long ttl = tryAcquire(leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        return;
      }
     
      //  订阅
      RFuture<RedissonLockEntry> future = subscribe(threadId);
      commandExecutor.syncSubscription(future);
     
      try {
        while (true) {
          ttl = tryAcquire(leaseTime, unit, threadId);
          // lock acquired
          if (ttl == null) {
            break;
          }
     
          // waiting for message
          if (ttl >= 0) {
            getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
          } else {
            getEntry(threadId).getLatch().acquire();
          }
        }
      } finally {
        unsubscribe(future, threadId);
      }
    //    get(lockAsync(leaseTime, unit));
    }
     
     
    protected static final LockPubSub PUBSUB = new LockPubSub();
     
    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
      return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
    }
     
    protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
      PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
    }

    这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源

    当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

    5. 小结

     

    6. 其它相关

    基于Redis的分布式锁的简单实现

    @感谢原文作者的分享:https://www.jb51.net/article/149353.htm

  • 相关阅读:
    怎样判断数组类型
    VS2017不能生成Database Unit Test项目
    Windows Server 远程桌面报错:No Remote Desktop License Servers Available
    TFS2018 获取所有Build变量及变量值
    C# 读写bat文件
    【PowerShell 学习系列】-- 删除Win10自带应用
    Win10
    【TFS】TFS2015链接TFS出现TF31002/TF400324问题解决方案
    【SQL Server 学习系列】-- 获取字符串中出现某字符的次数及字符某次出现的下标
    【.Net 学习系列】-- 利用Aspose转换Excel为PDF文件
  • 原文地址:https://www.cnblogs.com/nov5026/p/10764068.html
Copyright © 2011-2022 走看看