zoukankan      html  css  js  c++  java
  • 一个Redis实现的分布式锁

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    public class RedisLock implements AutoCloseable {
        private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);
        public static final String REDIS_LOCK = "RedisLock:";
    
    
        private static final long DEFAULT_WAIT_LOCK_TIME_OUT = 60;//60s 有慢sql,超时时间设置长一点
        private static final long DEFAULT_EXPIRE = 80;//80s 有慢sql,超时时间设置长一点
        private String key;
        private RedisTemplate redisTemplate;
    
        public RedisLock(RedisTemplate redisTemplate,String key) {
            this.redisTemplate = redisTemplate;
            this.key = key;
        }
    
        /**
         * 等待锁的时间,单位为s
         *
         * @param key
         * @param timeout s
         * @param seconds
         */
        public boolean lock(String key, long timeout, TimeUnit seconds) {
            String lockKey = generateLockKey(key);
            long nanoWaitForLock = seconds.toNanos(timeout);
            long start = System.nanoTime();
    
            try {
                while ((System.nanoTime() - start) < nanoWaitForLock) {
                    if (redisTemplate.getConnectionFactory().getConnection().setNX(lockKey.getBytes(), new byte[0])) {
                        redisTemplate.expire(lockKey, DEFAULT_EXPIRE, TimeUnit.SECONDS);//暂设置为80s过期,防止异常中断锁未释放
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("add RedisLock[{}].{}", key, Thread.currentThread());
                        }
                        return true;
                    }
                    TimeUnit.MILLISECONDS.sleep(1000 + new Random().nextInt(100));//加随机时间防止活锁
                }
            } catch (Exception e) {
                LOGGER.error("{}", e.getMessage(), e);
                unlock();
            }
            return false;
        }
    
        public void unlock() {
            try {
                String lockKey = generateLockKey(key);
                RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
                connection.del(lockKey.getBytes());
                connection.del(key.getBytes());
                connection.close();
            } catch (Exception e) {
                LOGGER.error("{}", e.getMessage(), e);
            }
        }
    
        private String generateLockKey(String key) {
            return String.format(REDIS_LOCK + "%s", key);
        }
    
        public boolean lock() {
            return lock(key, DEFAULT_WAIT_LOCK_TIME_OUT, TimeUnit.SECONDS);
        }
    
        @Override
        public void close(){
            try {
                String lockKey = generateLockKey(key);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("release RedisLock[" + lockKey + "].");
                }
                RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
                connection.del(lockKey.getBytes());
                connection.close();
            } catch (Exception e) {
                LOGGER.error("{}", e.getMessage(), e);
            }
        }
    }

    在高并发的使用场景下,如何让redis里的数据尽量保持一致,可以采用分布式锁。以分布式锁的方式来保证对临界资源的互斥读写。

       redis使用缓存作为分布式锁,性能非常强劲,在一些不错的硬件上,redis可以每秒执行10w次,内网延迟不超过1ms,足够满足绝大部分应用的锁定需求。

       redis常用的分布式锁的实现方式:

    一、setbit / getbit

       用索引号为0的第一个比特位来表示锁定状态,其中:0表示未获得锁,1表示已获得锁。

       优势:简单;

       劣势:竞态条件(race condition),死锁。

       获得锁的过程至少需要两步:先getbit判断,后setbit上锁。由于不是原子操作,因此可能存在竞态条件;如果一个客户端使用setbit获取到锁,然后没来得及释放crash掉了,那么其他在等待的客户端将永远无法获得该锁,进而形成了死锁。所以这种形式不太适合实现分布式锁。

    二、setnx / del / getset

      redis官网有一篇文章专门谈论了实现分布式锁的话题。基本的原则是:采用setnx尝试获取锁并判断是否获得了锁,setnx设置的值是它想占用锁的时间(预估):

    • 如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
    • 如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。

      通过del删除key来释放锁。某个想获得锁的客户端,先采用setnx尝试获取锁,如果获取失败了,那么会通过get命令来获得锁的过期时间以判断该锁的占用是否过期。如果跟当前时间对比,发现过期,那么先执行del,然后执行setnx获取锁。如果整个流程就这样,可能会产生死锁,请参考下面的执行序列:

       所以,在高并发的场景下,如果检测到锁过期,不能简单地进行del并尝试通过setnx获得锁。我们可以通过getset命令来避免这个问题。来看看,如果存在一个用户user4,它通过调用getset命令如何避免这种情况的发生:

     getset设置的过期时间跟上面的setnx设置的相同:

       如果该命令返回的结果跟上一步通过get获得的过期时间一致,则说明这两步之间,没有新的客户端抢占了锁,则该客户端即获得锁。如果该命令返回的结果跟上一步通过get获得的过期时间不一致,则该锁可能已被其他客户端抢先获得,则本次获取锁失败。

       这种实现方式得益于getset命令的原子性,从而有效得避免了竞态条件。并且,通过将比对锁的过期时间作为获取锁逻辑的一部分,从而避免了死锁。

    三、setnx / del / expire

       这是使用最多的实现方式:setnx的目的同上,用来实现尝试获取锁以及判断是否获取到锁的原子性,del删除key来释放锁,与上面不同的是,使用redis自带的expire命令来防止死锁(可能出现某个客户端获得了锁,但是crash了,永不释放导致死锁)。这算是一种比较简单但粗暴的实现方式:因为,不管实际的情况如何,当你设置expire之后,它一定会在那个时间点删除key。如何当时某个客户端已获得了锁,正在执行临界区内的代码,但执行时间超过了expire的时间,将会导致另一个正在竞争该锁的客户端也获得了该锁,这个问题下面还会谈到。

      我们来看一下宿舍锁的简单实现很简单:

    通过一个while(true),在当前线程上进行阻塞等待,并通过一个计数器进行自减操作,防止永久等待。 

    http://www.cnblogs.com/moonandstar08/p/5682822.html

    多节点的部署中,对锁的控制,参考:

    http://www.jeffkit.info/2011/07/1000/

    直接贴上代码实现,同上一篇文章一样,都是基于AOP

    定义注解,标志切入点:

    package com.ns.annotation;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Inherited;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Target({ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    public @interface RedisLock {
        /**
         * redis的key
         * @return
         */
        String value();
        /**
         * 持锁时间,单位毫秒,默认一分钟
         */
        long keepMills() default 60000;
        /**
         * 当获取失败时候动作
         */
        LockFailAction action() default LockFailAction.GIVEUP;
        
        public enum LockFailAction{
            /**
             * 放弃
             */
            GIVEUP,
            /**
             * 继续
             */
            CONTINUE;
        }
        /**
         * 睡眠时间,设置GIVEUP忽略此项
         * @return
         */
        long sleepMills() default 1000;
    }

    切面实现:

    package com.redis.aop;
    import java.lang.reflect.Method;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    import com.ns.annotation.RedisLock;
    import com.ns.annotation.RedisLock.LockFailAction;
    import com.ns.redis.dao.base.BaseRedisDao;
    @Aspect
    public class RedisLockAspect extends BaseRedisDao<String, Long>{
      private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);
      //execution(* com.ns..*(*,..)) and @within(com.ns.annotation.RedisLock)
      
      @Pointcut("execution(* com.ns..*(..)) && @annotation(com.ns.annotation.RedisLock)")
      private void lockPoint(){}
      @Around("lockPoint()")
      public Object arround(ProceedingJoinPoint pjp) throws Throwable{
        MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
        Method method = methodSignature.getMethod();
        RedisLock lockInfo = method.getAnnotation(RedisLock.class);
        boolean lock = false;
        Object obj = null;
        while(!lock){
          long timestamp = System.currentTimeMillis()+lockInfo.keepMills();
          lock = setNX(lockInfo.value(), timestamp);
          //得到锁,已过期并且成功设置后旧的时间戳依然是过期的,可以认为获取到了锁(成功设置防止锁竞争)
          long now = System.currentTimeMillis();
          if(lock || ((now > getLock(lockInfo.value())) && (now > getSet(lockInfo.value(), timestamp)))){
            //得到锁,执行方法,释放锁
            log.info("得到锁...");
            obj = pjp.proceed();
            //不加这一行,对于只能执行一次的定时任务,时间差上不能保证另一个一定正好放弃
            if(lockInfo.action().equals(LockFailAction.CONTINUE)){
              delete(lockInfo.value());
            }
          }else{
            if(lockInfo.action().equals(LockFailAction.CONTINUE)){
              log.info("稍后重新请求锁...");
              Thread.currentThread().sleep(lockInfo.sleepMills());
            }else{
              log.info("放弃锁...");
              break;
            }
          }
        }
        return obj;
      }
      public boolean setNX(String key,Long value){
        return valueOperations.setIfAbsent(key, value);
      }
      public long getLock(String key){
        return valueOperations.get(key);
      }
      public Long getSet(String key,Long value){
        return valueOperations.getAndSet(key, value);
      }
      public void releaseLock(String key){
        delete(key);
      }
    }

    Python的一个实现

    LOCK_TIMEOUT = 3
    lock = 0
    lock_timeout = 0
    lock_key = 'lock.foo'
    
    # 获取锁
    while lock != 1:
        now = int(time.time())
        lock_timeout = now + LOCK_TIMEOUT + 1
        lock = redis_client.setnx(lock_key, lock_timeout)
        if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
            break
        else:
            time.sleep(0.001)
    
    # 已获得锁
    do_job()
    
    # 释放锁
    now = int(time.time())
    if now < lock_timeout:
        redis_client.delete(lock_key)

    http://blog.csdn.net/lihao21/article/details/49104695

    以上有些代码只符合我现在的项目场景,根据实际需要进行调整

    http://www.tuicool.com/articles/EzaM7by

  • 相关阅读:
    Ubuntu14.04+cuda 7.5+cudnn_v4+tensorflow安装
    error C2275: “XXX”: 将此类型用作表达式非法
    DLL调试方法
    OpenCV ——双线性插值(Bilinear interpolation)
    OpenCV ——背景建模之CodeBook(2)
    OpenCV ——背景建模之CodeBook(1)
    OpenCV——GMM混合高斯模型
    OpenCV——运用于pixels war游戏
    【SpringSecurityOAuth2】源码分析@EnableOAuth2Sso在Spring Security OAuth2 SSO单点登录场景下的作用
    【Spring】简述@Configuration配置类注册BeanDefinition到Spring容器的过程
  • 原文地址:https://www.cnblogs.com/softidea/p/5974095.html
Copyright © 2011-2022 走看看