zoukankan      html  css  js  c++  java
  • 【分布式锁】Redis实现可重入的分布式锁

    一、前言

      之前写的一篇文章《细说分布式锁》介绍了分布式锁的三种实现方式,但是Redis实现分布式锁关于Lua脚本实现、自定义分布式锁注解以及需要注意的问题都没描述。本文就是详细说明如何利用Redis实现重入的分布式锁。

    二、方案

    死锁问题

      当一个客户端获取锁成功之后,假如它崩溃了导致它再也无法和 Redis 节点通信,那么它就会一直持有这个锁,导致其它客户端永远无法获得锁了,因此锁必须要有一个自动释放的时间。
      我们需要保证setnx命令和expire命令以原子的方式执行,否则如果客户端执行setnx获得锁后,这时客户端宕机了,那么这把锁没有设置过期时间,导致其他客户端永远无法获得锁了。

    锁被其他线程释放

      如果不加任何处理即简单使用 SETNX 实现 Redis 分布式锁,就会遇到一个问题:如果线程 C1 获得锁,但由于业务处理时间过长,锁在线程 C1 还未处理完业务之前已经过期了,这时线程 C2 获得锁,在线程 C2 处理业务期间线程 C1 完成业务执行释放锁操作,但这时线程 C2 仍在处理业务线程 C1 释放了线程 C2 的锁,导致线程 C2 业务处理实际上没有锁提供保护机制;同理线程 C2 可能释放线程 C3 的锁,从而导致严重的问题。
      因此每个线程释放锁的时候只能释放自己的锁,即锁必须要有一个拥有者的标记,并且也需要保证释放锁的原子性操作。
      在释放锁的时候判断拥有者的标记(value是否相同),只有相同时才可以删除,同时利用Lua脚本来达到原子操作,脚本如下:

    1. if redis.call("get", KEYS[1]) == ARGV[1] then 
    2. return redis.call("del", KEYS[1]) 
    3. else 
    4. return 0 
    5. end 

    可重入问题

    可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
    这里有两种解决方案:

    1. 客户端在获得锁后保存value(拥有者标记),然后释放锁的时候将value和key同时传过去。
    2. 利用ThreadLocal实现,获取锁后将Redis中的value保存在ThreadLocal中,同一线程再次尝试获取锁的时候就先将 ThreadLocal 中的 值 与 Redis 的 value 比较,如果相同则表示这把锁所以该线程,即实现可重入锁。

    这里的实现的方案是基于单机Redis,之前说的集群问题这里暂不考虑。

    三、编码

    我们通过自定义分布式锁注解+AOP可以更加方便的使用分布式锁,只需要在加锁的方法上加上注解即可。
    Redis分布式锁接口

    /**
     * Redis分布式锁接口
     * Created by 2YSP on 2019/9/20.
     */
    public interface IRedisDistributedLock {
    
      /**
       *
       * @param key
       * @param requireTimeOut 获取锁超时时间 单位ms
       * @param lockTimeOut 锁过期时间,一定要大于业务执行时间 单位ms
       * @param retries 尝试获取锁的最大次数
       * @return
       */
      boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries);
    
      /**
       * 释放锁
       * @param key
       * @return
       */
      boolean release(String key);
    
    }
    

    Redis 分布式锁实现类

    /**
     * Redis 分布式锁实现类
     * Created by 2YSP on 2019/9/20.
     */
    @Slf4j
    @Component
    public class RedisDistributedLockImpl implements IRedisDistributedLock {
    
      /**
       * key前缀
       */
      public static final String PREFIX = "Lock:";
      /**
       * 保存锁的value
       */
      private ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
      private static final Charset UTF8 = Charset.forName("UTF-8");
      /**
       * 释放锁脚本
       */
      private static final String UNLOCK_LUA;
    
      /*
       * 释放锁脚本,原子操作
       */
      static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call("del",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
      }
    
      @Autowired
      private RedisTemplate redisTemplate;
    
      @Override
      public boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries) {
        //可重入锁判断
        String originValue = threadLocal.get();
        if (!StringUtils.isBlank(originValue) && isReentrantLock(key, originValue)) {
          return true;
        }
        String value = UUID.randomUUID().toString();
        long end = System.currentTimeMillis() + requireTimeOut;
        int retryTimes = 1;
    
        try {
          while (System.currentTimeMillis() < end) {
            if (retryTimes > retries) {
              log.error(" require lock failed,retry times [{}]", retries);
              return false;
            }
            if (setNX(wrapLockKey(key), value, lockTimeOut)) {
              threadLocal.set(value);
              return true;
            }
            // 休眠10ms
            Thread.sleep(10);
    
            retryTimes++;
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
        return false;
      }
    
      private boolean setNX(String key, String value, long expire) {
        /**
         * List设置lua的keys
         */
        List<String> keyList = new ArrayList<>();
        keyList.add(key);
        return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
          Boolean result = connection
              .set(key.getBytes(UTF8),
                  value.getBytes(UTF8),
                  Expiration.milliseconds(expire),
                  SetOption.SET_IF_ABSENT);
          return result;
        });
    
      }
    
      /**
       * 是否为重入锁
       */
      private boolean isReentrantLock(String key, String originValue) {
        String v = (String) redisTemplate.opsForValue().get(key);
        return v != null && originValue.equals(v);
      }
    
      @Override
      public boolean release(String key) {
        String originValue = threadLocal.get();
        if (StringUtils.isBlank(originValue)) {
          return false;
        }
        return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
          return connection
              .eval(UNLOCK_LUA.getBytes(UTF8), ReturnType.BOOLEAN, 1, wrapLockKey(key).getBytes(UTF8),
                  originValue.getBytes(UTF8));
        });
      }
    
    
      private String wrapLockKey(String key) {
        return PREFIX + key;
      }
    }
    

    分布式锁注解

    @Retention(value = RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface DistributedLock {
    
      /**
       * 默认包名加方法名
       * @return
       */
      String key() default "";
    
      /**
       * 过期时间 单位:毫秒
       * <pre>
       *     过期时间一定是要长于业务的执行时间.
       * </pre>
       */
      long expire() default 30000;
    
      /**
       * 获取锁超时时间 单位:毫秒
       * <pre>
       *     结合业务,建议该时间不宜设置过长,特别在并发高的情况下.
       * </pre>
       */
      long timeout() default 3000;
    
      /**
       * 默认重试次数
       * @return
       */
      int retryTimes() default Integer.MAX_VALUE;
    
    }
    

    aop切片类

    @Component
    @Aspect
    @Slf4j
    public class RedisLockAop {
    
      @Autowired
      private IRedisDistributedLock redisDistributedLock;
    
    
      @Around(value = "@annotation(lock)")
      public Object doAroundAdvice(ProceedingJoinPoint proceedingJoinPoint, DistributedLock lock) {
        // 加锁
        String key = getKey(proceedingJoinPoint, lock);
        Boolean success = null;
        try {
            success = redisDistributedLock
              .lock(key, lock.timeout(), lock.expire(), lock.retryTimes());
          if (success) {
            log.info(Thread.currentThread().getName() + " 加锁成功");
            return proceedingJoinPoint.proceed();
          }
          log.info(Thread.currentThread().getName() + " 加锁失败");
          return null;
        } catch (Throwable throwable) {
          throwable.printStackTrace();
          return null;
        } finally {
          if (success){
            boolean result = redisDistributedLock.release(key);
            log.info(Thread.currentThread().getName() + " 释放锁结果:{}",result);
          }
        }
      }
    
      private String getKey(JoinPoint joinPoint, DistributedLock lock) {
        if (!StringUtils.isBlank(lock.key())) {
          return lock.key();
        }
        return joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature()
            .getName();
      }
    }
    
    

    业务逻辑处理类

    @Service
    public class TestService {
    
      @DistributedLock(retryTimes = 1000,timeout = 1000)
      public String lockTest() {
        try {
          System.out.println("模拟执行业务逻辑。。。");
          Thread.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
          return "error";
        }
    
        return "ok";
      }
    }
    

    四、测试

    1. 启动本地redis,启动项目
    2. 打开cmd,利用ab压力测试设置3个线程100个请求。

    ab -c 3 -n 100 http://localhost:8000/lock/test

    idea控制台输出如下:

     

    enter description here
    enter description here

     

    至此大功告成,代码地址

    ps: 遇到一个奇怪的问题,我用 RedisTemplate.execute(RedisScript script, List keys, Object... args) 这个方法,通过加载resource目录下的lua脚本来释放锁的时候一直不成功,参数没任何问题,而且我之前的文章就是用这个方法可以正确的释放锁。

  • 相关阅读:
    git 获取领先落后的命令
    django orm 时间字段讲解
    在SAE上同步djanogo的mysql数据库
    使用JS来实现验证码功能
    一个基于python的即时通信程序
    关于python多线程编程中join()和setDaemon()的一点儿探究
    Django1.6添加comments应用的简单过程
    使用saltstack批量部署服务器运行环境事例——批量部署nagios客户端
    Web服务器集群搭建关键步骤纪要
    关于rsync的密码问题
  • 原文地址:https://www.cnblogs.com/2YSP/p/11563448.html
Copyright © 2011-2022 走看看