zoukankan      html  css  js  c++  java
  • redis系列之5----redis实战(redis与spring整合,分布式锁实现)

    本文是redis学习系列的第五篇,点击下面链接可回看系列文章

    《redis简介以及linux上的安装》

    《详细讲解redis数据结构(内存模型)以及常用命令》

    《redis高级应用(主从、事务与锁、持久化)》

    《redis高级应用(集群搭建、集群分区原理、集群操作》

    本文我们继续学习redis与spring的整合,整合之后就可以用redisStringTemplate的setNX()和delete()方法实现分布式锁了。

    Redis与spring的整合

    相关依赖jar包

    spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装

    <dependency>
    	<groupId>org.springframework.data</groupId>
    	<artifactId>spring-data-redis</artifactId>
    	<version>1.4.2.RELEASE</version>
    </dependency>
    
    <dependency>
    	<groupId>redis.clients</groupId>
    	<artifactId>jedis</artifactId>
    	<version>2.6.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.4.2</version>
    </dependency>
    

    Spring 配置文件applicationContext.xml

        <!--命令空间中加入下面这行-->
        xmlns:p="http://www.springframework.org/schema/p"
    
        <!-- redis连接池配置文件 -->
        <context:property-placeholder location="classpath:redis.properties" />  
      
        <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">  
            <property name="maxIdle" value="${redis.maxIdle}" />  
            <property name="maxTotal" value="${redis.maxTotal}" />  
            <property name="MaxWaitMillis" value="${redis.MaxWaitMillis}" />  
            <property name="testOnBorrow" value="${redis.testOnBorrow}" />  
        </bean>  
          
        <bean id="connectionFactory" class="org.springframework.data.    redis.connection.jedis.JedisConnectionFactory"  
            p:host-name="${redis.host}" p:port="${redis.port}" 
            p:password="${redis.pass}"  p:pool-config-ref="poolConfig"/>  
          
        <bean id="redisTemplate" class="org.springframework.data.    redis.core.RedisTemplate">  
            <property name="connectionFactory"   ref="connectionFactory" />  
        </bean>
    

      

    注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同

    redis连接池配置文件redis.properties

    redis.host=192.168.2.129
    redis.port=6379  
    redis.pass=redis129  
    
    redis.maxIdle=300  
    redis.maxTotal=600  
    redis.MaxWaitMillis=1000  
    redis.testOnBorrow=true 
    

    好了,配置完成,下面写上代码

    测试代码

    User

    @Entity
    @Table(name = "t_user")
    public class User {
    	//主键
    	private String id;
    	//用户名
    	private String userName;
            //...省略get,set...
    }
    

    BaseRedisDao

    @Repository
    public abstract class BaseRedisDao<K,V> {
    	
    	@Autowired(required=true)  
        protected RedisTemplate<K, V> redisTemplate;
    
    }
    

    IUserDao 

    public interface IUserDao {
    	
    	public boolean save(User user);
    	
    	public boolean update(User user);
    
    	public boolean delete(String userIds);
    	
    	public User find(String userId);
    	
    }
    

    UserDao 

    @Repository
    public class UserDao extends BaseRedisDao<String, User> implements IUserDao {
    	
    	@Override
    	public boolean save(final User user) {
    		boolean res = redisTemplate.execute(new RedisCallback<Boolean>() {
    			public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
    				RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
    				byte[] key = serializer.serialize(user.getId());
    				byte[] value = serializer.serialize(user.getUserName());
    				//set not exits
    				return connection.setNX(key, value);
    			}
    		});
    		return res;
    	}
    
    	@Override
    	public boolean update(final User user) {
            boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {  
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {  
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                    byte[] key  = serializer.serialize(user.getId());  
                    byte[] name = serializer.serialize(user.getUserName());  
                    //set
                    connection.set(key, name);  
                    return true;  
                }  
            });  
            return result;
    	}
    
    	@Override
    	public User find(final String userId) {
    		User result = redisTemplate.execute(new RedisCallback<User>() {  
                public User doInRedis(RedisConnection connection) throws DataAccessException {  
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                    byte[] key = serializer.serialize(userId);
                    //get
                    byte[] value = connection.get(key);  
                    if (value == null) {  
                        return null;  
                    }  
                    String name = serializer.deserialize(value); 
                    User resUser = new User();
                    resUser.setId(userId);
                    resUser.setUserName(name);
                    return resUser;  
                }  
            });  
            return result;  
    	}
    
    	@Override
    	public boolean delete(final String userId) {
    		boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {  
                public Boolean doInRedis(RedisConnection connection) throws DataAccessException {  
                    RedisSerializer<String> serializer = redisTemplate.getStringSerializer();  
                    byte[] key  = serializer.serialize(userId);  
                    //delete 
                    connection.del(key);
                    return true;  
                }  
            });  
    		return result;
    	}
    
    }
    

    Test

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath*:applicationContext.xml"})  
    public class RedisTest extends AbstractJUnit4SpringContextTests {  
          
        @Autowired  
        private IUserDao userDao;
        
        @Test  
        public void testSaveUser() {  
            User user = new User();  
            user.setId("402891815170e8de015170f6520b0000");  
            user.setUserName("zhangsan");  
            boolean res = userDao.save(user);
            Assert.assertTrue(res);  
        }  
        
        @Test  
        public void testGetUser() {  
            User user = new User();  
            user = userDao.find("402891815170e8de015170f6520b0000");
            System.out.println(user.getId() + "-" + user.getUserName() );  
        }  
        
        @Test  
        public void testUpdateUser() {  
        	User user = new User();  
            user.setId("402891815170e8de015170f6520b0000");  
            user.setUserName("lisi");  
            boolean res = userDao.update(user);
            Assert.assertTrue(res);  
        }  
        
        @Test  
        public void testDeleteUser() {  
        	boolean res = userDao.delete("402891815170e8de015170f6520b0000");
        	Assert.assertTrue(res);  
        }  
       
    }
    

      

    String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下

    connection.hSetNX(key, field, value);
    connection.hDel(key, fields);
    connection.hGet(key, field);
    
    connection.lPop(key);
    connection.lPush(key, value);
    connection.rPop(key);
    connection.rPush(key, values);
    
    connection.sAdd(key, values);
    connection.sMembers(key);
    connection.sDiff(keys);
    connection.sPop(key);
    

      

    整合可能遇到的问题

    1.NoSuchMethodError

     java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
    
    Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V
    

      

    类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。

    1.No qualifying bean

    No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found for dependency
    

      

    找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。

    基于redis实现的分布式锁

    我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。

    现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。

    java中的锁技术

    在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。

    不加锁共享输出设备

    public class LockTest {
    	//不加锁
    	static class Outputer {
    		public void output(String name) {
    			for(int i=0; i<name.length(); i++) {
    				System.out.print(name.charAt(i));
    			}
    			System.out.println();
    		}
    	}
    	public static void main(String[] args) {
    		final Outputer output = new Outputer();
    		//线程1打印zhangsan
    		new Thread(new Runnable(){
    			@Override
    			public void run() {
    				while(true) {
    					 try{
    						 Thread.sleep(1000);
    					 }catch(InterruptedException e) {
    						 e.printStackTrace();
    					 }
    					 output.output("zhangsan");
    				}	
    			}
    		}).start();
    		
    		//线程2打印lingsi
    		new Thread(new Runnable(){
    			@Override
    			public void run() {
    				while(true) {
    					 try{
    						 Thread.sleep(1000);
    					 }catch(InterruptedException e) {
    						 e.printStackTrace();
    					 }
    					 output.output("lingsi");
    				}
    			}
    		}).start();
    		
    		//线程3打印wangwu
    		new Thread(new Runnable(){
    			@Override
    			public void run() {
    				while(true) {
    					 try{
    						 Thread.sleep(1000);
    					 }catch(InterruptedException e) {
    						 e.printStackTrace();
    					 }
    					 output.output("huangwu");
    				}
    			}
    		}).start();
    	}
    }
    

    上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:

    huangwu
    zhangslingsi
    an
    huangwu
    zlingsi
    hangsan
    huangwu
    lzhangsan
    ingsi
    huangwu
    lingsi
    

      

    从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。

    使用java5中的Lock对输出设备加锁

    现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。

    //使用java5中的锁
    static class Outputer{
    	Lock lock = new ReentrantLock();
    	public void output(String name) {
    		//传统java加锁
    		//synchronized (Outputer.class){
    		lock.lock();
    		try {
    			for(int i=0; i<name.length(); i++) {
    				System.out.print(name.charAt(i));
    			}
    			System.out.println();
    		}finally{
    			//任何情况下都有释放锁
    			lock.unlock();
    		}	
    		//}
    	}
    }
    

      

    看看加锁后的输出结果:

    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    zhangsan
    lingsi
    huangwu
    ......
    

      

    从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。

    Redis分布式锁

    实现分析

    从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:

    l 互斥,不管任何时候,只有一个客户端能持有同一个锁。

    l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。

    l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。

    那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,

    (1)线程1在master节点拿到了锁(存入key)

    (2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点

    (3)slaver节点升级为master节点

    (4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。

    可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。

    那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。

    为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。

    Redlock算法

    根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:

     

    RedLock算法主要流程

     

     

    Java实现

    结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理

    public class RedisLock implements Lock{
    
    	protected StringRedisTemplate redisStringTemplate;
    
    	// 存储到redis中的锁标志
    	private static final String LOCKED = "LOCKED";
    
    	// 请求锁的超时时间(ms)
    	private static final long TIME_OUT = 30000;
    
    	// 锁的有效时间(s)
    	public static final int EXPIRE = 60;
    
    	// 锁标志对应的key;
    	private String key;
    
    	// state flag
    	private volatile boolean isLocked = false;
    
    	public RedisLock(String key) {
    		this.key = key;
    		@SuppressWarnings("resource")
    		ApplicationContext  ctx =  new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
    		redisStringTemplate = (StringRedisTemplate)ctx.getBean("redisStringTemplate");
    	}
    
    	@Override
    	public void lock() {
    		//系统当前时间,毫秒
    		long nowTime = System.nanoTime();
    		//请求锁超时时间,毫秒
    		long timeout = TIME_OUT*1000000;
    		final Random r = new Random();
    		try {
    			//不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
    			//这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
    			//如果一个master节点不可用了,应该尽快尝试下一个master节点
    			while ((System.nanoTime() - nowTime) < timeout) {
    				//将锁作为key存储到redis缓存中,存储成功则获得锁
    				if (redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
    						LOCKED.getBytes())) {
    					//设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
    					//可以防止因异常情况无法释放锁而造成死锁情况的发生
    					redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
    					isLocked = true;
    					//上锁成功结束请求
    					break;
    				}
    				//获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
    				//睡眠3毫秒后继续请求锁
    				Thread.sleep(3, r.nextInt(500));
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void unlock() {
    		//释放锁
    		//不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
    		if (isLocked) {
    			redisStringTemplate.delete(key);
    		}
    	}
    
    	@Override
    	public void lockInterruptibly() throws InterruptedException {
    		// TODO Auto-generated method stub
    		
    	}
    
    	@Override
    	public boolean tryLock() {
    		// TODO Auto-generated method stub
    		return false;
    	}
    
    	@Override
    	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    		// TODO Auto-generated method stub
    		return false;
    	}
    
    	@Override
    	public Condition newCondition() {
    		// TODO Auto-generated method stub
    		return null;
    	}
    }
    

     

    好了,RedisLock已经实现,我们对Outputer使用RedisLock进行修改

    /使用RedisLock
    static class Outputer {
    	//创建一个名为redisLock的RedisLock类型的锁
    	RedisLock redisLock = new RedisLock("redisLock");
    	public void output(String name) {
    		//上锁
    		redisLock.lock();
    		try {
    			for(int i=0; i<name.length(); i++) {
    				System.out.print(name.charAt(i));
    			}
    			System.out.println();
    		}finally{
    			//任何情况下都要释放锁
    			redisLock.unlock();
    		}	
    	}
    }
    

      

    看看使用RedisLock加锁后的的运行结果

    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    lingsi
    zhangsan
    huangwu
    ......
    

      

    可见,使用RedisLock加锁后线程之间不再“打架”,三个线程互斥的访问output。

    问题

    现在我无法论证RedLock算法在分布式、高并发环境下的可靠性,但从本例三个线程的运行结果看,RedLock算法确实保证了三个线程互斥的访问output(redis.maxIdle=300 redis.maxTotal=600,运行到Timeout waiting for idle object都没有出现线程“打架”的问题)。我认为RedLock算法仍有些问题没说清楚,比如,如何防止宕机时多个线程同时获得锁;RedLock算法在释放锁的处理上,不管线程是否获取锁成功,只要上了锁,就会到每个master节点上释放锁,这就会导致一个线程上的锁可能会被其他线程释放掉,这就和每个锁只能被获得锁的线程释放相互矛盾。这些有待后续进一步交流学习研究。

    另一种可靠的方案(单机Redis分布式锁的实现)参考我的博文 

    参考文档

    http://redis.io/topics/distlock

    http://ifeve.com/redis-lock/

    本文转载自 https://www.cnblogs.com/hjwublog/p/5637150.html
    作者:ITPSC 出处:http://www.cnblogs.com/hjwublog/

  • 相关阅读:
    OSX安装nginx和rtmp模块(rtmp直播服务器搭建)
    用runtime来重写Coder和deCode方法 归档解档的时候使用
    Homebrew安装卸载
    Cannot create a new pixel buffer adaptor with an asset writer input that has already started writing'
    OSX下面用ffmpeg抓取桌面以及摄像头推流进行直播
    让nginx支持HLS
    iOS 字典转json字符串
    iOS 七牛多张图片上传
    iOS9UICollectionView自定义布局modifying attributes returned by UICollectionViewFlowLayout without copying them
    Xcode6 iOS7模拟器和Xcode7 iOS8模拟器离线下载
  • 原文地址:https://www.cnblogs.com/bcde/p/11163397.html
Copyright © 2011-2022 走看看