zoukankan      html  css  js  c++  java
  • Redis分布式锁

    一、分布式锁使用场景

     单机系统的时候,当处理临界资源的时候需要使用synchronized来锁住资源,以免并发导致临界资源异常。
             在分布式系统中,使用单机的锁已经不能控制临界资源了,这个时候就需要跨系统来控制临界资源,那控制的标识位放到哪里呢?可以是缓存redis,也可以是ZK。
            
            直白的将,单机需要用synchronized解决的问题,在集群部署的时候出现同样的问题就需要用 分布式锁来解决。
     
            redis做分布式操作redisson太强大了:https://github.com/redisson/redisson/wiki
    二、分布式锁需要解决的问题
    1.  同时只能有一个人获取锁 :使用 setnx解决
    2. 锁不能死锁:使用过期时间来解决
    3. 锁在没有使用完的情况下,由于时间超时自动解锁:锁续命
    4. 解锁原则:谁上锁,谁解锁
     三、分布式锁的实现代码
    package test.config;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.util.Assert;
    import org.springframework.util.StringUtils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    
    
    import java.util.*;
    import java.util.concurrent.TimeUnit;
    
    
    /**
    * Redis分布式锁
    * 使用 SET resource-name anystring NX EX max-lock-time 实现
    * <p>
    * 该方案在 Redis 官方 SET 命令页有详细介绍。
    * http://doc.redisfans.com/string/set.html
    * <p>
    * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性,
    * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:
    * <p>
    * EX seconds — 以秒为单位设置 key 的过期时间;
    * PX milliseconds — 以毫秒为单位设置 key 的过期时间;
    * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
    * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。
    * <p>
    * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
    * <p>
    * 客户端执行以上的命令:
    * <p>
    * 如
    * 个客户端获得锁。
    * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
    *
    */
    public class RedisLock {
    
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
    
        private RedisTemplate<String, Object> redisTemplate;
    
    
        /**
         * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
         */
        public static final String NX = "NX";
    
    
        /**
         * seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds
         */
        public static final String EX = "EX";
    
    
        /**
         * 调用set后的返回值
         */
        public static final String OK = "OK";
    
    
        /**
         * 默认请求锁的超时时间(ms 毫秒)
         */
        private static final long TIME_OUT = 100;
    
    
        /**
         * 默认锁的有效时间(s)
         */
        public static final int EXPIRE = 60;
    
    
        /**
         * 解锁的lua脚本
         */
        public static final String UNLOCK_LUA;
    
    
        static {
            StringBuilder sb = new StringBuilder();
            sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call("del",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
            UNLOCK_LUA = sb.toString();
        }
    
    
        /**
         * 锁标志对应的key
         */
        private String lockKey;
    
    
        /**
         * 记录到日志的锁标志对应的key
         */
        private String lockKeyLog = "";
    
    
        /**
         * 锁对应的值
         */
        private String lockValue;
    
    
        /**
         * 锁的有效时间(s)
         */
        private int expireTime = EXPIRE;
    
    
        /**
         * 请求锁的超时时间(ms)
         */
        private long timeOut = TIME_OUT;
    
    
        /**
         * 锁标记
         */
        private volatile boolean locked = false;
    
    
        final Random random = new Random();
        //解决锁续命问题
        private Timer timer;
        //锁是否续命处理, 可以参数传进来控制
        private boolean lockCanNotTimeOut=true;
    
    
        /**
         * 使用默认的锁过期时间和请求锁的超时时间
         *
         * @param redisTemplate
         * @param lockKey       锁的key(Redis的Key)
         */
        public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
            this.redisTemplate = redisTemplate;
            this.lockKey = lockKey + "_lock";
        }
    
    
        /**
         * 使用默认的请求锁的超时时间,指定锁的过期时间
         *
         * @param redisTemplate
         * @param lockKey       锁的key(Redis的Key)
         * @param expireTime    锁的过期时间(单位:秒)
         */
        public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {
            this(redisTemplate, lockKey);
            this.expireTime = expireTime;
        }
    
    
        /**
         * 使用默认的锁的过期时间,指定请求锁的超时时间
         *
         * @param redisTemplate
         * @param lockKey       锁的key(Redis的Key)
         * @param timeOut       请求锁的超时时间(单位:毫秒)
         */
        public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {
            this(redisTemplate, lockKey);
            this.timeOut = timeOut;
        }
    
    
        /**
         * 锁的过期时间和请求锁的超时时间都是用指定的值
         *
         * @param redisTemplate
         * @param lockKey       锁的key(Redis的Key)
         * @param expireTime    锁的过期时间(单位:秒)
         * @param timeOut       请求锁的超时时间(单位:毫秒)
         */
        public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {
            this(redisTemplate, lockKey, expireTime);
            this.timeOut = timeOut;
        }
    
    
        /**
         * 尝试获取锁 超时返回
         *
         * @return
         */
        public boolean tryLock() {
            // 生成随机key
            lockValue = UUID.randomUUID().toString();
            // 请求锁超时时间,纳秒
            long timeout = timeOut * 1000000;
            // 系统当前时间,纳秒
            long nowTime = System.nanoTime();
            while ((System.nanoTime() - nowTime) < timeout) {
                if(this.set(lockKey, lockValue, expireTime)) {
    //            if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
                    locked = true;
                    // 上锁成功结束请求
                    this.continueLock(this.lockKey);
                    return locked;
                }
                // 每次请求等待一段时间
                seleep(10, 50000);
            }
            return locked;
        }
    
    
        /**
         * 尝试获取锁 立即返回
         *
         * @return 是否成功获得锁
         */
        public boolean lock() {
            lockValue = UUID.randomUUID().toString();
            //不存在则添加 且设置过期时间(单位ms)
            boolean result = set(lockKey, lockValue, expireTime);
            if(result){
                continueLock(this.lockKey);
            }
            locked = result;
            return locked;
        }
    
    
        /**
         * 以阻塞方式的获取锁
         * 一直获取锁,直到成功
         * @return 是否成功获得锁
         */
        public boolean lockBlock() {
            lockValue = UUID.randomUUID().toString();
            while (true) {
                //不存在则添加 且设置过期时间(单位ms)
                boolean result = set(lockKey, lockValue, expireTime);
                if(result) {
    //            if (OK.equalsIgnoreCase(result)) {
                    locked = true;
                    return locked;
                }
    
    
                // 每次请求等待一段时间
                seleep(10, 50000);
            }
        }
    
    
        /**
         * 解锁
         * <p>
         * 可以通过以下修改,让这个锁实现更健壮:
         * <p>
         * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
         * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
         * 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
         */
        public Boolean unlock() {
            // 只有加锁成功并且锁还有效才去释放锁
            // 只有加锁成功并且锁还有效才去释放锁
            if (locked) {
                boolean isUnLock =  (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {
                    @Override
                    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                        Object nativeConnection = connection.getNativeConnection();
                        Long result = 0L;
    
    
                        List<String> keys = new ArrayList<>();
                        keys.add(lockKey);
                        List<String> values = new ArrayList<>();
                        values.add(lockValue);
    
    
                        // 集群模式
                        if (nativeConnection instanceof JedisCluster) {
                            result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
                        }
    
    
                        // 单机模式
                        if (nativeConnection instanceof Jedis) {
                            result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
                        }
    
    
                        if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
                            logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis());
                        }
    
    
                        locked = result == 0;
                        return result == 1;
                    }
                });
                if(isUnLock && timer!=null){
                    timer.cancel();
                }
    
    
                return isUnLock;
            }
    
    
            return true;
        }
    
    
        /**
         * 获取锁状态
         * @Title: isLock
         * @Description: TODO
         * @return  
         * @author yuhao.wang
         */
        public boolean isLock() {
          
          return locked;
       }
        
        /**
         * 重写redisTemplate的set方法
         * <p>
         * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
         * <p>
         * 客户端执行以上的命令:
         * <p>
         * 如果服务器返回 OK ,那么这个客户端获得锁。
         * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
         *
         * @param key     锁的Key
         * @param value   锁里面的值
         * @param seconds 过去时间(秒)
         * @return
         */
        private boolean set(final String key, final String value, final long seconds) {
            Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");
            //如果返回为true单表 这个key在redis里面没有,并且返回true  setNX
            //spring start用的2.2.2版本
            return redisTemplate.opsForValue().setIfAbsent(key, value,expireTime, TimeUnit.SECONDS);
    /*        return (String) redisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    String result = null;
                    if (nativeConnection instanceof JedisCommands) {
                        result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
                    }
    
    
                    if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
                        logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis());
                    }
    
    
                    return result;
                }
            });*/
        }
    
    
        /**
         * @param millis 毫秒
         * @param nanos  纳秒
         * @Title: seleep
         * @Description: 线程等待时间
         * @author yuhao.wang
         */
        private void seleep(long millis, int nanos) {
            try {
                //第二个参数不能起到随机数的作用,这个地方需要自己用随机数在改动
                Thread.sleep(millis, random.nextInt(nanos));
            } catch (InterruptedException e) {
                logger.info("获取分布式锁休眠被中断:", e);
            }
        }
    
    
        public String getLockKeyLog() {
            return lockKeyLog;
        }
    
    
        public void setLockKeyLog(String lockKeyLog) {
            this.lockKeyLog = lockKeyLog;
        }
    
    
        public int getExpireTime() {
            return expireTime;
        }
    
    
        public void setExpireTime(int expireTime) {
            this.expireTime = expireTime;
        }
    
    
        public long getTimeOut() {
            return timeOut;
        }
    
    
        public void setTimeOut(long timeOut) {
            this.timeOut = timeOut;
        }
    
    
        /**
         * 锁续命
         * @param lockKey
         */
        public void continueLock(String lockKey) {
            if(lockCanNotTimeOut){
                timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        //每次续命60s,参数可以传进来
                        redisTemplate.expire(lockKey, 60, TimeUnit.SECONDS);
                    }
                }, 0, 1);
            }
        }
    
    }

    2 )redisTemplate配置

    @Configuration
    public class RedisConfig {
        @Bean(name="myRedisTemplate")
        public RedisTemplate myRedisTemplate(RedisConnectionFactory factory){
            RedisTemplate redisTemplate = new RedisTemplate();
            RedisSerializer stringSerializer = new StringRedisSerializer();
            redisTemplate.setConnectionFactory(factory);
            redisTemplate.setKeySerializer(stringSerializer);
            redisTemplate.setValueSerializer(stringSerializer);
            redisTemplate.setHashKeySerializer(stringSerializer);
            redisTemplate.setHashValueSerializer(stringSerializer);
            return redisTemplate;
        }
    }

    四、Junit进行测试-redis使用 spring的redisTemplate

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Server.class)
    public class RedisTest
    {
    
        @Autowired
        @Qualifier("myRedisTemplate")
        private RedisTemplate myRedisTemplate;
    
        private AtomicInteger count  = new AtomicInteger(0);
    
        @Before
        public void init(){
            System.out.println( myRedisTemplate );
    
        }
    
        //并发控制
        @Test
        public void test4() throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1) ;
            for (int i = 0; i <5; i++) {
    
                int finalI = i;
                Thread thread = new Thread(()->{
                    try {
                        System.out.println(finalI);
                        countDownLatch.await();
                        this.test3();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                thread.start();
            }
            TimeUnit.SECONDS.sleep(2);
            System.out.println("启动并发");
            countDownLatch.countDown();
            Thread.currentThread().join();
            System.out.println("主线程结束");
        }
    
        //测试锁的使用,因为有锁续命,所以只要线程不自己解锁则会一直持有
        public void test3() throws InterruptedException {
            RedisLock redisLock = new RedisLock(myRedisTemplate,"lockKey") ;
            try {
                if(redisLock.tryLock()){
                    System.out.println("进入锁");
                    TimeUnit.SECONDS.sleep(30);
                    System.out.println("睡眠结束");
                }
            }finally {
                System.out.println( count.incrementAndGet());
                redisLock.unlock();
            }
        }
    }
  • 相关阅读:
    通俗理解时空效应,感受质量、空间与时间的关系_番外篇
    第四十三象 丙午
    第四十二象 乙巳
    第四十一象 甲辰
    第四十象 癸卯
    ServiceComb简介
    Spring-Session实现Session共享
    SpringBoot整合ActiveMQ
    Hbase配置运行
    KafKa配置运行
  • 原文地址:https://www.cnblogs.com/lean-blog/p/14149588.html
Copyright © 2011-2022 走看看