zoukankan      html  css  js  c++  java
  • RLock的代码及学习

    1:高性能锁

    1.1 互斥

    在分布式高并发的条件下,需要保证,同一时刻只能有一个线程获得锁。

    1.2 防止死锁

    在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。

    所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。

    1.3 性能

    对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。

    所以在锁的设计时,需要考虑两点。

    • 1、锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。

    • 2、锁的范围尽量要小`。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。

    1.4 可重入

    ReentrantLock是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。

    2:Redission的原理分析

    img

    2.1 加锁机制

    线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。

    线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

    2.2 watch dog自动延期机制

    在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。

    2.3 为啥要用lua脚本

    主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

    在分布式锁中,加锁的操作需要多条命令,使用lua脚本保证了原子性。

    2.4 可重入加锁机制

    Redisson可以实现可重入加锁机制基于以下两点进行实现:

    1、Redis存储锁的数据类型是 Hash类型
    2、Hash数据类型的key值包含了当前线程信息。

    使用的是数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 加锁的key。例如:key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:

    guid + 当前线程的ID。后面的value就是重入的次数。

    3:Redis分布式锁的缺点

    (1):如果使用的是单Master节点,那么可能会因为主备节点切换时候,锁数据没有同步完成就出现一次加锁,可能出现问题。

    4:代码分析

    4.1 RLock接口

    其中继承的接口Lock,是JUC中的接口,具有Lock中的能力。同时它还有很多新特性:强制锁释放,带有效期的锁。

    public interface RLock extends Lock, RLockAsync
    public interface RRLock {
      //----------------------Lock接口方法-----------------------

      /**
        * 加锁 锁的有效期默认30秒
        */
      void lock();
      /**
        * tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
        */
      boolean tryLock();
      /**
        * tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
        * 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
        *
        * @param time 等待时间
        * @param unit 时间单位 小时、分、秒、毫秒等
        */
      boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
      /**
        * 解锁
        */
      void unlock();
      /**
        * 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
        * Thread.currentThread().interrupt(); 方法真正中断该线程
        */
      void lockInterruptibly();

      //----------------------RLock接口方法-----------------------
      /**
        * 加锁 上面是默认30秒这里可以手动设置锁的有效时间
        *
        * @param leaseTime 锁有效时间
        * @param unit     时间单位 小时、分、秒、毫秒等
        */
      void lock(long leaseTime, TimeUnit unit);
      /**
        * 这里比上面多一个参数,多添加一个锁的有效时间
        *
        * @param waitTime 等待时间
        * @param leaseTime 锁有效时间
        * @param unit     时间单位 小时、分、秒、毫秒等
        */
      boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
      /**
        * 检验该锁是否被线程使用,如果被使用返回True
        */
      boolean isLocked();
      /**
        * 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
        * 这个比上面那个实用
        */
      boolean isHeldByCurrentThread();
      /**
        * 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
        * @param leaseTime 锁有效时间
        * @param unit       时间单位 小时、分、秒、毫秒等
        */
      void lockInterruptibly(long leaseTime, TimeUnit unit);  
    }
    4.2:RLock实现类RedissonLock
    public class RedissonLock extends RedissonExpirable implements RLock
    4.2.1 void lock()方法
    • 1:RedissionLock的Lock方法

      @Override
      public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
      }
    • 2:Lock带参数

      private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
            long threadId = Thread.currentThread().getId();
            //1:开始进行尝试获取锁
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return;
            }
      //2:订阅锁的释放信息
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            //3:等待锁的释放
            if (interruptibly) {
                commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                commandExecutor.syncSubscription(future);
            }

            try {
                while (true) {
                //4:循环多次尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    //5:获取锁成功,返回
                    if (ttl == null) {
                        break;
                    }

                    // waiting for message
                    //6:休眠等待锁被释放的信号,释放被唤醒
                    if (ttl >= 0) {
                        try {
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException e) {
                            if (interruptibly) {
                                throw e;
                            }
                            future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        if (interruptibly) {
                            future.getNow().getLatch().acquire();
                        } else {
                            future.getNow().getLatch().acquireUninterruptibly();
                        }
                    }
                }
            } finally {
            //7:退订锁的信息
                unsubscribe(future, threadId);
            }
      //       get(lockAsync(leaseTime, unit));
        }
    • 3:tryAcquire

      异步尝试进行加锁,尝试加锁的时候leaseTime为-1。通常如果客户端没有加锁成功,则会进行阻塞,leaseTime为锁释放的时间。

      private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        //使用get方法,获取RFuture数据,没有返回则阻塞
        return get(tryAcquireAsync(leaseTime, unit, threadId));
      }
      private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        //进行加锁,返回RFuture
        if (leaseTime != -1) {
        //异步进行获取锁的操作
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
            //进行看门狗的处理,见4.2.3
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
      }

      生成lua脚本:

      <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
      }

    上述lua脚本:

    img

    总结:

    • 1:加锁,使用Lock没有携带过期时间,但是会使用一个默认值: lockWatchdogTimeout = 30 * 1000;

    • 2:和ReentranLock一样,在进行获取锁的时候,首先会先尝试一次获取锁的操作,如果获取锁失败使用信号量的方式,订阅锁的释放信息,有了释放锁的信息,则进行尝试加锁,依此循环。

    • 3:根据锁释放的逻辑,锁释放的时候会发布锁解除的消息,应该在2中订阅对应,在释放锁的时候释放锁的信息。

    4.2.2 解锁
    @Override
    public void unlock() {
      try {
          get(unlockAsync(Thread.currentThread().getId()));
      } catch (RedisException e) {
          if (e.getCause() instanceof IllegalMonitorStateException) {
              throw (IllegalMonitorStateException) e.getCause();
          } else {
              throw e;
          }
      }
    • 1:unlockAsync方法

      @Override
      public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //核心的解锁流程
        RFuture<Boolean> future = unlockInnerAsync(threadId);
      //后取是解除看门狗进程
        future.onComplete((opStatus, e) -> {
            if (e != null) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
             
            cancelExpirationRenewal(threadId);
            result.trySuccess(null);
        });

        return result;
      }
    • 2:解锁unlockInnerAsync

      protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        /**
        *(1):判断key是否存在,不存在即释放成功;
        *(2):减少锁定值1,减去后如果小于等于0,则释放锁,并且进行事件通知锁释放。
        */
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

      }
    4.2.3 看门狗实现

    在方法:tryAcquireAsync中尝试加锁之后,会启动定时任务进行看门狗的进程

    entryName:锁的唯一标记

    entry:线程ID信息的包装类

    private void scheduleExpirationRenewal(long threadId) {
      ExpirationEntry entry = new ExpirationEntry();
      //在EXPIRATION_RENEWAL_MAP中添加entryNme和entry的数据
      ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
      if (oldEntry != null) {
          oldEntry.addThreadId(threadId);
      } else {
          entry.addThreadId(threadId);
          //启动一个开门狗的线程
          renewExpiration();
      }
    }
    private void renewExpiration() {
      ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
      if (ee == null) {
          return;
      }
      //在internalLockLeaseTime / 3时间后执行
      Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
          @Override
          public void run(Timeout timeout) throws Exception {
              ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
              if (ent == null) {
                  return;
              }
              Long threadId = ent.getFirstThreadId();
              if (threadId == null) {
                  return;
              }
              //看门狗的核心方法
              RFuture<Boolean> future = renewExpirationAsync(threadId);
              future.onComplete((res, e) -> {
                  if (e != null) {
                      log.error("Can't update lock " + getName() + " expiration", e);
                      return;
                  }
                   
                  if (res) {
                      // reschedule itself
                      //命令执行成功之后,再循环调用
                      renewExpiration();
                  }
              });
          }
      }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
       
      ee.setTimeout(task);
    }
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
      //使用lua脚本进行锁的延长
      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return 1; " +
              "end; " +
              "return 0;",
          Collections.<Object>singletonList(getName()),
          internalLockLeaseTime, getLockName(threadId));
    }

    总结:看门狗就是维护一个静态的Map集合中,集合中有线程信息ID,在锁没有释放的情况下,定时的重新赋值锁的失效时间,达到延长锁的机制。

    5:RedLock

    RedLock是基于redis实现的分布式锁,它能够保证以下特性:

    • 互斥性:在任何时候,只能有一个客户端能够持有锁;避免死锁:
    • 当客户端拿到锁后,即使发生了网络分区或者客户端宕机,也不会发生死锁;(利用key的存活时间)
    • 容错性:只要多数节点的redis实例正常运行,就能够对外提供服务,加锁或者释放锁; RedLock算法思想,意思是不能只在一个redis实例上创建锁,应该是在多个redis实例上创建锁,n / 2 + 1,必须在大多数redis节点上都成功创建锁,才能算这个整体的RedLock加锁成功,避免说仅仅在一个redis实例上加锁而带来的问题。

    Redisson中有一个MultiLock的概念,可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。而Redisson中实现RedLock就是基于MultiLock 去做的。实现原理其实很简单,基于RedLock思想,遍历所有的Redis客户端,然后依次加锁,最后统计成功的次数来判断是否加锁成功。

  • 相关阅读:
    Codeforces Round #325 (Div. 2) F:(meet in the middle)
    Educational Codeforces Round 3:E (MST+树链剖分+RMQ)
    Educational Codeforces Round 3:D. Gadgets for dollars and pounds(二分答案+贪心)
    CodeForce 484B:(最大余数)
    CodeForce 540C:(DFS)
    HDU 1010:(DFS)
    Poj1741-Tree(树分治)
    uva10245-The Closest Pair Problem(平面上的点分治)
    hdu1561-The more, The Better(树形dp)
    hdu2196-Computer(树形dp)
  • 原文地址:https://www.cnblogs.com/mayang2465/p/14655899.html
Copyright © 2011-2022 走看看