zoukankan      html  css  js  c++  java
  • redis 高并发分布式锁实现

    一般在单体应用中,如果遇到高并发可以通过 synchronized 或者 Lock 进行加锁,但是现在大部分应用都是采用分布式的方式进行部署,这样像 synchronized 和 Lock 这样的锁就不适用了。

    这个使用我们可以使用分布式锁来实现,分布式锁的实现方式主要有:

    • 基于数据库的分布式锁
    • 基于缓存的分布式锁
    • 基于 Zookeeper 的分布式锁

    本次主要记录一下如果是用 redis 实现分布式锁。

    首先看一个示例:
    本例使用 springboot 结合 redisTemplate 实现,具体如何配置,可以参考上一边文章:springboot 整合 redisTemplate
    这里只贴核心代码:

    @Controller
    @RequestMapping("/")
    public class HelloV2Controller {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @RequestMapping("delStock")
        @ResponseBody
        public String delStock() {
            Integer result = (Integer) redisTemplate.opsForValue().get("stock");
            if (result > 0) {
                int remainStock = result - 1;
                redisTemplate.opsForValue().set("stock", remainStock);
                System.out.println("Remain Stock: " + remainStock);
            } else {
                System.out.println("Remain Stock: 0");
            }
            return "success";
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    在这里插入图片描述
    在 redis 中设置 stock 的值为 50,然后访问 http://localhost:8080/delStock
    每次 -1

    Remain Stock: 49
    Remain Stock: 48
    Remain Stock: 47
    Remain Stock: 46
    Remain Stock: 45
    Remain Stock: 44
    Remain Stock: 43
    Remain Stock: 42
    Remain Stock: 41
    Remain Stock: 40
    Remain Stock: 39
    Remain Stock: 38
    Remain Stock: 37
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    发现其实正常的,那么使用多线程访问呢?
    使用 groboutils 模拟多线程并发,你也可以使用 jmeter

    <!-- junit 多线程测试 -->
    <!-- https://mvnrepository.com/artifact/net.sourceforge.groboutils/groboutils-core -->
    <dependency>
    	<groupId>net.sourceforge.groboutils</groupId>
    	<artifactId>groboutils-core</artifactId>
    	<version>5</version>
    	<scope>test</scope>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    代码:

    public class Test {
    
        @org.junit.Test
        public void test() {
    
            TestRunnable runner = new TestRunnable() {
                @Override
                public void runTest() throws Throwable {
                    String url = "http://localhost:8080/delStock";
                    HttpGet get = new HttpGet(url);
                    CloseableHttpClient client = HttpClientBuilder.create().build();
                    CloseableHttpResponse response = client.execute(get);
                    response.close();
                }
            };
    
            int runnerCount = 10;
            // Rnner数组,想当于并发多少个。
            TestRunnable[] trs = new TestRunnable[runnerCount];
            for (int i = 0; i < runnerCount; i++) {
                trs[i] = runner;
            }
            // 用于执行多线程测试用例的Runner,将前面定义的单个Runner组成的数组传入
            MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs);
            try {
                // 开发并发执行数组里定义的内容
                mttr.runTestRunnables();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    打印结果:

    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    Remain Stock: 21
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    发现其出现了并发问题。
    使用 synchronized 对其加锁呢?

    @RequestMapping("delStock")
    @ResponseBody
    public String delStock() {
        synchronized (HelloV2Controller.class) {
            Integer result = (Integer) redisTemplate.opsForValue().get("stock");
            if (result > 0) {
                int remainStock = result - 1;
                redisTemplate.opsForValue().set("stock", remainStock);
                System.out.println("Remain Stock: " + remainStock);
            } else {
                System.out.println("Remain Stcock: 0");
            }
        }
        return "success";
    }
    
    ---------------------------------------------
    Remain Stock: 49
    Remain Stock: 48
    Remain Stock: 47
    Remain Stock: 46
    Remain Stock: 45
    Remain Stock: 44
    Remain Stock: 43
    Remain Stock: 42
    Remain Stock: 41
    Remain Stock: 40
    Remain Stock: 39
    Remain Stock: 38
    Remain Stock: 37
    Remain Stock: 36
    .....
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    解决了并发的问题,但是这是在单体应用下场景,那要是分布式的环境下呢?
    使用 nginx 搭建分布式环境,首先安装 nginx
    nginx 配置
    在这里插入图片描述
    更改 springboot 启动端口,server.port=8081
    在使用 Test 测试类访问端口号:
    在这里插入图片描述
    8081 端口结果:

    Remain Stock: 49
    Remain Stock: 48
    Remain Stock: 47
    Remain Stock: 46
    Remain Stock: 45
    Remain Stock: 44
    Remain Stock: 43
    Remain Stock: 42
    Remain Stock: 41
    Remain Stock: 40
    Remain Stock: 39
    Remain Stock: 38
    Remain Stock: 37
    Remain Stock: 36
    Remain Stock: 35
    Remain Stock: 34
    Remain Stock: 33
    Remain Stock: 32
    Remain Stock: 31
    Remain Stock: 30
    Remain Stock: 29
    Remain Stock: 28
    Remain Stock: 27
    Remain Stock: 26
    Remain Stock: 25
    Remain Stock: 24
    Remain Stock: 23
    Remain Stock: 22
    Remain Stock: 21
    Remain Stock: 20
    Remain Stock: 19
    Remain Stock: 18
    Remain Stock: 17
    Remain Stock: 16
    Remain Stock: 15
    Remain Stock: 14
    Remain Stock: 13
    Remain Stock: 12
    Remain Stock: 11
    Remain Stock: 10
    Remain Stock: 9
    Remain Stock: 8
    Remain Stock: 7
    Remain Stock: 6
    Remain Stock: 5
    Remain Stock: 4
    Remain Stock: 3
    Remain Stock: 2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    8080 端口结果:

    Remain Stock: 49
    Remain Stock: 48
    
    • 1
    • 2

    发现还是会有并发问题。

    redis 实现分布式锁

    那使用 redis 实现分布式锁:

    public String delStock() {
            // synchronized (HelloV2Controller.class) { // 单体应用没有问题,但是在分布式情况下就不适用了
            String lockKey = "lockKey";
            Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue"); // 相当于jedis中的 jedis.setnx
            if (!lock) {
                return "false";
            }
            Integer result = (Integer) redisTemplate.opsForValue().get("stock");
            if (result > 0) {
                int remainStock = result - 1;
                redisTemplate.opsForValue().set("stock", remainStock);
                System.out.println("Remain Stock: " + remainStock);
            } else {
                System.out.println("Remain Stcock: 0");
                // }
            }
            // 加锁记得删除锁
            redisTemplate.delete(lockKey);
            return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    Test 测试类并发访问:
    8081 端口结果:

    Remain Stock: 499
    Remain Stock: 498
    Remain Stock: 497
    Remain Stock: 496
    Remain Stock: 493
    Remain Stock: 491
    Remain Stock: 487
    Remain Stock: 486
    Remain Stock: 485
    Remain Stock: 483
    Remain Stock: 481
    Remain Stock: 479
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    8080 端口访问结果:

    Remain Stock: 495
    Remain Stock: 494
    Remain Stock: 492
    Remain Stock: 490
    Remain Stock: 489
    Remain Stock: 488
    Remain Stock: 484
    Remain Stock: 482
    Remain Stock: 480
    Remain Stock: 478
    Remain Stock: 477
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    解决了并发问题。
    但是这样就安全了吗 ?

    1. 业务代码发生异常
      在这里插入图片描述
      这里发生异常,就无法删除锁,导致死锁。
      在业务代码中加入 try catch finally
      在这里插入图片描述
    2. 服务端突然宕机
      比如拿到锁执行到业务代码时,应用重启。解决方式:设置一个超时时间
      在这里插入图片描述
      这种方式不能保证原子性,还是会出现死锁的问题。
      redisTemplate 的方法实现了原子性的方法:
    public String delStock() {
       // synchronized (HelloV2Controller.class) { // 单体应用没有问题,但是在分布式情况下就不适用了
        String lockKey = "lockKey";
        try {
            // Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey,
            // "lockValue"); // 相当于jedis中的
            // jedis.setnx
            Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
            // redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保证原子性
            if (!lock) {
                return "false";
            }
            Integer result = (Integer) redisTemplate.opsForValue().get("stock");
            if (result > 0) {
                int remainStock = result - 1;
                redisTemplate.opsForValue().set("stock", remainStock);
                System.out.println("Remain Stock: " + remainStock);
            } else {
                System.out.println("Remain Stcock: 0");
                // }
            }
        } catch (Exception e) {
        } finally {
            // 加锁记得删除锁
            redisTemplate.delete(lockKey);
        }
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    但是这种情况可能出现业务代码 10 秒还没有执行完,redis 锁就失效了, 甚至可能导致锁的永久失效。

    比如 线程 Thread1 拿到锁,执行了10没有执行完,这个时候锁失效了。
    线程 Thread2 拿到锁,在执行到 15 秒时,Thread1 将锁删掉了,这个时候 Thread3 过来又拿到了锁,从而可能导致锁的永久失效。
    那又该如何解决?
    解决方法:
    自己删除自己的锁:
    在这里插入图片描述
    但是这种方式还是会导致会可能有两个线程持有锁,那么该如何保证只有一个线程持有锁呢?
    解决方案:
    在后台开启一个线程,给锁续期。Redisson 实现了给锁续期。

    Redisson 实现原理

    在这里插入图片描述

    <dependency>
    	<groupId>org.redisson</groupId>
    	<artifactId>redisson</artifactId>
    	<version>3.5.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    config

    @Bean
     public Redisson redisson() {
          Config config = new Config();
          config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
          return (Redisson) Redisson.create(config);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    修改 controller
    在这里插入图片描述
    其实 redisson 默认实现的续期的原理,就相当于使用

    Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
    redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保证原子性
    
    • 1
    • 2

    还有一种情况,线程Thread1获得锁,开始执行业务逻辑,这个时候 redis 的master 突然宕机,但是还没有同步到slave,这个时候主从集群从 slave 节点中重新选择master节点,但是slave中并没有 Thread1 的锁,这个时候 Thread2 来,便可能获得锁。

    解决方法:
    保证锁在 master 及 slave 均存在的情况下,才能加锁成功。

    如果你有什么好的实现方式,欢迎留言探讨。

     
     
    转载: 
    https://blog.csdn.net/wjavadog/article/details/103221855
  • 相关阅读:
    ftp的基本工作原理
    ubuntu自带输入法ibus 无法按数字键取词
    C语言教程
    【数据结构】---线性表
    python搭建opencv
    第六届Code+程序设计网络挑战赛
    整除分块
    ac自动机
    算法梳理 (CSP 2019
    lougu main page
  • 原文地址:https://www.cnblogs.com/lovezbs/p/14139317.html
Copyright © 2011-2022 走看看