zoukankan      html  css  js  c++  java
  • 基于Redis实现分布式锁

    1、前言

      众所周知,对于高并发业务场景通常会考虑加锁机制保证线程安全,比如使用Synchronized对象锁。Synchronized为JVM进程级别,在项目采取单实例部署情况下几乎可以胜任。但是当项目采用分布式架构,考虑采用多实例高可用部署情况时,Synchronized对象锁应对高并发场景已经力不从心。

    分布式高可用部署架构:

      那么,分布式部署架构下如何避免高并发造成的“超买/超卖现象”等类似线程安全问题呢?还好,目前也有不少成熟解决方案,整体上都是围绕实现分布式锁,常见的实现方案有:

    • 基于Redis(缓存等)实现分布式锁。
    • 基于ZooKeeper实现分布式锁。
    • 基于数据库实现分布式锁。

      本文将重点探讨如何采用Redis缓存实现分布式锁。

    2、Redis SETNX

      通常,采用Redis SETNX指令实现基于Redis实现分布式锁。Redis为单线程模型,可以将高并发场景操作映射为单点指令操作。

      Redis数据库指令:SETNX key value  ,SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

      (Refer to :http://redisdoc.com/string/setnx.html)

    • 指令特性

    只在键 key 不存在的情况下, 将键 key 的值设置为 value 。
    若键 key 已经存在, 则 SETNX 命令不做任何动作。
    • 返回值

    命令在设置成功时返回 1 , 设置失败时返回 0
    • setnx是Redis命令中的方法,java中对应的实现方法是setIfAbsent()。

    3、代码验证

       下文我将展示一段购物库存简单的demo示例,若采用分布式部署多实例,那么在高并发情况下会存在哪些重要问题。

    本文将采用JMeter性能测试工具,模拟高并发业务场景,完成高并发压力测试。 

    • 代码

        @PostMapping("/buyProduct1")
        public String buyProduct1() {
            String buyerName = "顾客" + Thread.currentThread().getId();
            Object stObj = redisTemplate.opsForValue().get("stockNum");
            int stockNum = Integer.parseInt(stObj.toString());
            if (stockNum > 0) {
                redisTemplate.opsForValue().set("stockNum", --stockNum);
                System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
            } else {
                System.out.println(buyerName + "下单失败,库存不足.");
                return buyerName + "下单失败!";
            }
            return buyerName + "下单成功!";
        }
    • JMeter测试

      JMeter设置10个用户线程,0.5s内并发请求一次。

      执行成功,模拟购物成功。

     

    •  执行结果

      从IDE控制台日志可以看到,0.5s内10次请求,出现了“超卖现象”,很明显的线程安全问题。

     

       当前代码如果在单实例部署架构中,可以采用Synchronized对象锁实现线程安全控制(在业务代码上添加锁),但是在分布式部署架构下将无法实现有效控制。

    4、优化代码

       采用Redis实现分布式锁,并对上述简单代码添加分布式锁机制,实现线程安全控制。通常,也有两种具体的实现方式,详细见下文代码实现。

    方式一:基于Redis SETNX指令

    • 代码实现

        @PostMapping("/buyProduct2")
        public String buyProduct2() {
            String buyerName = "顾客" + Thread.currentThread().getId();
            String lockKey = "buyProductLock";
            String lockValue = UUID.randomUUID().toString().concat(UUID.randomUUID().toString());
            try {
                // setIfAbsent是java中的方法,setnx是redis命令中的方法
                // 1.保证系统崩溃可以自然释放锁
                // 2.保证redis操作原子性,避免设置超时时刻系统崩溃
                Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
                if (!isSuccess) {
                    System.out.println("系统繁忙,请稍后重试.");
                    return "系统繁忙,请稍后重试.";
                }
                int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
                if (stockNum > 0) {
                    redisTemplate.opsForValue().set("stockNum", --stockNum);
                    System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
                } else {
                    System.out.println(buyerName + "下单失败,库存不足.");
                    return buyerName + "下单失败!";
                }
            } finally {//3.保证操作成功和系统异常情况下都能释放锁
                //4.采用线程标识主动检查,保证仅删除自己的锁。避免redis超时时间小于业务逻辑执行时间,前一个线程释放了后一个线程的加锁,造成锁永久失效。
                //lockValue存储方法栈中线程私有
                if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
                    //释放锁
                    redisTemplate.delete(lockKey);
                }
            }
            return buyerName + "下单成功!";
        }
    • 运行结果

      部分线程执行成功,部分线程执行被拦截,保证了用户并发下单库存数据正确性,实现了线程安全控制。

     

     

    方式二:采用Redisson 实现

    • 实现原理

     

      加锁失败情况下,可以设置超时时间T,在时间T内自旋加锁,超过时间T之后加锁失败返回,避免死锁。

      当Redis集群为多Master-Slave模式时,Redis根据hash算法选择一个master尝试加锁。

      Redisson是通过执行lua脚本完成对Redis加锁操作。

    • maven依赖

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.15.0</version>
            </dependency>
    • Redisson配置

    @Component
    public class redissonConfig {
    
        @Bean
        public Redisson redisson() {
            Config config = new Config();
            config.useSingleServer()
                    .setAddress("redis://127.0.0.1:6379")
                    .setDatabase(0);
            return (Redisson) Redisson.create(config);
        }
    }
    •  代码实现

        @PostMapping("/buyProduct3")
        public String buyProduct3() {
            String buyerName = "顾客" + Thread.currentThread().getId();
            String lockKey = "buyProductLock";
            // redisson加锁
            RLock redissonLock = redisson.getLock(lockKey);
            try {
                //redisson设置锁时间
                redissonLock.lock(10, TimeUnit.SECONDS);
                int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
                if (stockNum > 0) {
                    redisTemplate.opsForValue().set("stockNum", --stockNum);
                    System.out.println(buyerName + "下单成功,库存剩余件数:" + stockNum);
                } else {
                    System.out.println(buyerName + "下单失败,库存不足.");
                    return buyerName + "下单失败!";
                }
            } finally {
                //redisson释放锁
                redissonLock.unlock();
            }
            return buyerName + "下单成功!";
        }
    • 执行结果

     

     

    5、总结

    • 比较

      方法一与方法二都实现了在分布式部署场景下,控制高并发业务请求下线程安全。方法一拦截并发线程,直接结束在业务逻辑执行过程中其他线程并发请求,并发吞入量较小。方法二基于Redisson可以设置并发线程等待状态,保证每个线程请求都能完成业务,提高了系统并发吞吐量。另外,方式二的实现代码量较少。

      方法一基于Redis指令面临的问题:当Redis设置超时时间<应用程序执行时间,Redis分布式锁先于程序执行完成释放,导致当前加锁失效。方式二Redisson分布式锁,通过加锁时候开启分线程,定期(小于redis超时时间,eg:1/3)检查redis锁标记,如果存在再延时机制,解决了这类时间差问题。

    • 共同存在的问题

      分布式架构下,Redis也以集群模式部署,当Redis master节点加锁成功之后,返回成功,这是Redis主节点可能宕机故障,slave从节点晋升为主节点,造成Redis锁标识丢失,从而导致分布式锁失效。

    解决方案:

      采取Redlock或者 Zookeeper。Redisson性能更高,确保绝对数据安全采用Zookeeper(也是主从结构)。

    6、防重复提交代码优化

       对前文《防止重复提交解决方案》进行代码优化,支持高并发场景线程安全控制。

    • 代码

        @Around("preventDuplication()")
        public Object before(ProceedingJoinPoint joinPoint) throws Throwable {
            ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = attributes.getRequest();
            Assert.notNull(request, "request cannot be null.");
            //获取执行方法
            Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
            //获取防重复提交注解
            PreventDuplication annotation = method.getAnnotation(PreventDuplication.class);
            // 获取token以及方法标记,生成redisKey和redisValue
            String token = request.getHeader(IdempotentConstant.TOKEN);
            String redisKey = IdempotentConstant.PREVENT_DUPLICATION_PREFIX
                    .concat(token)
                    .concat(getMethodSign(method, joinPoint.getArgs()));
            String redisValue = redisKey.concat(annotation.value()).concat("submit duplication");
            System.out.print("当前线程号:" + Thread.currentThread().getId());
            System.out.println("存储redisKey: " + redisKey);
            redisValue.concat(UUID.randomUUID().toString() + Thread.currentThread().getId());
            try {
                //设置防重复操作限时标记(前置通知)
                //redisTemplate实现jedis.setnx(key,value),setIfAbsent 是java中的方法,setnx 是 redis命令中的方法
                Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisKey, redisValue, annotation.expireSeconds(), TimeUnit.SECONDS);
                System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "startTime:" + isSuccess);
                long startTime = System.currentTimeMillis();
                if (!isSuccess) {
                    throw new RuntimeException("请勿重复提交");
                }
                System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "startTime:" + startTime + "ms耗时");
                //ProceedingJoinPoint类型参数可以决定是否执行目标方法,且环绕通知必须要有返回值,返回值即为目标方法的返回值
                Object proceed = joinPoint.proceed();
                long endStart = System.currentTimeMillis();
                System.out.println("当前线程号:" + Thread.currentThread().getId() + "," + "endStart:" + endStart + "ms耗时");
                return proceed;
            } finally {
                //释放锁校验是否为当前线程
                if (redisValue.equals(redisTemplate.opsForValue().get(redisKey))) {
                    //释放锁
                    redisTemplate.delete(redisKey);
                }
            }
        }

    7、源代码

     本文代码已经上传托管至GitHub以及Gitee,有需要的读者请自行下载。

    • GitHub:https://github.com/gavincoder/distributedlock.git
    • Gitee:https://gitee.com/gavincoderspace/distributedlock.git
  • 相关阅读:
    IO文件
    eclipse快捷键
    异常
    多例模式
    单例模式
    访问控制权限
    系统常见包

    查找
    【avalon】offsetParent
  • 原文地址:https://www.cnblogs.com/gavincoder/p/14413436.html
Copyright © 2011-2022 走看看