zoukankan      html  css  js  c++  java
  • SpringBoot中RedisTemplate订阅发布对象

    解说

    1. RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现
      * addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
      * setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析
    1. MessageListenerAdapter 监听适配器
      • MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法
    2. redisTemplate redis模版类
      • convertAndSend(String channel, Object message) 消息发布

    问题

    1. 多人同时订阅一个频道 有没有更好的实现方式?
      2、一个频道内容广播,有没更好的实现方式?

    代码

    RedisConfig

    核心类,实现了Redis连接,订阅以及发布配置

    package com.mengxiangxiang.newtech.redisMQ; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

    @Configuration
    public class RedisConfig {

    @Bean
    public RedisTemplate<String,Object> redisTemplate(JedisConnectionFactory redisConnectionFactory){
    
        RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
    
        //自定义序列化方式
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
    
        redisTemplate.setKeySerializer(jackson2JsonRedisSerializer);
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
    
        return redisTemplate;
    }
    
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    //MessageListenerAdapter 表示监听频道的不同订阅者
    @Bean
    RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter2,MessageListenerAdapter listenerAdapter){
         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅多个频道
        container.addMessageListener(listenerAdapter2,new PatternTopic("fullDataUpload"));
        container.addMessageListener(listenerAdapter2,new PatternTopic("analysis"));
        container.addMessageListener(listenerAdapter,new PatternTopic("fullDataUpload"));
    
        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
    
        container.setTopicSerializer(seria);
        return container;
    }
    
     //表示监听一个频道
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiveTwo receiver){
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceiveTwo ”
        return new MessageListenerAdapter(receiver,"getMessage");
    }
    
    @Bean
    MessageListenerAdapter listenerAdapter2(MessageReceiveOne receiver){
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceiveOne ”
        return new MessageListenerAdapter(receiver,"getMessage");
    }
    

    }

    被消费的对象(即传输的数据)

    User

    package com.mengxiangxiang.newtech.hello.entity;

    public class User {
    private String name;
    private String phone;

    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    public String getPhone() {
        return phone;
    }
    
    public void setPhone(String phone) {
        this.phone = phone;
    }
    

    }

    消费者

    MessageReceiveOne

    第一个消费者
    获取到内容后,是圆是扁随便搓。

    package com.mengxiangxiang.newtech.redisMQ;

    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.feinno.newtech.hello.entity.User;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.stereotype.Component;

    @Component
    public class MessageReceiveOne {
    public void getMessage(String object){
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);

        User user = (User)seria.deserialize(object.getBytes());
        System.out.println("消息客户端2号:"+object);
    }
    

    }

    ###MessageReceiveTwo >第二个消费者 package com.feinno.newtech.redisMQ;

    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.mengxiangxiang.newtech.hello.entity.User;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.stereotype.Component;

    @Component
    public class MessageReceiveTwo {
    public void getMessage(String object){
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
    Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(User.class);
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    seria.setObjectMapper(objectMapper);

        User user = (User)seria.deserialize(object.getBytes());
        System.out.println("消息客户端2号:"+object);
    }
    

    }

    发布消息

    @Test public void testMQ(){ String channel1 = "fullDataUpload"; String channel2 = "analysis";
        User user = new User();
        user.setPhone("18675830623");
        user.setName("刘大");
    
        User user2 = new User();
        user2.setPhone("17856232365");
        user2.setName("李二");
    
        redisTemplate.convertAndSend(channel1,user2);
        redisTemplate.convertAndSend(channel2,user);
    }
    
  • 相关阅读:
    LeetCode 965. Univalued Binary Tree
    LeetCode 961. N-Repeated Element in Size 2N Array
    LeetCode 832. Flipping an Image
    语法设计——基于LL(1)文法的预测分析表法
    简单的词法设计——DFA模拟程序
    LeetCode 905. Sort Array By Parity
    LeetCode 804. Unique Morse Code Words
    【原创】用事实说话,Firefox 的性能是 Chrome 的 2 倍,Edge 的 4 倍,IE11 的 6 倍!
    【新特性速递】新增单标签页模式,界面更加清爽!
    【新特性速递】重构表格列锁定代码,只有一个横向滚动条,更加现代化!
  • 原文地址:https://www.cnblogs.com/liuyp-ken/p/10538658.html
Copyright © 2011-2022 走看看