1. 背景
在分布式集群系统的开发中,线程锁往往并不能支持全部场景的使用,必须引入新的技术方案分布式锁。
2. 原理
实现分布式锁必须依靠第三方存储介质来存储锁的元数据等信息。比如分布式集群要操作某一行数据时,这个数据的流水号是唯一的,那么就把这个流水号作为一把锁的id,当某进程要操作该数据时,先去第三方存储介质中看该锁id是否存在,如果不存在,则将该锁id写入,然后执对该数据的操作。
当其他进程要访问这个数据时,会先到第三方存储介质中查看有没有这个数据的锁id,有的话就认为这行数据目前已经有其他进程在使用了,就会不断地轮询第三方存储介质看其他进程是否释放掉该锁。
当进程操作完该数据后,该进程就到第三方存储介质中把该锁id删除掉,这样其他轮询的进程就能得到对该锁的控制。
3. 实现
实现分布式锁有三种方式。
1)数据库乐观锁
2)基于Redis的分布式锁
3)基于ZooKeeper的分布式锁
本文将介绍第二种方式,基于Redis实现分布式锁。
4. 指标
为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件。
1)互斥性。在任意时刻,只有一个客户端能持有锁。
2)不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
3)具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
4)加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
5. 代码
5.1依赖
首先要通过Maven引入Jedis开源组件,在pom.xml文件加入代码。
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> |
5.2加锁
redis有个事务锁,命令如下,它的含义是将一个value设置到一个key中,如果不存在将会赋值并且设置超时时间为30秒,如何这个key已经存在了,则不进行设置。
SET key value NX PX 30000 |
这个事务锁很好的解决了两个单独的命令,一个设置set key value nx,即该key不存在的话将对其进行设置,另一个是expire key seconds,设置该key的超时时间。
可以想一下,这两个命令用程序单独使用会存在什么问题。如果一个set key的命令设置了key,然后程序异常了,expire时间没有设置,那么这个key会一直锁住。如果一个set key时出现了异常,但expire执行正常。与此同时,另一个进程进行set key,代码才执行了一部分,key就过期了,别的线程也进入了锁,从而不能达到互斥的效果。
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { String lockValue = UUID.randomUUID().toString(); Thread currentThread = Thread.currentThread(); Jedis jedis = this.getRedisClient(); String result = jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time); if (LOCK_SUCCESS.equals(result)) { lockContex.set(lockValue); exclusiveOwnerThread.set(currentThread); return true; } else if (exclusiveOwnerThread.get() == currentThread) {//当前线程已经持有了锁,可重入 return true; } else { return false; } } |
可以看到,加锁就一行代码:jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time),这个set()方法一共有五个形参:
第一个为key,使用key来当锁,因为key是唯一的。
第二个为value,传的是lockValue,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为lockValue,就知道这把锁是哪个进程加的了,在解锁的时候就可以有依据。lockValue可以使用UUID.randomUUID().toString()方法生成。
第三个为nx,这个参数填的是NX,意思是SET IF NOT EXIST,即当key不存在时,进行set操作;若key已经存在,则不做任何操作;
第四个为px,这个参数传的是PX,意思是要给这个key加一个过期的设置,具体时间由第五个参数决定。
第五个为time,与第四个参数相呼应,代表key的过期时间。
总的来说,执行上面的set()方法就只会导致两种结果。当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。已有锁存在,查看持锁的是不是当前线程,如果不是,不做任何操作。
心细的童鞋会发现加锁代码满足可靠性里描述的三个条件。首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。
最后,因为将value赋值为lockValue,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。由于只考虑Redis单机部署的场景,所以容错性暂不考虑。
5.3解锁
@Override public void unlock() { String script; try { Jedis jedis = this.getRedisClient(); script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; if (StringUtils.isBlank(lockContex.get())) { throw new RuntimeException("can not find redis key"); } // 执行脚本,并向脚本传递参数 Object result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(lockContex.get())); if (RELEASE_SUCCESS.equals(result)) { lockContex.remove(); } else { throw new RuntimeException("redis lock release failed"); } } catch (Exception ex) { throw new RuntimeException(ex.getMessage()); } } |
可以看到,解锁只需要两行代码就搞定了。第一行代码,写了一个简单的Lua脚本代码。第二行代码,将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKey,ARGV[1]赋值为lockValue。eval()方法是将Lua代码交给Redis服务端执行。
那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与lockValue相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。那么为什么执行eval()方法可以确保原子性,源于Redis的特性,下面是官网对eval命令的部分解释:
简单来说,就是在eval命令执行Lua代码的时候,将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。
6. 结语
至此基于redis的分布式锁已经介绍完成,最后附上全部代码。
import org.apache.commons.lang3.StringUtils; import org.tonny.properties.PropertiesUtil; import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class RedisDistributedLock implements Lock { private final static long time = 500; private final static String LOCK_KEY = "redis_lock_key"; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; /** * 存储唯一标识 */ private ThreadLocal<String> lockContex = new ThreadLocal<>(); private ThreadLocal<Thread> exclusiveOwnerThread = new ThreadLocal<>(); @Override public void lock() { while (!tryLock()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void lockInterruptibly() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } while (!tryLock()) { Thread.sleep(100); } } @Override public boolean tryLock() { try { return tryLock(time, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { String lockValue = UUID.randomUUID().toString(); Thread currentThread = Thread.currentThread(); Jedis jedis = this.getRedisClient(); String result = jedis.set(LOCK_KEY, lockValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, time); jedis.close(); if (LOCK_SUCCESS.equals(result)) { lockContex.set(lockValue); exclusiveOwnerThread.set(currentThread); return true; } else if (exclusiveOwnerThread.get() == currentThread) {//当前线程已经持有了锁,可重入 return true; } else { return false; } } @Override public void unlock() { String script; try { Jedis jedis = this.getRedisClient(); script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; if (StringUtils.isBlank(lockContex.get())) { throw new RuntimeException("can not find redis key"); } // 执行脚本,并向脚本传递参数 Object result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(lockContex.get())); jedis.close(); if (RELEASE_SUCCESS.equals(result)) { lockContex.remove(); } else { throw new RuntimeException("redis lock release failed"); } } catch (Exception ex) { throw new RuntimeException(ex.getMessage()); } } @Override public Condition newCondition() { return null; } private Jedis getRedisClient() { PropertiesUtil.readProperties("redis-config/redis.properties"); String host = PropertiesUtil.getProperty("redis.host"); String port = PropertiesUtil.getProperty("redis.port"); return new Jedis(host, Integer.parseInt(port)); } public static void main(String[] args) { Lock lock = new RedisDistributedLock(); lock.lock(); try { //TimeUnit.SECONDS.sleep(20); System.out.println("operation"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } |