zoukankan      html  css  js  c++  java
  • Redis之分布式锁的使用

    一、分布式锁

      分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。

    二、分布式锁的演进

    业务:电商网站卖东西需要去减库存,本篇文章假设下的订单数量都为1;

    第1版的代码

    @Service
    public class RedisLockDemo {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            //获取redis中的库存
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if(stock > 0) {
                int newStock = stock - 1;
                valueOperations.set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            }
            else {
                System.out.println("库存已经为0,不能继续扣减");
            }
         return "success"; } }

    以上代码在高并发的场景下会产生超卖的问题,所以我们修改一下代码(增加synchronized);

    第2版代码

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
            synchronized (this) {
                //获取redis中的库存
                int stock = Integer.valueOf(valueOperations.get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //减库存
                    valueOperations.set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
            }
         
    return "success"; } }

    以上代码在服务为多实例的情况下,还是会出现超卖的问题,这个时候就要引入分布式锁来解决了。

    第3版代码

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            String lockKey = "lockKey";
    
            //加锁: setnx
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
            if(null == isSuccess || isSuccess) {
                System.out.println("服务器繁忙, 请稍后重试");
                return "error";
            }
            
            //------ 执行业务逻辑 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //执行业务操作减库存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣减库存成功, 剩余库存:" + newStock);
            } else {
                System.out.println("库存已经为0,不能继续扣减");
            }
            //------ 执行业务逻辑 ----end------
    
            //释放锁
            redisTemplate.delete(lockKey);
            return "success";
        }
    }
    

    以上代码的问题:

    (1)若在执行业务逻辑的过程中出现了异常,则会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以把锁释放操作放入到 finally 语句中来解决;

    (2)若在执行业务逻辑的过程中服务给挂掉了,仍然会造成锁不会被释放,使其他有关的线程全部阻塞住(死锁);我们可以给 redis 的 key 增加一个超时时间(超过指定的时间则会删除key及其对应的数据),虽然在超时时间到达之前其他有关的线程会一直阻塞住,但是这个时间比较小,且可以解决死锁的问题,所以这个解决方案也是可以接受的。代码如下:

    第4版代码:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            String lockKey = "lockKey";
    
            try {
                //加锁: setnx,expire(10秒超时)
                Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    redisTemplate.opsForValue().set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //释放锁
                redisTemplate.delete(lockKey);
            }
            return "success";
        }
    }
    

    以上代码还是会出现问题:

      当线程1的业务执行到一半的时候,设置的锁超时时间到了,则锁的key会被删除;线程2就加锁成功了,线程2还在执行的时候,线程1的业务执行完了,线程1接着执行删除锁的操作,但是线程1删除的锁实际上是线程2加的锁,导致锁失效的问题。

    方法一:可以使用 “只要自己加锁,只能自己去释放” 来解决这个问题(第5版代码);

    第5版代码:

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public String deduceStock() {
            String lockKey = "lockKey";
    
            String clientId = UUID.randomUUID().toString();
            try {
                //加锁: setnx,expire
                Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
                if(null == isSuccess || isSuccess) {
                    System.out.println("服务器繁忙, 请稍后重试");
                    return "error";
                }
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    redisTemplate.opsForValue().set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                if(clientId.equals(redisTemplate.opsForValue().get(lockKey))) {
                    //释放锁
                    redisTemplate.delete(lockKey);
                }
            }
            return "success";
        }
    }
    

    以上代码虽然解决了锁被其他线程释放的问题,但是还是会出现问题;当前线程的业务还没有执行完,锁的超时时间到了,这样其他线程就可以去加锁并执行业务逻辑了,这样就有两个线程都在执行了,有可能导致bug。

    方法二:可以给锁进行续命,每次锁快超时的时候就给锁重新在设置一个时间(引入另一个redis的java客户端 Redisson

    三、分布式锁的Redisson实现

    (1)引入maven坐标;

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.13.4</version>
    </dependency>

    (2)增加配置文件,将Redisson注入到容器中;

    @Configuration
    public class RedissonConfig {
    
        @Bean
        public Redisson redisson() {
            Config config = new Config();
            //单机版
            //config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
    
            //集群版
            config.useClusterServers()
                    .addNodeAddress("redis://192.168.1.1:8001")
                    .addNodeAddress("redis://192.168.1.1:8002")
                    .addNodeAddress("redis://192.168.1.2:8001")
                    .addNodeAddress("redis://192.168.1.2:8002")
                    .addNodeAddress("redis://192.168.1.3:8001")
                    .addNodeAddress("redis://192.168.1.3:8002");
            return (Redisson) Redisson.create(config);
        }
    }
    

    (3)分布式锁的实现

    @Service
    public class RedisLockDemo {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        @Autowired
        private Redisson redisson;
    
        public String deduceStock() {
            String lockKey = "lockKey";
            RLock redissonLock = redisson.getLock(lockKey);
    
            try {
                //加锁(超时默认30s), 实现锁续命的功能(后台启动一个timer, 默认每10s检测一次是否持有锁)
                redissonLock.lock();
    
                //------ 执行业务逻辑 ----start------
                int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
                if (stock > 0) {
                    int newStock = stock - 1;
                    //执行业务操作减库存
                    redisTemplate.opsForValue().set("stock", newStock + "");
                    System.out.println("扣减库存成功, 剩余库存:" + newStock);
                } else {
                    System.out.println("库存已经为0,不能继续扣减");
                }
                //------ 执行业务逻辑 ----end------
            } finally {
                //解锁
                redissonLock.unlock();
            }
            return "success";
        }
    }
    

    Redisson的实现原理

    RedissonLock的使用介绍 

    Redisson的官网:https://redisson.org/

    // 锁默认有效时间30秒,每10秒去检查并重新设置超时时间
    void lock();  
    
    // 超过锁有效时间 leaseTime,就会释放锁
    void lock(long leaseTime, TimeUnit unit);
    
    // 尝试获取锁;成功则返回true,失败则返回false
    boolean tryLock();
    
    // 不会去启动定时任务;在 time 时间内还没有获取到锁,则返回false
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    // 不会去启动定时任务;当 waitTime 的时间到了,还没有获取到锁则返回false;若获取到锁了,锁的有效时间设置为 leaseTime
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    

      

    Jedis和Redisson的比较 

    Jedis提供了比Redisson更丰富的操作;

    Redisson底层多使用 lua 脚本实现,对原子性的操作封装较好,尤其是在分布式锁上的封装;

    Redis实现的分布式锁还会出现一点问题:

    线程1加了锁去执行业务了,此时Redis的 master 挂掉了,还没有将数据同步到 slave 上。因为集群会选举一个新的 master 出来,但是新的 master 上并没有这个锁;线程2可以在新选举产生的 master 上去加锁,然后处理业务。

    (1)针对以上问题,我们可以使用 zookeeper 去实现分布式锁,因为它是强一致性的。但是zookeeper的性能是低于Redis,使用Redis是完全够了。

    (2)当然,对于以上的问题,我们也可以使用 RedLock 去解决Redis上的那个问题,RedLock 实现的原理:给多个Redis节点发送加锁的消息,只有超过一半以上的节点加锁成功才算加锁成功。

    但是不推荐使用RedLock,当前的 RedLock 是有bug的,它的实现原理和 zookeeper 是差不多的。

    高并发的高性能的Redis

    怎么在高并发的场景去实现一个高性能的分布式锁呢?

    电商网站在大促的时候并发量很大:

    (1)若抢购不是同一个商品,则可以增加Redis集群的cluster来实现,因为不是同一个商品,所以通过计算 key 的hash会落到不同的 cluster上;

    (2)若抢购的是同一个商品,则计算key的hash值会落同一个cluster上,所以加机器也是么有用的。

    我们可以使用库存分段锁的方式去实现。

    分段锁

      假如产品1有200个库存,我们可以将这200个库存分为10个段存储(每段20个),每段存储到一个cluster上;将key使用hash计算,使这些key最后落在不同的cluster上。

      每个下单请求锁了一个库存分段,然后在业务逻辑里面,就对数据库或者是Redis中的那个分段库存进行操作即可,包括查库存 -> 判断库存是否充足 -> 扣减库存。

    可以参照 ConcurrentHashMap 的源码去实现,它使用的就是分段锁。

    高性能分布式锁参考链接:https://blog.csdn.net/eluanshi12/article/details/84616173

  • 相关阅读:
    npm教程2
    02.RIP——CCNP学习笔记
    01.静态路由——CCNP学习笔记
    The 10th SWJTU ACM Online Tutorial
    visual studio 2005 常用按键
    Python垃圾回收机制
    私人网盘系统2.0—全部升级为layUI+PHP(持续更新中)
    Layui框架+PHP打造个人简易版网盘系统
    翻译app的开发全过程---编码+打包+上架
    值得认真学习的6 个 JavaScript 框架
  • 原文地址:https://www.cnblogs.com/yufeng218/p/13733205.html
Copyright © 2011-2022 走看看