一般在单体应用中,如果遇到高并发可以通过 synchronized 或者 Lock 进行加锁,但是现在大部分应用都是采用分布式的方式进行部署,这样像 synchronized 和 Lock 这样的锁就不适用了。
这个使用我们可以使用分布式锁来实现,分布式锁的实现方式主要有:
- 基于数据库的分布式锁
- 基于缓存的分布式锁
- 基于 Zookeeper 的分布式锁
本次主要记录一下如果是用 redis 实现分布式锁。
首先看一个示例:
本例使用 springboot 结合 redisTemplate 实现,具体如何配置,可以参考上一边文章:springboot 整合 redisTemplate
这里只贴核心代码:
@Controller
@RequestMapping("/")
public class HelloV2Controller {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("delStock")
@ResponseBody
public String delStock() {
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stock: 0");
}
return "success";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
在 redis 中设置 stock 的值为 50,然后访问 http://localhost:8080/delStock
每次 -1
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
发现其实正常的,那么使用多线程访问呢?
使用 groboutils 模拟多线程并发,你也可以使用 jmeter
<!-- junit 多线程测试 -->
<!-- https://mvnrepository.com/artifact/net.sourceforge.groboutils/groboutils-core -->
<dependency>
<groupId>net.sourceforge.groboutils</groupId>
<artifactId>groboutils-core</artifactId>
<version>5</version>
<scope>test</scope>
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
代码:
public class Test {
@org.junit.Test
public void test() {
TestRunnable runner = new TestRunnable() {
@Override
public void runTest() throws Throwable {
String url = "http://localhost:8080/delStock";
HttpGet get = new HttpGet(url);
CloseableHttpClient client = HttpClientBuilder.create().build();
CloseableHttpResponse response = client.execute(get);
response.close();
}
};
int runnerCount = 10;
// Rnner数组,想当于并发多少个。
TestRunnable[] trs = new TestRunnable[runnerCount];
for (int i = 0; i < runnerCount; i++) {
trs[i] = runner;
}
// 用于执行多线程测试用例的Runner,将前面定义的单个Runner组成的数组传入
MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs);
try {
// 开发并发执行数组里定义的内容
mttr.runTestRunnables();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
打印结果:
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
发现其出现了并发问题。
使用 synchronized 对其加锁呢?
@RequestMapping("delStock")
@ResponseBody
public String delStock() {
synchronized (HelloV2Controller.class) {
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
}
}
return "success";
}
---------------------------------------------
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
.....
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
解决了并发的问题,但是这是在单体应用下场景,那要是分布式的环境下呢?
使用 nginx 搭建分布式环境,首先安装 nginx
nginx 配置
更改 springboot 启动端口,server.port=8081
在使用 Test 测试类访问端口号:
8081 端口结果:
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
Remain Stock: 35
Remain Stock: 34
Remain Stock: 33
Remain Stock: 32
Remain Stock: 31
Remain Stock: 30
Remain Stock: 29
Remain Stock: 28
Remain Stock: 27
Remain Stock: 26
Remain Stock: 25
Remain Stock: 24
Remain Stock: 23
Remain Stock: 22
Remain Stock: 21
Remain Stock: 20
Remain Stock: 19
Remain Stock: 18
Remain Stock: 17
Remain Stock: 16
Remain Stock: 15
Remain Stock: 14
Remain Stock: 13
Remain Stock: 12
Remain Stock: 11
Remain Stock: 10
Remain Stock: 9
Remain Stock: 8
Remain Stock: 7
Remain Stock: 6
Remain Stock: 5
Remain Stock: 4
Remain Stock: 3
Remain Stock: 2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
8080 端口结果:
Remain Stock: 49
Remain Stock: 48
- 1
- 2
发现还是会有并发问题。
redis 实现分布式锁
那使用 redis 实现分布式锁:
public String delStock() {
// synchronized (HelloV2Controller.class) { // 单体应用没有问题,但是在分布式情况下就不适用了
String lockKey = "lockKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue"); // 相当于jedis中的 jedis.setnx
if (!lock) {
return "false";
}
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
// }
}
// 加锁记得删除锁
redisTemplate.delete(lockKey);
return "success";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
Test 测试类并发访问:
8081 端口结果:
Remain Stock: 499
Remain Stock: 498
Remain Stock: 497
Remain Stock: 496
Remain Stock: 493
Remain Stock: 491
Remain Stock: 487
Remain Stock: 486
Remain Stock: 485
Remain Stock: 483
Remain Stock: 481
Remain Stock: 479
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
8080 端口访问结果:
Remain Stock: 495
Remain Stock: 494
Remain Stock: 492
Remain Stock: 490
Remain Stock: 489
Remain Stock: 488
Remain Stock: 484
Remain Stock: 482
Remain Stock: 480
Remain Stock: 478
Remain Stock: 477
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
解决了并发问题。
但是这样就安全了吗 ?
- 业务代码发生异常
这里发生异常,就无法删除锁,导致死锁。
在业务代码中加入 try catch finally - 服务端突然宕机
比如拿到锁执行到业务代码时,应用重启。解决方式:设置一个超时时间
这种方式不能保证原子性,还是会出现死锁的问题。
redisTemplate 的方法实现了原子性的方法:
public String delStock() {
// synchronized (HelloV2Controller.class) { // 单体应用没有问题,但是在分布式情况下就不适用了
String lockKey = "lockKey";
try {
// Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey,
// "lockValue"); // 相当于jedis中的
// jedis.setnx
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
// redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保证原子性
if (!lock) {
return "false";
}
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
// }
}
} catch (Exception e) {
} finally {
// 加锁记得删除锁
redisTemplate.delete(lockKey);
}
return "success";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
但是这种情况可能出现业务代码 10 秒还没有执行完,redis 锁就失效了, 甚至可能导致锁的永久失效。
比如 线程 Thread1 拿到锁,执行了10没有执行完,这个时候锁失效了。
线程 Thread2 拿到锁,在执行到 15 秒时,Thread1 将锁删掉了,这个时候 Thread3 过来又拿到了锁,从而可能导致锁的永久失效。
那又该如何解决?
解决方法:
自己删除自己的锁:
但是这种方式还是会导致会可能有两个线程持有锁,那么该如何保证只有一个线程持有锁呢?
解决方案:
在后台开启一个线程,给锁续期。Redisson 实现了给锁续期。
Redisson 实现原理
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.5.0</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
config
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
- 1
- 2
- 3
- 4
- 5
- 6
修改 controller
其实 redisson 默认实现的续期的原理,就相当于使用
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保证原子性
- 1
- 2
还有一种情况,线程Thread1获得锁,开始执行业务逻辑,这个时候 redis 的master 突然宕机,但是还没有同步到slave,这个时候主从集群从 slave 节点中重新选择master节点,但是slave中并没有 Thread1 的锁,这个时候 Thread2 来,便可能获得锁。
解决方法:
保证锁在 master 及 slave 均存在的情况下,才能加锁成功。
如果你有什么好的实现方式,欢迎留言探讨。