要点:
RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现
* addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
* setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析
MessageListenerAdapter 监听适配器
- MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法
redisTemplate redis模版类
- convertAndSend(String channel, Object message) 消息发布
第一种:
RedisConfig核心类,实现了Redis连接,订阅以及发布配置
package com.example.demo.config; import com.example.demo.project.MessageReceive1; import com.example.demo.project.MessageReceive2; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author lzg * @date 2019/12/5 15:37 */ @Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param redisConnectionFactory * @param listenerAdapter1 * @return */ //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) @Bean public RedisMessageListenerContainer container1(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter1, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); // 订阅多个频道 redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test1")); redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test2")); //不同的订阅者 redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2")); //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); redisMessageListenerContainer.setTopicSerializer(seria); return redisMessageListenerContainer; } //表示监听一个频道 @Bean public MessageListenerAdapter listenerAdapter1(MessageReceive1 messageReceive1) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ” return new MessageListenerAdapter(messageReceive1, "getMessage"); } //表示监听一个频道 @Bean public MessageListenerAdapter listenerAdapter2(MessageReceive2 messageReceive2) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive2 ” return new MessageListenerAdapter(messageReceive2, "getMessage"); } }
被消费的对象(即传输的数据)
/** * @author lzg * @date 2019/12/5 15:46 */ @Data public class Person implements Serializable { private final long serialVersionUID = 1L; private String id; private String userName; private String memberName; private String password; private String email; private String status; private String pwdSalt; }
客户端:
@Component public class MessageReceive1 { public void getMessage(String object) { //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); Person user = (Person) seria.deserialize(object.getBytes()); System.out.println("消息客户端1号:" + user.toString()); } }
@Component public class MessageReceive2 { public void getMessage(String object) { //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化) Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); Person user = (Person) seria.deserialize(object.getBytes()); System.out.println("消息客户端2号:" + user); } }
测试类:
@RunWith(SpringRunner.class) @SpringBootTest public class TestPack { @Resource private RedisTemplate redisTemplate; @Test public void test() { Person person1 = new Person(); person1.setId("001"); person1.setUserName("一号"); Person person2 = new Person(); person2.setId("002"); person2.setUserName("二号"); redisTemplate.convertAndSend("test1", person1); redisTemplate.convertAndSend("test2", person2); } }
第二种(简单版):
配置类:
@Configuration public class MyRedisConf { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("test1")); return container; } /** * 绑定消息监听者和接收监听的方法,必须要注入这个监听器,不然会报错 */ @Bean public MessageListenerAdapter listenerAdapter() { return new MessageListenerAdapter(new Receiver(), "receiveMessage"); } } @Slf4j class Receiver { public void receiveMessage(String message) { System.out.println(message); } }
测试类:
@RunWith(SpringRunner.class) @SpringBootTest public class TestPack { @Resource private RedisTemplate redisTemplate; @Test public void test(){ for (int i = 0; i < 10; i++) { System.out.println(i); redisTemplate.convertAndSend("test1","这是我发送的第"+i+"个消息"); } } }