zoukankan      html  css  js  c++  java
  • SpringBoot中使用Redis的发布/订阅模式

    redis的发布订阅模式,使发布者和订阅者完全解耦 

     1.pom.xml and application.properties

    <!-- 引入redis -->
     
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>io.lettuce</groupId>
                        <artifactId>lettuce-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
     
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
    spring:
       redis:
         #数据库索引
         database: 5
         host: 127.0.0.1
         port: 6379
         password: 123456
         jedis:
           pool:
             #最大连接数
             max-active: 8
             #最大阻塞等待时间(负数表示没限制)
             #最大空闲
             max-idle: 8
             #最小空闲
             min-idle: 0

    2.消息发布者、消息处理者POJO、redis消息监听器容器以及redis监听器注入IOC容器

    @Configuration //相当于xml中的beans
    public class RedisConfig {
     
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean //相当于xml中的bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageListenerAdapter listenerAdapter2) {
     
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //订阅了一个叫chat 的通道
            container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
            container.addMessageListener(listenerAdapter2, new PatternTopic("chat2"));
            //这个container 可以添加多个 messageListener
            return container;
        }
     
        /**
         * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
         * @param receiver
         * @return
         */
        @Bean
        MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
            //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
            return new MessageListenerAdapter(receiver, "receiveMessage");
        }
        
        @Bean
        MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage2”
            //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
            return new MessageListenerAdapter(receiver, "receiveMessage2");
        }
     
        /**redis 读取内容的template */
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
            return new StringRedisTemplate(connectionFactory);
        }
        
        @Bean
        RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
     
             Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
             ObjectMapper om = new ObjectMapper();
             om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
             om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
             jackson2JsonRedisSerializer.setObjectMapper(om);
             RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
             template.setConnectionFactory(connectionFactory);
             template.setKeySerializer(jackson2JsonRedisSerializer);
             template.setValueSerializer(jackson2JsonRedisSerializer);
             template.setHashKeySerializer(jackson2JsonRedisSerializer);
             template.setHashValueSerializer(jackson2JsonRedisSerializer);
             template.afterPropertiesSet();
             return template;
     
        }
     
    }

    MessageListenerAdapter通过反射使普通的POJO就可以处理消息。具体情况见MessageListenerAdapter的onMessage方法。

    3.消息发布者

    @EnableScheduling //开启定时器功能
    @Component
    public class MessageSender {
     
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
     
        @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息
        public void sendMessage(){
            stringRedisTemplate.convertAndSend("chat",String.valueOf(Math.random()));
            stringRedisTemplate.convertAndSend("chat2",String.valueOf(Math.random()));
        }
    }

    4.普通的消息处理器POJO

    @Component
    public class MessageReceiver {
     
        /**接收消息的方法*/
        public void receiveMessage(String message){
            System.out.println("收到一条chat的消息:"+message);
        }
        
        /**接收消息的方法*/
        public void receiveMessage2(String message){
            System.out.println("收到一条chat2的消息:"+message);
        }
     
    }

    MessageListenerAdapter通过反射调用receiveMessage方法处理消息


    5.其他方式(参考)

    配置

    package com.example.day3_30.redisConfiBack;
     
    /**
     * redis消息队列配置-订阅者
     */
     
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
     
    import java.util.concurrent.CountDownLatch;
     
    /**
     * redis消息队列配置-订阅者
     */
    @Configuration
    public class RedisMessageListener {
        /**
         * 创建连接工厂
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                       MessageListenerAdapter listenerAdapter,
                                                       MessageListenerAdapter listenerAdapterTest2){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //接受消息的key
            container.addMessageListener(listenerAdapter,new PatternTopic("phone"));
            container.addMessageListener(listenerAdapterTest2,new PatternTopic("phoneTest2"));
            return container;
        }
     
     
        /**
         * 绑定消息监听者和接收监听的方法
         * @param receiver
         * @return
         */
        @Bean
        public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage  receiver){
            return new MessageListenerAdapter(receiver,"receiveMessage");
        }
        /**
         * 绑定消息监听者和接收监听的方法
         * @param receiver
         * @return
         */
        @Bean
        public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage  receiver){
            return new MessageListenerAdapter(receiver,"receiveMessage2");
        }
        /**
         * 注册订阅者
         * @param latch
         * @return
         */
        @Bean
        ReceiverRedisMessage receiver(CountDownLatch latch) {
            return new ReceiverRedisMessage(latch);
        }
     
     
        /**
         * 计数器,用来控制线程
         * @return
         */
        @Bean
        public CountDownLatch latch(){
            return new CountDownLatch(1);//指定了计数的次数 1
        }
    }

    消息处理

    package com.example.day3_30.redisConfiBack;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
     
    import java.util.concurrent.CountDownLatch;
     
    /**
     * 注入消息接受类
     */
    public class ReceiverRedisMessage {
        private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class);
        private CountDownLatch latch;
     
        @Autowired
        public ReceiverRedisMessage(CountDownLatch latch) {
            this.latch = latch;
        }
     
     
        /**
         * 队列消息接收方法
         *
         * @param jsonMsg
         */
        public void receiveMessage(String jsonMsg) {
            log.info("[开始消费REDIS消息队列phone数据...]");
            try {
                System.out.println(jsonMsg);
                log.info("[消费REDIS消息队列phone数据成功.]");
            } catch (Exception e) {
                log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage());
            }
            latch.countDown();
        }
     
        /**
         * 队列消息接收方法
         *
         * @param jsonMsg
         */
        public void receiveMessage2(String jsonMsg) {
            log.info("[开始消费REDIS消息队列phoneTest2数据...]");
            try {
                System.out.println(jsonMsg);
                /**
                 *  此处执行自己代码逻辑 例如 插入 删除操作数据库等
                 */
     
                log.info("[消费REDIS消息队列phoneTest2数据成功.]");
            } catch (Exception e) {
                log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage());
            }
            latch.countDown();
        }
     
    }

    测试

    package com.example.day3_30.redisConfiBack;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
     
    @RestController
    @RequestMapping
    public class PublisherController {
        private static final Logger log = LoggerFactory.getLogger(PublisherController.class);
        @Autowired
        private RedisTemplate redisTemplate;
        @GetMapping(value = "pub/{id}")
        public String pubMsg(@PathVariable String id){
            redisTemplate.convertAndSend("phone","223333");
            redisTemplate.convertAndSend("phoneTest2","34555665");
            log.info("Publisher sendes Topic... ");
            return "success";
        }
    }
  • 相关阅读:
    洛谷 P1508 Likecloud-吃、吃、吃
    Codevs 1158 尼克的任务
    2017.10.6 国庆清北 D6T2 同余方程组
    2017.10.6 国庆清北 D6T1 排序
    2017.10.3 国庆清北 D3T3 解迷游戏
    2017.10.3 国庆清北 D3T2 公交车
    2017.10.3 国庆清北 D3T1 括号序列
    2017.10.4 国庆清北 D4T1 财富
    2017.10.7 国庆清北 D7T2 第k大区间
    2017.10.7 国庆清北 D7T1 计数
  • 原文地址:https://www.cnblogs.com/xiejn/p/15698826.html
Copyright © 2011-2022 走看看