一、慢查询
找到 系统中瓶颈的命令
1. 客户端请求的生命周期:
①. 慢查询通常发生在第三阶段。
②. 客户端超时不一定是慢查询,但慢查询是客户端超时的一个可能因素。
2. 相关配置
慢查询命令会存放在一个先进先出的队列
查询队列的长度:
config get slowlog-max-len
默认值是 128,我们通常建议设置为 1000
config set slowlog-max-len=1000
查询慢查询的定义时长:
config get slowlog-log-slower-than
默认值是 10000 微秒= 10 毫秒,我们建议设置为 1 毫秒
config set slowlog-log-slower-than=1000
3. 相关命令
slowlog get [n] # 查询慢查询队列的 n 条 slowlog len # 获取慢查询队列长度 slowlog reset # 清空慢查询队列
二、pipeline 流水线
流水线是一个类似于 mget / mset 的一个批量操作。
区别在于 m 操作是 redis 原生的命令,他在执行队列中作为一个整体在排队。
而流水线是 Java 客户端的命令,在排队时会跟其他命令杂乱在一起排队,非原子性的。但返回时会一起返回。
1. Jedis 客户端直连:
Jedis jedis = new Jedis("127.0.0.1", 6379); for (int i = 0; i < 100; i++) { Pipeline pipeline = jedis.pipelined(); for (int j = i * 100; j < (i + 1) * 100; j++) { pipeline.hset("hashkey:" + j, "field" + j, "value" + j); } pipeline.syncAndReturnAll(); }
2. SpringBoot 提供的 RedisTemplate 客户端:
// 1 重写入参 RedisCallback 类的 doInRedis 方法 List<Object> list = redisTemplate.executePipelined((RedisConnection connection) -> { // 2 打开连接 connection.openPipeline(); // 3 要一次性执行的命令 // 3.1 一个 set 操作 connection.set("key1".getBytes(), "value1".getBytes()); // 3.2 一个 mSet 操作 Map<byte[], byte[]> tuple = new HashMap(); tuple.put("m_key1".getBytes(), "m_value1".getBytes()); tuple.put("m_key2".getBytes(), "m_value2".getBytes()); tuple.put("m_key3".getBytes(), "m_value3".getBytes()); connection.mSet(tuple); // 3.3 一个 get 操作 connection.get("m_key2".getBytes()); // 4 返回 null 即可 return null; }, RedisSerializer.string()); // 5 遍历结果 for (Object obj : list) { System.out.println(String.valueOf(obj)); }
执行结果:
三、消息队列与发布订阅
参考博客:https://www.cnblogs.com/qlqwjy/p/9763754.html
在 Redis 中,发布订阅与消息队列属于不同的概念。
消息队列:Redis 的列表类型天然支持消息队列,并且支持阻塞式读取。多个消费者之间需要抢一个消息。
发布订阅:多个消费者都可以消费到同一条消息,但是无法订阅以往的消息。
1. 消息队列
我们重新熟悉一下 Redis 的列表类型相关 API。
①. 从左边插入,从右边取出
127.0.0.1:6379> lpush mylist a b c (integer) 3 127.0.0.1:6379> lrange mylist 0 -1 1) "c" 2) "b" 3) "a" 127.0.0.1:6379> rpop mylist "a" 127.0.0.1:6379> rpop mylist "b"
在实际使用中,我们为了及时消费,需要不停的 rpop 监听是否有消息进入,这样造成资源浪费。
②. 为了解决这一问题,redis 为我们提供了阻塞命令 brpop 和 blpop。
客户端 1 消费上次剩余的消息:
127.0.0.1:6379> brpop mylist 0 1) "mylist" 2) "c"
客户端 1 继续消费:
127.0.0.1:6379> brpop mylist 0
我们发现客户端阻塞,正在等待中
客户端 2 往键 mylist 中添加消息:
127.0.0.1:6379> lpush mylist 1 (integer) 1
这时查看 客户端 1,消息已经拿到 且耗时 10 秒:
127.0.0.1:6379> brpop mylist 0 1) "mylist" 2) "1" (10.45s)
2. 发布订阅
①. 客户端 1 发布消息
127.0.0.1:6379> publish channel:1 hi (integer) 0
向命名为 channel:1 的频道发布一个 hi。
结果返回 0,表示接收到这条消息的订阅者数量。发出去的消息不会被持久化,也就是说后续的订阅者是不会收到这条消息的。
②. 客户端 2 订阅频道
127.0.0.1:6379> subscribe channel:1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel:1" 3) (integer) 1
执行上面这条命令会进入订阅状态。
在订阅状态客户端可能会收到 3 种类型的回复。每种类型包含 3 个值。第 1 个值是消息的类型,根据消息类型的不同,第二个和第三个参数的含义可能不同。
消息类型分为:
subscribe。表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
message。表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
unsubscribe。表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。
③. 客户端 1 再次发布消息
127.0.0.1:6379> publish channel:1 hi (integer) 1
返回值表示有 1 个订阅者收到消息。
④. 客户端 2 已看到内容为 hi 的 message 类型的消息
127.0.0.1:6379> subscribe channel:1 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel:1" 3) (integer) 1 1) "message" 2) "channel:1" 3) "hi"
频道可以不用具体的名字,而使用通配符命名:
? 表示 1 个占位符
* 表示任意个占位符,包含 0 个
?* 表示最少 1 个占位符
当我们向命名为 channel* 的频道发送消息时。
订阅命名为 channel1 channel2 channel_a 这 3 个频道的订阅者都会收到消息。
3. 在 SpringBoot 中实现订阅
①. 声明配置 Bean,订阅 channel:1 频道
@Configuration public class RedisListener { @Bean RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel:1")); return container; } @Bean MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) { return new MessageListenerAdapter(redisReceiver, "receiveMessage"); } } @Service class RedisReceiver { public void receiveMessage(String message) { System.out.println("订阅者:" + message); } }
②. 使用 Junit 实现向频道 channel:1 发布消息
@Test public void publish() { redisTemplate.convertAndSend("channel:1", "Java客户端向你示好"); }
③. 执行单元测试后,控制台输出
订阅者:"Java客户端向你示好"
四、bitmap 位图
减少内存的方案
比如统计每日用户的登录数
1. API
getbit key offset # 获取指定 key 的指定位置上的值 setbit key offset value # 对指定 key 的指定位置上设置值,只能设置为 0 或 1 bitcount key [start end] # 获取位图指定范围内值为 1 的个数。 bitop op destKey key1 [key2] # 做多个 bitmap 的 and (交集)、or (并集)、not(非)、xor(异或) 操作并将结果保存到 destKey 中。 bitpos key tartgetBit [start end] # 获取指定范围内第一个等于 tartgetBit 的值的位置,找不到返回 -1
2. 演示
127.0.0.1:6379> set hello big OK 127.0.0.1:6379> getbit hello 0 (integer) 0 127.0.0.1:6379> getbit hello 1 (integer) 1 127.0.0.1:6379> setbit hello 7 1 (integer) 0 127.0.0.1:6379> get hello "cig" 127.0.0.1:6379> bitcount hello (integer) 13 127.0.0.1:6379> bitcount hello 1 3 (integer) 9 127.0.0.1:6379> set world small OK 127.0.0.1:6379> bitop and helloWorld hello world (integer) 5
127.0.0.1:6379> bitcount helloWorld (integer) 11 127.0.0.1:6379> bitpos hello 1 (integer) 1 127.0.0.1:6379> bitpos hello 0 1 2 (integer) 8
五、hyperloglog
极端的减少内存的方案 / 数据结构
可以用来做独立用户统计,缺陷是有错误率,并且只能查询去重后的总数而不是查看具体元素。
1. API
pfadd key element [element ...] # 添加元素 pfcount key # 计算去重后的总数 pfmerge destKey sourceKey # 合并多个 key 到 destKey
2. 演示
127.0.0.1:6379> pfadd 14:user:list "user1" "user2" "user3" "user4" (integer) 1 127.0.0.1:6379> pfcount 14:user:list (integer) 4 127.0.0.1:6379> pfadd 14:user:list "user1" "user2" "user3" "user5" (integer) 1 127.0.0.1:6379> pfcount 14:user:list (integer) 5
六、geo(地理信息定位)
存储经纬度,计算两地距离,范围计算等