一、前言
最近由于工作需要,将数据更新到redis之后,系统每一个小时读取一次redis,时效性极差,最多可能需要等待一个小时才能生效,如果减少轮询访问的时间间隔,无形中又会增加redis的压力,而且时间间隔真心不好控制。这时候redis的发布订阅可以在一定程度上解决这个问题
二、流程
1、消费方需要向redis订阅channel,设置监听
2、生产者发生数据变更时,向redis发送变更通知
3、redis向消费方发送变更消息
4、消费方得知数据发生变化,执行相关操作
三、实现
常量
public class RedisConstant { /** * redis发布订阅channel */ public static final String REDIS_SUB_PUB_CHANNEL = "test_topic"; }
@Service public class RedisPubService { @Resource private RedisTemplate<String, Object> redisTemplate; public void publish(){ for (int i = 0; i < 3; i++) { redisTemplate.convertAndSend(RedisConstant.REDIS_SUB_PUB_CHANNEL, createList()); System.out.println("发送成功!"); try { Thread.sleep(new Random().nextInt(3) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } private List<String> createList() { return Arrays.asList("aaa", "测试消息", "!@#_"); } }
@Configuration public class RedisSubConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListener listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //配置要订阅的订阅项,可以有多个 container.addMessageListener(listenerAdapter, new PatternTopic(RedisConstant.REDIS_SUB_PUB_CHANNEL)); return container; } }
@Component public class RedisSubListener implements MessageListener { private RedisSerializer<?> serializer; @Resource private RedisTemplate<String, Object> redisTemplate; @PostConstruct private void init(){ serializer = redisTemplate.getDefaultSerializer(); //如果没有默认的序列化器,则使用jdk序列化器 if(serializer == null){ serializer = new JdkSerializationRedisSerializer(); } } @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); Object deserialize = serializer.deserialize(body); String topic = redisTemplate.getStringSerializer().deserialize(channel); System.out.println("我是sub,监听" + topic + ",我收到消息:" + deserialize); } }
发送成功! 我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_] 发送成功! 我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_] 发送成功! 我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_]
tips:这里顺便小小吐槽一下spring的RedisTemplate真心没有Jedis好用
四、使用场景
redis的发布订阅适用的场景不算多,但有时候可以在一定程度上提高系统的时效性。这里需要明确一点,redis的通知不一定能够到达,这点和zookeeper的watcher类似(watcher是一次性的,redis可以一直监听),也就是说redis只负责发送变更通知,至于消费方是否接收到,接收到后进行什么操作redis是不关心的,可以理解为MQ中没有ack,所以redis并不能保证100%的通知到达。故redis的发布订阅需要满足以下条件才可使用:
1)对消息通知要求不是100%的严格触达,如果要求消息一定能触达,请使用mq
2)变更通知可以丢失,redis是基于内存的Nosql,可能存在数据丢失的风险
3)变更的数据量较小,redis的发布订阅可以理解为一个事件通知,尽量不要把变更数据放在其中,只需要告诉消费方数据发生变化即可(可以适当加入哪条数据发生了增删改的变化)
五、总结
有人可能会说,既然redis的发布订阅不是100%通知到,为什么还要使用它?这个问题之前在使用zookeeper的watcher时也考虑过,100% + n%和100%有什么区别,而这个n却需要额外开发。这里举个简单的例子
银行转账时,提示最多两小时到账,但是有时候几秒钟就到账了,用户体验更好一些
换句话说就是,告知了下限,却将大部分数据做到了上限。
写在最后,这篇文章的名称纠结了很久,感觉叫发布订阅其实并不是十分的友好,因为这个词通常与MQ息息相关,如果基于redis做MQ,应该使用list这种数据结构,不过这里redis调用的其实是redis的publish和subscribe命令,直译过来也就是发布订阅了,只是希望不要和MQ弄混淆