zoukankan      html  css  js  c++  java
  • redis分布式锁

    1. maven中添加jedis客户端版本

    <dependency>
         <groupId>redis.clients</groupId>
         <artifactId>jedis</artifactId>
         <version>2.4.2</version>
    </dependency>

    此版本对于单机基本满足redis任何操作

    java代码实现连接方式如下:

    @Configuration
    @EnableCaching
    public class RedisCacheConfiguration extends CachingConfigurerSupport {
        private static Logger logger = LoggerFactory.getLogger(RedisCacheConfiguration.class);
    
        @Value("${spring.redis.host}")
        private String host;
    
        @Value("${spring.redis.port}")
        private int port;
    
        @Value("${spring.redis.timeout}")
        private int timeout;
    
        @Value("${spring.redis.jedis.pool.max-idle}")
        private int maxIdle;
    
        @Value("${spring.redis.jedis.pool.max-wait}")
        private long maxWaitMillis;
    
        @Value("${spring.redis.password}")
        private String password;
    
        @Value("${spring.redis.database}")
        private int database;
    
        @Value("${spring.redis.clientname}")
        private String clienttoken;
    
        @Bean
        public JedisPool redisPoolFactory() {
            logger.info("JedisPool注入成功!!");
            logger.info("redis地址:" + host + ":" + port);
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(maxIdle);
            jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
            jedisPoolConfig.setTestOnBorrow(true);
            jedisPoolConfig.setTestOnReturn(true);
            JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password,database,clienttoken);
            return jedisPool;
        }
    }
    @Service
    public class RedisManager {
        // 0-表示永远不过期
        private int expire = 0;
    
        @Autowired
        JedisPool jedisPool;
        public RedisManager() {
        }
    
        /**
         * @Title: get
         * @Description: 根据key来获得一条特定的缓存数据
         * @param @param key 序列化后的key
         * @param @return    设定文件
         * @return byte[]    返回类型
         * @throws
         */
        public byte[] get(byte[] key) {
            byte[] value = null;
            Jedis jedis = jedisPool.getResource();
            try {
                value = jedis.get(key);
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }
    
        /**
         * @Title: get
         * @Description: 根据key来获得一条特定的缓存数据
         * @param @param key string类型的key
         * @param @return    设定文件
         * @return String    返回类型
         * @throws
         */
        public String get(String key) {
            String value = null;
            Jedis jedis = jedisPool.getResource();
            try {
                value = jedis.get(key);
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }
    
        /**
         * @Title: set
         * @Description: 向redis数据库中缓存数据,key,value都为byte[](已经序列化)
         * @param @param key
         * @param @param value
         * @param @return 设定文件
         * @return byte[] 返回类型
         * @throws
         */
        /*public byte[] set(byte[] key, byte[] value) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.set(key, value);
                if (this.expire != 0) {
                    jedis.expire(key, this.expire);
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }*/
    
        /**
         * @Title: set
         * @Description: 向redis数据库中缓存数据,key,value为字符串类型,缓存时间为永不过期
         * @param @param key
         * @param @param value
         * @param @return 设定文件
         * @return String 返回类型
         * @throws
         */
        public String set(String key, String value) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.set(key, value);
                if (this.expire != 0) {
                    jedis.expire(key, this.expire);
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }
    
        /**
         * @Title: set
         * @Description: 向redis数据库中缓存数据,key,value都为byte[](已经序列化)
         * @param @param key
         * @param @param value
         * @param @param expire 0为永不过期,其他时间则会设置对应的过期时间
         * @param @return 设定文件
         * @return byte[] 返回类型
         * @throws
         */
        /*public byte[] set(byte[] key, byte[] value, int expire) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.set(key, value);
                if (expire != 0) {
                    jedis.expire(key, expire);
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }*/
    
        /**
         * @Title: set
         * @Description: 向redis数据库中缓存数据,key,value都为字符串的类型
         * @param @param key
         * @param @param value
         * @param @param expire 0为永不过期,其他时间则会设置对应的过期时间
         * @param @return 设定文件
         * @return String 返回类型
         * @throws
         */
        public String set(String key, String value, int expire) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.set(key, value);
                if (expire != 0) {
                    jedis.expire(key, expire);
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return value;
        }
    
        /**
         * @Title: del
         * @Description: 根据byte数组(已经序列化的key)来删除redis数据库中缓存的数据
         * @param @param key 设定文件
         * @return void 返回类型
         * @throws
         */
        public void del(byte[] key) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.del(key);
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    
        /**
         * @Title: del
         * @Description: 根据特定的string类型的key来删除redis数据库中的缓存数据
         * @param @param key 设定文件
         * @return void 返回类型
         * @throws
         */
        public void del(String key) {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.del(key);
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    
        /**
         * @Title: flushDB
         * @Description: 清除指定redis数据库中的数据
         * @param
         * @return void 返回类型
         * @throws
         */
        public void flushDB() {
            Jedis jedis = jedisPool.getResource();
            try {
                jedis.flushDB();
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    
        /**
         * @Title: dbSize
         * @Description: 获得redis缓存数据的大小
         * @param @return 设定文件
         * @return Long 返回类型
         * @throws
         */
        public Long dbSize() {
            Long dbSize = 0L;
            Jedis jedis = jedisPool.getResource();
            try {
                dbSize = jedis.dbSize();
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return dbSize;
        }
    
        /**
         * @Title: keys
         * @Description: 根据泛型来查询所有符合泛型的缓存数据
         * @param @param pattern
         * @param @return 设定文件
         * @return Set<byte[]> 返回类型
         * @throws
         */
        public Set<byte[]> keys(String pattern) {
            Set<byte[]> keys = null;
            Jedis jedis = jedisPool.getResource();
            try {
                keys = jedis.keys(pattern.getBytes());
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return keys;
        }
    
        /**
         * @Title: dels
         * @Description: 根据提供的泛型来删除reids中缓存的数据
         * @param @param pattern 设定文件
         * @return void 返回类型
         * @throws
         */
        public void dels(String pattern) {
            Set<byte[]> keys = null;
            Jedis jedis = jedisPool.getResource();
            try {
                keys = jedis.keys(pattern.getBytes());
                Iterator<byte[]> ito = keys.iterator();
                while (ito.hasNext()) {
                    jedis.del(ito.next());
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    }
    spring.redis.database=0
    spring.redis.host=111.11.1.1
    spring.redis.password=111111
    spring.redis.port=6379
    spring.redis.jedis.pool.max-active=10
    spring.redis.jedis.pool.max-wait=-1
    spring.redis.jedis.pool.max-idle=6
    spring.redis.jedis.pool.min-idle=0
    #spring.redis.default.expire.time=60
    spring.redis.timeout=10000
    spring.redis.clientname=know

     如果要使用分布式锁用此版本的setnx会有如下问题

    问题1

    public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {
        //这里的问题是,两次设置已经不再是原子操作
        Long result = jedis.setnx(lockKey, requestId);
        if (result == 1) {
            // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
            jedis.expire(lockKey, expireTime);
        }
    
    }

    问题2

    public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {
     
        long expires = System.currentTimeMillis() + expireTime;
        String expiresStr = String.valueOf(expires);
     
        // 如果当前锁不存在,返回加锁成功
        if (jedis.setnx(lockKey, expiresStr) == 1) {
            return true;
        }
     
        // 如果锁存在,获取锁的过期时间
        String currentValueStr = jedis.get(lockKey);
        if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
            // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
            String oldValueStr = jedis.getSet(lockKey, expiresStr);
            if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
                return true;
            }
        }
     
        // 其他情况,一律返回加锁失败
        return false;
    }

    错误原因描述

    1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。

    2. 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。

    3. 锁不具备拥有者标识,即任何客户端都可以解锁。

    既然要使用分布式锁,又要避免死锁发生,不破坏原子性。只有升级jedis版本

    <!-- 添加jedis客户端 -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>

    一个重要改进的地方就是set操作支持过期时间的设置

    @Service
    public class DistributedLock {
        private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
        private static final Integer Lock_Timeout = 3;
    
        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
    
        private static final Long RELEASE_SUCCESS = 1L;
    
        @Autowired
        JedisPool jedisPool;
    
        /**
         * 尝试获取分布式锁
         * @param lockKey 锁
         * @param requestId 请求标识
         * @param expireTime 超期时间
         * @return 是否获取成功
         */
        public boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) {
            Jedis jedis = jedisPool.getResource();
            try {
                String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
                if (LOCK_SUCCESS.equals(result)) {
                    System.out.println("加锁成功");
                    return true;
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return false;
        }
    
        /**
         * 释放分布式锁
         * @param lockKey 锁
         * @param requestId 请求标识
         * @return 是否释放成功
         */
        public boolean releaseDistributedLock(String lockKey, String requestId) {
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Jedis jedis = jedisPool.getResource();
            try {
                Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
                if (RELEASE_SUCCESS.equals(result)) {
                    return true;
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
            return false;
        }
    
        /**
         * 外部调用加锁的方法
         * @param lockKey 锁的名字
         * @param timeout 超时时间(放置时间长度,如:5L)
         * @return
         */
        public boolean tryLock(String lockKey, Long timeout) {
            try {
                Long currentTime = System.currentTimeMillis();//开始加锁的时间
                boolean result = false;
    
                while (true) {
                    if ((System.currentTimeMillis() - currentTime) / 1000 > timeout) {//当前时间超过了设定的超时时间
                        logger.info("Execute DistributedLockHandler.tryLock method, Time out.");
                        break;
                    } else {
                        result = innerTryLock(lockKey);
                        if (result) {
                            break;
                        } else {
                            logger.info("Try to get the Lock,and wait 100 millisecond....");
                            Thread.sleep(100);
                        }
                    }
                }
                return result;
            } catch (Exception e) {
                logger.info("Failed to run DistributedLockHandler.getLock method."+ e.getMessage());
                return false;
            }
        }
    
        /**
         * 释放锁
         * @param lockKey 锁的名字
         */
        public void realseLock(String lockKey) {
            if(!checkIfLockTimeout(System.currentTimeMillis(), lockKey)){
                Jedis jedis = jedisPool.getResource();
                try {
                    jedis.del(lockKey);
                } finally {
                    if (jedis != null) {
                        jedisPool.returnResource(jedis);
                    }
                }
            }
        }
    
        /**
         * 内部获取锁的实现方法
         * @param lockKey 锁的名字
         * @return
         */
        private boolean innerTryLock(String lockKey) {
            long currentTime = System.currentTimeMillis();//当前时间
            String lockTimeDuration = String.valueOf(currentTime + Lock_Timeout + 1);//锁的持续时间
            Jedis jedis = jedisPool.getResource();
            try {
                //Long result = jedis.setnx(lockKey, lockTimeDuration);
                Long result = jedis.setnx(lockKey, lockTimeDuration);
    
                if (result == 1) {
                    return true;
                } else {
                    if (checkIfLockTimeout(currentTime, lockKey)) {
                        String preLockTimeDuration = jedis.getSet(lockKey, lockTimeDuration);
                        if (currentTime > Long.valueOf(preLockTimeDuration)) {
                            return true;
                        }
                    }
                    return false;
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    
        /**
         * 判断加锁是否超时
         * @param currentTime 当前时间
         * @param lockKey 锁的名字
         * @return
         */
        private boolean checkIfLockTimeout(Long currentTime, String lockKey) {
            Jedis jedis = jedisPool.getResource();
            try {
                if (currentTime > Long.valueOf(jedis.get(lockKey))) {//当前时间超过锁的持续时间
                    return true;
                } else {
                    return false;
                }
            } finally {
                if (jedis != null) {
                    jedisPool.returnResource(jedis);
                }
            }
        }
    }

    在idea工具内发现一个明显的错误,于是查看jedis源码

    public Jedis getResource() {
            Jedis jedis = (Jedis)super.getResource();
            jedis.setDataSource(this);
            return jedis;
        }
    
        /** @deprecated */
        @Deprecated
        public void returnBrokenResource(Jedis resource) {
            if (resource != null) {
                this.returnBrokenResourceObject(resource);
            }
    
        }
    
        /** @deprecated */
        @Deprecated
        public void returnResource(Jedis resource) {
            if (resource != null) {
                try {
                    resource.resetState();
                    this.returnResourceObject(resource);
                } catch (Exception var3) {
                    this.returnBrokenResource(resource);
                    throw new JedisException("Could not return the resource to the pool", var3);
                }
            }
    
        }

    2.9版本建议jedispool对于连接的回收不建议使用returnResource。如果不用此操作,那么对于getResource如何释放链接,继续看源码

    2.4版本 jedis.close();源码如下
    public void close() {
        this.client.close();
    }
    2.9版本 jedis.close();源码如下
    public void close() {
        if (this.dataSource != null) {
             if (this.client.isBroken()) {
                 this.dataSource.returnBrokenResource(this);
             } else {
                 this.dataSource.returnResource(this);
            }
        } else {
            this.client.close();
        }
    }

    到此,我们修改原来DistributedLock和RedisManager的代码,完成版本升级和分布式锁的实现

  • 相关阅读:
    Python-time和datetime模块
    Python-hashlib模块
    Python-利用flask模块创建web接口
    Python-操作Excel
    2
    1
    8
    7
    HDFS元数据管理实战篇
    使用HttpFS网关从防火墙后面访问HDFS
  • 原文地址:https://www.cnblogs.com/mutong1228/p/10606860.html
Copyright © 2011-2022 走看看