关于Redis分布式锁网上有很多优秀的博文,这篇文章仅作为我这段时间遇到的新问题的记录。
1.什么是分布式锁:
在单机部署的情况下,为了保证数据的一致性,不出现脏数据等,就需要使用synchronized关键字、semaphore、ReentrantLock或者我们可以基于AQS定制锁。锁是在多线程间共享;在分布式部署情况下,锁是在多进程间共享的;所以为了保证锁在多进程之间的唯一性,就需要实现锁在多进程之间的共享。
2.分布式锁的特性:
2.1要保证某个时刻中只有一个服务的一个方法获取到这个锁
2.2要保证是可重入锁(避免死锁)
2.3要保证锁的获取和释放的高可用。
3.分布式锁考虑的要点:
3.1需要在何时释放锁(finally)
3.2锁超时设置
3.3锁刷新设置(timeOut)
3.4如果锁超时了,为了避免误删了其他其他线程的锁,可以将当前线程的id存入redis中,当前线程释放锁的时候,需要判断存入redis的值是否为当前线程的id
3.5可重入
4.Redis分布式锁:
RedisLockRegistry是spring-integration-redis中提供Redis的实现类;主要通过redis锁+本地锁两个锁方式实现。
4.1在pomx.xml文件中导入spring-integration-redis的依赖:
<!-- 分布式锁支持 start--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 分布式锁支持 end-->
4.2 RedisLockRegistry类主要内部结构如图:
RedisLockRegistry类的静态String的常量OBTAIN_LOCK_SCRIPT是RedisLockRegistry类的一个上锁的lua脚本。KEYS[1]代表当前锁的key值,ARGV[1]代表当前的客户端标识,ARGV[2]代表过期时间。
private static final String OBTAIN_LOCK_SCRIPT = "local lockClientId = redis.call('GET', KEYS[1]) "+ "if lockClientId == ARGV[1] then "+ "redis.call('PEXPIRE', KEYS[1], ARGV[2]) "+ "return true "+ "elseif not lockClientId then "+ "redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]) "+ "return true "+ "end "+
"return false";
基本逻辑就是:拿着KEYS[1]去redis中获取值,如果值等于ARGV[1]就表示这条数据已经被上锁了,并且延长锁的过期时间,如果想要获取锁锁就要等待拿到锁的进程释放锁;如果这个键KEYS[1]不存在,那么设置KEYS[1]的值为ARGV[1],并且设置过期时间为ARGV[2],即当前进程就获取到这个数据的锁,并设置过期时间。(对lua脚本和redis命令不熟悉的可以上redis中文网)
4.3RedisLockRegistry类的内部类RedisLock的结构如下:
RedisLockRegistry类中获取锁的方法:
......
private final Map<String, RedisLockRegistry.RedisLock> locks;
......
public Lock obtain(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = (String)lockKey; return (Lock)this.locks.computeIfAbsent(path, (x$0) -> { return new RedisLockRegistry.RedisLock(x$0); }); }
如上面代码显示,locks是RedisLockRegistry类的Map类型的常量,以String类型作为key,以RedisLockRegistry的内部类RedisLock作为value;
拿着lockKey作为key去这个map中查找是否已经存在(即这条数据是否已经上锁),如果存在就返回这个lockKey对应的RedisLock,如果不存在就创建一个RedisLock并将其以此lockKey为key放入map中。
每个分布式部署的应用都会自己创建一个RedisLockRegistry实例,到这里,同一个应用的多个线程都可以获取到这条共享数据的RedisLock对象,本地锁+Redis锁真正开始于调用通过RedisLockRegistry实例.obtain(lockKey)方法获取到的RedisLock实例对象.trylock()方法,参见下文。
4.4RedisLockRegistry类的内部类的属性和部分构造方法:
private final class RedisLock implements Lock { private final String lockKey; private final ReentrantLock localLock; //用于记录上锁的时间 private volatile long lockedAt; private RedisLock(String path) { this.localLock = new ReentrantLock(); this.lockKey = this.constructLockKey(path); } ...... }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long now = System.currentTimeMillis(); if (!this.localLock.tryLock(time, unit)) { return false; } else { try { long expire = now + TimeUnit.MILLISECONDS.convert(time, unit); boolean acquired; while(!(acquired = this.obtainLock()) && System.currentTimeMillis() < expire) { Thread.sleep(100L); } if (!acquired) { this.localLock.unlock(); } return acquired; } catch (Exception var9) { this.localLock.unlock(); this.rethrowAsLockException(var9); return false; } } } private boolean obtainLock() { Boolean success = (Boolean)RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), new Object[]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)}); boolean result = Boolean.TRUE.equals(success); if (result) { this.lockedAt = System.currentTimeMillis(); } return result; }
redisTemplate的execute方法参数:
第一个参数就是要执行的lua脚本;
第二个参数就是表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推);
第三个参数那些不是键名参数的附加参数 arg [arg …] ,可以在 Lua 中通过全局变量 ARGV 数组访问,访问的形式和 KEYS 变量类似( ARGV[1] 、 ARGV[2] ,诸如此类)
分析tryLock源码可以看出,首先获取本地锁,如果获取失败,即表示某个请求线程已经获取到了锁,直接返回false;如果获取成功,就调用obtainLock方法执行OBTAIN_LOCK_SCRIPT这段lua脚本来获取redis锁,判断其他进程的某个请求线程获取到了这个redis锁,如果获取redis失败,则acquired变量为false,同时释放本地锁,tryLock方法直接返回false,获取锁失败。
为什么要用本地锁?一个是为了可重入,另一个是为了减轻redis服务器的压力。
4.5 释放锁:
public void unlock() { if (!this.localLock.isHeldByCurrentThread()) { throw new IllegalStateException("You do not own lock at " + this.lockKey); } else if (this.localLock.getHoldCount() > 1) { this.localLock.unlock(); } else { try { if (!this.isAcquiredInThisProcess()) { throw new IllegalStateException("Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised."); } if (Thread.currentThread().isInterrupted()) { RedisLockRegistry.this.executor.execute(this::removeLockKey); } else { this.removeLockKey(); } if (RedisLockRegistry.logger.isDebugEnabled()) { RedisLockRegistry.logger.debug("Released lock; " + this); } } catch (Exception var5) { ReflectionUtils.rethrowRuntimeException(var5); } finally { this.localLock.unlock(); } } }
释放锁的过程也比较简单,第一步通过本地锁判断当前线程是否持有锁,第二步通过本地锁判断当前线程持有锁的计数。
如果当前线程持有锁的计数 > 1,说明本地锁被当前线程多次获取,这时只释放本地锁(释放之后当前线程持有锁的计数-1)。
如果当前线程持有锁的计数 = 1,释放本地锁和redis锁。
redis分布式锁的使用参见另一篇博文:springboot实现分布式锁