zoukankan      html  css  js  c++  java
  • 【连载】redis库存操作,分布式锁的四种实现方式[三]--基于Redis watch机制实现分布式锁

    一、redis的事务介绍

    1、 Redis保证一个事务中的所有命令要么都执行,要么都不执行。如果在发送EXEC命令前客户端断线了,则Redis会清空事务队列,事务中的所有命令都不会执行。而一旦客户端发送了EXEC命令,所有的命令就都会被执行,即使此后客户端断线也没关系,因为Redis中已经记录了所有要执行的命令。

    2、 除此之外,Redis的事务还能保证一个事务内的命令依次执行而不被其他命令插入。试想客户端A需要执行几条命令,同时客户端B发送了一条命令,如果不使用事务,则客户端B的命令可能会插入到客户端A的几条命令中执行。如果不希望发生这种情况,也可以使用事务。

    3、 若一个事务中有多条命令,若有一条命令错误,事务中的所有命令都不会执行。若在执行阶段有命令执行错误,其他的命令也会正确的执行,需要注意。

    4、与mysql的事务不同,redis的事务执行中时不会回滚的,哪怕出现错误,之前已经执行的命令结果也不会回滚。

    二、Redis watch介绍

    1、 WATCH命令可以监控一个或多个键,一旦其中有一个键被修改(或删除),之后的事务就不会执行。监控一直持续到EXEC命令(事务中的命令是在EXEC之后才执行的,所以在MULTI命令后可以修改WATCH监控的键值)

    2、watch一般配合事务使用

    例:启动一个线程,连接redis,监控key watchKeyTest,sleep10s模拟业务逻辑处理,此时再启动另一个进程去修改该key的值,那么当前线程就会返回null

    /**
     * @author LiJunJun
     * @date 2018/12/10
     */
    public class Test {
    
        private static Jedis jedis;
    
        static {
            jedis = new Jedis("192.168.10.109", 6379);
            jedis.auth("aaa@leadeon.cn");
            jedis.sadd("watchKeyTest", "290");
        }
    
        public static void main(String[] args) {
    
            jedis.watch("watchKeyTest");
    
            System.out.println("开始监控key: watchKeyTest");
    
            Transaction transaction = jedis.multi();
    
            try {
                // sleep 10秒,模拟业务逻辑处理
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("开始获取key: watchKeyTest");
            transaction.sismember("watchKeyTest", "290");
    
            List<Object> result = transaction.exec();
            System.out.println("执行结果:" + result);
            jedis.disconnect();
        }
    }

    启动另一个进程,修改同一个key

    public class Test2 {
    
        public static void main(String[] args) {
    
            Jedis jedis = new Jedis("192.168.10.109", 6379);
            jedis.auth("common@leadeon.cn");
            long result = jedis.sadd("watchKeyTest", "358");
            System.out.println(result);
            jedis.disconnect();
        }
    }

    此时,进程1就会返回null

    若在进程1执行期间,该key没有被其他进程修改,则返回正确的值。

    三、实现思路

    基于以上介绍的redis的事务以及watch机制,我们可以做分布式锁处理,即在分布式系统中,高并发情况下,一个线程watch相应的key后,其他进程若修改了key,则该进程所在的事务就不执行,返回null,我们可以增加重试机制,来做库存操作

    四、业务代码实现

    采用watch机制,做乐观锁处理,重试三次,三次返回均未成功,则接口返回失败
        /**
         * 减库存(基于redis watch机制实现)
         *
         * @param trace 请求流水
         * @param stockManageReq(stockId、decrNum)
         * @return -1为失败,大于-1的正整数为减后的库存量,-2为库存不足无法减库存
         */
        @Override
        @ApiOperation(value = "减库存", notes = "减库存")
        @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {
    
            long startTime = System.currentTimeMillis();
    
            LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));
    
            int res = 0;
            String stockId = stockManageReq.getStockId();
            Integer decrNum = stockManageReq.getDecrNum();
    
            boolean decrByStock = false;
    
            try {
                if (null != stockId && null != decrNum) {
    
                    stockId = PREFIX + stockId;
    
                    // 采用watch机制,做乐观锁处理,重试三次,三次返回均未成功,则接口返回失败
                    for (int i = 0; i < TRY_COUNT; i++) {
                        Integer decrByStockRes = decrByStock(stockId, decrNum, trace);
    
                        // 更新库存时key对应的value发生变更,重试
                        if (decrByStockRes != -1) {
                            res = decrByStockRes;
                            decrByStock = true;
                            break;
                        }
                    }
    
                    if (!decrByStock) {
                        res = -2;
                        LOGGER.info("本次请求减库存失败!decrByStockFailure=1");
                    }
                }
            } catch (Exception e) {
                LOGGER.error(trace, "decr sku stock failure.", e);
                res = -1;
            } finally {
                LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
            }
            return res;
        }
    
        /**
         * 减库存逻辑
         *
         * @param stockId 库存id
         * @param decrNum 减少的量
         * @return 减库存结果(-1:表示更新库存时key对应的value发生变更,即提示调用方重试;-2: 库存不够减,售罄;其它值表示减库存后的值)
         */
        private Integer decrByStock(String stockId, int decrNum, String trace) {
    
            Response<Long> v = null;
            List<Object> result = null;
    
            try (Jedis jedis = jedisPool.getWriteResource()) {
    
                if (!jedis.select(0).equals("OK")) {
                    LOGGER.error(trace, "减库存,本次请求未获取到jedis连接!");
                    return -1;
                }
    
                jedis.watch(stockId);
    
                // redis 减库存逻辑
                String vStock = jedis.get(stockId);
    
                long realV = 0L;
    
                if (StringUtils.isNotEmpty(vStock)) {
                    realV = Long.parseLong(vStock);
                }
                //库存数  大于等于 要减的数目,则执行减库存
                if (realV < decrNum) {
    
                    return -2;
                }
    
                Transaction transaction = jedis.multi();
    
                v = transaction.decrBy(stockId, decrNum);
    
                result = transaction.exec();
            }
    
            return (result == null || result.isEmpty()) ? -1 : v.get().intValue();
        }

    五、ab压测及分析
    同样的,我们以5000的请求量100的并发量来压、tps在640左右,比zk做分布式锁来看,提升了20倍的性能,比redisson分布式锁提升了2倍,性能提升不大

    同时我们发现,5000个请求,有4561个请求失败,我们看下日志统计,有多少请求没有成功执行事务

    也是4561,说明有4561个事务没有成功执行,并不是运行错误。

    六、总结

    watch可以用来控制对redis的操作同步执行,但失败的几率较大,用该机制做抢购的业务还行,但对redis操作结果依赖较强的业务来说,不太适用,下一篇我们讲下终极解决方案,适用redis的lua脚本编程,变相的实现分布式锁。

  • 相关阅读:
    AOP 和 前置通知,后置通知
    使用Spring 简化MyBatis
    核心一:DI
    环境搭建及创建对象方式及赋值(注入)
    核心一:IoC
    Spring框架简介
    判断两个矩形是否相交的4个方法
    计算旋转角度
    浅析adb命令
    如何选择开源许可证?
  • 原文地址:https://www.cnblogs.com/ft535535/p/10150577.html
Copyright © 2011-2022 走看看