一、分布式锁使用场景
单机系统的时候,当处理临界资源的时候需要使用synchronized来锁住资源,以免并发导致临界资源异常。
在分布式系统中,使用单机的锁已经不能控制临界资源了,这个时候就需要跨系统来控制临界资源,那控制的标识位放到哪里呢?可以是缓存redis,也可以是ZK。
直白的将,单机需要用synchronized解决的问题,在集群部署的时候出现同样的问题就需要用 分布式锁来解决。
redis做分布式操作redisson太强大了:https://github.com/redisson/redisson/wiki
二、分布式锁需要解决的问题
-
同时只能有一个人获取锁 :使用 setnx解决
-
锁不能死锁:使用过期时间来解决
-
锁在没有使用完的情况下,由于时间超时自动解锁:锁续命
-
解锁原则:谁上锁,谁解锁
三、分布式锁的实现代码
package test.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import java.util.*; import java.util.concurrent.TimeUnit; /** * Redis分布式锁 * 使用 SET resource-name anystring NX EX max-lock-time 实现 * <p> * 该方案在 Redis 官方 SET 命令页有详细介绍。 * http://doc.redisfans.com/string/set.html * <p> * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性, * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中: * <p> * EX seconds — 以秒为单位设置 key 的过期时间; * PX milliseconds — 以毫秒为单位设置 key 的过期时间; * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 * <p> * 客户端执行以上的命令: * <p> * 如 * 个客户端获得锁。 * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。 * */ public class RedisLock { private Logger logger = LoggerFactory.getLogger(getClass()); private RedisTemplate<String, Object> redisTemplate; /** * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 */ public static final String NX = "NX"; /** * seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds */ public static final String EX = "EX"; /** * 调用set后的返回值 */ public static final String OK = "OK"; /** * 默认请求锁的超时时间(ms 毫秒) */ private static final long TIME_OUT = 100; /** * 默认锁的有效时间(s) */ public static final int EXPIRE = 60; /** * 解锁的lua脚本 */ public static final String UNLOCK_LUA; static { StringBuilder sb = new StringBuilder(); sb.append("if redis.call("get",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call("del",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); UNLOCK_LUA = sb.toString(); } /** * 锁标志对应的key */ private String lockKey; /** * 记录到日志的锁标志对应的key */ private String lockKeyLog = ""; /** * 锁对应的值 */ private String lockValue; /** * 锁的有效时间(s) */ private int expireTime = EXPIRE; /** * 请求锁的超时时间(ms) */ private long timeOut = TIME_OUT; /** * 锁标记 */ private volatile boolean locked = false; final Random random = new Random(); //解决锁续命问题 private Timer timer; //锁是否续命处理, 可以参数传进来控制 private boolean lockCanNotTimeOut=true; /** * 使用默认的锁过期时间和请求锁的超时时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) */ public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey) { this.redisTemplate = redisTemplate; this.lockKey = lockKey + "_lock"; } /** * 使用默认的请求锁的超时时间,指定锁的过期时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param expireTime 锁的过期时间(单位:秒) */ public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) { this(redisTemplate, lockKey); this.expireTime = expireTime; } /** * 使用默认的锁的过期时间,指定请求锁的超时时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param timeOut 请求锁的超时时间(单位:毫秒) */ public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) { this(redisTemplate, lockKey); this.timeOut = timeOut; } /** * 锁的过期时间和请求锁的超时时间都是用指定的值 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param expireTime 锁的过期时间(单位:秒) * @param timeOut 请求锁的超时时间(单位:毫秒) */ public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) { this(redisTemplate, lockKey, expireTime); this.timeOut = timeOut; } /** * 尝试获取锁 超时返回 * * @return */ public boolean tryLock() { // 生成随机key lockValue = UUID.randomUUID().toString(); // 请求锁超时时间,纳秒 long timeout = timeOut * 1000000; // 系统当前时间,纳秒 long nowTime = System.nanoTime(); while ((System.nanoTime() - nowTime) < timeout) { if(this.set(lockKey, lockValue, expireTime)) { // if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) { locked = true; // 上锁成功结束请求 this.continueLock(this.lockKey); return locked; } // 每次请求等待一段时间 seleep(10, 50000); } return locked; } /** * 尝试获取锁 立即返回 * * @return 是否成功获得锁 */ public boolean lock() { lockValue = UUID.randomUUID().toString(); //不存在则添加 且设置过期时间(单位ms) boolean result = set(lockKey, lockValue, expireTime); if(result){ continueLock(this.lockKey); } locked = result; return locked; } /** * 以阻塞方式的获取锁 * 一直获取锁,直到成功 * @return 是否成功获得锁 */ public boolean lockBlock() { lockValue = UUID.randomUUID().toString(); while (true) { //不存在则添加 且设置过期时间(单位ms) boolean result = set(lockKey, lockValue, expireTime); if(result) { // if (OK.equalsIgnoreCase(result)) { locked = true; return locked; } // 每次请求等待一段时间 seleep(10, 50000); } } /** * 解锁 * <p> * 可以通过以下修改,让这个锁实现更健壮: * <p> * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。 * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。 * 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。 */ public Boolean unlock() { // 只有加锁成功并且锁还有效才去释放锁 // 只有加锁成功并且锁还有效才去释放锁 if (locked) { boolean isUnLock = (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); Long result = 0L; List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> values = new ArrayList<>(); values.add(lockValue); // 集群模式 if (nativeConnection instanceof JedisCluster) { result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values); } // 单机模式 if (nativeConnection instanceof Jedis) { result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values); } if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) { logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis()); } locked = result == 0; return result == 1; } }); if(isUnLock && timer!=null){ timer.cancel(); } return isUnLock; } return true; } /** * 获取锁状态 * @Title: isLock * @Description: TODO * @return * @author yuhao.wang */ public boolean isLock() { return locked; } /** * 重写redisTemplate的set方法 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 * <p> * 客户端执行以上的命令: * <p> * 如果服务器返回 OK ,那么这个客户端获得锁。 * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。 * * @param key 锁的Key * @param value 锁里面的值 * @param seconds 过去时间(秒) * @return */ private boolean set(final String key, final String value, final long seconds) { Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空"); //如果返回为true单表 这个key在redis里面没有,并且返回true setNX //spring start用的2.2.2版本 return redisTemplate.opsForValue().setIfAbsent(key, value,expireTime, TimeUnit.SECONDS); /* return (String) redisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); String result = null; if (nativeConnection instanceof JedisCommands) { result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds); } if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) { logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis()); } return result; } });*/ } /** * @param millis 毫秒 * @param nanos 纳秒 * @Title: seleep * @Description: 线程等待时间 * @author yuhao.wang */ private void seleep(long millis, int nanos) { try { //第二个参数不能起到随机数的作用,这个地方需要自己用随机数在改动 Thread.sleep(millis, random.nextInt(nanos)); } catch (InterruptedException e) { logger.info("获取分布式锁休眠被中断:", e); } } public String getLockKeyLog() { return lockKeyLog; } public void setLockKeyLog(String lockKeyLog) { this.lockKeyLog = lockKeyLog; } public int getExpireTime() { return expireTime; } public void setExpireTime(int expireTime) { this.expireTime = expireTime; } public long getTimeOut() { return timeOut; } public void setTimeOut(long timeOut) { this.timeOut = timeOut; } /** * 锁续命 * @param lockKey */ public void continueLock(String lockKey) { if(lockCanNotTimeOut){ timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { //每次续命60s,参数可以传进来 redisTemplate.expire(lockKey, 60, TimeUnit.SECONDS); } }, 0, 1); } } }
2 )redisTemplate配置
@Configuration public class RedisConfig { @Bean(name="myRedisTemplate") public RedisTemplate myRedisTemplate(RedisConnectionFactory factory){ RedisTemplate redisTemplate = new RedisTemplate(); RedisSerializer stringSerializer = new StringRedisSerializer(); redisTemplate.setConnectionFactory(factory); redisTemplate.setKeySerializer(stringSerializer); redisTemplate.setValueSerializer(stringSerializer); redisTemplate.setHashKeySerializer(stringSerializer); redisTemplate.setHashValueSerializer(stringSerializer); return redisTemplate; } }
四、Junit进行测试-redis使用 spring的redisTemplate
@RunWith(SpringRunner.class) @SpringBootTest(classes = Server.class) public class RedisTest { @Autowired @Qualifier("myRedisTemplate") private RedisTemplate myRedisTemplate; private AtomicInteger count = new AtomicInteger(0); @Before public void init(){ System.out.println( myRedisTemplate ); } //并发控制 @Test public void test4() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1) ; for (int i = 0; i <5; i++) { int finalI = i; Thread thread = new Thread(()->{ try { System.out.println(finalI); countDownLatch.await(); this.test3(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread.start(); } TimeUnit.SECONDS.sleep(2); System.out.println("启动并发"); countDownLatch.countDown(); Thread.currentThread().join(); System.out.println("主线程结束"); } //测试锁的使用,因为有锁续命,所以只要线程不自己解锁则会一直持有 public void test3() throws InterruptedException { RedisLock redisLock = new RedisLock(myRedisTemplate,"lockKey") ; try { if(redisLock.tryLock()){ System.out.println("进入锁"); TimeUnit.SECONDS.sleep(30); System.out.println("睡眠结束"); } }finally { System.out.println( count.incrementAndGet()); redisLock.unlock(); } } }