出处:https://www.cnblogs.com/fixzd/p/9479970.html
redis分布式锁相关命令的背景
Redis 2.6.12之前使用 setnx
和 expire来实现分布式锁,expire和setnx不是原子操作,setnx的节点1突然挂掉,那么expire来不及执行就变成了无止尽的锁;
解决方案:Redis 2.6.12之前,可以自己去删除锁,实现方式就是其他线程在锁到期之后,去帮忙删除这个锁;Redis 2.6.12之后,使用redis的set多参数方法,set(key,1,30,NX) ,使用SET代替SETNX ,相当于SETNX+EXPIRE实现了原子性,不必担心SETNX成功,EXPIRE失败的问题!有效的避免死锁!!!
问题:set获取了锁之后,如果执行的业务逻辑时间很长,expire已过期,那么节点2会拿到该锁,等节点1执行完业务逻辑之后,执行del操作,其实是删掉节点2的锁;
解决方案:把线程id加到锁的value当中去,拿到key的时候判断当前线程id是否为当前线程id;
新问题:判断线程id的逻辑和删除逻辑不在同一个原子操作中,(获取值、判断和删除锁这些操作不是原子操作)
解决办法:使用lua脚本;把判断线程id和删除逻辑放在一个原子操作中;
一、介绍
这篇博文讲介绍如何一步步构建一个基于Redis的分布式锁。会从最原始的版本开始,然后根据问题进行调整,最后完成一个较为合理的分布式锁。
本篇文章会将分布式锁的实现分为两部分,一个是单机环境,另一个是集群环境下的Redis锁实现。在介绍分布式锁的实现之前,先来了解下分布式锁的一些信息。
二、分布式锁
2.1 什么是分布式锁?
分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性。
2.2 分布式锁需要具备哪些条件
- 互斥性:在任意一个时刻,只有一个客户端持有锁。
- 无死锁:即便持有锁的客户端崩溃或者其他意外事件,锁仍然可以被获取。
- 容错:只要大部分Redis节点都活着,客户端就可以获取和释放锁
2.4 分布式锁的实现有哪些?
- 数据库
- Memcached(add命令)
- Redis(setnx命令)
- Zookeeper(临时节点)
- 等等
三、单机Redis的分布式锁
3.1 准备工作
- 定义常量类
public class LockConstants { public static final String OK = "OK"; /** NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key if it already exist. **/ public static final String NOT_EXIST = "NX"; public static final String EXIST = "XX"; /** expx EX|PX, expire time units: EX = seconds; PX = milliseconds **/ public static final String SECONDS = "EX"; public static final String MILLISECONDS = "PX"; private LockConstants() {} }
- 定义锁的抽象类
抽象类RedisLock实现java.util.concurrent包下的Lock接口,然后对一些方法提供默认实现,子类只需实现lock方法和unlock方法即可。代码如下
public abstract class RedisLock implements Lock { protected Jedis jedis; protected String lockKey; public RedisLock(Jedis jedis,String lockKey) { this(jedis, lockKey); } public void sleepBySencond(int sencond){ try { Thread.sleep(sencond*1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void lockInterruptibly(){} @Override public Condition newCondition() { return null; } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit){ return false; } }
3.2 最基础的版本1
先来一个最基础的版本,代码如下
public class LockCase1 extends RedisLock { public LockCase1(Jedis jedis, String name) { super(jedis, name); } @Override public void lock() { while(true){ String result = jedis.set(lockKey, "value", NOT_EXIST); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } } @Override public void unlock() { jedis.del(lockKey); } }
LockCase1类提供了lock和unlock方法。
其中lock方法也就是在reids客户端执行如下命令
SET lockKey value NX
而unlock方法就是调用DEL命令将键删除。
好了,方法介绍完了。现在来想想这其中会有什么问题?
假设有两个客户端A和B,A获取到分布式的锁。A执行了一会,突然A所在的服务器断电了(或者其他什么的),也就是客户端A挂了。这时出现一个问题,这个锁一直存在,且不会被释放,其他客户端永远获取不到锁。如下示意图
可以通过设置过期时间来解决这个问题
3.3 版本2-设置锁的过期时间
public void lock() { while(true){ String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } }
类似的Redis命令如下
SET lockKey value NX EX 30
注:要保证设置过期时间和设置锁具有原子性
这时又出现一个问题,问题出现的步骤如下
- 客户端A获取锁成功,过期时间30秒。
- 客户端A在某个操作上阻塞了50秒。
- 30秒时间到了,锁自动释放了。
- 客户端B获取到了对应同一个资源的锁。
- 客户端A从阻塞中恢复过来,释放掉了客户端B持有的锁。
示意图如下
这时会有两个问题
- 过期时间如何保证大于业务执行时间?
- 如何保证锁不会被误删除?
先来解决如何保证锁不会被误删除这个问题。
这个问题可以通过设置value为当前客户端生成的一个随机字符串,且保证在足够长的一段时间内在所有客户端的所有获取锁的请求中都是唯一的。
版本2的完整代码
import com.learnRedis.lock.RedisLock; import redis.clients.jedis.Jedis; import static com.learnRedis.lock.LockConstants.*; /** * 存在问题: * 1.如何保证锁不会被误删除? * 2.过期时间如何保证大于执行时间? */ public class LockCase2 extends RedisLock { public LockCase2(Jedis jedis, String name) { super(jedis, name); } @Override public void lock() { while(true){ /** * 这里设置key和设置过期时间需要保持原子性, * 设置存活时间30秒,解决LockCase1存在的问题. * @see com.learnRedis.lock.case1.LockCase1 */ String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } } @Override public void unlock() { jedis.del(lockKey); } }
3.4 版本3-设置锁的value
抽象类RedisLock增加lockValue字段,lockValue字段的默认值为UUID随机值假设当前线程ID。
public abstract class RedisLock implements Lock { //... protected String lockValue; public RedisLock(Jedis jedis,String lockKey) { this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId()); } public RedisLock(Jedis jedis, String lockKey, String lockValue) { this.jedis = jedis; this.lockKey = lockKey; this.lockValue = lockValue; } //... }
加锁代码
public void lock() { while(true){ String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } }
解锁代码
public void unlock() { String lockValue = jedis.get(lockKey); if (lockValue.equals(lockValue)){ jedis.del(lockKey); } }
这时看看加锁代码,好像没有什么问题啊。
再来看看解锁的代码,这里的解锁操作包含三步操作:获取值、判断和删除锁。这时你有没有想到在多线程环境下的i++
操作?
3.4.1 i++问题
i++
操作也可分为三个步骤:读i的值,进行i+1,设置i的值。
如果两个线程同时对i进行i++操作,会出现如下情况
- i设置值为0
- 线程A读到i的值为0
- 线程B也读到i的值为0
- 线程A执行了+1操作,将结果值1写入到内存
- 线程B执行了+1操作,将结果值1写入到内存
- 此时i进行了两次i++操作,但是结果却为1
在多线程环境下有什么方式可以避免这类情况发生?
解决方式有很多种,例如用AtomicInteger、CAS、synchronized等等。
这些解决方式的目的都是要确保i++
操作的原子性。那么回过头来看看解锁,同理我们也是要确保解锁的原子性。我们可以利用Redis的lua脚本来实现解锁操作的原子性。
版本3的完整代码
import com.learnRedis.lock.RedisLock; import redis.clients.jedis.Jedis; import java.util.UUID; import static com.learnRedis.lock.LockConstants.*; /** * 存在问题:解锁不具备原子性 */ public class LockCase3 extends RedisLock { public LockCase3(Jedis jedis, String name) { super(jedis, name); } @Override public void lock() { while(true){ /** * 设置value为当前线程特有的值 */ String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30); if(OK.equals(result)){ System.out.println(Thread.currentThread().getId()+"加锁成功!"); break; } } } @Override public void unlock() { /** * 此处不具备原子性,可以分为三个步骤 * 1.获取锁对应的value值 * 2.检查是否与requestId相等 * 3.如果相等则删除锁(解锁) */ String lockValue = jedis.get(lockKey); if (lockValue.equals(lockValue)){ jedis.del(lockKey); } } }
3.5 版本4-具有原子性的释放锁
lua脚本内容如下
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
这段Lua脚本在执行的时候要把的lockValue作为ARGV[1]的值传进去,把lockKey作为KEYS[1]的值传进去。现在来看看解锁的java代码
public void unlock() { // 使用lua脚本进行原子删除操作 String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end"; jedis.eval(checkAndDelScript, 1, lockKey, lockValue); }
好了,解锁操作也确保了原子性了,那么是不是单机Redis环境的分布式锁到此就完成了?
别忘了版本2-设置锁的过期时间还有一个,过期时间如何保证大于业务执行时间问题没有解决。
版本4的完整代码
import com.learnRedis.lock.RedisLock; import redis.clients.jedis.Jedis; import java.util.Collections; import static com.learnRedis.lock.LockConstants.*; /** * 存在问题:过期时间如何保证大于执行时间? */ public class LockCase4 extends RedisLock { public LockCase4(Jedis jedis, String lockKey) { super(jedis, lockKey); } @Override public void lock() { while (true) { String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30); if (OK.equals(result)) { System.out.println(Thread.currentThread().getId() + "加锁成功!"); break; } } } @Override public void unlock() { // 使用lua脚本进行原子删除操作 String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end"; jedis.eval(checkAndDelScript, 1, lockKey, lockValue); } }
3.6 版本5-确保过期时间大于业务执行时间
抽象类RedisLock增加一个boolean类型的属性isOpenExpirationRenewal,用来标识是否开启定时刷新过期时间。
在增加一个scheduleExpirationRenewal方法用于开启刷新过期时间的线程。
public abstract class RedisLock implements Lock { //... protected volatile boolean isOpenExpirationRenewal = true; /** * 开启定时刷新 */ protected void scheduleExpirationRenewal(){ Thread renewalThread = new Thread(new ExpirationRenewal()); renewalThread.start(); } /** * 刷新key的过期时间 */ private class ExpirationRenewal implements Runnable{ @Override public void run() { while (isOpenExpirationRenewal){ System.out.println("执行延迟失效时间中..."); String checkAndExpireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('expire',KEYS[1],ARGV[2]) " + "else " + "return 0 end"; jedis.eval(checkAndExpireScript, 1, lockKey, lockValue, "30"); //休眠10秒 sleepBySencond(10); } } } }
加锁代码在获取锁成功后将isOpenExpirationRenewal置为true,并且调用scheduleExpirationRenewal方法,开启刷新过期时间的线程。
public void lock() { while (true) { String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30); if (OK.equals(result)) { System.out.println("线程id:"+Thread.currentThread().getId() + "加锁成功!时间:"+LocalTime.now()); //开启定时刷新过期时间 isOpenExpirationRenewal = true; scheduleExpirationRenewal(); break; } System.out.println("线程id:"+Thread.currentThread().getId() + "获取锁失败,休眠10秒!时间:"+LocalTime.now()); //休眠10秒 sleepBySencond(10); } }
解锁代码增加一行代码,将isOpenExpirationRenewal属性置为false,停止刷新过期时间的线程轮询。
public void unlock() { //... isOpenExpirationRenewal = false; }
版本5的完整代码:
import com.learnRedis.lock.RedisLock; import redis.clients.jedis.Jedis; import java.time.LocalTime; import java.util.Timer; import static com.learnRedis.lock.LockConstants.*; public class LockCase5 extends RedisLock { public LockCase5(Jedis jedis, String lockKey) { super(jedis, lockKey); } @Override public void lock() { while (true) { String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30); if (OK.equals(result)) { System.out.println("线程id:"+Thread.currentThread().getId() + "加锁成功!时间:"+LocalTime.now()); //开启定时刷新过期时间 isOpenExpirationRenewal = true; scheduleExpirationRenewal(); break; } System.out.println("线程id:"+Thread.currentThread().getId() + "获取锁失败,休眠10秒!时间:"+LocalTime.now()); //休眠10秒 sleepBySencond(10); } } @Override public void unlock() { System.out.println("线程id:"+Thread.currentThread().getId() + "解锁!时间:"+LocalTime.now()); String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else " + "return 0 " + "end"; jedis.eval(checkAndDelScript, 1, lockKey, lockValue); isOpenExpirationRenewal = false; } }
3.7 测试
测试代码如下
public void testLockCase5() { //定义线程池 ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); //添加10个线程获取锁 for (int i = 0; i < 10; i++) { pool.submit(() -> { try { Jedis jedis = new Jedis("localhost"); LockCase5 lock = new LockCase5(jedis, lockName); lock.lock(); //模拟业务执行15秒 lock.sleepBySencond(15); lock.unlock(); } catch (Exception e){ e.printStackTrace(); } }); } //当线程池中的线程数为0时,退出 while (pool.getPoolSize() != 0) {} }
测试结果:
或许到这里基于单机Redis环境的分布式就介绍完了。但是使用java的同学有没有发现一个锁的重要特性
那就是锁的重入,那么分布式锁的重入该如何实现呢?这里就留一个坑了
四、集群Redis的分布式锁
在Redis的分布式环境中,Redis 的作者提供了RedLock 的算法来实现一个分布式锁。
4.1 加锁
RedLock算法加锁步骤如下
- 获取当前Unix时间,以毫秒为单位。
- 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个Redis实例。
- 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
- 如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
- 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功)。
4.2 解锁
向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁.
关于RedLock算法,还有一个小插曲,就是Martin Kleppmann 和 RedLock 作者 antirez的对RedLock算法的互怼。 官网原话如下
Martin Kleppmann analyzed Redlock here. I disagree with the analysis and posted my reply to his analysis here.
更多关于RedLock算法这里就不在说明,有兴趣的可以到官网阅读相关文章。
五、使用Redisson框架实现分布式锁
说实话,如果在公司里落地生产环境用分布式锁的时候,一定是会用开源类库的,比如Redis分布式锁,一般就是用Redisson框架就好了,非常的简便易用。
大家如果有兴趣,可以去看看Redisson的官网,看看如何在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁。
下面给大家看一段简单的使用代码片段,先直观的感受一下:
怎么样,上面那段代码,是不是感觉简单的不行!
此外,人家还支持redis单实例、redis哨兵、redis cluster、redis master-slave等各种部署架构,都可以给你完美实现。
二、Redisson实现Redis分布式锁的底层原理
好的,接下来就通过一张手绘图,给大家说说Redisson这个开源框架对Redis分布式锁的实现原理。
(1)加锁机制
咱们来看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。
这里注意,仅仅只是选择一台机器!这点很关键!
紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:
为啥要用lua脚本呢?
因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。
那么,这段lua脚本是什么意思呢?
KEYS[1]代表的是你加锁的那个key,比如说:
RLock lock = redisson.getLock("myLock");
这里你自己设置了加锁的那个锁key就是“myLock”。
ARGV[1]代表的就是锁key的默认生存时间,默认30秒。
ARGV[2]代表的是加锁的客户端的ID,类似于下面这样:
8743c9c0-0795-4907-87fd-6c719a6b4586:1
给大家解释一下,第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。
如何加锁呢?很简单,用下面的命令:
hset myLock
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。
接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。
好了,到此为止,ok,加锁完成了。
(2)锁互斥机制
那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?
很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。
接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
所以,客户端2会获取到 pttl myLock 返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁。
(3)watch dog自动延期机制
客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
(4)可重入加锁机制
那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?
比如下面这种代码:
这时我们来分析一下上面那段lua脚本。
第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。
第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:
incrby myLock
8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。
此时myLock数据结构变为下面这样:
大家看到了吧,那个myLock的hash数据结构中的那个客户端ID,就对应着加锁的次数
(5)释放锁机制
如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
“del myLock”命令,从redis里删除这个key。
然后呢,另外的客户端2就可以尝试完成加锁了。
这就是所谓的分布式锁的开源Redisson框架的实现机制。
一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。
(6)上述Redis分布式锁的缺点
其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。
但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。
接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。
此时就会导致多个客户端对一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。
六、总结
这篇文章讲述了一个基于Redis的分布式锁的编写过程及解决问题的思路,但是本篇文章实现的分布式锁并不适合用于生产环境。java环境有 Redisson 可用于生产环境,但是分布式锁还是Zookeeper会比较好一些(可以看Martin Kleppmann 和 RedLock的分析)。
Martin Kleppmann对RedLock的分析:http://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
RedLock 作者 antirez的回应:http://antirez.com/news/101
整个项目的地址存放在Github上,有需要的可以看看:Github地址