https://blog.csdn.net/xiaolyuh123/article/details/78551345
分布式锁的解决方式
- 基于数据库表做乐观锁,用于分布式锁。(适用于小并发)
- 使用memcached的add()方法,用于分布式锁。
- 使用memcached的cas()方法,用于分布式锁。(不常用)
- 使用redis的setnx()、expire()方法,用于分布式锁。
- 使用redis的setnx()、get()、getset()方法,用于分布式锁。
- 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)
- 使用zookeeper,用于分布式锁。(不常用)
这里主要介绍第四种和第五种:
前文提供的两种方式其实都有些问题,要么是死锁,要么是依赖服务器时间同步。从Redis 2.6.12 版本开始, SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果。这样我们的可以将加锁操作用一个set命令来实现,直接是原子性操作,既没有死锁的风险,也不依赖服务器时间同步,可以完美解决这两个问题。
在redis文档上有详细说明:
http://doc.redisfans.com/string/set.html
使用redis的SET resource-name anystring NX EX max-lock-time 方式,用于分布式锁
原理
命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
客户端执行以上的命令:
- 如果服务器返回 OK ,那么这个客户端获得锁。
- 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
- 设置的过期时间到达之后,锁将自动释放。
可以通过以下修改,让这个锁实现更健壮:
- 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
- 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
以下是一个简单的解锁脚本示例:
-
if redis.call("get",KEYS[1]) == ARGV[1]
-
then
-
return redis.call("del",KEYS[1])
-
else
-
return 0
-
end
可能存在的问题
占时没发现
具体实现
锁具体实现RedisLock:
-
package com.xiaolyuh.lock; -
-
import org.slf4j.Logger; -
import org.slf4j.LoggerFactory; -
import org.springframework.dao.DataAccessException; -
import org.springframework.data.redis.connection.RedisConnection; -
import org.springframework.data.redis.core.RedisCallback; -
import org.springframework.data.redis.core.StringRedisTemplate; -
import org.springframework.data.redis.core.script.RedisScript; -
import org.springframework.util.Assert; -
import org.springframework.util.StringUtils; -
import redis.clients.jedis.Jedis; -
import redis.clients.jedis.JedisCluster; -
import redis.clients.jedis.Protocol; -
import redis.clients.util.SafeEncoder; -
-
import java.util.ArrayList; -
import java.util.List; -
import java.util.Random; -
import java.util.UUID; -
-
/** -
* Redis分布式锁 -
* 使用 SET resource-name anystring NX EX max-lock-time 实现 -
* <p> -
* 该方案在 Redis 官方 SET 命令页有详细介绍。 -
* http://doc.redisfans.com/string/set.html -
* <p> -
* 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性, -
* 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中: -
* <p> -
* EX seconds — 以秒为单位设置 key 的过期时间; -
* PX milliseconds — 以毫秒为单位设置 key 的过期时间; -
* NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 -
* XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。 -
* <p> -
* 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 -
* <p> -
* 客户端执行以上的命令: -
* <p> -
* 如果服务器返回 OK ,那么这个客户端获得锁。 -
* 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。 -
* -
* @author yuhao.wangwang -
* @version 1.0 -
* @date 2017年11月3日 上午10:21:27 -
*/ -
public class RedisLock3 { -
-
private static Logger logger = LoggerFactory.getLogger(RedisLock3.class); -
-
private StringRedisTemplate redisTemplate; -
-
/** -
* 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 -
*/ -
public static final String NX = "NX"; -
-
/** -
* seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds -
*/ -
public static final String EX = "EX"; -
-
/** -
* 调用set后的返回值 -
*/ -
public static final String OK = "OK"; -
-
/** -
* 默认请求锁的超时时间(ms 毫秒) -
*/ -
private static final long TIME_OUT = 100; -
-
/** -
* 默认锁的有效时间(s) -
*/ -
public static final int EXPIRE = 60; -
-
/** -
* 解锁的lua脚本 -
*/ -
public static final String UNLOCK_LUA; -
-
static { -
StringBuilder sb = new StringBuilder(); -
sb.append("if redis.call("get",KEYS[1]) == ARGV[1] "); -
sb.append("then "); -
sb.append(" return redis.call("del",KEYS[1]) "); -
sb.append("else "); -
sb.append(" return 0 "); -
sb.append("end "); -
UNLOCK_LUA = sb.toString(); -
} -
-
/** -
* 锁标志对应的key -
*/ -
private String lockKey; -
-
/** -
* 记录到日志的锁标志对应的key -
*/ -
private String lockKeyLog = ""; -
-
/** -
* 锁对应的值 -
*/ -
private String lockValue; -
-
/** -
* 锁的有效时间(s) -
*/ -
private int expireTime = EXPIRE; -
-
/** -
* 请求锁的超时时间(ms) -
*/ -
private long timeOut = TIME_OUT; -
-
/** -
* 锁标记 -
*/ -
private volatile boolean locked = false; -
-
final Random random = new Random(); -
-
/** -
* 使用默认的锁过期时间和请求锁的超时时间 -
* -
* @param redisTemplate -
* @param lockKey 锁的key(Redis的Key) -
*/ -
public RedisLock3(StringRedisTemplate redisTemplate, String lockKey) { -
this.redisTemplate = redisTemplate; -
this.lockKey = lockKey + "_lock"; -
} -
-
/** -
* 使用默认的请求锁的超时时间,指定锁的过期时间 -
* -
* @param redisTemplate -
* @param lockKey 锁的key(Redis的Key) -
* @param expireTime 锁的过期时间(单位:秒) -
*/ -
public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, int expireTime) { -
this(redisTemplate, lockKey); -
this.expireTime = expireTime; -
} -
-
/** -
* 使用默认的锁的过期时间,指定请求锁的超时时间 -
* -
* @param redisTemplate -
* @param lockKey 锁的key(Redis的Key) -
* @param timeOut 请求锁的超时时间(单位:毫秒) -
*/ -
public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, long timeOut) { -
this(redisTemplate, lockKey); -
this.timeOut = timeOut; -
} -
-
/** -
* 锁的过期时间和请求锁的超时时间都是用指定的值 -
* -
* @param redisTemplate -
* @param lockKey 锁的key(Redis的Key) -
* @param expireTime 锁的过期时间(单位:秒) -
* @param timeOut 请求锁的超时时间(单位:毫秒) -
*/ -
public RedisLock3(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) { -
this(redisTemplate, lockKey, expireTime); -
this.timeOut = timeOut; -
} -
-
/** -
* 尝试获取锁 超时返回 -
* -
* @return -
*/ -
public boolean tryLock() { -
// 生成随机key -
lockValue = UUID.randomUUID().toString(); -
// 请求锁超时时间,纳秒 -
long timeout = timeOut * 1000000; -
// 系统当前时间,纳秒 -
long nowTime = System.nanoTime(); -
while ((System.nanoTime() - nowTime) < timeout) { -
if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) { -
locked = true; -
// 上锁成功结束请求 -
return true; -
} -
-
// 每次请求等待一段时间 -
seleep(10, 50000); -
} -
return locked; -
} -
-
/** -
* 尝试获取锁 立即返回 -
* -
* @return 是否成功获得锁 -
*/ -
public boolean lock() { -
lockValue = UUID.randomUUID().toString(); -
//不存在则添加 且设置过期时间(单位ms) -
String result = set(lockKey, lockValue, expireTime); -
return OK.equalsIgnoreCase(result); -
} -
-
/** -
* 以阻塞方式的获取锁 -
* -
* @return 是否成功获得锁 -
*/ -
public boolean lockBlock() { -
lockValue = UUID.randomUUID().toString(); -
while (true) { -
//不存在则添加 且设置过期时间(单位ms) -
String result = set(lockKey, lockValue, expireTime); -
if (OK.equalsIgnoreCase(result)) { -
return true; -
} -
-
// 每次请求等待一段时间 -
seleep(10, 50000); -
} -
} -
-
/** -
* 解锁 -
* <p> -
* 可以通过以下修改,让这个锁实现更健壮: -
* <p> -
* 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。 -
* 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。 -
* 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。 -
*/ -
public Boolean unlock() { -
// 只有加锁成功并且锁还有效才去释放锁 -
// 只有加锁成功并且锁还有效才去释放锁 -
if (locked) { -
return redisTemplate.execute(new RedisCallback<Boolean>() { -
-
public Boolean doInRedis(RedisConnection connection) throws DataAccessException { -
Object nativeConnection = connection.getNativeConnection(); -
Long result = 0L; -
-
List<String> keys = new ArrayList<>(); -
keys.add(lockKey); -
List<String> values = new ArrayList<>(); -
values.add(lockValue); -
-
// 集群模式 -
if (nativeConnection instanceof JedisCluster) { -
result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values); -
} -
-
// 单机模式 -
if (nativeConnection instanceof Jedis) { -
result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values); -
} -
-
if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) { -
logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis()); -
} -
-
locked = result == 0; -
return result == 1; -
} -
}); -
} -
-
return true; -
} -
-
/** -
* 重写redisTemplate的set方法 -
* <p> -
* 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 -
* <p> -
* 客户端执行以上的命令: -
* <p> -
* 如果服务器返回 OK ,那么这个客户端获得锁。 -
* 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。 -
* -
* @param key 锁的Key -
* @param value 锁里面的值 -
* @param seconds 过去时间(秒) -
* @return -
*/ -
private String set(final String key, final String value, final long seconds) { -
Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空"); -
return redisTemplate.execute(new RedisCallback<String>() { -
-
public String doInRedis(RedisConnection connection) throws DataAccessException { -
Object nativeConnection = connection.getNativeConnection(); -
String result = null; -
// 集群模式 -
if (nativeConnection instanceof JedisCluster) { -
result = ((JedisCluster) nativeConnection).set(key, value, NX, EX, seconds); -
} -
// 单机模式 -
if (nativeConnection instanceof Jedis) { -
result = ((Jedis) nativeConnection).set(key, value, NX, EX, seconds); -
} -
-
if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) { -
logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis()); -
} -
-
return result; -
} -
}); -
} -
-
/** -
* @param millis 毫秒 -
* @param nanos 纳秒 -
* @Title: seleep -
* @Description: 线程等待时间 -
* @author yuhao.wang -
*/ -
private void seleep(long millis, int nanos) { -
try { -
Thread.sleep(millis, random.nextInt(nanos)); -
} catch (InterruptedException e) { -
logger.info("获取分布式锁休眠被中断:", e); -
} -
} -
-
public String getLockKeyLog() { -
return lockKeyLog; -
} -
-
public void setLockKeyLog(String lockKeyLog) { -
this.lockKeyLog = lockKeyLog; -
} -
-
public int getExpireTime() { -
return expireTime; -
} -
-
public void setExpireTime(int expireTime) { -
this.expireTime = expireTime; -
} -
-
public long getTimeOut() { -
return timeOut; -
} -
-
public void setTimeOut(long timeOut) { -
this.timeOut = timeOut; -
} -
} -
调用方式:
-
public void redisLock3(int i) { -
RedisLock3 redisLock3 = new RedisLock3(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500); -
try { -
long now = System.currentTimeMillis(); -
if (redisLock3.tryLock()) { -
logger.info("=" + (System.currentTimeMillis() - now)); -
// TODO 获取到锁要执行的代码块 -
logger.info("j:" + j++); -
} else { -
logger.info("k:" + k++); -
} -
} catch (Exception e) { -
logger.info(e.getMessage(), e); -
} finally { -
redisLock2.unlock(); -
} -
} -
对于这种种redis实现分布式锁的方案还是有一个问题:就是你获取锁后执行业务逻辑的代码只能在redis锁的有效时间之内,因为,redis的key到期后会自动清除,这个锁就算释放了。所以这个锁的有效时间一定要结合业务做好评估。
源码: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-data-redis-distributed-lock 工程
参考: