假设有一个秒杀程序,库存为50,代码如下:
@GetMapping("/lock")
public String Redis() {
String retVal;
synchronized (this) {
int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int remainStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
}
return retVal;
}
单机下,上面代码没有任何问题,但是在集群下,使用synchronized 就不好使了,启动2台机器,分别是8001,8002,压测情况如下:
可以看到,出现了重复消费的情况,接下来使用分布式锁来解决上面的问题
分布式锁
redis有一个setnx操作,如果key存在,就不进行操作,否则就操作,使用setnx后代码如下:
@GetMapping("/lock1")
public String RedisTest1() {
String retVal;
String lockKey="lockKey";
//加锁
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "");
if(!isLock){
retVal="服务器繁忙";
}
int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int remainStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
//解锁
stringRedisTemplate.delete(lockKey);
return retVal;
}
上面的代码表面上看似实现了加锁,实际上有很多问题,假设有A,B两个请求同时到达,由于redis是执行命令时是单线程,所以只会有一个请求拿到锁,假设A拿到,存在的问题有:
-
1、如果A线程在执行的过程中发生了异常,锁就不会释放;针对这个问题使用try{}finally{}
-
2、如果A线程还未释放锁,但所在的机器突然宕机了,锁也不会释放;针对这个问题设置过期时间
为了解决以上问题,改进后的代码如下:
@GetMapping("/lock2")
public String RedisTest2() {
String retVal;
String lockKey="lockKey";
//加锁
Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "");
stringRedisTemplate.expire(lockKey, 10,TimeUnit.SECONDS);
if(!isLock){
retVal="服务器繁忙";
}
try{
int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int remainStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
}finally {
//解锁
stringRedisTemplate.delete(lockKey);
}
return retVal;
}
仔细分析上面的程序,还是会有一系列问题:
-
1.由于设置过期时间不是原子性的,如果刚拿到锁,还未来得及设置过期时间,机器宕掉了,锁不会释放;
-
2.加锁A线程先拿到锁,还未执行完成,时间到期,然后B线程也拿到了锁,过一段时间后,A执行结束,释放锁,但B还未结束,此时其他请求也可以拿到锁了;
针对第一个问题,可以使用set命令,可以同时到达setnx和设置过期时间的效果,由于只有jedis才有相应的api,RedisTemplate未提供相应的功能,所以需要自己拿到jedis实例,然后调用set方法;
针对第二个问题,可以加锁后设置一个标识,只有锁是自己的,才释放;
改进代码如下:
@GetMapping("/lock2")
public String RedisTest2() {
String retVal;
String lockKey="lockKey";
String clientId= UUID.randomUUID().toString();
//加锁
Boolean isLock = stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(lockKey, clientId, "NX", "EX", 10);
if ("OK".equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
if(!isLock){
retVal="服务器繁忙";
}
try{
int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int remainStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
}finally {
//解锁
if(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete("stock");
}
}
return retVal;
}
实际上以上代码仍然有问题,体现如下:
-
1.解锁不是原子性的,仍会刚判断了是自己的锁,还未来得及释放就宕机了;针对这个问题要实现原子操作,需要写脚本解决;
-
2.过期时间到底设置多少合适,如果设置短了,可能程序还未执行完,锁就释放了,如果设置长了,万一机器宕掉了,其他机器就会等待很长的时间才能获取锁;针对这个问题,可以拿到锁后,开启一个线程定时检测是否程序持有锁,未完成就把过期时间延迟(重新
设置),具体实现自己动手比较麻烦,后面会使用redisson框架来解决该问题。
改进后的代码如下:
@GetMapping("/lock2")
public String RedisTest2() {
String retVal;
String lockKey="lockKey";
String clientId= UUID.randomUUID().toString();
//加锁
Boolean isLock = stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(lockKey, clientId, "NX", "EX", 10);
if ("OK".equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
if(!isLock){
retVal="服务器繁忙";
return retVal;
}
try{
int stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int remainStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
}finally {
//解锁
String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
stringRedisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey),
Collections.singletonList(clientId));
Long RELEASE_SUCCESS = 1L;
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
return retVal;
}
最终把代码整理后如下:
@GetMapping("/lock")
public String RedisTest() {
String retVal;
String lockKey = "lockKey";
String stockKey = "stock";
String clientId = UUID.randomUUID().toString();
//加锁
Boolean isLock = lockService.tryLock(lockKey, clientId, 10);
if (!isLock) {
retVal = "服务器繁忙";
return retVal;
}
try {
BoundValueOperations<String, String> valueOps = stringRedisTemplate.boundValueOps(stockKey);
Integer stock = Integer.valueOf(valueOps.get());
if (stock > 0) {
int remainStock = stock - 1;
valueOps.set(String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
} finally {
//解锁
lockService.releaseLock(lockKey,clientId);
}
return retVal;
}
package com.yyb.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.scripting.support.StaticScriptSource;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*/
@Component
public class NewLockService {
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 该加锁方法仅针对单实例 Redis 可实现分布式加锁
* 对于 Redis 集群则无法使用
*
* @param lockKey 加锁键
* @param clientId 加锁客户端唯一标识(采用UUID)
* @param seconds 锁过期时间
* @return
*/
public Boolean tryLock(String lockKey, String clientId, long seconds) {
return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
return redisConnection.set(lockKey.getBytes(), clientId.getBytes(), Expiration.seconds(seconds), RedisStringCommands.SetOption.ifAbsent());
});
}
/**
* 释放锁操作
* @param key
* @param value
* @return
*/
public boolean releaseLock(String key, String value) {
DefaultRedisScript<Boolean> lockScript = new DefaultRedisScript<>();
lockScript.setScriptSource(
new StaticScriptSource(RELEASE_LOCK_SCRIPT));
lockScript.setResultType(Boolean.class);
Boolean result = redisTemplate.execute(lockScript,Collections.singletonList(key),value);
return result;
}
}
上述代码实现,仅对 redis 单实例架构有效,当面对 redis 哨兵模式或集群时就无效了。原因是当在主机宕机,从机被升级为主机的一瞬间的时候,如果恰好在这一刻,由于 redis 主从复制的异步性,导致从机中数据没有即时同步,那么上述代码就会无效,导致同
一资源有可能会产生两把锁,违背了分布式锁的原则。
使用Redisson解决分布式问题
@GetMapping("/lock")
public String RedisTest() {
String retVal = "";
String stockKey = "stock";
//加锁
RLock lock = redissonClient.getLock("myLock");
lock.lock();
try {
BoundValueOperations<String, String> valueOps = stringRedisTemplate.boundValueOps(stockKey);
Integer stock = Integer.valueOf(valueOps.get());
Thread.sleep(3000);
if (stock > 0) {
int remainStock = stock - 1;
valueOps.set(String.valueOf(remainStock));
retVal = "剩余库存:" + remainStock;
} else {
retVal = "库存不足";
}
log.info(retVal);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//解锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return retVal;
}
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}