zoukankan      html  css  js  c++  java
  • 分布式缓存技术redis系列(五)——redis实战(redis与spring整合,分布式锁实现)

    本文是redis学习系列的第五篇,点击下面链接可回看系列文章

    《redis简介以及linux上的安装》

    《详细讲解redis数据结构(内存模型)以及常用命令》

    《redis高级应用(主从、事务与锁、持久化)》

    《redis高级应用(集群搭建、集群分区原理、集群操作》

    本文我们继续学习redis与spring的整合,整合之后就可以用redisStringTemplate的setNX()和delete()方法实现分布式锁了。

    Redis与spring的整合

    相关依赖jar包

    spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
        <version>1.4.2.RELEASE</version>
    </dependency>
     
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.6.2</version>
    </dependency>
     
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.4.2</version>
    </dependency>

    Spring 配置文件applicationContext.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <!--命令空间中加入下面这行-->
    xmlns:p="http://www.springframework.org/schema/p"
     
    <!-- redis连接池配置文件 -->
    <context:property-placeholder location="classpath:redis.properties" /> 
     
    <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"
        <property name="maxIdle" value="${redis.maxIdle}" /> 
        <property name="maxTotal" value="${redis.maxTotal}" /> 
        <property name="MaxWaitMillis" value="${redis.MaxWaitMillis}" /> 
        <property name="testOnBorrow" value="${redis.testOnBorrow}" /> 
    </bean> 
       
    <bean id="connectionFactory" class="org.springframework.data.    redis.connection.jedis.JedisConnectionFactory" 
        p:host-name="${redis.host}" p:port="${redis.port}"
        p:password="${redis.pass}"  p:pool-config-ref="poolConfig"/> 
       
    <bean id="redisTemplate" class="org.springframework.data.    redis.core.RedisTemplate"
        <property name="connectionFactory"   ref="connectionFactory" /> 
    </bean>

      

    注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。

    redis连接池配置文件redis.properties

    1
    2
    3
    4
    5
    6
    7
    8
    redis.host=192.168.2.129
    redis.port=6379 
    redis.pass=redis129 
     
    redis.maxIdle=300 
    redis.maxTotal=600 
    redis.MaxWaitMillis=1000 
    redis.testOnBorrow=true

    好了,配置完成,下面写上代码

    测试代码

    User

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Entity
    @Table(name = "t_user")
    public class User {
        //主键
        private String id;
        //用户名
        private String userName;
            //...省略get,set...
    }

    BaseRedisDao

    1
    2
    3
    4
    5
    6
    7
    @Repository
    public abstract class BaseRedisDao<K,V> {
         
        @Autowired(required=true
        protected RedisTemplate<K, V> redisTemplate;
     
    }

    IUserDao 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public interface IUserDao {
         
        public boolean save(User user);
         
        public boolean update(User user);
     
        public boolean delete(String userIds);
         
        public User find(String userId);
         
    }

    UserDao 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    @Repository
    public class UserDao extends BaseRedisDao<String, User> implements IUserDao {
         
        @Override
        public boolean save(final User user) {
            boolean res = redisTemplate.execute(new RedisCallback<Boolean>() {
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                    byte[] key = serializer.serialize(user.getId());
                    byte[] value = serializer.serialize(user.getUserName());
                    //set not exits
                    return connection.setNX(key, value);
                }
            });
            return res;
        }
     
        @Override
        public boolean update(final User user) {
            boolean result = redisTemplate.execute(new RedisCallback<Boolean>() { 
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException { 
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                    byte[] key  = serializer.serialize(user.getId()); 
                    byte[] name = serializer.serialize(user.getUserName()); 
                    //set
                    connection.set(key, name); 
                    return true
                
            }); 
            return result;
        }
     
        @Override
        public User find(final String userId) {
            User result = redisTemplate.execute(new RedisCallback<User>() { 
                public User doInRedis(RedisConnection connection) throws DataAccessException { 
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                    byte[] key = serializer.serialize(userId);
                    //get
                    byte[] value = connection.get(key); 
                    if (value == null) { 
                        return null
                    
                    String name = serializer.deserialize(value);
                    User resUser = new User();
                    resUser.setId(userId);
                    resUser.setUserName(name);
                    return resUser; 
                
            }); 
            return result; 
        }
     
        @Override
        public boolean delete(final String userId) {
            boolean result = redisTemplate.execute(new RedisCallback<Boolean>() { 
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException { 
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); 
                    byte[] key  = serializer.serialize(userId); 
                    //delete
                    connection.del(key);
                    return true
                
            }); 
            return result;
        }
     
    }

    Test

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath*:applicationContext.xml"}) 
    public class RedisTest extends AbstractJUnit4SpringContextTests { 
           
        @Autowired 
        private IUserDao userDao;
         
        @Test 
        public void testSaveUser() { 
            User user = new User(); 
            user.setId("402891815170e8de015170f6520b0000"); 
            user.setUserName("zhangsan"); 
            boolean res = userDao.save(user);
            Assert.assertTrue(res); 
        
         
        @Test 
        public void testGetUser() { 
            User user = new User(); 
            user = userDao.find("402891815170e8de015170f6520b0000");
            System.out.println(user.getId() + "-" + user.getUserName() ); 
        
         
        @Test 
        public void testUpdateUser() { 
            User user = new User(); 
            user.setId("402891815170e8de015170f6520b0000"); 
            user.setUserName("lisi"); 
            boolean res = userDao.update(user);
            Assert.assertTrue(res); 
        
         
        @Test 
        public void testDeleteUser() { 
            boolean res = userDao.delete("402891815170e8de015170f6520b0000");
            Assert.assertTrue(res); 
        
        
    }

      

    String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    connection.hSetNX(key, field, value);
    connection.hDel(key, fields);
    connection.hGet(key, field);
     
    connection.lPop(key);
    connection.lPush(key, value);
    connection.rPop(key);
    connection.rPush(key, values);
     
    connection.sAdd(key, values);
    connection.sMembers(key);
    connection.sDiff(keys);
    connection.sPop(key);

      

    整合可能遇到的问题

    1.NoSuchMethodError

    1
    2
    3
    java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
     
    Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V

      

    类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。

    1.No qualifying bean

    1
    No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency

      

    找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。

    基于redis实现的分布式锁

    我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的),不像传统的synchronized锁只能锁住本机,分布式锁则用于分布式环境中对集群其他节点上锁。

    现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。

    java中的锁技术

    在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。

    不加锁共享输出设备

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    public class LockTest {
        //不加锁
        static class Outputer {
            public void output(String name) {
                for(int i=0; i<name.length(); i++) {
                    System.out.print(name.charAt(i));
                }
                System.out.println();
            }
        }
        public static void main(String[] args) {
            final Outputer output = new Outputer();
            //线程1打印zhangsan
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true) {
                         try{
                             Thread.sleep(1000);
                         }catch(InterruptedException e) {
                             e.printStackTrace();
                         }
                         output.output("zhangsan");
                    }  
                }
            }).start();
             
            //线程2打印lingsi
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true) {
                         try{
                             Thread.sleep(1000);
                         }catch(InterruptedException e) {
                             e.printStackTrace();
                         }
                         output.output("lingsi");
                    }
                }
            }).start();
             
            //线程3打印wangwu
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true) {
                         try{
                             Thread.sleep(1000);
                         }catch(InterruptedException e) {
                             e.printStackTrace();
                         }
                         output.output("huangwu");
                    }
                }
            }).start();
        }
    }

    上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    huangwu
    zhangslingsi
    an
    huangwu
    zlingsi
    hangsan
    huangwu
    lzhangsan
    ingsi
    huangwu
    lingsi

      

    从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。

    使用java5中的Lock对输出设备加锁

    现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //使用java5中的锁
    static class Outputer{
        Lock lock = new ReentrantLock();
        public void output(String name) {
            //传统java加锁
            //synchronized (Outputer.class){
            lock.lock();
            try {
                for(int i=0; i<name.length(); i++) {
                    System.out.print(name.charAt(i));
                }
                System.out.println();
            }finally{
                //任何情况下都有释放锁
                lock.unlock();
            }  
            //}
        }
    }

      

    看看加锁后的输出结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    ......

      

    从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。

    Redis分布式锁

    实现分析

    从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:

    l 互斥,不管任何时候,只有一个客户端能持有同一个锁。

    l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。

    l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。

    那么什么情况下会不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,

    (1)线程1在master节点拿到了锁(存入key)

    (2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点

    (3)slaver节点升级为master节点

    (4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。

    可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。

    那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。

    为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。

    Redlock算法

    根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:

     

    RedLock算法主要流程

     

     

    Java实现

    结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    public class RedisLock implements Lock{
     
        protected StringRedisTemplate redisStringTemplate;
     
        // 存储到redis中的锁标志
        private static final String LOCKED = "LOCKED";
     
        // 请求锁的超时时间(ms)
        private static final long TIME_OUT = 30000;
     
        // 锁的有效时间(s)
        public static final int EXPIRE = 60;
     
        // 锁标志对应的key;
        private String key;
     
        // state flag
        private volatile boolean isLocked = false;
     
        public RedisLock(String key) {
            this.key = key;
            @SuppressWarnings("resource")
            ApplicationContext  ctx =  new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
            redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
        }
     
        @Override
        public void lock() {
            //系统当前时间,毫秒
            long nowTime = System.nanoTime();
            //请求锁超时时间,毫秒
            long timeout = TIME_OUT*1000000;
            final Random r = new Random();
            try {
                //不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
                //这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
                //如果一个master节点不可用了,应该尽快尝试下一个master节点
                while ((System.nanoTime() - nowTime) < timeout) {
                    //将锁作为key存储到redis缓存中,存储成功则获得锁
                    if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
                            LOCKED.getBytes())) {
                        //设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
                        //可以防止因异常情况无法释放锁而造成死锁情况的发生
                        redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
                        isLocked = true;
                        //上锁成功结束请求
                        break;
                    }
                    //获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
                    //睡眠3毫秒后继续请求锁
                    Thread.sleep(3, r.nextInt(500));
                }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
     
        @Override
        public void unlock() {
            //释放锁
            //不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
            if (isLocked) {
                redisStringTemplate.delete(key);
            }
        }
     
        @Override
        public void lockInterruptibly() throws InterruptedException {
            // TODO Auto-generated method stub
             
        }
     
        @Override
        public boolean tryLock() {
            // TODO Auto-generated method stub
            return false;
        }
     
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            // TODO Auto-generated method stub
            return false;
        }
     
        @Override
        public Condition newCondition() {
            // TODO Auto-generated method stub
            return null;
        }
    }

     

    好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /使用RedisLock
    static class Outputer {
        //创建一个名为redisLock的RedisLock类型的锁
        RedisLock redisLock = new RedisLock("redisLock");
        public void output(String name) {
            //上锁
            redisLock.lock();
            try {
                for(int i=0; i<name.length(); i++) {
                    System.out.print(name.charAt(i));
                }
                System.out.println();
            }finally{
                //任何情况下都要释放锁
                redisLock.unlock();
            }  
        }
    }

      

    看看使用RedisLock加锁后的的运行结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    ......

      

    可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。

    问题

    现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for idle object都没有出现线程“打架”的问题)。我认为RedLock算法仍有些问题没说清楚,比如,如何防止拥有锁的Master节点宕机而未来得及同步key到Slave节点时导致其他线程(客户端)获得锁?RedLock算法在释放锁的处理上,不管线程是否获取锁成功,只要上了锁,就会到每个master节点上释放锁,这就会导致一个线程上的锁可能会被其他线程释放掉,这就和每个锁只能被获得锁的线程释放相互矛盾。这些有待后续进一步交流学习研究。

    参考文档

    http://redis.io/topics/distlock

    http://ifeve.com/redis-lock/

  • 相关阅读:
    SQL Server需要监控哪些计数器
    将表里的数据批量生成INSERT语句的存储过程 继续增强版
    [Java]
    [Linux] 安装JBoss
    [Spring]
    [Spring]
    [Maven]
    [Maven]
    [Spring MVC]
    [Spring MVC]
  • 原文地址:https://www.cnblogs.com/eer123/p/11803906.html
Copyright © 2011-2022 走看看