在并发量比较高的情况下redis有很多应用场景,提升查询效率,缓解底层DBio ,下面列举两个平时开发中应用过的两个例子,欢迎各位一起讨论改进。
1 . redis 惊群处理
1.1 方案的由来
Redis的缓存数据库是为快速响应客户端减轻数据库压力的有效手段之一,其中有一种功能是失效缓存,其优点是可以不定期的释放使用频率低的业务空间而增加有限的内存,但对于同步数据库和缓存之间的数据来说需要面临一个问题就是:在并发量比较大的情况下当一个缓存数据失效之后会导致同时有多个并发线程去向后端数据库发起请求去获取同一业务数据,这样如果在一段时间内同时生成了大量的缓存,然后在另外一段时间内又有大量的缓存失效,这样就会导致后端数据库的压力陡增,这种现象就可以称为“缓存过期产生的惊群现象”!
1.2 处理逻辑
缓存内真实失效时间time1
缓存value中存放人为失效时间戳 :time2 ( time2 永远小于time1)
缓存value对应的lock锁(就是一个与value 对应的 另一个key),主要用于判断是第几个线程来读取redis的value
当把数据库的数据写入缓存后,这时有客户端第一次来读取缓存,取当前系统时间:system_time 如果system_time >= time2 则认为默认缓存已过期(如果system_time< time1 则还没真实失效 ),这时再获取value的lock锁,调用redis的incr函数(单线程自增函数)判断是第几个获取锁的线程,当且仅当是第一个线程时返回1,以后都逐渐递增。第一个访问的线程到数据库中获取最新值重新放入缓存并删除lock锁的key,并重新设置时间戳;在删除lock之前所有访问value客户端线程获取lock的value都大于1,这些线程仍然读取redis中的旧值,而不会集中访问数据库。
1.3 伪代码
private long expirt_time = 1000 * 40 ;//人为过期时间
private long time = 1000 * 60;//一分钟
private long second = 60 * 6;//六分钟
KooJedisClient client =SpringContextUtils.getBean("redisClient", KooJedisClient.class);
private final String user_key ="USER_REDIS";
private final String user_key_lock ="USER_REDIS_lock";
public void setExpireTime( HttpServletRequestrequest ){
StringuserId = request.getParameter( "userId");
//数组里存放:1:真实value ,2:过期时间
Stringkey = org.apache.commons.lang3.StringUtils.join(new Object[]{user_key,userId});
String[]info = client.get( key , String[].class);
longnowTime = System.currentTimeMillis();
if( null!= info ){
longexpireRealTime = new Long( info[1] );
//如果已过期并且是第一个访问的线程
if(nowTime >= expireRealTime ){
Long lockNum = client.incr( user_key_lock+userId ); // 可以实现原子性的递增,可应用于高并发的秒杀活动、分布式序列号生成等场景
if( ( lockNum == 1 || lockNum ==null )){
//重新从数据库获取
User user = teacherDataMaintain.findUserInfo(new Integer(userId));
info[ 0 ] = user.getUserName();
info[ 1 ] =org.apache.commons.lang3.StringUtils.join(new Object[]{(nowTime + expirt_time),""});
client.setex( key ,60, info );//六分后过期
client.del( user_key_lock+userId );
}else{
System.out.println( "缓存过期但不是第一个线程,返回旧值" );
}
}else{
//返回缓存中旧值
System.out.println( "缓存未过期" );
}
}else{
Useruser = teacherDataMaintain.findUserInfo(new Integer(userId));
String[] userInfo = { user.getUserName() ,(nowTime + expirt_time ) +"" };
client.setex( key ,60, userInfo );// 过期
}
}
2.redis 分布式锁应用
2.1 分布式锁主要是解决分布式环境对共享资源的同步访问
在但进程的环境下程序上完全可以用synchronized同步锁来限制多线程对共享资源的访问,但在分布式环境下同步锁无法控制不同进程之间的线程,这种情况下就需要找一种单进程可串行处理的“锁”,redis 就是其中的一种选择,
2.2. 应用场景:
场景1: A 系统于B系统均是分布式部署单台服务器多实例,采用SOA接口方式通信,两个系统需要对共享信息进行实时同步。
1):比如A系统的订单信息需要共享给B,同时B系统会在系统中再保留一个副本,A系统设计到任何关于订单的信息都需要同步给B系统
2):B 接收到A的信息变更后发送至rabbitMQ
3):B负责消费订单信息或变更请求同时保存至数据库。(因为是分布式部署,所有存在多个实例消费一个消息的可能)
4):技术的关键点在当B的多台实例同时消费任务时有可能产生多个任务,但是数据库里只允许保存一条记录。当RabbitMq发生阻塞时会造成消费不及时,等RabbitMq回复后也可能存在多个B的server消费一个消息而对数据库产生多个请求。
最终会导致:
1).数据库事务瞬间处理过多,可能造成死锁,
2).队列中的对象信息有可能是有序的,可能会出现状态的相互覆盖。
例如:秒杀
Redis 缓存中与数据库的库存在秒杀前是一样的,当秒杀开始的时候,同一时间点会有很多客户端访问缓存和数据库,不同的进程同时访问缓存或者数据库,当缓存中的数据变化后并且没被修改之前有可能又被另一个线程获取,数据有可能出现脏读和数据被覆盖的可能。(脏读 < 不可重复读 < 幻读)
2.3 解决思路:
对共享资源的操作要是互斥且良性的竞争,即在分布式条件下怎样做到一次只能有一个线程来处理共享资源,且线程之间不会出现死锁的情况。
2.3.1 几个基本的函数:
Setnx key value :如果没有key 则可以获得锁并返回1 ,如果已存在key 则不做操作并返回0 。
Getset key value :设置value ,并返回key的旧值,若key不存在则返回null
Get key :
2.3.2 死锁
Setnx是单线程处理的,但仍有可能出现死锁
Eg:thread0 操作超时了,但它还持有着锁,thread 1和thread 2读取lock.foo检查时间戳,然后发现超时了;
thread 1 发送DEL lock.foo;
thread 1 发送SETNX lock.foo并且成功了;
thread 2 发送DEL lock.foo;
thread 2 发送SETNX lock.foo并且成功了。
这样一来,thread 1、thread 2都拿到了锁!锁安全性被破坏了!
2.3.3 解决死锁
- thread 3 发送SETNX lock.foo 想要获得锁,由于thread 0 还持有锁,所以Redis返回给thread 3 一个0;
- thread 3 发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试;
- 反之,如果已超时,thread 3 通过下面的操作来尝试获得锁:
GETSET lock.foo <current Unix time + lock timeout + 1>
通过getSet,thread 3 拿到的时间戳如果仍然是超时的,那就说明,thread 3 如愿以偿拿到锁了。 - 如果在thread 3 之前,有个叫thread 4 的客户端比thread 3 快一步执行了上面的操作,那么thread 3 拿到的时间戳是个未超时的值,这时,thread 3 没有如期获得锁,需要再次等待或重试。留意一下,尽管thread 3 没拿到锁,但它改写了thread 4 设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。
2.3.4
1).基于redisson分布式锁框架实现
2).基于SpringRedisTemplate实现分布式锁
3).基于Jedis实现分布式锁
原理一样
public synchronized boolean acquire(Jedis jedis,String lockKey, long expires) throws InterruptedException {
inttimeoutMsecs = 10 * 1000;
inttimeout = timeoutMsecs;
while (timeout >= 0 ) {
String expiresStr = String.valueOf(expires ); // 锁到期时间
if (jedis.setnx( lockKey, expiresStr ) == 1 ) {
// lock acquired
return true;
}
String currentValueStr = jedis.get(lockKey); //redis里的时间
if(currentValueStr!=null&& Long.parseLong(currentValueStr) <System.currentTimeMillis()) {
// 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
// lock is expired
//Getset 命令用于设置指定 key 的值,并返回 key 旧的值。
String oldValueStr = jedis.getSet(lockKey, expiresStr);
// 获取上一个锁到期时间,并设置现在的锁到期时间
// 只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)){
// 如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
// lock acquired
return true;
}
}
timeout -= 100;
Thread.sleep(100); // 每100毫秒重试一次,直至timeout用尽
}
//Expire命令用于设定键有效期。到期时间后键不会在Redis中使用。
returnfalse;
}
当然,方法不是唯一的,也可以不用GetSet方法,单用setnx也可以实现,在while循环里处理线程的sleep时间,这里就不举例了