如何利用Redis分布式锁处理高并发?
一、添加项目依赖
<!-- redis依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
二、配置文件
spring:
#Redis配置
redis:
host: localhost
password: 123456
三、模拟 抢购商品 的 Service 层
- 接口类
/**
*/
public interface SellService {
/**
* 根据商品ID抢购商品并且返回商品的抢购详情
* @param productId
* @return
*/
String orderGoods(String productId);
/**
* 根据商品ID查询商品抢购详情
* @param productId
* @return
*/
String queryGoods(String productId);
}
- 实现类
@Service
@Slf4j
/**
*/
public class SellServiceImpl implements SellService {
@Autowired
private RedisLock redisLock;
/**
设置超时时间10秒
*/
private static final int TIMEOUT = 10*1000;
/**
* 例如国庆大甩卖 图书大甩卖 库存 1000 件
*/
/**
* 库存
*/
static Map<String, Integer> products;
/**
* 库存余量
*/
static Map<String, Integer> stock;
/**
* 抢购成功者信息
*/
static Map<String, String> orders;
static {
products = new HashMap<>();
stock = new HashMap<>();
orders = new HashMap<>();
products.put("book", 1000);
stock.put("book", 1000);
}
public String queryMap(String productId){
return "国庆图书大甩卖,库存 " + products.get(productId) + " 件,现余 " + stock.get(productId) + " 件,已被抢购 " + orders.size() + " 件";
}
@Override
public String orderGoods(String productId) {
//先获取商品余量
int number = stock.get(productId);
if(number == 0){
throw new RuntimeException("商品已抢购完,请您下次再来,谢谢您的理解...");
}else {
//模拟下单(不同用户拥有不同ID)
orders.put(String.valueOf(UUID.randomUUID()), productId);
//减库存
number = number - 1;
//模拟延迟
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
stock.put(productId, number);
}
log.info("共抢购 {} 件,抢购详情:{}", orders.size(), orders);
//再返回商品的抢购详情
return this.queryMap(productId);
}
@Override
public String queryGoods(String productId) {
return this.queryMap(productId);
}
}
四、Controller 层
/**
*
*/
@RestController
public class SellController {
@Autowired
private SellService sellService;
/**
* 根据商品ID进行抢购
* @param productId
* @return 商品抢购详情
*/
@GetMapping("/order/{productId}")
public String sellGoods(@PathVariable String productId){
return sellService.orderGoods(productId);
}
/**
* 根据商品ID进行查询余量
* @param productId
* @return 商品抢购详情
*/
@GetMapping("/query/{productId}")
public String queryGoods(@PathVariable String productId){
return sellService.queryGoods(productId);
}
}
五、模拟高并发
- 使用 Apache ab 模拟高并发
ab -n 500 -c 80 http://localhost:8080/order/book
六、结果
七、利用Redis分布式锁 解决高并发问题
1、实现Redis分布式锁
/**
*
*/
@Component
@Slf4j
public class RedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
* @param key
* @param value 当前时间+超时时间
* @return
*/
public boolean lock(String key, String value){
if(redisTemplate.opsForValue().setIfAbsent(key, value)){
return true;
}
String currentValue = redisTemplate.opsForValue().get(key);
//如果锁过期
if(!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()){
//获取上一个锁的时间
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if(!StringUtils.isEmpty(oldValue) && currentValue.equals(oldValue)){
return true;
}
}
return false;
}
/**
* 解锁
* @param key
* @param value 当前时间+超时时间
*/
public void unlock(String key, String value){
try{
String currentValue = redisTemplate.opsForValue().get(key);
if(!StringUtils.isEmpty(currentValue) && currentValue.equals(value)){
redisTemplate.opsForValue().getOperations().delete(key);
}
}catch (Exception e){
log.error("【Redis分布式锁】 解锁异常 {}", e.getMessage());
}
}
}
2、利用分布式锁处理Service层方法
/**
* 第一种方法 synchronized 锁机制,解决高并发产生的超卖问题 但效率大大降低 不推荐使用
* 第二种方法 使用 Redis 分布式锁,解决高并发产生的超卖问题 并且效率相对高很多
*/
@Override
public String orderGoods(String productId) {
//加锁
Long time = System.currentTimeMillis() + TIMEOUT;
//加锁失败 说明有人正在使用
if(!redisLock.lock(productId, String.valueOf(time))){
log.info("抢购失败,请再试试吧...");
//return null;
throw new RuntimeException("服务器刚才好像睡着了,请再试试吧...");
}
//先获取商品余量
int number = stock.get(productId);
if(number == 0){
throw new RuntimeException("商品已抢购完,请您下次再来,谢谢您的理解...");
}else {
//模拟下单(不同用户拥有不同ID)
orders.put(String.valueOf(UUID.randomUUID()), productId);
//减库存
number = number - 1;
//模拟延迟
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
stock.put(productId, number);
}
log.info("共抢购 {} 件,抢购详情:{}", orders.size(), orders);
//解锁
redisLock.unlock(productId, String.valueOf(time));
//再返回商品的抢购详情
return this.queryMap(productId);
}
八、模拟高并发
- 使用 Apache ab 模拟高并发
ab -n 500 -c 80 http://localhost:8080/order/book
九、结果
- 浏览器显示
- 控制台打印
原创:点击打开
------------------------------------
原创:点击打开
2.1 引入redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.2 配置redis
spring:
redis:
host: localhost
port: 6379
2.3 编写加锁和解锁的方法
package com.vito.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* Created by VitoYi on 2018/4/5.
*/
@Component
public class RedisLock {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
* @param key 商品id
* @param value 当前时间+超时时间
* @return
*/
public boolean lock(String key, String value) {
if (redisTemplate.opsForValue().setIfAbsent(key, value)) { //这个其实就是setnx命令,只不过在java这边稍有变化,返回的是boolea
return true;
}
//避免死锁,且只让一个线程拿到锁
String currentValue = redisTemplate.opsForValue().get(key);
//如果锁过期了
if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {
//获取上一个锁的时间
String oldValues = redisTemplate.opsForValue().getAndSet(key, value);
/*
只会让一个线程拿到锁
如果旧的value和currentValue相等,只会有一个线程达成条件,因为第二个线程拿到的oldValue已经和currentValue不一样了
*/
if (!StringUtils.isEmpty(oldValues) && oldValues.equals(currentValue)) {
return true;
}
}
return false;
}
/**
* 解锁
* @param key
* @param value
*/
public void unlock(String key, String value) {
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
redisTemplate.opsForValue().getOperations().delete(key);
}
} catch (Exception e) {
logger.error("『redis分布式锁』解锁异常,{}", e);
}
}
}
为什么要有避免死锁的一步呢?
假设没有『避免死锁』这一步,结果在执行到下单代码的时候出了问题,毕竟操作数据库、网络、io的时候抛了个异常,这个异常是偶然抛出来的,就那么偶尔一次,那么会导致解锁步骤不去执行,这时候就没有解锁,后面的请求进来自然也或得不到锁,这就被称之为死锁。
而这里的『避免死锁』,就是给锁加了一个过期时间,如果锁超时了,就返回true
,解开之前的那个死锁。
2.4 下单代码中引入加锁和解锁,确保只有一个线程操作
@Autowired
private RedisLock redisLock;
@Override
@Transactional
public String seckill(Integer id)throws RuntimeException {
//加锁
long time = System.currentTimeMillis() + 1000*10; //超时时间:10秒,最好设为常量
boolean isLock = redisLock.lock(String.valueOf(id), String.valueOf(time));
if(!isLock){
throw new RuntimeException("人太多了,换个姿势再试试~");
}
//查库存
Product product = productMapper.findById(id);
if(product.getStock()==0) throw new RuntimeException("已经卖光");
//写入订单表
Order order=new Order();
order.setProductId(product.getId());
order.setProductName(product.getName());
orderMapper.add(order);
//减库存
product.setPrice(null);
product.setName(null);
product.setStock(product.getStock()-1);
productMapper.update(product);
//解锁
redisLock.unlock(String.valueOf(id),String.valueOf(time));
return findProductInfo(id);
}
这样再来跑几次压测,就不会超卖了: