zoukankan      html  css  js  c++  java
  • Springboot通过redisTemplate实现发布订阅

    要点:

    RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现
    * addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
    * setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析

    MessageListenerAdapter 监听适配器

    • MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法

    redisTemplate redis模版类

    • convertAndSend(String channel, Object message) 消息发布

    第一种:

    RedisConfig核心类,实现了Redis连接,订阅以及发布配置

    package com.example.demo.config;
    
    import com.example.demo.project.MessageReceive1;
    import com.example.demo.project.MessageReceive2;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * @author lzg
     * @date 2019/12/5 15:37
     */
    @Configuration
    public class RedisConfig {
        @Bean
        public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            // 设置value的序列化规则和 key的序列化规则
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param redisConnectionFactory
         * @param listenerAdapter1
         * @return
         */
        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        @Bean
        public RedisMessageListenerContainer container1(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter1, MessageListenerAdapter listenerAdapter2) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            // 订阅多个频道
            redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test1"));
            redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test2"));
            //不同的订阅者
            redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));
    
            //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
            Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            seria.setObjectMapper(objectMapper);
            redisMessageListenerContainer.setTopicSerializer(seria);
            return redisMessageListenerContainer;
        }
    
    
        //表示监听一个频道
        @Bean
        public MessageListenerAdapter listenerAdapter1(MessageReceive1 messageReceive1) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ”
            return new MessageListenerAdapter(messageReceive1, "getMessage");
        }
    
        //表示监听一个频道
        @Bean
        public MessageListenerAdapter listenerAdapter2(MessageReceive2 messageReceive2) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive2 ”
            return new MessageListenerAdapter(messageReceive2, "getMessage");
        }
    }

    被消费的对象(即传输的数据)

    /**
     * @author lzg
     * @date 2019/12/5 15:46
     */
    @Data
    public class Person implements Serializable {
        private final long serialVersionUID = 1L;
    
        private String id;
    
        private String userName;
    
        private String memberName;
    
        private String password;
    
        private String email;
    
        private String status;
    
        private String pwdSalt;
    
    }

    客户端:

    @Component
    public class MessageReceive1 {
        public void getMessage(String object) {
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
            Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            seria.setObjectMapper(objectMapper);
            Person user = (Person) seria.deserialize(object.getBytes());
            System.out.println("消息客户端1号:" + user.toString());
        }
    }
    @Component
    public class MessageReceive2 {
        public void getMessage(String object) {
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
            Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            seria.setObjectMapper(objectMapper);
            Person user = (Person) seria.deserialize(object.getBytes());
            System.out.println("消息客户端2号:" + user);
        }
    }

    测试类:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TestPack {
        @Resource
        private RedisTemplate redisTemplate;
        @Test
        public void test() {
            Person person1 = new Person();
            person1.setId("001");
            person1.setUserName("一号");
            Person person2 = new Person();
            person2.setId("002");
            person2.setUserName("二号");
            redisTemplate.convertAndSend("test1", person1);
            redisTemplate.convertAndSend("test2", person2);
        }
    }

    第二种(简单版):

    配置类:

    @Configuration
    public class MyRedisConf {
        @Bean
        public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
            // 设置value的序列化规则和 key的序列化规则
            redisTemplate.setKeySerializer(new StringRedisSerializer());
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                       MessageListenerAdapter listenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(listenerAdapter, new PatternTopic("test1"));
            return container;
        }
    
        /**
         * 绑定消息监听者和接收监听的方法,必须要注入这个监听器,不然会报错
         */
        @Bean
        public MessageListenerAdapter listenerAdapter() {
            return new MessageListenerAdapter(new Receiver(), "receiveMessage");
        }
    }
    
    @Slf4j
    class Receiver {
        public void receiveMessage(String message) {
            System.out.println(message);
        }
    }

    测试类:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class TestPack {
        @Resource
        private RedisTemplate redisTemplate;
        @Test
        public void test(){
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                redisTemplate.convertAndSend("test1","这是我发送的第"+i+"个消息");
            }
        }
    
    }
  • 相关阅读:
    8.10
    今日头条笔试题 1~n的每个数,按字典序排完序后,第m个数是什么?
    Gym 100500B Conference Room(最小表示法,哈希)
    CodeForces 438D The Child and Sequence(线段树)
    UVALIVE 6905 Two Yachts(最小费用最大流)
    Gym Conference Room (最小表示法,哈希)
    hdu 2389 Rain on your Parade(二分图HK算法)
    Codeforces Fox And Dinner(最大流)
    zoj 3367 Counterfeit Money(dp)
    ZOJ3370. Radio Waves(2-sat)
  • 原文地址:https://www.cnblogs.com/lzghyh/p/12680583.html
Copyright © 2011-2022 走看看