zoukankan      html  css  js  c++  java
  • redis-分布式锁-设计与使用

    死锁

    错误例子

    解决方式

     防止死锁 通过设置超时时间
     不要使用setnx key   expire 20  不能保证原子性 如果setnx程序就挂了 没有执行expire就死锁了
     reidis2.8版本提供 set lock:key1 true ex 5 nx 方式 保证了  setnx+expire原子性方式执行(秒为单位)

    锁超时

    错误例子

            String lockKey="stock:product:1";
            boolean  isGetLock=false;
            try{
                //假设是原子性的 获取锁并设置锁10秒
                isGetLock==setnx(lockKey,10);
                if(!isGetLock){
                    throw  new Exception("系统繁忙!请稍后再试");
                }
    //模拟需要执行12秒 Thread.sleep(
    12); }finally { if(isGetLock){ del(lockKey); } }

    假设有线程A线程B 2个线程

    线程A率先拿到锁因为我们设置的锁10秒自动释放(redis过期时间10秒) 而我们程序需要执行10秒以上

    10.1ms秒的时候线程B进来 因为redis锁key已经过期成功拿到锁 并阻塞在12秒处

    12秒后线程A 执行完 执行del操作 导致释放了线程B的锁

    解决方式1

     String lockKey="stock:product:1";
            boolean  isGetLock=false;
            //用来标识当前身份
            String currentIndex=UUID.randomUUID().toString();
            try{
                //假设是原子性的 获取锁并设置锁10秒 同时设置一个值为currentIndex
                isGetLock==setnx(lockKey,currentIndex,10);
                if(!isGetLock){
                    throw  new Exception("系统繁忙!请稍后再试");
                }
                //模拟需要执行12秒
                Thread.sleep(12);
            }finally {
                if(isGetLock){
                    String lockValue=get(lockKey);
                    //表示是当前线程的锁 释放
                    if(lockValue!=null&&lockValue.equals(currentIndex)) {
                        del(lockKey);
                    }
                }
            }

    方式1优化方案

    简单一看 好像并没有什么问题 但是需要注意 get 比较 和del并不是原子性的

    比如 线程A get完之后 lockkey因为超时释放   线程B 成功获得锁    线程A再执行if判断 会删除调线程B的锁

    改为lua脚本 

    if redis.call("get",KEYS[1]==ARGV[1])  then
    return redis.call("del","KEYS1")
    else
    return 0
    end

    主从切换

    线程A从主节点加锁成功  这个时候主节点挂掉,从节点替换主节点 锁数据并没有同步过来 导致2个线程会获得锁  只会在 挂掉时 从节点还未同步时导致这样的情况 极少情况发生 不过一般业务场景都能接受
     

    可重入锁实现

    /**
     * @Auther: liqiang
     * @Date: 2019/7/14 14:59
     * @Description:
     */
    public class RedisWithReentrantLock {
        private ThreadLocal<Map<String,Integer>> lockers=new ThreadLocal<>();
        private Jedis jedis;
        public  RedisWithReentrantLock(Jedis jedis){
            this.jedis=jedis;
        }
        /**
         * 加锁
         */
        private boolean _lock(String key){
            String value=String.valueOf(System.currentTimeMillis());;
            return jedis.set(key,value,"nx","ex",5L)!=null;
        }
        /**
         * 释放锁
         * @param key
         */
        private void  _unlock(String key){
            jedis.del(key);
        }
    
        /**
         * 从线程缓存获取map 没有就初始化一个
         * @return
         */
        private  Map<String,Integer> currentLockers(){
            Map<String,Integer> refs=lockers.get();
            if(refs==null){
                refs=new HashMap<String,Integer>();
                lockers.set(refs);
            }
            return lockers.get();
        }
    
        /**
         * 可重入锁
         * @param key
         * @return
         */
        public boolean lock(String key){
            /**
             * 选择map的原因是 一个线程里面可能有很多加锁的地方
             */
            Map<String,Integer> lockers=currentLockers();
            /**
             *如果存在 表示是重入加锁
             */
            if(lockers.containsKey(key)){
                lockers.put(key,lockers.get(key)+1);
                //延长过期时间
                jedis.expire(key,5000);
                return true;
            }
            //走到这里表示是头部第一次加锁 加锁并对应map数量+1
            boolean isGetLock=_lock(key);
            lockers.put(key,1);
            return  isGetLock;
        }
    
        /**
         * 释放锁
         * @param key
         * @return
         */
        public boolean unLock(String key){
            /**
             * 获得map
             */
            Map<String,Integer> lockers=currentLockers();
            /**
             * 表示key未加过锁 或者释放了
             */
            Integer refCnt=lockers.get(key);
            if(refCnt==null){
                return false;
            }
            //-1
            refCnt-=1;
            //大于0表示不是头部锁释放
            if(refCnt>0){
                lockers.put(key,refCnt);
            }else{
                //小于等于0 表示是头部锁释放 删除mapkey
                lockers.remove(key);
                /**
                 * 释放锁
                 */
                _unlock(key);
            }
            return true;
        }
        public static void main(String[] args) {
            Jedis conn = new Jedis("127.0.0.1",6379);
            conn.select(1);
            RedisWithReentrantLock redisWithReentrantLock=new RedisWithReentrantLock(conn);
            String lockKey="lock:key3";
            redisWithReentrantLock.lock(lockKey);
            redisWithReentrantLock.lock(lockKey);
    
            redisWithReentrantLock.unLock(lockKey);
            redisWithReentrantLock.unLock(lockKey);
        }
    }

    一些建议

    建议涉及并发的地方能用原子性操作就用原子性

    例子一

           tock stock=stockDao.get(id);
            if(stock.getNumber()-10<0){
                throw new Exception("库存不足");
            }
            stock.setNumber(stock.getNumber-10);
            stockDao.update(stock);

    这种情况就算加锁的情况 如果出现上面说的几种极端情况 或者锁失效了 会导致超卖以及库存异常问题

    优化方案

             Stock stock=stockDao.get(id);
            /**
             * 这里可能会疑惑 下面有原子性的update加 where校验超卖 这一步是否不需要了
             * 个人理解 程序进行校验 总比全部堆到数据库校验好的多
             * 比如库存卖完了 还持续有并发请求 在这里就可以全部挡在外面
             */
            if(stock.getNumber()-10<0){
                throw new Exception("库存不足");
            }
            stock.setNumber(stock.getNumber-10);
            //原子性的update
            Integer updateNumber=stockDao.excuteSql("update stock set number-=10 where id=:id and number>=0",id);
            //表示未能成功修改 
            if(updateNumber<=0){
                throw new Exception("库存不足");
            }

     redis则使用对应redis递增递减

    对于提供给管理员的库存盘点 也是使用原子性递增递减 

    盘增

    比如当前库存是10  管理员调整20  则是+10  而不要直接set 20   不然并发时 10 卖了5  这个时候20才提交 则变成了20  如果+10 则变成15

    盘减

    比如当前库存是10 管理员 需要调整为5    并发时减成了0  执行update stock set number-=5 where id=:id and number>=0   number>=0并不成立所以修改失败

    高并发时建议(比如秒杀场景)

    将库存全量到redis  通过Incrby 命令实现原子性递增递减   如果消息发送失败需要进行补偿

    update stock set number-=10 where id=:id and number>=0 通过mq 队列异步执行 否则会出现同一个库存并发改 部分是失败数据库抛出waitLock  tps就上不去 还会有大量请求到数据库 可能把redis

    弄挂

     
  • 相关阅读:
    23岁的这一年
    迁移ORACLE数据库文件到ASM
    无归档情况下使用BBED处理ORA-01113错误
    Oracle RMAN-06023 和ORA-19693错误
    手工创建Oracle数据库
    使用BBED理解和修改Oracle数据块
    Oracle 11g 重建EM需要删除的对象
    各大主流编程语言-常用爬虫框架以及优劣分析
    pyspider入门
    Centos7上安装docker及使用scrapy-splash
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12059965.html
Copyright © 2011-2022 走看看