zoukankan      html  css  js  c++  java
  • SpringBoot使用RedisTemplate+Lua脚本实现Redis分布式锁

    SpringBoot使用RedisTemplate+Lua脚本实现Redis分布式锁

    问题:定时任务部署在多台Tomcat上,因此到达指定的定时时间时,多台机器上的定时器可能会同时启动,造成重复数据或者程序异常等问题。

    //发送消息,不能重复发送
    @Scheduled(cron = "0 0/15 * * * ? ")
    public void sendMsg(String userId) {
     
    }
    
    项目部署在Tom 1 ,Tom 2
    
    如何控制只有一个Tomcat在同一时刻执行任务
    

    使用分布式锁来控制,谁抢到了锁就让谁执行。

    一、基于Redis实现分布式锁

    package cn.pconline.pcloud.base.util;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.stereotype.Component;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description 使用RedisTemplate+Lua脚本实现Redis分布式锁
     * @Author jie.zhao
     * @Date 2019/11/19 11:46
     */
    @Component
    public class RedisLock {
        @Autowired
        private StringRedisTemplate template;
    
        private static final Long RELEASE_SUCCESS = 1L;
    
        private static final long DEFAULT_TIMEOUT = 1000 * 10;
    
        private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    
        /**
         * 尝试获取锁 立即返回
         *
         * @param key
         * @param value
         * @param timeout
         * @return
         */
        public boolean lock(String key, String value, long timeout) {
            return template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 以阻塞方式的获取锁
         *
         * @param key
         * @param value
         * @param timeout
         * @return
         */
        public boolean lockBlock(String key, String value, long timeout) {
            long start = System.currentTimeMillis();
            while (true) {
                //检测是否超时
                if (System.currentTimeMillis() - start > timeout) {
                    return false;
                }
                //执行set命令
                //1
                Boolean absent = template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
                //其实没必要判NULL,这里是为了程序的严谨而加的逻辑
                if (absent == null) {
                    return false;
                }
                //是否成功获取锁
                if (absent) {
                    return true;
                }
            }
        }
    
        public boolean unlock(String key, String value) {
            // 使用Lua脚本:先判断是否是自己设置的锁,再执行删除
            // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
            // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常EvalSha is not supported in cluster environment.,所以只能拿到原redis的connection来执行脚本
    
            List<String> keys = new ArrayList<>();
            keys.add(key);
            List<String> args = new ArrayList<>();
            args.add(value);
    
            Long result = template.execute(new RedisCallback<Long>() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
    
                    // 单机模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return 0L;
                }
            });
    
            //返回最终结果
            return RELEASE_SUCCESS.equals(result);
        }
    }
    

    使用方法:

    @Scheduled(cron = "0 0/15 * * * ? ")
    public void sendExamTemplateMsg() {
        if (redisLock.lock(RedisKey.REDIS_JOB_SEND_KEY, RedisKey.REDIS_JOB_SEND_VALUE, 1000 * 60)) {
            
            //....
            log.info("定时轮询考试安排通知结束 	" + new Date());
        } else {
            log.info("定时轮询考试安排,未获取到锁其他应用正在执行 	" + new Date());
        }
    }
    

    二、分布式锁的要求

    • 互斥性。在任意时刻,只有一个客户端能持有锁。
    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
    • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

    三、存在的问题

    注意!!!! 该加锁方法仅针对单实例 Redis 可实现分布式加锁,或者使用场景少的业务。

    原因对于 Redis 集群会有一定几率出现问题

    例如:当进程1对master节点写入了锁,此时master节点宕机,slave节点提升为master而刚刚写入master的锁还未同步,此时进程2也将能够获取锁成功,此时必然会导致数据不同步问题。还有另一个问题即: key 超时之后业务并没有执行完毕但却自动释放锁了,这样就会导致并发问题。

    如果需要更加健壮的Redis集群分布式锁,推荐使用Redisson

  • 相关阅读:
    MYSQL中排序
    编写一个 SQL 查询,获取 Employee 表中第二高的薪水(Salary)
    job1
    python中对于数组的操作
    python中将字符串转为字典类型
    python中的周几
    python 链接redis 获取对应的值
    jenkins 设置定时任务规则
    如何安全close go 的channel
    [转]
  • 原文地址:https://www.cnblogs.com/cnsyear/p/12731963.html
Copyright © 2011-2022 走看看