zoukankan      html  css  js  c++  java
  • 基于Redis实现分布式锁

    一、Redis实现分布式锁基本原理

     主要就是redis的setnx(id,value)指令

    在Redis中,有一个不常使用的命令如下所示。

    SETNX key value

    这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。

    只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。

    这个命令的返回值如下。

    • 命令在设置成功时返回1。

    • 命令在设置失败时返回0。

    所以,我们在分布式高并发环境下,可以使用Redis的SETNX命令来实现分布式锁。假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行SETNX命令设置加锁状态后继续向下执行。

    引入分布式锁

    了解了如何使用Redis中的命令实现分布式锁后,我们就可以对下单接口进行改造了,加入分布式锁,如下所示。

    **
    * 为了演示方便,我这里就简单定义了一个常量作为商品的id
    * 实际工作中,这个商品id是前端进行下单操作传递过来的参数
    */
    public static final String PRODUCT_ID = "100001";
    
    @RequestMapping("/submitOrder")
    public String submitOrder(){
        //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
        //实际上,value可以为任意的字符换
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
       //没有拿到锁,返回下单失败
        if(!isLock){
            return "failure";
        }
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
        //业务执行完成,删除PRODUCT_ID key
        stringRedisTemplate.delete(PRODUCT_ID);
        return "success";
    }
    View Code

    那么,在上述代码中,我们加入了分布式锁的操作,那上述代码是否能够在高并发场景下保证业务的原子性呢?答案是可以保证业务的原子性。但是,在实际场景中,上面实现分布式锁的代码是不可用的!!

    假设当线程A首先执行stringRedisTemplate.opsForValue()的setIfAbsent()方法返回true,继续向下执行,正在执行业务代码时,抛出了异常,线程A直接退出了JVM。此时,stringRedisTemplate.delete(PRODUCT_ID);代码还没来得及执行,之后所有的线程进入提交订单的方法时,调用stringRedisTemplate.opsForValue()的setIfAbsent()方法都会返回false。导致后续的所有下单操作都会失败。这就是分布式场景下的死锁问题。

    所以,上述代码中实现分布式锁的方式在实际场景下是不可取的!!

    引入try-finally代码块

    说到这,相信小伙伴们都能够想到,使用try-finall代码块啊,接下来,我们为下单接口的方法加上try-finally代码块。

    /**
    * 为了演示方便,我这里就简单定义了一个常量作为商品的id
    * 实际工作中,这个商品id是前端进行下单操作传递过来的参数
    */
    public static final String PRODUCT_ID = "100001";
    
    @RequestMapping("/submitOrder")
    public String submitOrder(){
        //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
        //实际上,value可以为任意的字符换
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
       //没有拿到锁,返回下单失败
        if(!isLock){
            return "failure";
        }
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                stock -= 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
                logger.debug("库存扣减成功,当前库存为:{}", stock);
            }else{
                logger.debug("库存不足,扣减库存失败");
                throw new OrderException("库存不足,扣减库存失败");
            }
        }finally{
             //业务执行完成,删除PRODUCT_ID key
            stringRedisTemplate.delete(PRODUCT_ID);
        }
        return "success";
    }
    View Code

    那么,上述代码是否真正解决了死锁的问题呢?我们在写代码时,不能只盯着代码本身,觉得上述代码没啥问题了。实际上,生产环境是非常复杂的。如果线程在成功加锁之后,执行业务代码时,

    还没来得及执行删除锁标志的代码,此时,服务器宕机了,程序并没有优雅的退出JVM。也会使得后续的线程进入提交订单的方法时,因无法成功的设置锁标志位而下单失败。所以说,上述的代码仍然存在问题。

    引入Redis超时机制

     在Redis中可以设置缓存的自动过期时间,我们可以将其引入到分布式锁的实现中,如下代码所示。

    /**
    * 为了演示方便,我这里就简单定义了一个常量作为商品的id
    * 实际工作中,这个商品id是前端进行下单操作传递过来的参数
    */
    public static final String PRODUCT_ID = "100001";
    
    @RequestMapping("/submitOrder")
    public String submitOrder(){
        //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
        //实际上,value可以为任意的字符换
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
       //没有拿到锁,返回下单失败
        if(!isLock){
            return "failure";
        }
        try{
            stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                stock -= 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
                logger.debug("库存扣减成功,当前库存为:{}", stock);
            }else{
                logger.debug("库存不足,扣减库存失败");
                throw new OrderException("库存不足,扣减库存失败");
            }
        }finally{
             //业务执行完成,删除PRODUCT_ID key
            stringRedisTemplate.delete(PRODUCT_ID);
        }
        return "success";
    }
    View Code

     在上述代码中,我们加入了如下一行代码来为Redis中的锁标志设置过期时间。

      stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS); 

    此时,我们设置的过期时间为30秒。

    那么问题来了,这样是否就真正的解决了问题呢?上述程序就真的没有坑了吗?答案是还是有坑的!!!

    “坑位”分析

    我们在下单操作的方法中为分布式锁引入了超时机制,此时的代码还是无法真正避免死锁的问题,那“坑位”到底在哪里呢?试想,当程序执行完stringRedisTemplate.opsForValue().setIfAbsent()方法后,正要执行stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代码时,服务器宕机了,你还别说,生产坏境的情况非常复杂,就是这么巧,服务器就宕机了。此时,后续请求进入提交订单的方法时,都会因为无法成功设置锁标志而导致后续下单流程无法正常执行。

    既然我们找到了上述代码的“坑位”,那我们如何将这个”坑“填上?如何解决这个问题呢?别急,Redis已经提供了这样的功能。我们可以在向Redis中保存数据的时候,可以同时指定数据的超时时间。所以,我们可以将代码改造成如下所示。

    /**
    * 为了演示方便,我这里就简单定义了一个常量作为商品的id
    * 实际工作中,这个商品id是前端进行下单操作传递过来的参数
    */
    public static final String PRODUCT_ID = "100001";
    
    @RequestMapping("/submitOrder")
    public String submitOrder(){
        //通过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
        //实际上,value可以为任意的字符换
        Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
       //没有拿到锁,返回下单失败
        if(!isLock){
            return "failure";
        }
        try{
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                stock -= 1;
                stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
                logger.debug("库存扣减成功,当前库存为:{}", stock);
            }else{
                logger.debug("库存不足,扣减库存失败");
                throw new OrderException("库存不足,扣减库存失败");
            }
        }finally{
             //业务执行完成,删除PRODUCT_ID key
            stringRedisTemplate.delete(PRODUCT_ID);
        }
        return "success";
    }
    View Code

    在上述代码中,我们在向Redis中设置锁标志位的时候就设置了超时时间。此时,只要向Redis中成功设置了数据,则即使我们的业务系统宕机,Redis中的数据过期后,也会自动删除。后续的线程进入提交订单的方法后,就会成功的设置锁标志位,并向下执行正常的下单流程。

    到此,上述的代码基本上在功能角度解决了程序的死锁问题,那么,上述程序真的就完美了吗?哈哈,很多小伙伴肯定会说不完美!确实,上面的代码还不是完美的,那大家知道哪里不完美吗?接下来,我们继续分析。

    在开发集成角度分析代码

    在我们开发公共的系统组件时,比如我们这里说的分布式锁,我们肯定会抽取一些公共的类来完成相应的功能来供系统使用。

    这里,假设我们定义了一个RedisLock接口,如下所示。

    public interface RedisLock{
        //加锁操作
        boolean tryLock(String key, long timeout, TimeUnit unit);
        //解锁操作
        void releaseLock(String key);
    }
    View Code

     接下来,使用RedisLockImpl类实现RedisLock接口,提供具体的加锁和解锁实现,如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
        }
        @Override
        public void releaseLock(String key){
            stringRedisTemplate.delete(key);
        }
    }
    View Code

    在开发集成的角度来说,当一个线程从上到下执行时,首先对程序进行加锁操作,然后执行业务代码,执行完成后,再进行释放锁的操作。理论上,加锁和释放锁时,操作的Redis Key都是一样的。但是,如果其他开发人员在编写代码时,并没有调用tryLock()方法,而是直接调用了releaseLock()方法,并且他调用releaseLock()方法传递的key与你调用tryLock()方法传递的key是一样的。那此时就会出现问题了,他在编写代码时,硬生生的将你加的锁释放了!!!

    所以,上述代码是不安全的,别人能够随随便便的将你加的锁删除,这就是锁的误删操作,这是非常危险的,所以,上述的程序存在很严重的问题!!

    那如何实现只有加锁的线程才能进行相应的解锁操作呢? 继续向下看。

    如何实现加锁和解锁的归一化?

    什么是加锁和解锁的归一化呢?简单点来说,就是一个线程执行了加锁操作后,后续必须由这个线程执行解锁操作,加锁和解锁操作由同一个线程来完成。

    为了解决只有加锁的线程才能进行相应的解锁操作的问题,那么,我们就需要将加锁和解锁操作绑定到同一个线程中,那么,如何将加锁操作和解锁操作绑定到同一个线程呢?其实很简单,相信很多小伙伴都想到了—— 使用ThreadLocal实现 。没错,使用ThreadLocal类确实能够解决这个问题。

    此时,我们将RedisLockImpl类的代码修改成如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
                 stringRedisTemplate.delete(key);   
            }
        }
    }
    View Code

     继续分析

    我们将加锁和解锁的方法改成如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
        private String lockUUID;
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            lockUUID = uuid;
            return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
                 stringRedisTemplate.delete(key);   
            }
        }
    }
    View Code

    相信很多小伙伴都会看出上述代码存在什么问题了!!没错,那就是 线程安全的问题。

    所以,这里,我们需要使用ThreadLocal来解决线程安全问题。

    可重入性分析

    在上面的代码中,当一个线程成功设置了锁标志位后,其他的线程再设置锁标志位时,就会返回失败。还有一种场景就是在提交订单的接口方法中,调用了服务A,服务A调用了服务B,而服务B的方法中存在对同一个商品的加锁和解锁操作。

    所以,服务B成功设置锁标志位后,提交订单的接口方法继续执行时,也不能成功设置锁标志位了。也就是说,目前实现的分布式锁没有可重入性。

    这里,就存在可重入性的问题了。我们希望设计的分布式锁 具有可重入性 ,那什么是可重入性呢?简单点来说,就是同一个线程,能够多次获取同一把锁,并且能够按照顺序进行解决操作。

    其实,在JDK 1.5之后提供的锁很多都支持可重入性,比如synchronized和Lock。

    如何实现可重入性呢?

    映射到我们加锁和解锁方法时,我们如何支持同一个线程能够多次获取到锁(设置锁标志位)呢?可以这样简单的设计:如果当前线程没有绑定uuid,则生成uuid绑定到当前线程,并且在Redis中设置锁标志位。如果当前线程已经绑定了uuid,则直接返回true,证明当前线程之前已经设置了锁标志位,也就是说已经获取到了锁,直接返回true。

    结合以上分析,我们将提交订单的接口方法代码改造成如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            Boolean isLocked = false;
            if(threadLocal.get() == null){
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            }else{
                isLocked = true;   
            }
            return isLocked;
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
                 stringRedisTemplate.delete(key);   
            }
        }
    }
    View Code

     这样写看似没有啥问题,但是大家细想一下,这样写就真的OK了吗?

    可重入性的问题分析

    既然上面分布式锁的可重入性是存在问题的,那我们就来分析下问题的根源在哪里!

    假设我们提交订单的方法中,首先使用RedisLock接口对代码块添加了分布式锁,在加锁后的代码中调用了服务A,而服务A中也存在调用RedisLock接口的加锁和解锁操作。而多次调用RedisLock接口的加锁操作时,只要之前的锁没有失效,则会直接返回true,表示成功获取锁。也就是说,无论调用加锁操作多少次,最终只会成功加锁一次。而执行完服务A中的逻辑后,在服务A中调用RedisLock接口的解锁方法,此时,会将当前线程所有的加锁操作获得的锁全部释放掉。

    我们可以使用下图来简单的表示这个过程。

     

    那么问题来了,如何解决可重入性的问题呢?

    解决可重入性问题

    相信很多小伙伴都能够想出使用计数器的方式来解决上面可重入性的问题,没错,就是使用计数器来解决。 整体流程如下所示。

     

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            Boolean isLocked = false;
            if(threadLocal.get() == null){
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            }else{
                isLocked = true;   
            }
            //加锁成功后将计数器加1
            if(isLocked){
                Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
                threadLocalInteger.set(count++);
            }
            return isLocked;
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
                Integer count = threadLocalInteger.get();
                //计数器减为0时释放锁
                if(count == null || --count <= 0){
                     stringRedisTemplate.delete(key);      
                }
            }
        }
    }
    View Code

    至此,我们基本上解决了分布式锁的可重入性问题。

    说到这里,我还要问大家一句,上面的解决问题的方案真的没问题了吗?

    阻塞与非阻塞锁

    在提交订单的方法中,当获取Redis分布式锁失败时,我们直接返回了failure来表示当前请求下单的操作失败了。试想,在高并发环境下,一旦某个请求获得了分布式锁,那么,在这个请求释放锁之前,其他的请求调用下单方法时,都会返回下单失败的信息。在真实场景中,这是非常不友好的。我们可以将后续的请求进行阻塞,直到当前请求释放锁后,再唤醒阻塞的请求获得分布式锁来执行方法。

    所以,我们设计的分布式锁需要支持 阻塞和非阻塞 的特性。

    那么,如何实现阻塞呢?我们可以使用自旋来实现,继续修改RedisLockImpl的代码如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            Boolean isLocked = false;
            if(threadLocal.get() == null){
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                //如果获取锁失败,则自旋获取锁,直到成功
                if(!isLocked){
                    for(;;){
                        isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                        if(isLocked){
                            break;
                        }
                    }
                }
            }else{
                isLocked = true;   
            }
            //加锁成功后将计数器加1
            if(isLocked){
                Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
                threadLocalInteger.set(count++);
            }
            return isLocked;
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
                Integer count = threadLocalInteger.get();
                //计数器减为0时释放锁
                if(count == null || --count <= 0){
                     stringRedisTemplate.delete(key);      
                }
            }
        }
    }
    View Code

    在分布式锁的设计中,阻塞锁和非阻塞锁 是非常重要的概念,大家一定要记住这个知识点。

    锁失效问题

    尽管我们实现了分布式锁的阻塞特性,但是还有一个问题是我们不得不考虑的。那就是 锁失效的问题。

    当程序执行业务的时间超过了锁的过期时间会发生什么呢? 想必很多小伙伴都能够想到,那就是前面的请求没执行完,锁过期失效了,后面的请求获取到分布式锁,继续向下执行了,程序无法做到真正的互斥,无法保证业务的原子性了。

    那如何解决这个问题呢?答案就是:我们必须保证在业务代码执行完毕后,才能释放分布式锁。方案是有了,那如何实现呢?

    说白了,我们需要在业务代码中,时不时的执行下面的代码来保证在业务代码没执行完时,分布式锁不会因超时而被释放。

      springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS); 

    这里,我们需要定义一个定时策略来执行上面的代码,需要注意的是:我们不能等到30秒后再执行上述代码,因为30秒时,锁已经失效了。例如,我们可以每10秒执行一次上面的代码。

    有些小伙伴说,直接在RedisLockImpl类中添加一个while(true)循环来解决这个问题,那我们就这样修改下RedisLockImpl类的代码,看看有没有啥问题。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            Boolean isLocked = false;
            if(threadLocal.get() == null){
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                //如果获取锁失败,则自旋获取锁,直到成功
                if(!isLocked){
                    for(;;){
                        isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                        if(isLocked){
                            break;
                        }
                    }
                }
                //定义更新锁的过期时间
                while(true){
                    Integer count = threadLocalInteger.get();
                    //当前锁已经被释放,则退出循环
                    if(count == 0 || count <= 0){
                        break;
                    }
                    springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                    try{
                        //每隔10秒执行一次
                        Thread.sleep(10000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }else{
                isLocked = true;   
            }
            //加锁成功后将计数器加1
            if(isLocked){
                Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
                threadLocalInteger.set(count++);
            }
            return isLocked;
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
                Integer count = threadLocalInteger.get();
                //计数器减为0时释放锁
                if(count == null || --count <= 0){
                     stringRedisTemplate.delete(key);      
                }
            }
        }
    }
    View Code

    相信小伙伴们看了代码就会发现哪里有问题了:更新锁过期时间的代码肯定不能这么去写。因为这么写会 导致当前线程在更新锁超时时间的while(true)循环中一直阻塞而无法返回结果。 所以,我们不能将当前线程阻塞,需要异步执行定时任务来更新锁的过期时间。

    此时,我们继续修改RedisLockImpl类的代码,将定时更新锁超时的代码放到一个单独的线程中执行,如下所示。

    public class RedisLockImpl implements RedisLock{
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
        private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
        @Override
        public boolean tryLock(String key, long timeout, TimeUnit unit){
            Boolean isLocked = false;
            if(threadLocal.get() == null){
                String uuid = UUID.randomUUID().toString();
                threadLocal.set(uuid);
                isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                //如果获取锁失败,则自旋获取锁,直到成功
                if(!isLocked){
                    for(;;){
                        isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                        if(isLocked){
                            break;
                        }
                    }
                }
                //启动新线程来执行定时任务,更新锁过期时间
               new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate)).start();
            }else{
                isLocked = true;   
            }
            //加锁成功后将计数器加1
            if(isLocked){
                Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
                threadLocalInteger.set(count++);
            }
            return isLocked;
        }
        @Override
        public void releaseLock(String key){
            //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操作
            String uuid = stringRedisTemplate.opsForValue().get(key);
            if(threadLocal.get().equals(uuid)){
                Integer count = threadLocalInteger.get();
                //计数器减为0时释放锁
                if(count == null || --count <= 0){
                     stringRedisTemplate.delete(key); 
                    //获取更新锁超时时间的线程并中断
                    long threadId = stringRedisTemplate.opsForValue().get(uuid);
                    Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                    if(updateLockTimeoutThread != null){
                         //中断更新锁超时时间的线程
                        updateLockTimeoutThread.interrupt();   
                        stringRedisTemplate.delete(uuid);
                    }
                }
            }
        }
    }
    View Code

     创建UpdateLockTimeoutTask类来执行更新锁超时的时间。

    public class UpdateLockTimeoutTask implements Runnable{
        //uuid
        private long uuid;
        private StringRedisTemplate stringRedisTemplate;
        public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate){
            this.uuid = uuid;
            this.stringRedisTemplate = stringRedisTemplate;
        }
        @Override
        public void run(){
            //以uuid为key,当前线程id为value保存到Redis中
            stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
             //定义更新锁的过期时间
            while(true){
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒执行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    View Code

     接下来,我们定义一个ThreadUtils工具类,这个工具类中有一个根据线程id获取线程的方法getThreadByThreadId(long threadId)。

    public class ThreadUtils{
        //根据线程id获取线程句柄
        public static Thread getThreadByThreadId(long threadId){
            ThreadGroup group = Thread.currentThread().getThreadGroup();
            while(group != null){
                Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
                int count = group.enumerate(threads, true);
                for(int i = 0; i < count; i++){
                    if(threadId == threads[i].getId()){
                        return threads[i];
                    }
                }
            }
        }
    }
    View Code

    上述解决分布式锁失效的问题在分布式锁领域有一个专业的术语叫做 “异步续命” 。需要注意的是:当业务代码执行完毕后,我们需要停止更新锁超时时间的线程。所以,这里,我对程序的改动是比较大的,首先,将更新锁超时的时间任务重新定义为一个UpdateLockTimeoutTask类,并将uuid和StringRedisTemplate注入到任务类中,在执行定时更新锁超时时间时,首先将当前线程保存到Redis中,其中Key为传递进来的uuid。

    在首先获取分布式锁后,重新启动线程,并将uuid和StringRedisTemplate传递到任务类中执行任务。当业务代码执行完毕后,调用releaseLock()方法释放锁时,我们会通过uuid从Redis中获取更新锁超时时间的线程id,并通过线程id获取到更新锁超时时间的线程,调用线程的interrupt()方法来中断线程。

    此时,当分布式锁释放后,更新锁超时的线程就会由于线程中断而退出了。

  • 相关阅读:
    序一
    Python3 网络爬虫开发实战
    爬虫笔记
    celery
    用yield实现协程 和asyncio模块
    Django学习之完成数据库主从复制、读写分离和一主多从情况下的使用办法
    Django学习之缓存和信号
    Django学习之Django-debug-toobar
    Python面向对象
    Python中 if __name__ == "__main__" 的理解
  • 原文地址:https://www.cnblogs.com/cb1186512739/p/12802040.html
Copyright © 2011-2022 走看看