zoukankan      html  css  js  c++  java
  • 分布式锁实战,分布式锁方案选择

    分布式锁实现方案优缺点

     不推荐自己编写的分布式锁

    推荐Redisson和Curator实现的分布式锁。

    一、使用锁解决电商中的超卖问题?

    举例:某件商品库存数量10件,结果卖出了20件

    二、超卖现象的产生

    A和B同时看到这个商品,加入购物车,并同时提交订单,导致了超卖的现象。

    三、超卖现象模拟 

    1)问题模拟。假设当前库存为1

    创建订单: 在订单中先获取是否有库存,然后更新库存数量为(原库存数-1),然后插入订单

    单元测试:创建线程池,并发执行创建订单。

     cyclicBarrier.await的作用: 让线程等待,在某一个时刻,让5个线程同时执行。

    测试结果: 产生了5条订单,并且库存为0. 这就是超卖现象的产生。

    四、超卖现象的解决

    1、方法1: 数据库update行锁 (单体应用)

    扣减库存不在程序中进行,而是通过数据库

    向数据库传递库存增量,扣减一个库存,增量为-1

    在数据库update语句计算库存,通过update行锁解决并发。更新后检查库存是否是负数,如果是负数抛出异常,则整个操作回滚。

    2、解决方法二(synchronized和ReentrantLock) (单体应用)

    校验库存、扣减库存统一加锁,使之成为原则的操作。并发时,只有获得锁的线程才能校验、扣减库存。

    注意: 事务的操作通过代码控制。如果事务通过注解的方式写在方法里,并且这个方法synchronized 修饰。 那么会在同步方法结束后,才提交事务。这样当事务还没提交,另外一个线程进入了同步方法。

     代码如下:

     public Integer createOrder() throws Exception{
            Product product = null;
    
            lock.lock();
            try {
                TransactionStatus transaction1 = platformTransactionManager.getTransaction(transactionDefinition);
                product = productMapper.selectByPrimaryKey(purchaseProductId);
                if (product==null){
                    platformTransactionManager.rollback(transaction1);
                    throw new Exception("购买商品:"+purchaseProductId+"不存在");
                }
    
                //商品当前库存
                Integer currentCount = product.getCount();
                System.out.println(Thread.currentThread().getName()+"库存数:"+currentCount);
                //校验库存
                if (purchaseProductNum > currentCount){
                    platformTransactionManager.rollback(transaction1);
                    throw
                            new Exception("商品"+purchaseProductId+"仅剩"+currentCount+"件,无法购买");
                }
    
                productMapper.updateProductCount(purchaseProductNum,"xxx",new Date(),product.getId());
                platformTransactionManager.commit(transaction1);
            }finally {
                lock.unlock();
            }
    
            TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
            Order order = new Order();
            order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseProductNum)));
            ...
            orderMapper.insertSelective(order);
    
            OrderItem orderItem = new OrderItem();
            ...
            orderItemMapper.insertSelective(orderItem);
            platformTransactionManager.commit(transaction);
            return order.getId();
        }
    

      

    3、基于数据库实现分布式锁

    多个进程、多个线程访问共同的组件数据库,

    通过select ...... for update 访问同一条数据

    for update锁住这条数据,其他线程只能等待

    接口代码如下,此程序是分布式部署。比如开启了两个程序,localhost:8080/myproject/singleLock   localhost:8081/myproject/singleLock

     selectDistributeLock的SQL语句里select ...... for update

    测试:分别调用localhost:8080/myproject/singleLock   localhost:8081/myproject/singleLock这里这两个接口。当第一个接口执行完后,等待2秒。 第二个请求也获得到了锁。

    基于数据库实现分布式锁的优缺点

    优点: 简单方便,易于理解、易于操作

    缺点: 并发量大时,对数据库压力较大。

    建议:作为锁的数据库与业务数据库分开。

    4、基于Redis的Setnx实现分布式锁

    1) 实现原理:

    获取锁的Redis命令 SET resource_name my_random_value NX PX 30000

    resource_name: 资源名称,可根据不同的业务区分不同的锁

    my_random_value: 随机值,每个线程的随机值都不同,用于释放锁时的校验

     NX: key不存在时设置成功,可以存则设置不成功。

     PX:自动失效时间,出现异常情况,锁可以过期失效。

    利用了NX的原子性,多个线程并发时,只有一个线程可以设置成功。

    设置成功即可获得锁,可以执行后续的业务处理。

    如果出现异常,过了锁的有效期,锁自动释放。

    2) 释放锁原理

    释放锁采用Redis的delete命令

    释放锁时校验之前设置的随机数,相同才能释放。(为了证明这个值是你这个线程设置的)

    释放锁采用LUA脚本

    3) Redis分布式锁实战

    Reids安装可参考 https://www.cnblogs.com/linlf03/p/9465272.html

     创建SpringBoog工程: distribute-lock-demo

    增加依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
    

      

    配置Redis连接

    创建Controller类

    @RestController
    @Slf4j
    public class RedisLockControler {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @RequestMapping("redisLock")
        public String redisLock(){
            log.info("进入redisLock 方法");
    
            String key = "redisKey";
            String value = UUID.randomUUID().toString() ;
    
            RedisCallback redisCallback = redisConnection -> {
                RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
                //过期时间30秒
                Expiration expiration = Expiration.seconds(30);
                //序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                // 序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
                //执行setnx操作
               Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
               return  result;
            };
            //获取分布式锁
            Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
            if(lock){
                log.info("进入了锁!!");
    
    
                //模拟业务处理,耗时15秒
                try {
                    Thread.sleep(15000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
    " +
                            "   return redis.call("del",KEYS[1])
    " +
                            "else
    " +
                            "  return 0
    " +
                            "end";
                    RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
                    List<String> keys = Arrays.asList(key);
                    Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
                    log.info("释放锁的结果:" + result);
                }
    
            }else {
                log.info("没有获取到锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    }
    

      

    启动DistributeLockDemoApplication

     一个端口为8080,一个端口为8081

    8081的端口设置如下

    然后分别调用http://localhost:8080/redisLock 和http://localhost:8081/redisLock

     说明8080获得到了锁,并且执行完成后释放了锁。

    8081执行未获得锁。

    5 基于分布式锁解决定时任务(服务集群部署)重复问题

    在前面的基础上对分布式锁进行封装

    @Slf4j
    public class RedisLock  implements AutoCloseable{
    
        private RedisTemplate redisTemplate;
        private String key;
        private String value;
        //单位:秒
        private int expireTime;
    
        public RedisLock(RedisTemplate redisTemplate, String key, int expireTime){
            this.redisTemplate = redisTemplate;
            this.key = key;
            this.expireTime = expireTime;
            this.value = UUID.randomUUID().toString();
        }
    
        /**
         * 获取分布式锁
         * @return
         */
        public boolean getLock(){
            RedisCallback redisCallback = redisConnection -> {
                RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
                //过期时间30秒
                Expiration expiration = Expiration.seconds(30);
                //序列化key
                byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
                // 序列化value
                byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
                //执行setnx操作
                Boolean result = redisConnection.set(redisKey, redisValue, expiration, setOption);
                return  result;
            };
            //获取分布式锁
            Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
            return lock;
        }
    
        public boolean unLock(){
    
            String script = "if redis.call("get",KEYS[1]) == ARGV[1] then
    " +
                    "   return redis.call("del",KEYS[1])
    " +
                    "else
    " +
                    "  return 0
    " +
                    "end";
            RedisScript<Boolean> redisScript = RedisScript.of(script, Boolean.class);
            List<String> keys = Arrays.asList(key);
            Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
            log.info("释放锁的结果={}", result);
            return  result;
        }
    
        @Override
        public void close() throws Exception {
            unLock();
        }
    }
    

      

    增加定时任务

    增加注解EnableScheduling

    定时任务为每隔5秒发送短信

    @Service
    @Slf4j
    public class SchedulerService {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        //每隔5秒钟执行一次
        @Scheduled(cron = "0/5 * * * * ?")
        public void SendSMS(){
    
            try( RedisLock redisLock = new RedisLock(redisTemplate, "autoSMS",30)) {
                if(redisLock.getLock()){
                    log.info("向18555558888发送短信");
                }else {
                    log.info("没有获取到锁");
                }
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    

      

    启动DistributeLockDemoApplication

     输出结果如下图所示,同一个时间点只有一个服务获取到了锁。

    6、基于Zookeeper的分布式锁

    1)Zookeeper的安装,可以参考Linux下Zookeeper的下载、安装和启动

    2) 原理

    重要概念: Zookkeeper的观察器

    可以设置观察器的3个方法: getData(), getChildren(), exists();

    节点发生变化,发送给客户端

    观察器只能监控一次,再监控需重新设置。

    实现原理:

    利用Zookeeper的瞬时有序节点的特性

    多线程并发创建瞬时节点时,得到有序的序列

    序号最小的线程获得锁

    其他的线程则监听自己序号前一个序号

    前一个线程执行完成,删除自己序号的节点

    下一个序号的线程得到通知,继续执行

    以此类推

    创建节点时,已经确定了线程的执行顺序。

    3) Zookeeper的分布式锁实践

    增加依赖,版本和服务端Zookeeper 3.4.13保持一致。

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.13</version>
            </dependency>
    

      

    创建Zookeeperr锁

    @Slf4j
    public class ZkLock implements AutoCloseable, Watcher {
    
        private ZooKeeper zooKeeper;
    
        private String znode;
    
    
        public ZkLock() throws IOException {
            this.zooKeeper = new ZooKeeper("47.xx.xx.120:2181", 10000, this);
    
        }
    
        /**
         * 获取分布式锁
         * @return
         */
        public boolean getLock(String businessCode){
            try {
                //创建业务根节点
                Stat stat = zooKeeper.exists("/" + businessCode, false);
                if(stat == null){
                    zooKeeper.create("/" +businessCode , businessCode.getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT);
                }
                //创建瞬时有序的节点. znode的值为/order/order_00000001
                znode = zooKeeper.create("/" +businessCode + "/" + businessCode + "_" , businessCode.getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
    
                //获得业务节点下所有的子节点
                List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
                Collections.sort(childrenNodes);
                // 获取序号最小的(第一个)子节点
                String firstNode = childrenNodes.get(0);
                // 如果创建的节点是第一个节点,则获得锁
                if(znode.endsWith(firstNode)){
                    return true;
                }
                // 不是第一个子节点,则监听前一个节点
                String lastNode = firstNode;
                for(String node: childrenNodes){
                    if(znode.endsWith(node)){
                        zooKeeper.exists("/" +businessCode + "/" +lastNode, true);
                        break;
                    }else {
                        lastNode = node;
                    }
                }
                //等待监听的节点发生变化
                synchronized (this){
                    wait();
                }
                return  true;
    
    
            }catch (Exception e){
                e.printStackTrace();
            }
            return  false;
    
        }
    
    
    
        @Override
        public void close() throws Exception {
            zooKeeper.delete(znode, -1);
            zooKeeper.close();
            log.info("已经释放了锁");
        }
    
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                synchronized (this){
                    //唤起线程
                    notify();
                }
    
            }
        }
    }
    

      

    增加调用接口

    @RestController
    @Slf4j
    public class ZookeeperControler {
    
    
        @RequestMapping("zookeeperLock")
        public String zookeeperLock(){
            log.info("进入zookeeperLock 方法");
            try(ZkLock zkLock = new ZkLock()) {
                if(zkLock.getLock("order")){
                    log.info("获得了锁");
                    //模拟业务处理,耗时15秒
                    Thread.sleep(15000);
                }else {
                    log.info("没有获取到锁");
                }
    
            }catch (Exception e){
                e.printStackTrace();
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    }
    

      

    测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/zookeeperLock 和 http://localhost:8081/zookeeperLock

    7、使用Curator实现分布式锁

    Apache Curator是为ZooKeeper开发的一套Java客户端类库,它是一个分布式协调服务。

    添加依赖

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.2.0</version>
            </dependency>
    

      

    在DistributeLockDemoApplication类中添加Bean

        @Bean(initMethod = "start", destroyMethod = "close")
        public CuratorFramework getCuratorFramework(){
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("47.xx.xx.120:2181", retryPolicy);
            return client;
        }
    

      

    ZookeeperControler类中添加方法

     @RequestMapping("curatorLock")
        public String curatorLock(){
            log.info("进入curatorLock 方法");
            InterProcessMutex lock = new InterProcessMutex(client,"/order");
            try {
                if(lock.acquire(30, TimeUnit.SECONDS)){
                    log.info("获得了锁!");
                    Thread.sleep(10000);
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                try {
                    log.info("释放了锁");
                    lock.release();
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    

     测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/curatorLock 和 http://localhost:8081/curatorLock

    8、 redisson分布式锁(API方式)

    1) 引入依赖

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.11.2</version>
            </dependenc
    

      

    2)Redisson API方式

    Redisson API方式,缺点:使用繁琐,每次需要配置

    @RestController
    @Slf4j
    public class RedissonLockController {
    
        /**
         * Redisson API方式,缺点:使用繁琐
         * @return
         */
        @RequestMapping("redissonLock")
        public String redissonLock() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://47.98.47.120:6379");
            RedissonClient redisson = Redisson.create(config);
    
            RLock rLock = redisson.getLock("order");
            log.info("进入了redissonLock方法");
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                log.info("获得了锁");
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                rLock.unlock();
                log.info("释放了锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    
    }
    

      

     3) 测试

    测试接口,分别启动两个服务,端口分别为8080和8081

    分别调用http://localhost:8080/redissonLock 和 http://localhost:8081/redissonLock

    9、 spring boot 集成redisson分布式锁

    1) 增加依赖

    注释redisson依赖,引入redisson-spring-boot-starter依赖

    配置文件增加属性

    spring.redis.host=47.xx.xx.120
    

      

    2) 增加方法redissonLock2

    @RequestMapping("redissonLock2")
        public String redissonLock2() {
            RLock rLock = redisson.getLock("order");
            log.info("进入了redissonLock2方法");
            try {
                rLock.lock(30, TimeUnit.SECONDS);
                log.info("获得了锁");
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                rLock.unlock();
                log.info("释放了锁");
            }
            log.info("方法执行完成");
            return  "方法执行完成";
        }
    

      

  • 相关阅读:
    瀑布流
    轮播图
    封装动画的函数
    回到顶部带动画
    动画setInterval
    模拟滚动条
    放大镜
    刷新
    cookie
    拖拽
  • 原文地址:https://www.cnblogs.com/linlf03/p/12681186.html
Copyright © 2011-2022 走看看