zoukankan      html  css  js  c++  java
  • Redis分布式锁的使用与实现原理

    模拟一个电商里面下单减库存的场景。
    1.首先在redis里加入商品库存数量。

    2.新建一个Spring Boot项目,在pom里面引入相关的依赖。

      <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    

    3.接下来,在application.yml配置redis属性和指定应用的端口号:

    server:
      port: 8090
    
    spring:
      redis:
        host: 192.168.0.60
        port: 6379
    

    4.新建一个Controller类,扣减库存第一版代码:

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    import java.util.Objects;
    
    @RestController
    public class StockController {
    
        private static final Logger logger = LoggerFactory.getLogger(StockController.class);
    
        @Resource
        private StringRedisTemplate stringRedisTemplate;
    
        @RequestMapping("/reduceStock")
        public String reduceStock() {
            // 从redis中获取库存数量
            int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
            if (stock > 0) {
                // 减库存
                int restStock = stock - 1;
                // 剩余库存再重新设置到redis中
                stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                logger.info("扣减成功,剩余库存:{}", restStock);
            } else {
                logger.info("库存不足,扣减失败。");
            }
    
            return "success";
        }
    }
    

    上面第一版的代码存在什么问题:超卖。假如多个线程同时调用获取库存数量的代码,那么每个线程拿到的都是100,判断库存都大于0,都可以执行减库存的操作。假如两个线程都做减库存更新缓存,那么缓存的库存变成99,但实际上,应该是减掉2个库存。
    那么很多人的第一个想法是加synchronized同步代码块,因为获取数量和减库存不是原子性操作,有多个线程来执行代码的时候,只允许一个线程执行代码块里的代码。那么改完的第二版的代码如下:

     @RequestMapping("/reduceStock")
        public String reduceStock() {
            synchronized (this) {
                // 从redis中获取库存数量
                int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
                if (stock > 0) {
                    // 减库存
                    int restStock = stock - 1;
                    // 剩余库存再重新设置到redis中
                    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                    logger.info("扣减成功,剩余库存:{}", restStock);
                } else {
                    logger.info("库存不足,扣减失败。");
                }
            }
    
            return "success";
        }
    

    但使用synchronize存在的问题,就是只能保证单机环境运行时没有问题的。但现在的软件公司里,基本上都是集群架构,是多实例,前面使用Nginx做负载均衡,大概架构如下:

    Nginx分发请求,把请求发送到不同的Tomcat容器,而synchronize只能保证一个应用是没有问题的。

    那么代码改进第三版,就是引入redis分布式锁,具体代码如下:

     @RequestMapping("/reduceStock")
        public String reduceStock() {
            String lockKey = "stockKey";
            try {
                boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
                if (!result) {
                    return "errorCode";
                }
                // 从redis中获取库存数量
                int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
                if (stock > 0) {
                    // 减库存
                    int restStock = stock - 1;
                    // 剩余库存再重新设置到redis中
                    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                    logger.info("扣减成功,剩余库存:{}", restStock);
                } else {
                    logger.info("库存不足,扣减失败。");
                }
            } finally {
                stringRedisTemplate.delete(lockKey)
            }
            return "success";
        }
    

    如果有一个线程拿到锁,那么其他的线程就会等待。一定要记得在finally里面把使用完的锁要删除掉。否则一旦抛出异常,只有一个线程会一直持有锁,其他线程没有机会获取。
    但如果在执行if (stock > 0) {代码块里的代码,因为宕机或重启没有执行完,也会一直持有锁,所以,这里需要把锁加一个超时时间:

       boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
       stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
    

    但如果上面两行代码在中间执行出问题了,设置超时时间的代码还没执行,也会出现锁不能释放的问题。好在有对应的方法:就是把上面两行代码设置成一个原子操作:

       // 这里默认设置超时时间为10秒
       boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
    

    到此为止,如果并发量不是很大的话,基本上是没有问题的。

    但是,如果请求的并发量很大,就会出现新的问题:有种比较特殊的情况,第一个线程执行了15秒,但是执行到10秒钟的时候,锁已经失效释放了,那么在高并发场景下,第二个线程发现锁已经失效,那么它就可以拿到这把锁进行加锁,
    假设第二个线程执行需要8秒,它执行到5秒钟后,此时第一个线程已经执行完了,执行完那一刻,进行了删除key的操作,但是此时的锁是第二个线程加的,这样第一个线程把第二个线程加的锁删掉了。
    那意味着第三个线程又可以拿到锁,第三个线程执行了3秒钟,此时第二个线程执行完毕,那么第二个线程把第三个线程的锁又删除了。导致锁失效。
    那么解决的思路就是,我自己加的锁,不要被别人删掉。那么可以为每个进来的请求生成一个唯一的id,作为分布式锁的值,然后在释放时,判断一下当前线程的id,是不是和缓存里的id是否相等。

     @RequestMapping("/reduceStock")
        public String reduceStock() {
            String lockKey = "stockKey";
            String id = UUID.randomUUID().toString();
            try {
                // 这里默认设置超时时间为30秒
                boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
                if (!result) {
                    return "errorCode";
                }
                // 从redis中获取库存数量
                int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
                if (stock > 0) {
                    // 减库存
                    int restStock = stock - 1;
                    // 剩余库存再重新设置到redis中
                    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                    logger.info("扣减成功,剩余库存:{}", restStock);
                } else {
                    logger.info("库存不足,扣减失败。");
                }
            } finally {
                if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
                    stringRedisTemplate.delete(lockKey);
                }
            }
            return "success";
        }
    

    到此为止,一个比较完善的锁就实现了,可以应付大部分场景。
    当然,上面的代码还有一个问题,就是一个线程执行时间超过了过期时间,后面的代码还没有执行完,锁就已经删除了,还是会有些bug存在。解决的方法是给锁续命的操作。
    在当前主线程获取到锁以后,可以fork出一个线程,执行Timer定时器操作,假如默认超时时间为30秒,那么定时器每隔10秒去看下这把锁还是否存在,存在就说明这个锁里的逻辑还没有执行完,那么就可以把当前主线程的超时时间重新设置为30秒;如果不存在,就直接结束掉。

    但是上面的逻辑,在高并发场景下,实现比较完善还是比较困难的。好在现在已经有比较成熟的框架,那就是Redisson。官方地址https://redisson.org。
    下面用Redisson来实现分布式锁。
    首先引入依赖包:

           <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.6.5</version>
            </dependency>
    
    

    配置类:

    @Configuration
    public class RedissonConfig {
        @Bean
        public Redisson redisson() {
            // 单机模式
            Config config = new Config();
            config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
            return (Redisson) Redisson.create(config);
        }
    }
    

    接下来用redisson重写上面的减库存操作:

     @Resource
        private Redisson redisson;
        
        @RequestMapping("/reduceStock")
        public String reduceStock() {
            String lockKey = "stockKey";
            RLock redissonLock = redisson.getLock(lockKey);
            try {
                // 加锁,锁续命
                redissonLock.lock();
                // 从redis中获取库存数量
                int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
                if (stock > 0) {
                    // 减库存
                    int restStock = stock - 1;
                    // 剩余库存再重新设置到redis中
                    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
                    logger.info("扣减成功,剩余库存:{}", restStock);
                } else {
                    logger.info("库存不足,扣减失败。");
                }
            } finally {
               redissonLock.unlock();
            }
            return "success";
        }
    

    其实就是三个步骤:获取锁,加锁,释放锁。

    先简单看下Redisson的实现原理:

    这里先说一下Redis很多操作使用Lua脚本来实现原子性操作,关于Lua语法,可以去网上找下相关教程。
    使用Lua脚本的好处有:
    1.减少网络开销,多个命令可以使用一次请求完成;
    2.实现了原子性操作,Redis会把Lua脚本作为一个整体去执行;
    3.实现事务,Redis自带的事务功能有限,而Lua脚本实现了事务的常规操作,而且还支持回滚。

    但是Lua实际上不会使用很多,如果Lua脚本执行时间过长,因为Redis是单线程,因此会导致堵塞。

    最后,说下Redisson分布式锁的代码实现,
    找到上面的redissonLock.lock();
    lock方法点进去,一直点到RedissonLock类里面的lockInterruptibly方法:

        @Override
        public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            // 获取线程id
            long threadId = Thread.currentThread().getId();
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return;
            }
    
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            commandExecutor.syncSubscription(future);
    
            try {
                while (true) {
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message
                    if (ttl >= 0) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
                unsubscribe(future, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    

    重点看下tryAcquire方法,把线程id作为一个参数传递进来,在这个方法里面,找到tryLockInnerAsync方法点进去,

      <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    这里就是一堆Lua脚本,先看第一个if命令,先去判断 KEYS[1](就是对应的锁key的名字),如果不存在,在hashmap里,设置一个属性为线程id,值为1,再把map的过期时间设置为internalLockLeaseTime,这个值默认是30秒,

    上面的操作对应的命令是:

    hset keyname id:thread 1
    pexpire keyname 30
    

    然后返回nil,相当于null,那程序return了。
    另外,Redisson还支持重入锁,那第二个if就是执行重入锁的操作,会判断锁是否存在,并且传入的线程id是否是当前线程的id,若果是,支持重复加锁进行自增操作;
    如果是其他线程调用lock方法,上面两个if判断不会走,会返回锁剩余过期时间。

    接着返回到tryAcquireAsync方法里面往下看:
    实际上是加了一个监听器,在监听器里面有个很重要的方法scheduleExpirationRenewal,一看这个名字就能大概猜出是什么功能,
    里面有个定时任务的轮询,

     private void scheduleExpirationRenewal(final long threadId) {
            if (expirationRenewalMap.containsKey(getEntryName())) {
                return;
            }
    
            Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    // 判断传递进来的线程id是否是我们之前主线程设置的id,如果是,则增加续命,增加30秒。
                    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                "return 1; " +
                            "end; " +
                            "return 0;",
                              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
                    future.addListener(new FutureListener<Boolean>() {
                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            expirationRenewalMap.remove(getEntryName());
                            if (!future.isSuccess()) {
                                log.error("Can't update lock " + getName() + " expiration", future.cause());
                                return;
                            }
                            
                            if (future.getNow()) {
                                // reschedule itself
                                scheduleExpirationRenewal(threadId);
                            }
                        }
                    });
                }
            }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
            if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
                task.cancel();
            }
        }
    

    接着推迟10秒钟(internalLockLeaseTime / 3),再执行续命操作逻辑。

    到最后,再回到lockInterruptibly方法,
    如果ttl 为null,说明加锁成功了,就返回null,那如果其他线程的话,就会返回剩余过期时间,那么就会进入到while死循环里,一直尝试加锁,调用tryAcquire方法,在琐失效以后,再会尝试获取加锁。

    到此为止,分析完毕。

  • 相关阅读:
    【反射】Java反射机制
    Composer教程之常用命令
    Composer教程之基础用法
    Composer教程之初识Composer
    Composer 的结构详解
    现代 PHP 新特性系列(七) —— 内置的 HTTP 服务器
    现代 PHP 新特性系列(一) —— 命名空间
    现代 PHP 新特性系列(二) —— 善用接口
    现代 PHP 新特性系列(三) —— Trait 概览
    现代 PHP 新特性系列(四) —— 生成器的创建和使用
  • 原文地址:https://www.cnblogs.com/IcanFixIt/p/14012661.html
Copyright © 2011-2022 走看看