zoukankan      html  css  js  c++  java
  • Redis分布式锁

    一、分布式锁  

      锁在项目中的应用场景就无须多说,在单应用多线程场景中,可以直接使用synchronize或者ReentrantLock来加锁处理,但是在微服务体系中,为了保证项目的高可用,会部署多个相同的项目,因此单应用的加锁并不能保证只有一个请求能进入到处理流程,因此就需要借助项目外的第三方实现分布式锁,从而保证只有一个请求进入加锁的处理流程。

      分布式锁的处理流程:获取锁,如果获取失败,则返回获取锁失败,如果获取成功,则执行加锁的业务逻辑,处理完毕后,解锁。这里需要强调一下,为了避免死锁,锁需要具备失效机制,在一段时间内,要释放锁。

      目前常用的分布式锁的实现方式有以下几种:基于数据库实现的分布式锁、基于zookeeper节点的分布式锁、基于redis的分布式锁、基于Etcd(分布式键值数据库)的分布式锁

    二、Redis实现分布式锁 

    1、使用lua实现分布式锁

      可以使用lua脚本或者自定义一个redis锁,总体逻辑就是使用setnx命令,设置一个key,值为每次请求的特定值,例如请求ID,如果设置失败(返回0),则加锁失败,如果设置成功(返回1),则返回成功,同时设置超期时间,以防产生死锁;在业务逻辑处理完成后,进行解锁操作,先判断该key中的值是否为本次操作的值,如果不是,说明非本次操作加的锁,不允许解锁,如果是,则删除该key。

      那么,直接上代码,代码中使用lua脚本来做上述的加锁和解锁操作。

    @Slf4j
    @Service
    public class RedisLock {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        //加锁lua脚本
        private static final String SCRIPT_TRY_LOCK = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('pexpire', KEYS[1],5000) return true else return false end";
        private static final String SCRIPT_UN_LOCK =  "if redis.call('get'  ,KEYS[1]) == ARGV[1] then redis.call('del',KEYS[1]) return true else return false end";
    
        public boolean tryLockLuaBefore(String lockKey, String requestId, int expire) {
            RedisCallback<Boolean> callback = (connection) -> {
                return connection.eval(SCRIPT_TRY_LOCK.getBytes(), ReturnType.BOOLEAN, 1,
                        lockKey.getBytes(Charset.forName("UTF-8")),
                        requestId.getBytes(Charset.forName("UTF-8")));
            };
            return (boolean) redisTemplate.execute(callback);
        }
    
    
        public boolean releaseLock(String lockKey, String requestId) {
            RedisCallback<Boolean> callback = (connection) -> {
                return connection.eval(SCRIPT_UN_LOCK.getBytes(), ReturnType.BOOLEAN, 1,
                        lockKey.getBytes(Charset.forName("UTF-8")),
                        requestId.getBytes(Charset.forName("UTF-8")));
            };
            return (Boolean) redisTemplate.execute(callback);
        }
    
    }

      然后就模拟一下多线程并发调用的情况

    @Service
    @Slf4j
    public class RedisDistributedLock {
    
        @Autowired
        private RedisLock redisLock;
    
        public static ThreadPoolExecutor threadPoolExecutor;
    
        static {
            final int corePoolSize = 10;
            final int maximumPoolSize = 10;
            final long keepAliveTime = 0L;
            final TimeUnit unit = TimeUnit.MILLISECONDS;
            final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100000);
            final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("RedisDistributedLockTest").build();
            final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
    
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                    keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        public void luaRedisLock(){
            String lockKey = "test";
            int num = 1;
            int whileWhere = 0;
            while (whileWhere++ < 10000){
                int localnum = num++;
                threadPoolExecutor.submit(()->{
                    String requestId = UUID.randomUUID().toString();
    
                    boolean flag = redisLock.tryLockLuaBefore(lockKey, requestId, 3 * 1000);
                    if(flag){
                        log.info("第{}条放入线程池数据,requestId={},第一条数据", localnum,requestId);
                        for (int i = 0; i < 10000; i++){ }
                        log.info("第{}条放入线程池数据,requestId={},最后一条数据", localnum,requestId);
                        redisLock.releaseLock(lockKey,requestId);
                    }
                });
            }
        }
    
    }

      最红调用luaRedisLock方法,发现输出都是成对出现,redis分布式锁处理成功。

      

    2、纯JAVA实现分布式锁

      跟上面一样,只需要在java代码中直接加锁或解锁,不借助lua脚本,逻辑跟上面一样。

    @Slf4j
    @Service
    public class NoLuaRedisLock {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        public boolean tryLockLua(String lockKey, String requestId, int expire) {
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expire, TimeUnit.MILLISECONDS);
            if(flag == null){
                return false;
            }
            return flag;
        }
    
        public boolean releaseLock(String lockKey, String requestId) {
            String value = (String) redisTemplate.opsForValue().get(lockKey);
            if(!requestId.equals(value)){
                return false;
            }
            Boolean flag = redisTemplate.delete(lockKey);
            if(flag == null){
                return false;
            }
            return flag;
        }
    
    }

    三、Redis分布式锁的优缺点

      Redis分布式锁的优点在于Redis是基于内存的,并发性能好;缺点是需要考虑原子性、超时、误删等场景,并且如果要是获取锁失败时,客户端只能自旋等待,在高并发情况下,性能消耗较大。

      在CAP(一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance))模型中,如果是分布式环境,只能满足其中两个,但是在分布式环境下,分区容错性又不能不要(如果不要就是单机),所以只能选择AP或者CP。其中分布式锁是CP模型,但是Redis是AP模型,这样就决定了Redis分布式锁如果不要求强一致性的话,可以使用Redis分布式锁,例如社交场景等;但是如果要求强一致性的话,例如金融场景,就不能使用Redis分布式锁,而是要使用CP模型特点的分布式锁,例如Zookeeper、etcd等

    四、Redisson分布式锁原理

      目前落地生产的分布式锁,一般使用开源框架,例如Redisson。

      

    1、加锁机制

      如果客户端面对的是一个Redis集群,它首先会根据hash节点选择一台服务器,然后发送lua脚本到redis服务器上,脚本如下:

    if (redis.call('exists',KEYS[1])==0) then
        redis.call('hset',KEYS[1],ARGV[2],1) ;
        redis.call('pexpire',KEYS[1],ARGV[1]) ;
        return nil;
    end ;
    if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then
        redis.call('hincrby',KEYS[1],ARGV[2],1) ;
        redis.call('pexpire',KEYS[1],ARGV[1]) ;
        return nil;
    end ;
    return redis.call('pttl',KEYS[1]) ;

      解释一下上述lua脚本,KEYS[1]表示锁的key,ARGV[1]表示锁的过期时间,ARGV[2]表示加锁的线程。

      首先判断锁是否已存在(key是否存在),如果不存在,就加锁,同时设置超期时间;

      然后判断对于本次加锁key中指定的field是否存在(线程id,使用机器号 + 线程ID来表示一个唯一的线程请求),如果存在,值加一,然后重置缓存失效时间

      最终返回key剩余的失效时间。

      使用lua脚本的目的:保证复杂的逻辑以原子性的方式执行

    2、锁互斥机制

      第1步的描述是加锁成功的机制,那么如果这时,有第二个线程进来:

      首先判断锁是否已存在(key是否存在),此时已经存在,则不设置key的field;

      然后判断key的field是否存在,由于field是使用的 机器码 + 线程ID 表示的,如果该请求的线程ID与已有的值一致,则表示有,那么会把值加一,同时重置key的过期时间;如果不一致,就说明锁不是该请求加的,直接就返回过期时间,由于加锁成功返回的都是nil,只有加锁失败返回的是一个整数的过期时间,那么此时线程2就需要进入一个while循环,不停的尝试加锁处理。

    3、自动延时机制

      只要线程1加锁成功,Redisson就会启动一个看门狗(watch dog),他是一个后台线程,会每十秒检查一次,如果线程1还持有该redis锁,那么就会不断的延长锁的过期时间。

    4、可重入锁机制

      可重入锁就是表示同一个线程可以再次进入锁,而不会被拦截,如果如果线程1此时又来加锁,那么第一个判断(判断该key是否存在),就会被跳过,直接判断第二个条件,如果key中的field已存在,就会将value加一,同时重置过期时间。

    5、所释放机制

      如果客户端面对的是一个Redis集群,它首先会根据hash节点选择一台服务器,然后发送lua脚本到redis服务器上,脚本如下:

    --#如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息
    if (redis.call('exists', KEYS[1]) == 0) then
        redis.call('publish', KEYS[2], ARGV[1]);
        return 1;
    end;
    --# key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
        return nil;
    end;
    --# 将value减1
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    --# 如果counter>0说明锁在重入,不能删除key
    if (counter > 0) then
        redis.call('pexpire', KEYS[1], ARGV[2]);
        return 0;
        --# 删除key并且publish 解锁消息 "
    else
        redis.call('del', KEYS[1]);
        redis.call('publish', KEYS[2], ARGV[1]);
        return 1;
    end;
    return nil;

      解释上述脚本:KEYS[1]表示redis锁的key,KEYS[2]表示解锁的消息通道,ARGV[1] 表示消息实体,ARGV[2]表示redis锁的超期时间,ARGV[3]表示加锁的线程(field)

      首先判断锁是否存在,如果不存在,说明已经解锁成功,直接发布消息已解锁的消息,以唤醒其他订阅锁的线程重新竞争锁;

      如果锁仍然存在,则判断该锁是否是当前线程持有(field和本次的线程id是否一致),如果不一致,说明该锁不是此线程加的锁,不允许解锁。

      如果是该线程加的锁,则将value减一,计算后的结果(加锁的次数)是否大于0 ,如果大于0,说明并没有完全解锁,重置超期时间即可;否则,则说明已经完全解锁,删除key,并通过通道发布已解锁消息。

    五、Redisson分布式锁的使用

      1、引包

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>2.7.0</version>
            </dependency>

      2、创建RedissonManager,用来获取Redisson对象

    public class RedissonManager {
        private static Config config = new Config(); //声明redisso对象
        private static Redisson redisson = null;
    
        static{
            config.useClusterServers() // 集群状态扫描间隔时间,单位是毫秒
                    .setScanInterval(2000) //cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
                    .addNodeAddress("redis://8.131.245.53:8001" )
                    .addNodeAddress("redis://8.131.245.53:8002" )
                    .addNodeAddress("redis://8.131.245.53:8003" )
                    .addNodeAddress("redis://8.131.245.53:8004" )
                    .addNodeAddress("redis://8.131.245.53:8005" )
                    .addNodeAddress("redis://8.131.245.53:8006" );
            //得到redisson对象
            redisson = (Redisson) Redisson.create(config);
        }
    
        //获取redisson对象的方法
        public static Redisson getRedisson(){
            return redisson;
        }
    }

      3、创建RedissonLock对象,用来处理加锁解锁处理

    public class RedissonLockService {
        //从配置类中获取redisson对象
        private static Redisson redisson = RedissonManager.getRedisson();
        private static final String LOCK_TITLE = "redisLock_";
        //加锁
        public static boolean acquire(String lockName){
            //声明key对象
            String key = LOCK_TITLE + lockName;
            //获取锁对象
            RLock mylock = redisson.getLock(key);
            //加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
            mylock.lock(3, TimeUnit.MINUTES);
            //加锁成功
            return true;
        }
    
        //锁的释放
        public static void release(String lockName) {
            //必须是和加锁时的同一个key
            String key = LOCK_TITLE + lockName;
            //获取锁对象
            RLock mylock = redisson.getLock(key);
            //释放锁(解锁)
            mylock.unlock();
        }
    }

      验证结果:

      总结:Redisson的使用总体来说还是比较简单的,直接使用redisson.lock()就可以加锁,使用redisson.getlock()就可以获得锁,使用redisson.unlock就可以解锁。

    ------------------------------------------------------------------
    -----------------------------------------------------------
    ---------------------------------------------
    朦胧的夜 留笔~~
  • 相关阅读:
    Qt Error: dependent '..***' does not exist.
    Qt 判断文件是否存在
    Qt 添加资源文件
    Qt lcdNumber 不能显示完整时间
    Qt snippet — 打开文件&保存文件
    right-click an action, missing "Go to slot"
    Code the Tree(图论,树)
    李代桃僵
    Dependency Scope
    Selenium的延迟等待
  • 原文地址:https://www.cnblogs.com/liconglong/p/14369961.html
Copyright © 2011-2022 走看看