zoukankan      html  css  js  c++  java
  • 分布式锁实战,分布式锁方案选择

    分布式锁实现方案优缺点

     不推荐自己编写的分布式锁

    推荐Redisson和Curator实现的分布式锁。

    一、使用锁解决电商中的超卖问题?

    举例:某件商品库存数量10件,结果卖出了20件

    二、超卖现象的产生

    A和B同时看到这个商品,加入购物车,并同时提交订单,导致了超卖的现象。

    三、超卖现象模拟 

    1)问题模拟。假设当前库存为1

    创建订单: 在订单中先获取是否有库存,然后更新库存数量为(原库存数-1),然后插入订单

    单元测试:创建线程池,并发执行创建订单。

     cyclicBarrier.await的作用: 让线程等待,在某一个时刻,让5个线程同时执行。

    测试结果: 产生了5条订单,并且库存为0. 这就是超卖现象的产生。

    四、超卖现象的解决

    1、方法1: 数据库update行锁 (单体应用)

    扣减库存不在程序中进行,而是通过数据库

    向数据库传递库存增量,扣减一个库存,增量为-1

    在数据库update语句计算库存,通过update行锁解决并发。更新后检查库存是否是负数,如果是负数抛出异常,则整个操作回滚。

    2、解决方法二(synchronized和ReentrantLock) (单体应用)

    校验库存、扣减库存统一加锁,使之成为原则的操作。并发时,只有获得锁的线程才能校验、扣减库存。

    注意: 事务的操作通过代码控制。如果事务通过注解的方式写在方法里,并且这个方法synchronized 修饰。 那么会在同步方法结束后,才提交事务。这样当事务还没提交,另外一个线程进入了同步方法。

     代码如下:

     public Integer createOrder() throws Exception{
            Product product = null;
    
            lock.lock();
            try {
                TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
                product = productMapper.selectByPrimaryKey(purchaseProductId);
                if (product==null){
                    platformTransactionManager.rollback(transaction1);
                    throw new Exception("购买商品:"+purchaseProductId+"不存在");
                }
    
                //商品当前库存
                Integer currentCount = product.getCount();
                System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
                //校验库存
                if (purchaseProductNum > currentCount){
                    platformTransactionManager.rollback(transaction1);
                    throw
                            new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
                }
    
                productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId());
                platformTransactionManager.commit(transaction1);
            }finally {
                lock.unlock();
            }
    
            TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
            Order order = new Order();
            order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
            ...
            orderMapper.insertSelective(order);
    
            OrderItem orderItem = new OrderItem();
            ...
            orderItemMapper.insertSelective(orderItem);
            platformTransactionManager.commit(transaction);
            return order.getId();
        }
    

      

    3、基于数据库实现分布式锁

    多个进程、多个线程访问共同的组件数据库,

    通过select ...... for update 访问同一条数据

    for update锁住这条数据,其他线程只能等待

    接口代码如下,此程序是分布式部署。比如开启了两个程序,localhost:8080/myproject/singleLock   localhost:8081/myproject/singleLock

     selectDistributeLock的SQL语句里select ...... for update

    测试:分别调用localhost:8080/myproject/singleLock   localhost:8081/myproject/singleLock这里这两个接口。当第一个接口执行完后,等待2秒。 第二个请求也获得到了锁。

    基于数据库实现分布式锁的优缺点

    优点: 简单方便,易于理解、易于操作

    缺点: 并发量大时,对数据库压力较大。

    建议:作为锁的数据库与业务数据库分开。

    4、基于Redis的Setnx实现分布式锁

    1) 实现原理:

    获取锁的Redis命令 SET resource_name my_random_value NX PX 30000

    resource_name: 资源名称,可根据不同的业务区分不同的锁

    my_random_value: 随机值,每个线程的随机值都不同,用于释放锁时的校验

     NX: key不存在时设置成功,可以存则设置不成功。

     PX:自动失效时间,出现异常情况,锁可以过期失效。

    利用了NX的原子性,多个线程并发时,只有一个线程可以设置成功。

    设置成功即可获得锁,可以执行后续的业务处理。

    如果出现异常,过了锁的有效期,锁自动释放。

    2) 释放锁原理

    释放锁采用Redis的delete命令

    释放锁时校验之前设置的随机数,相同才能释放。(为了证明这个值是你这个线程设置的)

    释放锁采用LUA脚本

    3) Redis分布式锁实战

    Reids安装可参考 https://www.cnblogs.com/linlf03/p/9465272.html

     创建SpringBoog工程: distribute-lock-demo

    增加依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
    

      

    配置Redis连接

    创建Controller类

    @RestController
    @Slf4j
    public class RedisLockControler {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @RequestMapping("redisLock")
        public String redisLock(){
            log.info("进入redisLock 方法");
    
            String key = "redisKey";
            String value = UUID.randomUUID().toString() ;
    
            RedisCallback redisCallback = redisConnection -> {
                RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
                //过期时间30秒
                Expiration expiration = Expiration.seconds(30);
                //序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                // 序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
                //执行setnx操作
               Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
               return  result;
            };
            //获取分布式锁
            Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
            if(lock){
                log.info("进入了锁!!");
    
    
                //模拟业务处理,耗时15秒
                try {
                    Thread.sleep(15000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
    " +
                            "   return redis.call("del",KEYS[1])
    " +
                            "else
    " +
                            "  return 0
    " +
                            "end";
                    RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
                    List<String> keys = Arrays.asList(key);
                    Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
                    log.info("释放锁的结果:" + result);
                }
    
            }else {
                log.info("没有获取到锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    }
    

      

    启动DistributeLockDemoApplication

     一个端口为8080,一个端口为8081

    8081的端口设置如下

    然后分别调用http://localhost:8080/redisLock 和http://localhost:8081/redisLock

     说明8080获得到了锁,并且执行完成后释放了锁。

    8081执行未获得锁。

    5 基于分布式锁解决定时任务(服务集群部署)重复问题

    在前面的基础上对分布式锁进行封装

    @Slf4j
    public class RedisLock  implements AutoCloseable{
    
        private RedisTemplate redisTemplate;
        private String key;
        private String value;
        //单位:秒
        private int expireTime;
    
        public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){
            this.redisTemplate = redisTemplate;
            this.key = key;
            this.expireTime = expireTime;
            this.value = UUID.randomUUID().toString();
        }
    
        /**
         * 获取分布式锁
         * @return
         */
        public boolean getLock(){
            RedisCallback redisCallback = redisConnection -> {
                RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
                //过期时间30秒
                Expiration expiration = Expiration.seconds(30);
                //序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                // 序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
                //执行setnx操作
                Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
                return  result;
            };
            //获取分布式锁
            Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
            return lock;
        }
    
        public boolean unLock(){
    
            String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
    " +
                    "   return redis.call("del",KEYS[1])
    " +
                    "else
    " +
                    "  return 0
    " +
                    "end";
            RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
            List<String> keys = Arrays.asList(key);
            Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
            log.info("释放锁的结果={}", result);
            return  result;
        }
    
        @Override
        public void close() throws Exception {
            unLock();
        }
    }
    

      

    增加定时任务

    增加注解EnableScheduling

    定时任务为每隔5秒发送短信

    @Service
    @Slf4j
    public class SchedulerService {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        //每隔5秒钟执行一次
        @Scheduled(cron = "0/5 * * * * ?")
        public void SendSMS(){
    
            try( RedisLock redisLock = new RedisLock(redisTemplate, "autoSMS",30)) {
                if(redisLock.getLock()){
                    log.info("向18555558888发送短信");
                }else {
                    log.info("没有获取到锁");
                }
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    

      

    启动DistributeLockDemoApplication

     输出结果如下图所示,同一个时间点只有一个服务获取到了锁。

    6、基于Zookeeper的分布式锁

    1)Zookeeper的安装,可以参考Linux下Zookeeper的下载、安装和启动

    2) 原理

    重要概念: Zookkeeper的观察器

    可以设置观察器的3个方法: getData(), getChildren(), exists();

    节点发生变化,发送给客户端

    观察器只能监控一次,再监控需重新设置。

    实现原理:

    利用Zookeeper的瞬时有序节点的特性

    多线程并发创建瞬时节点时,得到有序的序列

    序号最小的线程获得锁

    其他的线程则监听自己序号前一个序号

    前一个线程执行完成,删除自己序号的节点

    下一个序号的线程得到通知,继续执行

    以此类推

    创建节点时,已经确定了线程的执行顺序。

    3) Zookeeper的分布式锁实践

    增加依赖,版本和服务端Zookeeper 3.4.13保持一致。

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.13</version>
            </dependency>
    

      

    创建Zookeeperr锁

    @Slf4j
    public class ZkLock implements AutoCloseable, Watcher {
    
        private ZooKeeper zooKeeper;
    
        private String znode;
    
    
        public ZkLock() throws IOException {
            this.zooKeeper = new ZooKeeper("47.xx.xx.120:2181", 10000, this);
    
        }
    
        /**
         * 获取分布式锁
         * @return
         */
        public boolean getLock(String businessCode){
            try {
                //创建业务根节点
                Stat stat = zooKeeper.exists("/" + businessCode, false);
                if(stat == null){
                    zooKeeper.create("/" +businessCode , businessCode.getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
                //创建瞬时有序的节点. znode的值为/order/order_00000001
                znode = zooKeeper.create("/" +businessCode + "/" + businessCode + "_" , businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
    
                //获得业务节点下所有的子节点
                List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
                Collections.sort(childrenNodes);
                // 获取序号最小的(第一个)子节点
                String firstNode = childrenNodes.get(0);
                // 如果创建的节点是第一个节点,则获得锁
                if(znode.endsWith(firstNode)){
                    return true;
                }
                // 不是第一个子节点,则监听前一个节点
                String lastNode = firstNode;
                for(String node: childrenNodes){
                    if(znode.endsWith(node)){
                        zooKeeper.exists("/" +businessCode + "/" +lastNode, true);
                        break;
                    }else {
                        lastNode = node;
                    }
                }
                //等待监听的节点发生变化
                synchronized (this){
                    wait();
                }
                return  true;
    
    
            }catch (Exception e){
                e.printStackTrace();
            }
            return  false;
    
        }
    
    
    
        @Override
        public void close() throws Exception {
            zooKeeper.delete(znode, -1);
            zooKeeper.close();
            log.info("已经释放了锁");
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                synchronized (this){
                    //唤起线程
                    notify();
                }
    
            }
        }
    }
    

      

    增加调用接口

    @RestController
    @Slf4j
    public class ZookeeperControler {
    
    
        @RequestMapping("zookeeperLock")
        public String zookeeperLock(){
            log.info("进入zookeeperLock 方法");
            try(ZkLock zkLock = new ZkLock()) {
                if(zkLock.getLock("order")){
                    log.info("获得了锁");
                    //模拟业务处理,耗时15秒
                    Thread.sleep(15000);
                }else {
                    log.info("没有获取到锁");
                }
    
            }catch (Exception e){
                e.printStackTrace();
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    }
    

      

    测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/zookeeperLock 和 http://localhost:8081/zookeeperLock

    7、使用Curator实现分布式锁

    Apache Curator是为ZooKeeper开发的一套Java客户端类库,它是一个分布式协调服务。

    添加依赖

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.2.0</version>
            </dependency>
    

      

    在DistributeLockDemoApplication类中添加Bean

        @Bean(initMethod = "start", destroyMethod = "close")
        public CuratorFramework getCuratorFramework(){
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("47.xx.xx.120:2181", retryPolicy);
            return client;
        }
    

      

    ZookeeperControler类中添加方法

     @RequestMapping("curatorLock")
        public String curatorLock(){
            log.info("进入curatorLock 方法");
            InterProcessMutex lock = new InterProcessMutex(client,"/order");
            try {
                if(lock.acquire(30, TimeUnit.SECONDS)){
                    log.info("获得了锁!");
                    Thread.sleep(10000);
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                try {
                    log.info("释放了锁");
                    lock.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    

     测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/curatorLock 和 http://localhost:8081/curatorLock

    8、 redisson分布式锁(API方式)

    1) 引入依赖

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.11.2</version>
            </dependenc
    

      

    2)Redisson API方式

    Redisson API方式,缺点:使用繁琐,每次需要配置

    @RestController
    @Slf4j
    public class RedissonLockController {
    
        /**
         * Redisson API方式,缺点:使用繁琐
         * @return
         */
        @RequestMapping("redissonLock")
        public String redissonLock() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://47.98.47.120:6379");
            RedissonClient redisson = Redisson.create(config);
    
            RLock rLock = redisson.getLock("order");
            log.info("进入了redissonLock方法");
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                log.info("获得了锁");
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                rLock.unlock();
                log.info("释放了锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    
    }
    

      

     3) 测试

    测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/redissonLock 和 http://localhost:8081/redissonLock

    9、 spring boot 集成redisson分布式锁

    1) 增加依赖

    注释redisson依赖,引入redisson-spring-boot-starter依赖

    配置文件增加属性

    spring.redis.host=47.xx.xx.120
    

      

    2) 增加方法redissonLock2

    @RequestMapping("redissonLock2")
        public String redissonLock2() {
            RLock rLock = redisson.getLock("order");
            log.info("进入了redissonLock2方法");
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                log.info("获得了锁");
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                rLock.unlock();
                log.info("释放了锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    

      

  • 相关阅读:
    [BJOI2019] 光线
    C# 从零开始写 SharpDx 应用 笔刷
    BAT 脚本判断当前系统是 x86 还是 x64 系统
    BAT 脚本判断当前系统是 x86 还是 x64 系统
    win2d 通过 CanvasActiveLayer 画出透明度和裁剪
    win2d 通过 CanvasActiveLayer 画出透明度和裁剪
    PowerShell 拿到显卡信息
    PowerShell 拿到显卡信息
    win10 uwp 如何使用DataTemplate
    win10 uwp 如何使用DataTemplate
  • 原文地址:https://www.cnblogs.com/linlf03/p/12681186.html
Copyright © 2011-2022 走看看