分布式高级篇(三) - 缓存与分布式锁
缓存
缓存使用
-
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而db承担数据落盘工作
-
哪些数据适合放入缓存?
-
即时性、数据一致性要求不高的
-
访问量大且更新频率不高的数据(读多、写少)
举例:电商类应用,商品分类,商品列表等适合加入缓存并加一个失效时间(根据数据更新频率来定),后台如果发布一个商品,买家需要5分钟才能看到新的商品一般还是可以接受的
#代码演示 data = cache.load(id); //从缓存中加载数据 if(data == null){ data = db.load(id); //从数据库加载数据 cache.put(id,data); //保存到cache中 } return data;
注意:在开发中,凡是放入缓存中的数据我们应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致的问题
-
整合redis作为缓存
本地缓存
-
在单机模式下,本地缓存基本没有什么问题,但是在分布式环境下,本地缓存会存在数据一致性的问题
-
本地模式在分布式下的问题
分布式缓存
-
在分布式系统下,使用分布式缓存
-
整合
redis
-
引入依赖
<!--redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
简单配置
redis
的host
信息#默认安装无密码 spring: redis: host: 192.168.83.133 port: 6379
-
使用
SpringBoot
自动配置好的RedisTemplate
、StringRedisTemplate
来操作redis
//测试 @Test public void testStringRedisTemplate() { ValueOperations<String, String> valueOperations = stringRedisTemplate.opsForValue(); //保存 valueOperations.set("hello", "world_" + UUID.randomUUID().toString()); //查询 System.out.println("之前保存的数据是:"+valueOperations.get("hello")); }
-
-
改造三级分类业务
-
加入
redis
缓存功能,提高吞吐量// 1、加入缓存逻辑 String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson"); if (StrUtil.isNotEmpty(catalogJson)) { //转为指定的对象 Map<String, List<Catalog2Vo>> map = JSON.parseObject(catalogJson,new TypeReference<Map<String,List<Catalog2Vo>>>(){}); return map; } else { //2、缓存中没有,查询数据库 Map<String, List<Catalog2Vo>> catalogJsonFromDB = getCatalogJsonFromDB(); //3、查询的数据再放入缓存,将对象转为json放入缓存 stringRedisTemplate.opsForValue().set("catalogJson", JSON.toJSONString(catalogJsonFromDB)); return catalogJsonFromDB; }
-
压测对比
压测内容 压测线程数 吞吐量/s 90%响应时间 99%响应时间 三级分类数据获取
localhost:12000/index/catalog.json50 3.8(db) 13205 13519 三级分类数据获取(多次查库变一次查库)
localhost:12000/index/catalog.json50 218 330 536 三级分类数据获取(redis缓存)
localhost:12000/index/catalog.json50 1407 48 73 -
内存泄漏及解决办法
当进行压力测试时后期后出现堆外内存溢出OutOfDirectMemoryError
产生原因:
1)、springboot2.0以后默认使用lettuce操作redis的客户端,它使用通信
2)、lettuce的bug导致netty堆外内存溢出
解决方案:由于是lettuce的bug造成,不能直接使用-Dio.netty.maxDirectMemory去调大虚拟机堆外内存1)、升级lettuce客户端。 2)、切换使用jedis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
-
缓存失效问题
缓存穿透
- 缓存穿透:指查询一个一定不存在的数据,由于缓存没有命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
- 风险:利用不存在的数据进行攻击,数据库瞬间压力增大,最终导致崩溃
- 解决:将null结果也缓存 0/1(true/false),并加入短暂的过期时间
缓存雪崩
- 缓存雪崩:缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬间压力过重雪崩
- 解决:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件
缓存击穿
-
缓存击穿:对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发的访问,是一种非常“热点”的数据
如果这个key在大量请求同时进来前正好失效,那么所有对这个Key的数据查询都落到db,称为缓存击穿
-
解决:加锁
大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取锁,先查缓存,就会有数据,不用去db
分布式锁
分布式锁原理
-
加锁解决缓存击穿(单机模式)
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDB() { //只要是同一把锁,就能锁住需要这个锁的所有线程 //1、synchronized(this):SpringBoot所有的组件在容器中都是单例的 synchronized (this) { //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询 String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson"); if (StrUtil.isNotEmpty(catalogJson)) { //如果缓存不为null,直接返回 Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJson,new TypeReference<Map<String,List<Catalog2Vo>>>(){}); return result; } /** * 1、将数据库的多次查询变为一次 */ List<CategoryEntity> selectList = baseMapper.selectList(null); //1、查出所有1级分类 List<CategoryEntity> firstLevelCategroys = getParent_cid(selectList, 0L); //2、封装数据 Map<String, List<Catalog2Vo>> map = firstLevelCategroys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), item -> { //2.1、查到这个一级分类下的所有二级分类 List<CategoryEntity> category2Entities = getParent_cid(selectList, item.getCatId()); List<Catalog2Vo> catalog2Vos = null; if (CollUtil.isNotEmpty(category2Entities)) { catalog2Vos = category2Entities.stream().map(categoryEntity2 -> { Catalog2Vo catalog2Vo = new Catalog2Vo(item.getCatId().toString(), null, categoryEntity2.getName(), categoryEntity2.getCatId().toString()); //2.2、查找当前二级分类下的三级分类 List<CategoryEntity> category3Entities = getParent_cid(selectList, categoryEntity2.getCatId()); List<Catalog2Vo.Catalog3Vo> catalog3VoList = category3Entities.stream().map(categoryEntity3 -> { Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(categoryEntity2.getCatId().toString(), categoryEntity3.getName(), categoryEntity3.getCatId().toString()); return catalog3Vo; }).collect(Collectors.toList()); catalog2Vo.setCatalog3List(catalog3VoList); return catalog2Vo; }).collect(Collectors.toList()); } return catalog2Vos; })); stringRedisTemplate.opsForValue().set("catalogJson", JSON.toJSONString(catalogJsonFromDB), 1, TimeUnit.DAYS); return map; } }
-
分布式环境下如何加锁
-
本地锁--时序问题
-
分布式锁演进
-
分布式锁的两个核心
- 原子加锁
- 原子解锁(Lua脚本)
-
基本原理:我们可以同时去一个地方占位置,如果占到,就执行逻辑。否则就必须等待,直到释放锁。占位置可以去
redis
,可以去数据库,可以去任何大家能访问的地方。等待可以自旋的方式SET key value [EX seconds] [PX milliseconds] [NX|XX] EX seconds – 设置键key的过期时间,单位时秒 PX milliseconds – 设置键key的过期时间,单位时毫秒 NX – 只有键key不存在的时候才会设置key的值 XX – 只有键key存在的时候才会设置key的值
分布式锁就是基于
set
模拟带了NX
参数来做的 -
阶段一
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() { //1、占分布式锁,去redis占位置 //1.1、setIfAbsent 就是redis的 setNX命令 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111"); if (lock) { //加锁成功 //设置锁的过期时间,避免死锁 stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS); //业务执行成功,删除锁 Map<String, List<Catalog2Vo>> dataFromDb = getDataFromDb(); stringRedisTemplate.delete("lock"); return dataFromDb; } else { //加锁失败 //休眠100ms重试 //自旋 return getCatalogJsonFromDBWithRedisLock(); } }
-
阶段二
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() { //1、占分布式锁,去redis占位置 //1.1、setIfAbsent 就是redis的 setNX命令 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111", 30, TimeUnit.SECONDS); if (lock) { //加锁成功 //设置锁的过期时间,避免死锁,必须和加锁是同步的、原子的 //stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS); //业务执行成功,删除锁 Map<String, List<Catalog2Vo>> dataFromDb = getDataFromDb(); stringRedisTemplate.delete("lock"); return dataFromDb; } else { //加锁失败 //休眠100ms重试 //自旋 return getCatalogJsonFromDBWithRedisLock(); } }
-
阶段三
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() { //1、占分布式锁,去redis占位置 //1.1、setIfAbsent 就是redis的 setNX命令 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid, 300, TimeUnit.SECONDS); if (lock) { //加锁成功 //设置锁的过期时间,避免死锁,必须和加锁是同步的、原子的 //stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS); //业务执行成功,删除锁 Map<String, List<Catalog2Vo>> dataFromDb = getDataFromDb(); String lockValue = stringRedisTemplate.opsForValue().get("lock"); if (uuid.equals(lockValue)) { //删除我自己的锁 stringRedisTemplate.delete("lock"); } return dataFromDb; } else { //加锁失败 //休眠100ms重试 //自旋 return getCatalogJsonFromDBWithRedisLock(); } }
-
阶段四
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() { //1、占分布式锁,去redis占位置 //1.1、setIfAbsent 就是redis的 setNX命令 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if (lock) { //加锁成功 //设置锁的过期时间,避免死锁,必须和加锁是同步的、原子的 //stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS); //业务执行成功,删除锁 Map<String, List<Catalog2Vo>> dataFromDb = getDataFromDb(); //获取值对比+对比成功删除 这两步也必须是原子操作 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } //删除锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; stringRedisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class), Arrays.asList("lock"), uuid); return dataFromDb; } else { //加锁失败 //休眠100ms重试 //自旋 return getCatalogJsonFromDBWithRedisLock(); } }
-
阶段五——最终形态
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDBWithRedisLock() { //1、占分布式锁,去redis占位置 //1.1、setIfAbsent 就是redis的 setNX命令 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if (lock) { //加锁成功 //设置锁的过期时间,避免死锁,必须和加锁是同步的、原子的 //stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS); //业务执行成功,删除锁 Map<String, List<Catalog2Vo>> dataFromDb; try { dataFromDb = getDataFromDb(); }finally { //删除锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; stringRedisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class), Arrays.asList("lock"), uuid); } //获取值对比+对比成功删除 这两步也必须是原子操作 // String lockValue = stringRedisTemplate.opsForValue().get("lock"); // if (uuid.equals(lockValue)) { // //删除我自己的锁 // stringRedisTemplate.delete("lock"); // } return dataFromDb; } else { //加锁失败 //休眠100ms重试 //自旋 return getCatalogJsonFromDBWithRedisLock(); } }
测试分布式锁
-
启动四个商品微服务,进行压力测试
-
本地锁情况下:打印了四次
查询了数据库
,只能锁住当前微服务的线程 -
分布式锁情况下:只打印了一次
查询了数据库
,锁住了全部
-
Redisson
-
官方文档地址
-
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上
Redisson采用了基于NIO的Netty框架,不仅能作为Redis底层驱动客户端,具备提供对Redis各种组态形式的连接功能,对Redis命令能以同步发送、异步形式发送、异步流形式发送或管道形式发送的功能,LUA脚本执行处理,以及处理返回结果的功能,还在此基础上融入了更高级的应用方案
整合redisson
-
第一步:引入依赖
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <!-- 使用redisson作为所有分布式锁,分布式对象等功能框架 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.14.1</version> </dependency>
-
第二步:配置redisson
@Configuration public class RedissonConfig { /** * 所有对redisson的使用 都是通过RedissonClient对象 * @return * @throws IOException */ @Bean(destroyMethod="shutdown") RedissonClient redisson() throws IOException { //1、创建配置 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.83.133:6379"); //、根据Config 创建出RedissonClient实例 RedissonClient redissonClient = Redisson.create(config); return redissonClient; } }
-
第三步:测试配置
@Resource private RedissonClient redissonClient; @Test public void testRedisson() { System.out.println(redissonClient); }
-
可重入锁(Reentrant Lock)
-
死锁:B等A释放锁再执行,A等B执行完再释放锁
-
基于Redis的Redisson分布式可重入锁
RLock
Java对象实现了java.util.concurrent.locks.Lock
接口-
测试
@ResponseBody @GetMapping("/hello") public String hello() { //1、获取一把锁,只要锁的名字一样,就是同一把锁 RLock lock = redissonClient.getLock("my-lock"); //2、加锁 //阻塞式等待 lock.lock(); /** * Redisson解决了两大难点 * * 1. 锁的自动续期,如果业务超长,运行期间会自动给锁续上新的30s,不用担心业务实际长,锁自动过期而被删除 * 2. 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除 */ try { System.out.println("加锁成功,执行业务"+Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //3、解锁 假设解锁代码没有运行,redis会不会出现死锁 System.out.println("释放锁...."+Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
-
模拟测试:启动两个商品微服务 12000、120001 都请求hello,当12000服务,加锁成功未释放锁之前,中断服务,观察12001 是否还能抢占锁资源,redis是否会出现死锁问题
观察发现:12001可以抢占锁资源,不会出现死锁问题
-
-
如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定
另外Redisson还通过加锁的方法提供了
leaseTime
的参数来指定加锁的时间。超过这个时间后锁便自动解开// 加锁以后10秒钟自动解锁 // 无需调用unlock方法手动解锁 lock.lock(10, TimeUnit.SECONDS); // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
Lock看门狗原理(redisson如何解决死锁)
-
Redisson解决了两大难点
- 锁的自动续期,如果业务超长,运行期间会自动给锁续上新的30s,不用担心业务实际长,锁自动过期而被删除
- 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
-
总结
-
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的时间】
-
定时任务执行:【看门狗时间】/3
启动测试用例,观察redis客户端工具,会发现每10秒过后,就会再次自动续期,TTL会续成满时间30s
-
-
最佳实战(指定锁超时时间)
-
#超时时间 一定要大于业务执行时间 lock.lock(100, TimeUnit.SECONDS);
-
如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间 如果我们未指定超时时间,就使用30*1000【LockWatchdogTimeOut看门狗的默认时间】
-
指定锁的超时时间,可以省掉整个续期的操作,执行成功后,手动解锁。就算不手动,超时时间到达也会自动解锁
-
读写锁(ReadWriteLock)
-
官方文档示例
-
保证一定可以读到最新数据,修改期间,写锁是一个排他锁(互斥锁),读锁是一个共享锁。写锁没有释放,读操作就必须等待
-
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态
-
先写再读:等待写锁释放
-
先写再写:阻塞,下个等待上个挨个释放
-
先读再写:写锁等待读锁释放
-
并发读:互不影响,相当于无锁,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
总之:只要有写锁的存在,都必须等待
-
信号量(Semaphore)
-
官方文档示例
-
举例描述(使用场景):
停车场有30个车位,来一辆车需要占用一个车位,走一辆车就要释放一个车位,想要停车就要看车位够不够,这种场景就可以使用信号量
-
模拟场景测试
-
park 占用一个车位
-
go 释放一个车位
-
连续占用3个车位,第四次请求park,查看反应
请求进入阻塞状态,等待go操作释放,这时候请求go释放车位,会发现刚刚阻塞的park操作,立马返回ok
-
-
信号量也可以用作分布式限流
闭锁(CountDownLatch)
-
官方文档示例
-
使用场景:
放假,学校锁门: 1班没人了,2班没人了...,直到5个班全部走完,才可以锁校门
#锁门操作 http://localhost:12000/lockDoor #一班全部走完操作 http://localhost:12000/goHome/1
缓存数据一致性
双写模式
-
最终一致性
失效模式
-
失效模式
解决方案
-
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新都会出事,怎么办?
-
如果是用户维度数据(订单数据,用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间出发读的主动更新即可
-
如果是菜单,商品介绍等基础数据,也可以使用canal订阅binlog的方式
-
缓存数据+过期时间也足够解决大部分业务对应缓存的要求
-
通过加锁保证并发读写,写写的时候按顺序排好队
读读无所谓,所以适合使用读写锁(业务不关心脏数据,允许临时脏数据可忽略)
-
-
总结
- 我们能放入缓存的数据就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每条拿到当前最新数据即可
- 我们不应该过度设计,增加系统的复杂性
- 遇到实时性、一致性的要求高的数据,就应该查数据库,即使慢点
Spring Cache
简介
-
官方文档地址
-
从版本3.1开始,Spring 框架就支持向现有的 Spring 应用程序透明地添加缓存。与事务支持类似,缓存抽象允许一致地使用各种缓存解决方案,对代码的影响最小。
org.springframework.cache.Cache
和org.springframework.cache.CacheManager
接口来统一不同的缓存技术。并支持使用JCache(JSR-107)注解简化开发 -
每次调用需要缓存功能的方法时,Spring会检查指定参数的 指定的目标方法是否已经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取
-
使用Spring缓存抽象时我们需要关注一下两点:
- 1、缓存声明: 确定需要缓存的方法及其策略
- 2、缓存配置: 存储数据并从中读取数据的后备缓存
基础概念
-
结构图
整合SpringCache简化开发
第一步引入依赖
<!--cache-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
第二步 添加配置
-
查看自动配置了哪些
CacheAutoConfiguration
会导入RedisCacheConfiguration
- 自动配好了缓存管理器
RedisCacheManager
- 自动配好了缓存管理器
-
我们需要手动配置哪些
-
配置指定使用redis作为缓存
spring: cache: type: redis
-
第三步测试使用缓存(注解)
-
第一步:开启缓存功能
@EnableCaching
@cacheable
: 触发将数据保存到缓存
-
代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存
redis
中缓存内容的key是自动生成的:缓存的名字::SimpleKey
(自主生成的key值)缓存的value的值,默认使用
jdk
序列号机制,将序列号后的数据存到redis默认时间 ttl 时间是 -1,不符合规范,需要自定义
/** * @Cacheable * 代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存 * 每一个需要缓存的数据,我们都来指定要放到哪个名字的缓存【缓存的分区(推荐按照业务类型分)】 * * @return */ @Cacheable({"category"}) @Override public List<CategoryEntity> getFirstLevelCategroys() { System.out.println("getFirstLevelCategroys...."); List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); return categoryEntities; }
@cacheevict
: 触发将数据从缓存删除 (失效模式)
-
失效模式案例
确定缓存中已经存在key为
CACHE_level1Categorys
的缓存数据,然后调用更新接口,更新成功后,会发现,缓存的数据key为CACHE_level1Categorys
被删除,下次访问首页,会重新生成
@cacheput
: 在不干扰方法执行的情况下更新缓存 (双写模式)
- 在有返回值的接口上加上该注解,会在返回值的同时,将值写入到缓存,即使不是最新的数据,但当接口稳定之后,最终缓存数据是一致的
@caching
: 组合以上多个操作
-
同时进行多种缓存操作
-
第二种:直接删除某个分区下的缓存,也能达到操作多个缓存的效果
存储同一个类型的数据,都可以指定为同一个分,分区名默认就是缓存的前缀。这样处理的好处,当有修改更新操作时,可以将整个分区删除
@cacheconfig
: 在类级别上共享一些常见的缓存相关设置
表达式语法使用
SpEL
-
语法解释
自定义缓存配置
-
1、指定生成的缓存使用key
key属性指定,接受一个SpEL
#设置key名称 @Cacheable(value = {"category"},key = "'level1Categorys'") @Cacheable(cacheNames="books", key="#isbn") public Book findBook(ISBN isbn, boolean checkWarehouse, boolean includeUsed) @Cacheable(cacheNames="books", key="#isbn.rawNumber") public Book findBook(ISBN isbn, boolean checkWarehouse, boolean includeUsed) #同步缓存:可以使用 sync 属性来指示底层缓存提供程序在计算值时锁定缓存条目。因此,只有一个线程忙于计算值,而其他线程则被阻塞,直到在缓存中更新条目 @Cacheable(cacheNames="foos", sync=true) public Foo executeExpensiveOperation(String id) {...} #Conditional Caching 条件缓存:只有当参数名称的长度小于32时,下面的方法才会被缓存 @Cacheable(cacheNames="book", condition="#name.length() < 32") public Book findBook(String name)
-
2、指定缓存的数据存活时间
配置文件中修改
spring: cache: type: redis redis: time-to-live: 3600000
-
3、将数据保存为json格式
-
原理:加载过程
CacheAutoConfiguration
-->RedisCacheConfiguration
-->自动配置了
RedisCacheManager
-->初始化所有的缓存 -->每个缓存决定使用什么配置-->如果
RedisCacheConfiguration
有就用已有的,没有就用默认配置-->想改缓存的配置,只需要给容器中放一个
RedisCacheConfiguration
即可-->就会应用到当前
RedisCacheManager
管理的所有缓存中 -
添加配置类
com.touch.air.mall.product.config.CacheConfig
@EnableConfigurationProperties(CacheProperties.class) @Configuration @EnableCaching public class CacheConfig { /** * 配置文件中的东西没用上 * * 1、原来和配置文件绑定的配置类是这样子的 * @ConfigurationProperties(prefix = "spring.cache") * public class CacheProperties * * 2、要让他生效 导入 * @EnableConfigurationProperties(CacheProperties.class) * * @return */ @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig(); redisCacheConfiguration = redisCacheConfiguration.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); redisCacheConfiguration = redisCacheConfiguration.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); //将配置文件中的所有配置,都在这里让其生效 CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { redisCacheConfiguration = redisCacheConfiguration.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { redisCacheConfiguration = redisCacheConfiguration.prefixKeysWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { redisCacheConfiguration = redisCacheConfiguration.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { redisCacheConfiguration = redisCacheConfiguration.disableKeyPrefix(); } return redisCacheConfiguration; } }
-
缓存穿透问题解决
-
SpringCache也考虑到了缓存穿透问题
添加配置即可
spring: cache: type: redis redis: time-to-live: 3600000 #key前缀 区别redis中其他非缓存数据 key-prefix: CACHE_ #key前缀是否使用 use-key-prefix: true #是否缓存空值,防止缓存穿透 cache-null-values: true
SpringCache原理与不足
读模式
-
缓存穿透:查询一个一定不存在的数据 解决:缓存空数据,并设置过期时间;ache-null-value=true
-
缓存雪崩:大量key同时过期 解决:过期时间加随机时间:
spring.cache.redis.time-to-live
-
缓存击穿(*):大量并发同时查询一个正好过期的数据 解决方案:加锁,先只放一个过去查?Spring Cache默认是无加锁的
sync=true
解决击穿
写模式(保证缓存与数据库一致)
-
写模式 Spring Cache 没有加锁
-
读写加锁(写操作多的,并发高的不合适,一致阻塞)
-
引入Canal,感知到MySQL的更新去更新数据库
-
读多写多的,直接去数据库查
原理
- CacheManager(RedisCacheManager) --> Cache(RedisCache) --> Cache负责缓存的读写
总结
-
什么时候使用Spring Cache?
答:常规数据(读多写少,即时性,一致性要求不高的数据),完全可以使用Spring Cache;写模式(只要缓存的数据有过期时间即可)
特殊数据:特殊设计