1.
学习Redis,我们首先需要简单学习一下NOSQL的特点,有助于让我们理解,什么是Redis,为什么使用Redis
NoSQL = Not Only SQL(不仅仅是SQL)泛指非关系型数据库
关系型数据库:表,行,列
非关系型数据库:数据类型的存储不需要固定的一个格式!不需要过多的操作,就可以横向扩展;
NOSQL--解耦!
1:方便扩展(数据之间没有关系,很好扩展);
2:大数据量高性能;
3:数据类型是多样型的(不需要事先设计数据库,随取随用);
4:传统关系型数据库和NoSQL的区别
传统的 RDBMS -结构化组织 -SQL -数据和关系都存在单独的表中 -数据操作语言,数据定义语言 -严格的一致性 -基础的事务操作 NoSQL -不仅仅是数据 -没有固定的查询语言 -键值对存储,列存储,文档存储,图形数据库等 -最终一致性 -CAP定理和BASE -高性能,高可用,高可扩。。。。
2.NoSQL的分类
KV键值对
2.1文档型数据库(bson格式和json格式一样)
-
MongoDB MongoDB是一个介于分布式文件存储的数据库,C++编写,主要来处理大量的文档 介于关系型和非关系型数据库中间的产品,是非关系型数据库中功能最丰富,最像关系型数据库的;
2.2列存储数据库
-
HBase
-
分布式文件系统
2.3图关系数据库
存的不是图片,是图片关系
3.Redis入门
3.1.概述
即远程字典服务,是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API
redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。也被称之为结构化数据库
3.2.Redis能干嘛?
-
内存存储,持久化,内存中断电即失,所以持久化很重要
-
效率高,可以用于高速缓存
-
发布订阅系统,地图信息分析,计时器,计数器。。。。
3.3.Redis的特性
多样的数据类型,持久化,集群,事务。。。。。。
3.4.Linux基本命令
Redis默认安装路径 /usr/local/bin 启动Redis redis -server kconfig/redis.conf 测试连接 redis-cli -p 6379 查看Redis进程是否开启 ps -ef|grep redis 如何关闭Redis服务 shutdown 查看所有key keys * 确定某个key是否存在 EXISTS key 移除某个key del key 从某个数据库移除 del key index 进行数据库切换 redis默认有16个数据库,默认使用的是第0个 select index dbsize 查看数据库大小 清空当前库 flushdb 清空所有库 flushall 设置key的过期时间 EXPIRE key 时间 查看当前key的剩余时间 ttl key 判断当前key的数据类型 type key
3.5.Redis是单线程
Redis速度很快,基于内存操作,CPU并不是Redis的瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程,就使用了单线程;
单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程
核心:为什么Redis是的单线程还这么快?首先高性能的服务器不一定都是多线程的,并且,多线程不一定就比单线程速度快;Redis是将所有的数据全部放在内存中,所以说使用单线程的操作效率就是最高的,多线程(CPU上下文切换,竞争条件都是耗时的操作),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存情况下,这个就是最佳的方案。
总结:
(1) 绝大部分请求是纯粹的内存操作(非常快速)
(2) 采用单线程,避免了不必要的上下文切换和竞争条件
(3) 非阻塞IO - IO多路复用
4.Redis五大数据类型
-
String
-
类似的使用场景:value除了是我们的字符串还可以是我们的数字
-
计数器
-
统计多单位的数量
-
粉丝数
-
对象缓存存储
1 1:往String类型数据后面追字符串,如果当前key不存在就相当于set key 2 append key "字符串" 3 2:获取mougekey的长度 4 strlen key 5 3:自增 自减 6 incr key (自增1) 7 decr key (自减1) 8 incrby key number 设置步长指定增量 9 decrby key number 设置步长减去指定增量 10 4:获取字符串的范围(截取字符串) 11 getrange key start end 12 getrange key 0 -1(查看全部字符串) 13 5:替换字符串 14 setrange key start "XX替换的内容" 15 6: 16 setex key second value 17 (set with expire)设置过期时间,如果存在直接覆盖,再设置过期时间,不存在先新增再设置过期时间; 18 19 setnx key value 20 (set if not exist) 21 不存在再设置,存在则不设置(不覆盖原有值),在分布式锁中常常使用。 22 7:批量获取值和设置值 23 mset k1 v1 k2 v2 k3 v3..... 24 mget k1 k2 k3 k4 25 msetnx k1 v1 k2 v2(不存在再设置,存在就不设置)(原子性操作,要么一起成功,要么一起失败) 26 8: 27 mset user:1:name zhangsan user:1:age 12 28 mget user:1 name user:1:age 29 9: 30 getset key value 31 先get再set;如果不存在返回null,然后set会自动创建key;如果存在,返回已存在的key,然后set会替换掉已存在的值
-
-
List
-
在Redis中,可以把list玩成栈,队列,阻塞队列
-
所有的list命令都是以l开头的
-
他实际是一个链表,before Node after left right 都可以插入值;
在两边插入或者改动值,比处理中间值效率高一些
-
使用场景:消息排队;消息队列;栈
1 1:存值 2 lpush key value 3 2:按范围取值 4 lrange key start stop 5 3:往尾部(右边)添加值 6 rpush key value 7 4:移除列表第一个元素,或最后一个元素(移除左,右的值) 8 rpop key 9 5:按下标取值 10 lindex key index 11 6:查看list长度 12 llen key 13 7:移除指定值 14 lrem key count value 15 8:通过下标截取指定长度 16 ltrim key start stop 17 9:移除某个列表最后一个值((source是某个列表的key)),添加到新列表中(destination是新列表的key) 18 rpoplpush source destination 19 10:按指定下标替换值,不存在的key或者index会报错 20 lset key index value 21 11:往指定key的指定value(pivot)前或者后添加指定值 22 linsert key BEFORE|AFTER pivot value
-
-
SET
-
set中的值是不能重复的
-
无需不重复集合
-
数字集合集,比如B站微博等的共同关注
1 1:按key存值 2 sadd key member 3 2:查看指定key的所有值 4 smembers key 5 3:判断某个key中是否包含某值 6 sismember key member 7 4:获取key中元素的个数 8 scard key 9 5:移除key中的指定值 10 srem key member 11 6:随机取key中的值 12 (count是取出随机值的个数,不指定个数,随机抽出一个值) 13 srandmember key [count] 14 7:随机删除元素 15 spop key [count] 16 8:将一个指定值,移到另一个set集合中(如果目标集合不存在,则会创建一个新的集合) 17 smove source destination member 18 9:查集合的差集(以第一个key为参照物) 19 sdiff key [key ...] 20 10:查集合的交集 21 sinter key [key ...] 22 11:查集合的并集 23 sunion key [key ...]
-
-
Hash(哈希)
-
value值可以想象成一个Map集合,key-map
-
本质和String类型没有太大区别
-
应用场景:做一些变更数据的保存,或者用户信息的保存都可以,Hash更适合对象的存储,而String更适合字符串的存储
1 1:存 2 hset key field value 3 2:取 4 hget key field 5 3:批量存 6 hmset key field value [field value ...] 7 4:批量取 8 hmget key field [field ...] 9 5:取出所有键值对 10 hgetall key 11 6:删除指定值 12 hdel key field [field...] 13 7:获取指定key的长度 14 hlen key 15 8:判断key中值是否存在 16 hexists key field 17 9:只获得key中所有的field 18 hkeys keys 19 10:只获得key中所有的value 20 hvals key 21 11:指定field中的value值自增 22 (increment是自增的数量,相当于加法) 23 hincrby key field increment 24 12:hsetnx key field value
-
-
Zset
-
有序集合
-
应用场景:存储成绩,工资表排序等;带权重进行判断;排行榜
1 1:添加 2 score相当于一个排序标志(会按这个自动排序) 3 zadd key score member [score member...] 4 2:查看(正序或者倒序) 5 zrange key start stop [withscores] 6 zrevrange key start stop [withscores] 7 3:从小到大排序 8 (-inf +inf 负无穷到正无穷) 9 zrangebyscore key min max [withscores] [limit offset count] 10 4:移除元素 11 zrem key member [member...] 12 5:获取长度 13 zcard key 14 6:获取指定区间的个数 15 zcount key min max
-
5.三种特殊的数据类型
-
geospatial 地理空间
-
将指定的地理位置(纬度,经度,名称)添加到指定的key中;这个功能可以推算地理位置的信息,两地之间的距离等(java中直接批量导入城市信息即可);
-
GEO底层的实现原理就是Zset ,因此我们可以使用Zset命令来操作geo
1 1:添加指定地理位置(longitude,latitude 经纬度) 2 南极和北极无法添加;经纬度也有一定的范围 3 GEOADD key longitude latitude member [longitude latitude member] 4 5 2:返回两个给定位置之间的距离(计算距离时,会假定地球是个完美的球形) 6 如果两个位置之间的其中一个不存在,命令返回空值;指定单位的参数unit必须是以下单位的其中一个;m(米)km(千米)mi(英里)ft(英尺);如果没有指定单位,那么默认用米做单位 7 GEODIST key member1 member2 [unit] 8 9 3:返回一个或多个位置元素的Geohash表示(二位经纬度转换为一维的字符串) 10 GEOHASH key member [member...] 11 12 4:获取指定的地理位置信息 13 GEOPOS key member 14 15 5:以给定的经纬度为中心,找出一定半径内的元素(按坐标为范围查) 16 GEORADIUS key longitude latitude redius unit [withcoord] [withdist][withhash] [count] 17 18 6:找出位于指定范围内的元素,中心点是由给定的位置元素决定(按城市为范围查) 19 GEORADIUSBYMEMBER key member redius unit [withcoord] [withdist][withhash] [count] 20 21 7:zset的相关命令都可以使用
-
-
hyperloglog
-
基数统计的算法;
-
优点:占用的内存是固定的,而且非常小,2^64仅占用12KB内存;有一定的容错率
-
网页UV(一个人访问一个网站多次,但是还算是一个人)
传统方式,用set保存用户的id,然后通过统计set中元素的数量作为判断用户个数的标准;这种方式如果保存大量的用户id,就会比较麻烦,我们的目的是为了计数而不是为了保存用户id;
1 1:存 2 pfadd key element [element...] 3 2:统计元素(两个key以上时,自动去重计算数量) 4 pfcount key [key...] 5 3:合并集合(可以两个集合合并,也可以新命名一个集合,用于合并其他集合) 6 pfmerge destkey sourcekey [sourcekey...]
-
-
bitmaps
-
位图 位存储
-
都是操作二进制位来进行记录,只有0和1两种状态
-
对于只有两种状态的,比如,用户打卡没打卡。登录没登录等,都可以使用bitmaps
1 1:存 value只有0或者1,offset偏移量,位数,可以从0开始 2 setbit key offset value 3 2:查看某一天,某个偏移量的状态 4 getbit key offset 5 3:统计(统计为1的数量) 6 bitcount key [start end]
-
6.事务
-
Redis单条命令是保证原子性的,但是Redis的事务是不保证原子性的;Redis事务没有隔离级别的概念
所有命令在事务中,并没有直接被执行,只有发起执行命令的时候才会被执行;
-
Redis事务的本质:一组命令的集合;一个事务中的所有命令都会被序列化,在事务执行过程中,会被按照顺序执行;一次性,顺序性,排他性;
-------队列set set set 执行-----
-
Redis的事务:
- 开启事务(命令:multi)
- 命令入队(添加相关命令)
- 执行事务(命令:exec) -
中途放弃事务(事务队列中的命令都不会执行)
dicard
-
事务中的异常
类比java
-
编译型异常(代码有问题;命令有错),事务中的所有命令都不会执行;
-
运行时异常(1/0),如果事务队列中存在语法型错误,其他命令可以正常执行;错误命令会抛出异常;
-
7.监控
一旦事务执行成功,监控就会自动取消掉;当做乐观锁操作
Watch 监视
unwatch 放弃监视
多线程修改值,事务开启之前进行监视;一旦有其他线程在途中修改监视值,最后事务会提交失败;举例(开启两个Redis连接即可简单测试)
线程1
1 127.0.0.1:6379> set money 100 2 OK 3 127.0.0.1:6379> set out 0 4 OK 5 127.0.0.1:6379> watch money 6 OK 7 127.0.0.1:6379> multi 8 OK 9 127.0.0.1:6379> decrby money 20 10 QUEUED 11 127.0.0.1:6379> incrby out 20 12 QUEUED
此时线程2进入
1 127.0.0.1:6379> get money 2 "100" 3 127.0.0.1:6379> incrby money 1000 4 (integer) 1100 5 127.0.0.1:6379>
再回到线程1结束事务(返回nil说明事务执行失败;可以先解锁unwatch,再获取锁watch)
1 127.0.0.1:6379> exec 2 (nil) 3 127.0.0.1:6379>
8.Jedis
-
Jedis是Redis官方推荐的Java连接开发工具;使用Java操作Redis中间件;
-
导入相关依赖
<!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
-
编码测试
-
连接数据库
-
操作命令
-
断开连接
事务代码简单示例
-
-
public class TestPing { public static void main(String[] args) { //new Jedis对象即可 Jedis jedis =new Jedis("127.0.0.1",6379); //Jedis所有的命令 之前的指令在这里就是一个个方法 System.out.println(jedis.ping()); //关闭连接 jedis.close(); } } 输出:PONG
-
public class TestTX { public static void main(String[] args) { //连接 Jedis jedis = new Jedis("127.0.0.1",6379); //开启事务 Transaction mutil = jedis.multi(); try { //命令操作 mutil.set("k1","1"); mutil.set("k2","2"); mutil.set("k3","3"); //执行事务 mutil.exec(); }catch (Exception e) { //放弃事务 mutil.discard(); e.printStackTrace(); } finally { System.out.println(jedis.mget("k1","k2","k3")); //关闭连接 jedis.close(); } } }
9.常用API
五大数据类型,三种特殊数据类型;指令就是方法
10.SpringBoot整合
-
说明:在SpringBoot2.x之后,原来使用的jedis被替换成了lettuce
jedis:底层采用的是直连,如果多线程操作,是不安全的,如果想要避免不安全,使用jedis pool连接池!(更像BIO模式)
lettuce:采用netty(实体类必须序列化),实例可以在多个线程中共享,不存在线程不安全的情况,可以减少线程数量;(更像NIO模式)
-
源码分析:
@Configuration(proxyBeanMethods = false) @ConditionalOnClass({RedisOperations.class}) @EnableConfigurationProperties({RedisProperties.class}) @Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class}) public class RedisAutoConfiguration { public RedisAutoConfiguration() { } @Bean @ConditionalOnMissingBean( name = {"redisTemplate"}) @ConditionalOnSingleCandidate(RedisConnectionFactory.class) //这个注解的意思是,当这个bean不存在的时候,这个类就生效,所以也就是说,我们可以自定义一个redisTemplate来替换这个默认的redisTemplate public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { //默认的RedisTemplate 没有过多的设置,redis 对象都是需要序列化! //两个泛型都是object类型,后面使用需要强制转换 RedisTemplate<Object, Object> template = new RedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean//由于String是Redis中最常使用的类型,所以说源码单独提出来一个bean @ConditionalOnSingleCandidate(RedisConnectionFactory.class) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } }
-
整合测试(RedisProperties.class源码中,可以看到配置相关的信息)
-
下面测试使用的是默认的api,实际开发中都是使用自己重新封装的工具
-
导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
-
配置连接(连接配置相关信息,建议都使用lettuce相关;因为jedis某些配置在springboot中不生效了)
spring.redis.host=127.0.0.1 spring.redis.port=6379
-
测试
@SpringBootTest class RedisSpringbootApplicationTests { @Autowired private RedisTemplate redisTemplate; @Test void contextLoads() { //实际企业开发的过程中。都不会使用这种原生的方式;一般使用工具类,RedisUtils等; /** * redisTemplate 操作不同数据类型 api和学的指令是一样的 * opsForValue 操作字符串 类似String * opsForList 操作List 类似List * ...... */ /** * 除了上述基本操作,常用的方法都可以通过redisTemplate来操作,事务,CRUD;redisTemplate都直接列出来了,可以直接使用 * 比如:redisTemplate.multi(); */ /** * 获取redis的连接对象 * RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); * connection.flushDb(); * connection.flushAll(); */ redisTemplate.opsForValue().set("test", "jjjjj"); System.out.println(redisTemplate.opsForValue().get("test")); //实际开发一般使用json来传递对象;传递对象时,所有的对象都需要序列化,不然会报错 @Test public void test1() { User user = new User(); user.setName("jpc"); user.setAge("3"); String us = null; //下面是两种转String的方式,都可以实现自动序列化;或者对象直接implements Serializable也可以 // try { // us = new ObjectMapper().writeValueAsString(user); // } catch (Exception e) { // e.printStackTrace(); // } us =JSONObject.toJSONString(user); redisTemplate.opsForValue().set("uesr",us); System.out.println(redisTemplate.opsForValue().get("uesr")); } } }
-
-
序列化
从RedisTemplate源码中可以看到,默认是通过jdk序列化的
this.defaultSerializer = new JdkSerializationRedisSerializer
(命令行中容易转义,导致乱码);我们可以重新写一个redisTemplate配置类来代替默认的redisTemplate
@Configuration public class RedisConfig { //编写我们自己的 redisTemplate;下面模板可以直接使用 //格式可以按照源码中的格式处理 @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate(); //默认的连接工厂 template.setConnectionFactory(redisConnectionFactory); //下面是重新配置redisTemplate具体的序列化方式 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); //用ObjectMapper转义 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer s = new StringRedisSerializer(); //key采用String序列化方式 template.setKeySerializer(s); //hash采用String序列化方式 template.setKeySerializer(s); //value采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); //hash的value template.setHashKeySerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
-
企业开发中一般使用到工具类,公司一般自己封装,
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @Component public final class RedisUtils { @Autowired private RedisTemplate<String, Object> redisTemplate; // =============================common============================ /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { redisTemplate.delete(key[0]); } else { redisTemplate.delete(String.valueOf(CollectionUtils.arrayToList(key))); } } } // ============================String============================= /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * @param key 键 * @param delta 要增加几(大于0) */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } // ================================Map================================= /** * HashGet * @param key 键 不能为null * @param item 项 不能为null */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map<Object, Object> hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * @param key 键 * @param map 对应多个键值 */ public boolean hmset(String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } /** * hash递增 如果不存在,就会创建一个 并把新增后的值返回 * * @param key 键 * @param item 项 * @param by 要增加几(大于0) */ public double hincr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } /** * hash递减 * * @param key 键 * @param item 项 * @param by 要减少记(小于0) */ public double hdecr(String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } // ============================set============================= /** * 根据key获取Set中的所有值 * @param key 键 */ public Set<Object> sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) expire(key, time); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * * @param key 键 */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } // ===============================list================================= /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 */ public List<Object> lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * @param key 键 */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推 */ public Object lGetIndex(String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 */ public boolean lSet(String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) */ public boolean lSet(String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @return */ public boolean lSet(String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } }
-
所有的redis操作,其实对于java开发人员来说,十分的简单,更重要的是去理解redis的思想和每一种数据结构的用处和使用场景;
-
11.Redis.conf详解
-
启动的时候就是通过配置文件来启动的(下面代码或描述片段是截取了配置文件的相关信息)
-
配置文件对大小写不敏感
# 1k => 1000 bytes # 1kb => 1024 bytes # 1m => 1000000 bytes # 1mb => 1024*1024 bytes # 1g => 1000000000 bytes # 1gb => 1024*1024*1024 bytes # # units are case insensitive so 1GB 1Gb 1gB are all the same.
-
包含INCLUDES(可以将其他配置文件配置进来)
# include path olocal.conf # include path oother.conf
-
网络NETWORK
bind 127.0.0.1 #绑定的ip protected-mode yes #保护模式 默认开启 port 6379 #绑定的端口
-
通用GENERAL
daemonize yes #以后台守护进程的方式运行,默认是no,我们需要自己开启为yes; pidfile /var/run/redis_6379.pid #如果以后台的方式运行,我们就需要指定一个pid文件; # Specify the server verbosity level. # This can be one of: # debug (a lot of information, useful for development/testing) # verbose (many rarely useful info, but not a mess like the debug level) # notice (moderately verbose, what you want in production probably) 默认的生产环境适用 # warning (only very important / critical messages are logged) loglevel notice #日志级别 logfile "" #生成的日志文件 databases 16 #默认数据库的数量 always-show-logo yes #是否总是显示LOGO
-
快照SNAPSHOTTING
-
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb.aof
-
Redis是内存数据库,如果没有持久化,那么数据断电即失去
#下面是配置文件中,对于持久化的配置规则 #如果900秒内,如果至少有一个key进行了修改,那么我们就进行持久化操作 save 900 1 #如果300秒内,如果至少有10个key进行了修改,那么我们就进行持久化操作 save 300 10 #如果60秒内,如果至少有10000个key进行了修改,那么我们就进行持久化操作 save 60 10000 stop-writes-on-bgsave-error yes #持久化如果出错,是否还需要继续工作 rdbcompression yes #是否压缩rdb文件,需要消耗一些cpu资源 rdbchecksum yes #保存rdb文件的时候,进行一些校验 dbfilename dump.rdb #rdb文件名 dir ./ #rdb文件保存的目录
-
-
复制REPLICATION(主从复制)
replicaof <masterip> <masterport> #配置主从复制,后面跟的是主机的ip+端口;这样这台Redis服务启动之后,自动就变成了从机; masterauth <master-password> #如果主机有密码,还需要把主机的密码配置上去;
-
安全SECURITY
# requirepass foobared 密码默认是空 用命令行查看密码 config get requirepass 可以在配置文件中设置密码 requirepass xxx 或者用命令行设置密码 config set requirepass "xxx" 设置完密码,再操作时,需要验证 auth xxx
-
限制CLIENTS (扩展了解)
maxclients 10000 #设置能连接上Redis的最大客户端数量 maxmemory <bytes> #redis 设置最大的内存容量 maxmemory-policy noevication #内存达到上限后的处理策略 #移除一些过期的key #报错等...... #maxmemory-policy 六种方式 #1、volatile-lru:只对设置了过期时间的key进行LRU(默认值) #2、allkeys-lru : 删除lru算法的key #3、volatile-random:随机删除即将过期key #4、allkeys-random:随机删除 #5、volatile-ttl : 删除即将过期的 #6、noeviction : 永不过期,返回错误
-
APPEND ONLY MODE aof配置
appendonly no #默认不开启aof,默认使用rdb方式持久化,在大部分情况下,rdb够用 appendfilename "appendonly.aof" #持久化的aof文件的名字 # appendfsync always 每次修改都会同步,速度比较慢,消耗性能 appendfsync everysec #每秒执行一次sync,可能会丢失这1秒的数据! # appendfsync no 不执行sync,这个时候操作系统自己同步数据,速度最快 atuo-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb #如果aof文件大于64m,太大了(记录上一次aof文件的大小,超过64m,就会触发重写机制),就会fork一个新的进程来将我们的文件重写。
-
12.Redis持久化
-
Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失,所以Redis提供了持久化功能!
12.1.RDB
-
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存中。
-
在主从复制中,rdb就是备用的,很多时候放在从机上,不占用主机的内存。
-
-
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是很敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是RDB,一般情况下不需要修改这个配置;
-
生成rdb文件(有时候生产环境我们需要将这个文件备份)
-
RDB保存的文件就是dump.rdb 都是在我们的配置文件中的快照中进行配置的;
-
save规则满足的情况下,会触发rdb规则,生成dump.rdb文件
-
执行flushall命令,也会触发rdb规则产生rdb文件
-
退出redis,也会产生rdb文件
-
-
如何恢复rdb文件
-
只需要将rdb文件放到我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb
-
查看需要存放的位置
congif get dir "dir" "/usr/local/bin" #如果在这个目录下存在dump.rdb文件,启动就会自动恢复其中的数据
-
-
优点
-
适合大规模数据恢复!
-
如果对数据的完整性要求不高,
-
-
缺点
-
需要一定的时间间隔进行操作,如果redis意外down机,最后一次修改的数据就没有了
-
fork进程的时候,会占用一定的内存空间
-
12.2.AOF(Append only File)
-
将我们所有的命令都记录下来,恢复的时候就把这个文件全部再执行一遍
-
默认的是文件无限追加,就会导致文件越来越大;所以我们在配置文件中可以看到重写的配置
-
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作;
-
AOF保存的文件是appendonly.aof文件
-
-
默认是不开启的,我们需要手动进行配置,我们只需要将appendonly改为yes就开启了aof;然后重启redis就可以生效了。
-
如果aof文件有错误,这个时候redis是启动不起来的,这个时候我们需要修复这个aof文件;redis给我们提供了一个工具,redis-check-aof --fix(命令行是:redis-check-aof --fix appendonly.aof)
-
优点
-
每一次修改都同步,文件的完整性会更好!
-
默认开启的同步效率是每秒同步一次,可能会丢失一秒的数据
-
不开启的话,就是从不同步,效率是最高的
-
-
缺点
-
相对于数据文件来说,aof远远大于rdb,修复的数据也比rdb慢
-
aof运行效率也比rdb慢,因为涉及到了读写的IO操作,所以redis默认的持久化操作是rdb而不是aof
-
扩展
-
RDB持久化方式能够在指定的时间间隔内对你的数据进行快照存储
-
AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件的末尾,Redis还能对AOF文件进行后台重写,是的AOF文件的体积不至于过大。
-
只做缓存,如果你只希望你的数据再服务器运行的时候存在,你也可以不使用任何持久化
-
同时开启两种持久化方式
-
在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
-
RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要使用AOF呢?建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF潜在的BUG,留着作为一个万一的手段。
-
-
性能建议
-
因为RDB文件只用作后备用途,建议只在Slave(从机)上持久化RDB文件。而且只要15分钟备份一次就够了,只保留save 900 1 这条规则
-
如果Enable (开启)AOF,好处是在最恶劣情况下也只会丢失不超过两秒的数据,启动脚本较简单,只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值是64M太小了,可以设到5G以上,默认超过原大小100%大小重写,可以改到适当的数值
-
如果不Enable AOF,仅靠Master-Slave Repullcation实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时down掉(比如断电),会丢失十几分钟的数据,启动脚本也要比较两个Mater/Slave中的RDB文件,载入较新的哪个,微博就是这种架构;
-
13.Redis发布订阅
-
Redis发布订阅(pub/sub)是一种消息通信模式,发送者(pub)发送消息,订阅者(sub)接收消息
-
Redis客户端可以订阅任意数量的频道
-
订阅/发布消息图
-
命令:这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播,提醒等
1:PSUBSCRIBE pattern [pattern.....] 订阅一个或者多个符合给定模式的频道 2:PUBSUB subcommand [argument [argument......]] 查看订阅与发布系统状态 3:PUBLISH channel message 将信息发送到指定频道 4:PUNSUBSCRIBE [pattern [pattern......]] 退订所有给定模式的频道 5:SUBSCRIBE channel[channel......] 订阅给定的一个或者多个频道的信息 6:UNSUBSCRIBE[channel [channel......]] 只退订给定的频道
测试
订阅 127.0.0.1:6379> subscribe jpc Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "jpc" 3) (integer) 1 发布 127.0.0.1:6379> publish jpc "hello,jpc" (integer) 1 127.0.0.1:6379> 接收到信息 127.0.0.1:6379> subscribe jpc Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "jpc" 3) (integer) 1 1) "message" 2) "jpc" 3) "hello,jpc"
-
原理
-
Redis使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,借此加深对Redis的理解
-
Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能
-
通过SUBSCRIBE命令订阅某个频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中
-
通过PUBLISH命令向订阅者发送消息,redis-serve会使用给定的频道作为键,在它维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发送给所有的订阅者。
-
Pub/Sub从字面上理解就是发布与订阅,在Redis中,你可以设定对某一个key值进行消息发布和消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是用作实时消息系统
-
较为复杂的场景,我们会采用消息队列去处理
-
14.Redis主从复制
-
概念
-
主从复制,是指将一台Redis服务器的数据,复制到其他Redis服务器,前者称为主节点master/leader,后者称为从节点slave/follower;数据的复制是单向的,只能从主节点到从节点。master以写为主,slave以读为主。
-
默认情况下,每台Redis服务器都是主节点,且每一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点
-
主从复制的作用主要包括:
-
数据冗(rong)余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式;
-
故障恢复:当主节点出现问题,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余;
-
负债均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的情况下,通过多个节点分担读负载,可以大大提高Redis服务器的并发量;
-
高可用基石:除了上述作用之外,主从复制还是哨兵和集群能够实现的基础,因此说主从复制是Redis高可用的基础。
-
一般情况下,要将Redis运用于工程项目中,一台Redis是万万不能的,因为
-
从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
-
从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器的容量是256G,也不能将所有内存用作Redis内存存储,一般来说,单个Redis最大使用内存不超过20G。
-
-
环境配置
-
只配置从库,不用配置主库
127.0.0.1:6379> info replication #查看当前库的信息 # Replication role:master #角色 connected_slaves:0 #从机的个数 master_replid:a804b1bf7454729cbfba233118499c5fc3d63111 master_replid2:0000000000000000000000000000000000000000 master_repl_offset:0 second_repl_offset:-1 repl_backlog_active:0 repl_backlog_size:1048576 repl_backlog_first_byte_offset:0 repl_backlog_histlen:0 127.0.0.1:6379>
-
在每一台redis.conf中分别修改相关信息,端口,pidfile文件名(开启后台运行后所需要的文件,不要重名),日志文件名称(不要重名),dump.rdb文件名(不要重名)等。确保Redis服务信息基本不重名;
-
修改完之后,启动Redis服务,查看进程是否开启:ps -ef|grep redis
-
开始配置主从;一般情况下,我们只需配置从机就好了;选一台作为主机,其他作为从机;配置完毕后会发现,主机能写,而从机只能读;主机中的所有信息都会被从机自动保存;
SLAVEOF ip port #在从机中配置,把当前Redis服务器配置成从机,后面ip+端口是默认主机的信息; 这里是用命令配置的,开发中是在配置文件中配置,这样才是永久的;如何在配置文件中配置,见上面的配置文件讲解;
主机断开连接,从机依旧连接到主机的,没有写操作,这时候如果主机回来了,从机立马可以获取到主机写的信息;
如果是使用命令行来配置的主从,这时候如果重启从机,它会立马会变成主机;只要再变回从机,立马就可以从主机中获取值;
-
复制原理
-
Slave启动成功连接到master后会发送一个sync命令
-
Master接收到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步;
-
全量复制
而Slave服务在接收到数据库文件数据后,将其存盘并加载到内存中
-
增量复制
Master继续将新的所有收集到的修改命令依次传给Slave,完成同步
但是只要是重新连接Master,一次完全同步(全量复制)将被自动执行;我们的数据一定可以在从机中看到
-
-
-
在Master没有之后,如果Slave想变成Master,在没有哨兵模式的情况下,只能手动去处理;
可以使用:SlAVEOF no one 命令 使自己的角色由slave变成master;其他的节点需要手动连接到最新的主节点;
-
15.哨兵模式
-
概念:自动选择Master的模式;主从切换技术的方法是:当主机服务器宕机之后,需要手动把一台服务器切换成主服务器,这就需要人工干预,费时费力,还会造成一段时间内服务不可用;更多时候我们选择哨兵模式,Sentinle架构来解决问题;能够后台监控主机是否故障,如果故障了,根据投票数自动将从库转换为主库;
-
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行,其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例;基本模型:
-
这里的哨兵有两个作用
-
通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器;(默认是30秒)
-
当哨兵检测到master宕机后,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让他们切换主机。
-
-
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控,各个哨兵之间还会进行监控,这样就形成了多哨兵模式(哨兵们分别监控每个Redis服务器,而多个哨兵之间也实现互相监控);
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行fail over过程;仅仅是哨兵1主观的认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票(这里有一个投票算法),投票的结果由一个哨兵发起,进行fail over【故障转移】操作,切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
-
建sentinel.conf哨兵配置文件,在其中配置相关信息(下面介绍核心配置)
#sentinel monitor 被监控的名称 ip port 1 #后面的数字代表主机挂了之后,slave会投票选出新的主机 sentinel monitor redis ip port 1
-
启动配置文件:redis-sentinel kconfig/sentinel.conf
-
默认30秒监控一次,如果主机宕机,会进行投票切换,产生新的主机;当原先的主机再连接上线后,会自动作为新主机的从机;
-
-
优点:
-
哨兵集群,基于主从复制模式。所有主从配置的优点,它都有;
-
主从可以切换,故障可以转移,系统的可用性就会更好;
-
哨兵模式就是主从模式的升级,手动到自动,更加健壮;
-
-
缺点:
-
Redis不好在线扩容,集群容量一旦达到上线,在线扩容就十分麻烦
-
实现哨兵模式的配置其实十分麻烦,里面有很多选择
# Example sentinel.conf # 哨兵sentinel实例运行的端口 默认26379 port 26379 # 哨兵sentinel的工作目录 dir /tmp # 哨兵sentinel监控的redis主节点的 ip port # master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 # quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了 # sentinel monitor <master-name> <ip> <redis-port> <quorum> sentinel monitor mymaster 127.0.0.1 6379 1 # 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码 # 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 # sentinel auth-pass <master-name> <password> sentinel auth-pass mymaster MySUPER--secret-0123passw0rd # 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 # sentinel down-after-milliseconds <master-name> <milliseconds> sentinel down-after-milliseconds mymaster 30000 # 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步, # 这个数字越小,完成failover所需的时间就越长, # 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 # 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 # sentinel parallel-syncs <master-name> <numslaves> sentinel parallel-syncs mymaster 1 # 故障转移的超时时间 failover-timeout 可以用在以下这些方面: #1. 同一个sentinel对同一个master两次failover之间的间隔时间。 #2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。 #3.当想要取消一个正在进行的failover所需要的时间。 #4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 # 默认三分钟 # sentinel failover-timeout <master-name> <milliseconds> sentinel failover-timeout mymaster 180000 # SCRIPTS EXECUTION #配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。 #对于脚本的运行结果有以下规则: #若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 #若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 #如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 #一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。 #通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本, #这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数, #一个是事件的类型, #一个是事件的描述。 #如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。 #通知脚本 # sentinel notification-script <master-name> <script-path> sentinel notification-script mymaster /var/redis/notify.sh # 客户端重新配置主节点参数脚本 # 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。 # 以下参数将会在调用脚本时传给脚本: # <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> # 目前<state>总是“failover”, # <role>是“leader”或者“observer”中的一个。 # 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的 # 这个脚本应该是通用的,能被多次调用,不是针对性的。 # sentinel client-reconfig-script <master-name> <script-path> sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
16.Redis缓存穿透和雪崩
服务的高可用问题
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是在数据查询方面。但同时,它带来了一些问题。其中,最要害的就是数据一致性问题,从严格意义上来讲,这个问题无解,如果对数据一致性要求很高,那么就不要使用缓存;另外一些问题,缓存穿透,缓存雪崩,行业中有一些典型的解决方案。
16.1.缓存穿透(查不到数据)
-
概念
用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,也是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库,这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
-
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题
-
如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能有很多空值的键;
-
即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响;
16.2.缓存击穿(量太大,缓存过期)
-
概念
这里需要注意和缓存穿透的区别,缓存击穿是指一个key非常热点,在不停的扛着大并发,大并发集中对一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞;
-
解决方案
设置热点数据永不过期
从缓存层面看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
16.3.缓存雪崩
-
概念
缓存雪崩是指在某一段时间,缓存集体过期失效。Redis宕机
产生雪崩的原因之一:比如马上就要到双十二零点,很快迎来一波抢购,这波商品时间比较集中的放入缓存,假设缓存一小时,那么到了凌晨一点的时候,这批商品的缓存就都过期了。而对于这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰,于是所有的请求都会到达存储层,存储层的调用量暴增,造成存储层也会挂掉的情况;
其中集中过期倒不是最致命的,比较致命的缓存雪崩,是缓存服务器某个节点宕机或者断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
双十一:会停掉一些服务,保证主要的服务可用,比如退款等服务暂不能用
-
解决方案
Redis高可用
多增设几台Redis,这样一台挂掉,其他还能工作;搭建Redis集群(异地多活)
限流降级
这个解决思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某一个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的的含义就是正式部署前,我先把可能的数据预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中,在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀点。
文章基于哔哔哩哩UP主:遇见狂神说 关于Redis的的讲解视频学习并整理;
附传送门:https://www.bilibili.com/video/BV1S54y1R7SB