redis的发布订阅模式,使发布者和订阅者完全解耦
1.pom.xml and application.properties
<!-- 引入redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
spring: redis: #数据库索引 database: 5 host: 127.0.0.1 port: 6379 password: 123456 jedis: pool: #最大连接数 max-active: 8 #最大阻塞等待时间(负数表示没限制) #最大空闲 max-idle: 8 #最小空闲 min-idle: 0
2.消息发布者、消息处理者POJO、redis消息监听器容器以及redis监听器注入IOC容器
@Configuration //相当于xml中的beans public class RedisConfig { /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return */ @Bean //相当于xml中的bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat 的通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat")); container.addMessageListener(listenerAdapter2, new PatternTopic("chat2")); //这个container 可以添加多个 messageListener return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * @param receiver * @return */ @Bean MessageListenerAdapter listenerAdapter(MessageReceiver receiver) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage2” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 return new MessageListenerAdapter(receiver, "receiveMessage2"); } /**redis 读取内容的template */ @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(connectionFactory); template.setKeySerializer(jackson2JsonRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashKeySerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
MessageListenerAdapter通过反射使普通的POJO就可以处理消息。具体情况见MessageListenerAdapter的onMessage方法。
3.消息发布者
@EnableScheduling //开启定时器功能 @Component public class MessageSender { @Autowired private StringRedisTemplate stringRedisTemplate; @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息 public void sendMessage(){ stringRedisTemplate.convertAndSend("chat",String.valueOf(Math.random())); stringRedisTemplate.convertAndSend("chat2",String.valueOf(Math.random())); } }
4.普通的消息处理器POJO
@Component public class MessageReceiver { /**接收消息的方法*/ public void receiveMessage(String message){ System.out.println("收到一条chat的消息:"+message); } /**接收消息的方法*/ public void receiveMessage2(String message){ System.out.println("收到一条chat2的消息:"+message); } }
MessageListenerAdapter通过反射调用receiveMessage方法处理消息
5.其他方式(参考)
配置
package com.example.day3_30.redisConfiBack; /** * redis消息队列配置-订阅者 */ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.concurrent.CountDownLatch; /** * redis消息队列配置-订阅者 */ @Configuration public class RedisMessageListener { /** * 创建连接工厂 * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapterTest2){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //接受消息的key container.addMessageListener(listenerAdapter,new PatternTopic("phone")); container.addMessageListener(listenerAdapterTest2,new PatternTopic("phoneTest2")); return container; } /** * 绑定消息监听者和接收监听的方法 * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver){ return new MessageListenerAdapter(receiver,"receiveMessage"); } /** * 绑定消息监听者和接收监听的方法 * @param receiver * @return */ @Bean public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver){ return new MessageListenerAdapter(receiver,"receiveMessage2"); } /** * 注册订阅者 * @param latch * @return */ @Bean ReceiverRedisMessage receiver(CountDownLatch latch) { return new ReceiverRedisMessage(latch); } /** * 计数器,用来控制线程 * @return */ @Bean public CountDownLatch latch(){ return new CountDownLatch(1);//指定了计数的次数 1 } }
消息处理
package com.example.day3_30.redisConfiBack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.CountDownLatch; /** * 注入消息接受类 */ public class ReceiverRedisMessage { private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class); private CountDownLatch latch; @Autowired public ReceiverRedisMessage(CountDownLatch latch) { this.latch = latch; } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage(String jsonMsg) { log.info("[开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } /** * 队列消息接收方法 * * @param jsonMsg */ public void receiveMessage2(String jsonMsg) { log.info("[开始消费REDIS消息队列phoneTest2数据...]"); try { System.out.println(jsonMsg); /** * 此处执行自己代码逻辑 例如 插入 删除操作数据库等 */ log.info("[消费REDIS消息队列phoneTest2数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } }
测试
package com.example.day3_30.redisConfiBack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping public class PublisherController { private static final Logger log = LoggerFactory.getLogger(PublisherController.class); @Autowired private RedisTemplate redisTemplate; @GetMapping(value = "pub/{id}") public String pubMsg(@PathVariable String id){ redisTemplate.convertAndSend("phone","223333"); redisTemplate.convertAndSend("phoneTest2","34555665"); log.info("Publisher sendes Topic... "); return "success"; } }