zoukankan      html  css  js  c++  java
  • 【连载】redis库存操作,分布式锁的四种实现方式[二]--基于Redisson实现分布式锁

    一、redisson介绍

    redisson实现了分布式和可扩展的java数据结构,支持的数据结构有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。并且是线程安全的,底层使用Netty 4实现网络通信。和jedis相比,功能比较简单,不支持排序,事务,管道,分区等redis特性,可以认为是jedis的补充,不能替换jedis。

    二、redisson几种锁介绍

    1、可重入锁(Reentrant Lock)

    基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口

    1 RLock lock = redisson.getLock("anyLock");
    2 // 最常见的使用方法
    3 lock.lock();

    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

    1 // 加锁以后10秒钟自动解锁
    2 // 无需调用unlock方法手动解锁
    3 lock.lock(10, TimeUnit.SECONDS);
    4 
    5 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
    6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    7 ...
    8 lock.unlock();

    Redisson同时还为分布式锁提供了异步执行的相关方法:

    1 RLock lock = redisson.getLock("anyLock");
    2 lock.lockAsync();
    3 lock.lockAsync(10, TimeUnit.SECONDS);
    4 Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

    2.公平锁(Fair Lock)

    基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。

    1 RLock fairLock = redisson.getFairLock("anyLock");
    2 // 最常见的使用方法
    3 fairLock.lock();

    同样的,Fair lock也提供加锁时间

    1 // 10秒钟以后自动解锁
    2 // 无需调用unlock方法手动解锁
    3 fairLock.lock(10, TimeUnit.SECONDS);
    4 
    5 // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
    6 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
    7 ...
    8 fairLock.unlock();

    Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:

    1 RLock fairLock = redisson.getFairLock("anyLock");
    2 fairLock.lockAsync();
    3 fairLock.lockAsync(10, TimeUnit.SECONDS);
    4 Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

    3.联锁(MultiLock)

    基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

     1 RLock lock1 = redissonInstance1.getLock("lock1");
     2 RLock lock2 = redissonInstance2.getLock("lock2");
     3 RLock lock3 = redissonInstance3.getLock("lock3");
     4 
     5 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
     6 // 同时加锁:lock1 lock2 lock3
     7 // 所有的锁都上锁成功才算成功。
     8 lock.lock();
     9 ...
    10 lock.unlock();

    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

    1 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
    2 // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
    3 lock.lock(10, TimeUnit.SECONDS);
    4 
    5 // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
    6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    7 ...
    8 lock.unlock();

    4.红锁(Red Lock)

    基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

     1 RLock lock1 = redissonInstance1.getLock("lock1");
     2 RLock lock2 = redissonInstance2.getLock("lock2");
     3 RLock lock3 = redissonInstance3.getLock("lock3");
     4 
     5 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
     6 // 同时加锁:lock1 lock2 lock3
     7 // 红锁在大部分节点上加锁成功就算成功。
     8 lock.lock();
     9 ...
    10 lock.unlock();

    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

    1 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
    2 // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
    3 lock.lock(10, TimeUnit.SECONDS);
    4 
    5 // 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
    6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
    7 ...
    8 lock.unlock();

    5. 读写锁(ReadWriteLock)

    基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。同时还支持自动过期解锁。该对象允许同时有多个读取锁,但是最多只能有一个写入锁。

    1 RReadWriteLock rwlock = redisson.getLock("anyRWLock");
    2 // 最常见的使用方法
    3 rwlock.readLock().lock();
    4 //
    5 rwlock.writeLock().lock();

    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

    // 10秒钟以后自动解锁
    // 无需调用unlock方法手动解锁
    rwlock.readLock().lock(10, TimeUnit.SECONDS);
    //
    rwlock.writeLock().lock(10, TimeUnit.SECONDS);
    
    // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
    boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
    //
    boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
    ...
    lock.unlock();

    6. 闭锁(CountDownLatch)

    基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

    RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
    latch.trySetCount(1);
    latch.await();
    
    // 在其他线程或其他JVM里
    RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
    latch.countDown();

    三、redisson分布式锁在业务中的应用

    1、引入相关pom

            <!-- redisson -->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.5.0</version>
            </dependency>

    2、将redisson交由spring管理,本文采用redisson的集群模式装配,另外可配置哨兵模式,单点等

    /**
     * redisson客户端参数配置类
     */
    @Configuration
    @Data
    public class RedissonProperties {
    
        private int idleConnectionTimeout = 10000;
    
        private int pingTimeout = 1000;
    
        private int connectTimeout = 10000;
    
        private int timeout = 3000;
    
        private int retryAttempts = 3;
    
        private int retryInterval = 1500;
    
        private int reconnectionTimeout = 3000;
    
        private int failedAttempts = 3;
    
        private int subscriptionsPerConnection = 5;
    
        private String clientName = "none";
    
        private int subscriptionConnectionMinimumIdleSize = 64;
    
        private int subscriptionConnectionPoolSize = 256;
    
        private int slaveConnectionMinimumIdleSize = 64;
    
        private int slaveConnectionPoolSize = 256;
    
        private int masterConnectionMinimumIdleSize = 64;
    
        private int masterConnectionPoolSize = 256;
    
        private ReadMode readMode = ReadMode.MASTER;
    
        private SubscriptionMode subscriptionMode = SubscriptionMode.MASTER;
    
        private int scanInterval = 1000;
    
        @Value("${rediscluster.pwd}")
        private String password;
    
        @Value("${redis.cluster}")
        private String nodeAddress;
    
        @Value("${redis.cluster1}")
        private String nodeAddress1;
    
        @Value("${redis.cluster2}")
        private String nodeAddress2;
    
    }
    /**
     * 初始化redisson Bean
     *
     * @author LiJunJun
     * @date 2018/10/19
     */
    @Configuration
    public class RedissonAutoConfiguration {
    
        @Autowired
        private RedissonProperties redssionProperties;
    
        /**
         * 集群模式自动装配
         *
         * @return
         */
        @Bean
        public RedissonClient redissonClient() {
    
            Config config = new Config();
            String passWord = redssionProperties.getPassword();
            ClusterServersConfig serverConfig = config.useClusterServers();
            serverConfig.addNodeAddress(redssionProperties.getNodeAddress(), redssionProperties.getNodeAddress1(), redssionProperties.getNodeAddress2());
            serverConfig.setPingTimeout(redssionProperties.getPingTimeout());
            serverConfig.setConnectTimeout(redssionProperties.getConnectTimeout());
            serverConfig.setTimeout(redssionProperties.getTimeout());
            serverConfig.setRetryAttempts(redssionProperties.getRetryAttempts());
            serverConfig.setRetryInterval(redssionProperties.getRetryInterval());
            serverConfig.setReconnectionTimeout(redssionProperties.getReconnectionTimeout());
            serverConfig.setFailedAttempts(redssionProperties.getFailedAttempts());
            serverConfig.setSubscriptionsPerConnection(redssionProperties.getSubscriptionsPerConnection());
            serverConfig.setClientName(redssionProperties.getClientName());
            serverConfig.setSubscriptionConnectionMinimumIdleSize(redssionProperties.getSubscriptionConnectionMinimumIdleSize());
            serverConfig.setSubscriptionConnectionPoolSize(redssionProperties.getSubscriptionConnectionPoolSize());
            serverConfig.setSlaveConnectionMinimumIdleSize(redssionProperties.getSlaveConnectionMinimumIdleSize());
            serverConfig.setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
            serverConfig.setMasterConnectionMinimumIdleSize(redssionProperties.getMasterConnectionMinimumIdleSize());
            serverConfig.setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize());
            serverConfig.setReadMode(redssionProperties.getReadMode());
            serverConfig.setSubscriptionMode(redssionProperties.getSubscriptionMode());
            serverConfig.setScanInterval(redssionProperties.getScanInterval());
            serverConfig.setPassword(StringUtils.isNotBlank(passWord) && !"null".equals(passWord) ? passWord : null);
            return Redisson.create(config);
        }
    }

    3、业务代码中的应用。此处使用的是悲观锁,即必须拿到锁之后才能继续往下执行,也可使用乐观锁,tryLock,利用重试去获取锁

        /**
         * redissonClient
         */
        @Resource
        private RedissonClient redissonClient;
    
        /**
         * 减库存
         *
         * @param trace 请求流水
         * @param stockManageReq(stockId、decrNum)
         * @return -1为失败,大于-1的正整数为减后的库存量,-2为库存不足无法减库存
         */
        @Override
        @ApiOperation(value = "减库存", notes = "减库存")
        @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {
    
            long startTime = System.currentTimeMillis();
    
            LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));
    
            int res = 0;
            String stockId = stockManageReq.getStockId();
            Integer decrNum = stockManageReq.getDecrNum();
    
            // 添加分布式锁
            RLock stockLock = null;
    
            try {
                if (null != stockId && null != decrNum) {
    
                    stockId = PREFIX + stockId;
    
                    // 添加分布式锁
                    stockLock = redissonClient.getFairLock(stockId);
    
                    stockLock.lock();
    
                    // redis 减库存逻辑
                    String vStock = redisStockPool.get(stockId);
                    long realV = 0L;
                    if (StringUtils.isNotEmpty(vStock)) {
                        realV = Long.parseLong(vStock);
                    }
                    //库存数  大于等于 要减的数目,则执行减库存
                    if (realV >= decrNum) {
                        Long v = redisStockPool.decrBy(stockId, decrNum);
                        res = v.intValue();
                    } else {
                        res = -2;
                    }
    
                    stockLock.unlock();
                }
            } catch (Exception e) {
                LOGGER.error(trace, "decr sku stock failure.", e);
                res = -1;
            } finally {
                if (stockLock != null && stockLock.isLocked() && stockLock.isHeldByCurrentThread()) {
                    stockLock.unlock();
                }
                LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
            }
            return res;
        }

    四、ab压测及结果分析
    同样的,我们以5000的请求量100的并发量来压、tps在330左右,相对于zk做分布式锁来看,提升了10倍的性能,但仍然不能满足我们的要求

     五、总结

    redisson提供了丰富的分布式锁实现机制,并且使用起来相对比较简单方便,具体选用哪种锁,可以根据业务来选择,但在高并发的情况下,性能还是有些差强人意,下一篇,我们使用redis的watch来实现分布式锁。

  • 相关阅读:
    编写一个ComputerAverage抽象类,类中有一个抽象方法求平均分average,可以有参数。定义 Gymnastics 类和 School 类,它们都是 ComputerAverage 的子类。Gymnastics 类中计算选手的平均成绩的方法是去掉一个最低分,去掉一个最高分,然后求平均分;School 中计算平均分的方法是所有科目的分数之和除以总科目数。 要求:定义ComputerAv
    创建一个接口Shape,其中有抽象方法area,类Circle 、Rectangle实现area方法计算其面积并返回。又有Star实现Shape的area方法,其返回值是0,Star类另有一返回值boolean型方法isStar;在main方法里创建一个Vector,根据随机数的不同向其中加入Shape的不同子类对象(如是1,生成Circle对象;如是2,生成Rectangle对象;如是3,生成S
    学校中有老师和学生两类人,而在职研究生既是老师又是学生,对学生的管理和对教师的管理在他们身上都有体现。
    定义抽象类Shape,抽象方法为showArea(),求出面积并显示,定义矩形类Rectangle,正方形类Square,圆类 Circle,根据各自的属性,用showArea方法求出各自的面积,在main方法中构造3个对象,调用showArea方法。(体现多态)
    分别编写两个类Point2D,Point3D来表示二维空间和三维空间的点,使之满足下列要求:
    2.编写实现:有一个三角形类Triangle,成员变量有底边x和另一条边y,和两边的夹角a(0<a<180),a为静态成员,成员方法有两个:求面积s(无参数)和修改角度(参数为角度)。 编写实现: 构造函数为 Triangle(int xx,int yy,int aa) 参数分别为x,y,a赋值 在main方法中构造两个对象,求出其面积,然后使用修改角度的方法,修改两边的夹角,再求出面积值。(提示
    定义一个复数(z=x+iy)类Complex,包含: 两个属性:实部x和虚部y 默认构造函数 Complex(),设置x=0,y=0 构造函数:Complex(int i,int j) 显示复数的方法:showComp()将其显示为如: 5+8i或5-8i 的形式。 求两个复数的和的方法:(参数是两个复数类对象,返回值是复数类对象)public Complex addComp(Compl
    定义一个类Point,代表一个点,public属性有x和y,方法有显示点坐标 show(),构造函数有两个参数分别给x,y赋值,在main方法中构造两个对象,再创建一方法(getMiddle)为取两个点构成线段的中点的坐标,参数为2个点对象,调用此方法后得到一个新的点,编写Application,显示该对象的坐标值。
    编写程序读取一组正数,找出它们的最大数,然后计算该数的出现次数,输入是以 0结束。比如:输入 3 5 2 5 5 5 0,程序找出最大数是 5,它出现的次数是 4。
    vim快捷键-02
  • 原文地址:https://www.cnblogs.com/ft535535/p/10149526.html
Copyright © 2011-2022 走看看