zoukankan      html  css  js  c++  java
  • 【spring boot】【redis】spring boot 集成redis的发布订阅机制

    一.简单介绍

    1.redis的发布订阅功能,很简单。
      消息发布者和消息订阅者互相不认得,也不关心对方有谁。
      消息发布者,将消息发送给频道(channel)。
      然后是由 频道(channel)将消息发送给对自己感兴趣的 消息订阅者们,进行消费。


    2.redis的发布订阅和专业的MQ相比较

      1>redis的发布订阅只是最基本的功能,不支持持久化,消息发布者将消息发送给频道。如果没有订阅者消费,消息就丢失了。
      2>在消息发布过程中,如果客户端和服务器连接超时,MQ会有重试机制,事务回滚等。但是Redis没有提供消息传输保障。
      3>简单的发布订阅可以使用redis,根据业务需求选择。

    二.spring boot 集成[spring boot 2.x]

    1.pom.xml文件

    <!-- redis -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <!--spring2.0集成redis所需common-pool2-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.4.2</version>
            </dependency>
            <!-- 使用redis的LUA脚本 需要序列化操作的jar-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>

    2.redis的config 

    为redis添加消息适配器,绑定消息处理器

    消息适配器 可以添加多个 

    package com.sxd.swapping.config;
    
    import com.sxd.swapping.redisReceiver.RedisReceiver;
    import com.sxd.swapping.redisReceiver.RedisReceiver2;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    
    
    /**
     * @author sxd
     * @date 2019/5/27 16:13
     */
    @Configuration
    @AutoConfigureAfter(RedisAutoConfiguration.class)
    public class RedisConfig {
    
    
    
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageListenerAdapter listenerAdapter2)
        {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            //可以添加多个 messageListener
            //可以对 messageListener对应的适配器listenerAdapter  指定本适配器 适配的消息类型  是什么
            //在发布的地方 对应发布的redisTemplate.convertAndSend("user",msg);  那这边的就对应的可以消费到指定类型的 订阅消息
            container.addMessageListener(listenerAdapter, new PatternTopic("user"));
            container.addMessageListener(listenerAdapter2, new PatternTopic("goods"));
    
            return container;
        }
    
    
        /**
         * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
         *
         * receiveMessage 是默认监听方法 一般不变
         * @param redisReceiver redis消息处理器,自定义的
         * @return
         */
        @Bean
        MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
            System.out.println("消息适配器1进来了");
            return new MessageListenerAdapter(redisReceiver, "receiveMessage");
        }
    
    
        /**
         * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
         *
         * receiveMessage 是默认监听方法 一般不变
         * @param redisReceiver2 redis消息处理器,自定义的
         * @return
         */
        @Bean
        MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) {
            System.out.println("消息适配器2进来了");
            return new MessageListenerAdapter(redisReceiver2, "receiveMessage");
        }
    
    
    
        //使用默认的工厂初始化redis操作模板
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
            return new StringRedisTemplate(connectionFactory);
        }
    
    
        @Bean
        public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate redisTemplate = new RedisTemplate();
            redisTemplate.setConnectionFactory(factory);
            RedisSerializer keySerializer = new StringRedisSerializer();
    //        RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
            //key采用字符串反序列化对象
            redisTemplate.setKeySerializer(keySerializer);
            //value也采用字符串反序列化对象
            //原因:管道操作,是对redis命令的批量操作,各个命令返回结果可能类型不同
            //可能是 Boolean类型 可能是String类型 可能是byte[]类型 因此统一将结果按照String处理
            redisTemplate.setValueSerializer(keySerializer);
            return redisTemplate;
        }
    
    
    
    
    
    
    
    
    
    }
    View Code

    3.创建几个消息处理器[处理消息订阅者  接收到消息后的业务]

    package com.sxd.swapping.redisReceiver;
    
    import org.springframework.stereotype.Service;
    
    /**
     *
     * redis 订阅发布  消息接收器/处理器
     * @author sxd
     * @date 2019/5/30 17:12
     */
    @Service
    public class RedisReceiver {
    
        public void receiveMessage(String message) {
            System.out.println("消息处理器1>我处理用户信息:"+message);
            //这里是收到通道的消息之后执行的方法
            //此处执行接收到消息后的 业务逻辑
        }
    }
    View Code
    package com.sxd.swapping.redisReceiver;
    
    import org.springframework.stereotype.Service;
    
    /**
     * redis 订阅发布  消息接收器/处理器2
     * @author sxd
     * @date 2019/5/30 17:15
     */
    @Service
    public class RedisReceiver2 {
    
        public void receiveMessage(String message) {
            System.out.println("消息处理器2>我处理商品信息:"+message);
            //这里是收到通道的消息之后执行的方法
            //此处执行接收到消息后的 业务逻辑
        }
    }
    View Code

    4.消息发布controller

    @Autowired
        RedisTemplate redisTemplate;
    
    
        /**
         * redis 发布订阅pubsub
         */
        @RequestMapping(value = "/redisPubSub")
        public void redisPubSub(String msg){
            if (msg.contains("用户")){
                redisTemplate.convertAndSend("user",msg);
            }else {
                redisTemplate.convertAndSend("goods",msg);
            }
        }
    View Code

    测试:

    发送请求:http://localhost:9666/redistest/redisPubSub?msg=用户---德玛西亚的用户

    结果:

    消息处理器1>我处理用户信息:用户---德玛西亚的用户

    发送请求:http://localhost:9666/redistest/redisPubSub?msg=goods---德玛西亚的用商品

    结果:

    消息处理器2>我处理商品信息:goods---德玛西亚的用商品

  • 相关阅读:
    11、sqlite
    10、正则
    9、bs4
    8、异常与import
    7、文件
    6、函数
    4、字典及集合
    3、元组
    1、python基本语法
    shell编程 15 --- shell 脚本调试技巧
  • 原文地址:https://www.cnblogs.com/sxdcgaq8080/p/10953693.html
Copyright © 2011-2022 走看看