zoukankan      html  css  js  c++  java
  • SpringBoot中使用Redis的发布/订阅模式

    redis的发布订阅模式,使发布者和订阅者完全解耦 

     1.pom.xml and application.properties

    <!-- 引入redis -->
     
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>io.lettuce</groupId>
                        <artifactId>lettuce-core</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
     
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
    spring:
       redis:
         #数据库索引
         database: 5
         host: 127.0.0.1
         port: 6379
         password: 123456
         jedis:
           pool:
             #最大连接数
             max-active: 8
             #最大阻塞等待时间(负数表示没限制)
             #最大空闲
             max-idle: 8
             #最小空闲
             min-idle: 0

    2.消息发布者、消息处理者POJO、redis消息监听器容器以及redis监听器注入IOC容器

    @Configuration //相当于xml中的beans
    public class RedisConfig {
     
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean //相当于xml中的bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageListenerAdapter listenerAdapter2) {
     
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //订阅了一个叫chat 的通道
            container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
            container.addMessageListener(listenerAdapter2, new PatternTopic("chat2"));
            //这个container 可以添加多个 messageListener
            return container;
        }
     
        /**
         * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
         * @param receiver
         * @return
         */
        @Bean
        MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
            //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
            return new MessageListenerAdapter(receiver, "receiveMessage");
        }
        
        @Bean
        MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage2”
            //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
            return new MessageListenerAdapter(receiver, "receiveMessage2");
        }
     
        /**redis 读取内容的template */
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
            return new StringRedisTemplate(connectionFactory);
        }
        
        @Bean
        RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
     
             Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
             ObjectMapper om = new ObjectMapper();
             om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
             om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
             jackson2JsonRedisSerializer.setObjectMapper(om);
             RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
             template.setConnectionFactory(connectionFactory);
             template.setKeySerializer(jackson2JsonRedisSerializer);
             template.setValueSerializer(jackson2JsonRedisSerializer);
             template.setHashKeySerializer(jackson2JsonRedisSerializer);
             template.setHashValueSerializer(jackson2JsonRedisSerializer);
             template.afterPropertiesSet();
             return template;
     
        }
     
    }

    MessageListenerAdapter通过反射使普通的POJO就可以处理消息。具体情况见MessageListenerAdapter的onMessage方法。

    3.消息发布者

    @EnableScheduling //开启定时器功能
    @Component
    public class MessageSender {
     
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
     
        @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息
        public void sendMessage(){
            stringRedisTemplate.convertAndSend("chat",String.valueOf(Math.random()));
            stringRedisTemplate.convertAndSend("chat2",String.valueOf(Math.random()));
        }
    }

    4.普通的消息处理器POJO

    @Component
    public class MessageReceiver {
     
        /**接收消息的方法*/
        public void receiveMessage(String message){
            System.out.println("收到一条chat的消息:"+message);
        }
        
        /**接收消息的方法*/
        public void receiveMessage2(String message){
            System.out.println("收到一条chat2的消息:"+message);
        }
     
    }

    MessageListenerAdapter通过反射调用receiveMessage方法处理消息


    5.其他方式(参考)

    配置

    package com.example.day3_30.redisConfiBack;
     
    /**
     * redis消息队列配置-订阅者
     */
     
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
     
    import java.util.concurrent.CountDownLatch;
     
    /**
     * redis消息队列配置-订阅者
     */
    @Configuration
    public class RedisMessageListener {
        /**
         * 创建连接工厂
         * @param connectionFactory
         * @param listenerAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                       MessageListenerAdapter listenerAdapter,
                                                       MessageListenerAdapter listenerAdapterTest2){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //接受消息的key
            container.addMessageListener(listenerAdapter,new PatternTopic("phone"));
            container.addMessageListener(listenerAdapterTest2,new PatternTopic("phoneTest2"));
            return container;
        }
     
     
        /**
         * 绑定消息监听者和接收监听的方法
         * @param receiver
         * @return
         */
        @Bean
        public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage  receiver){
            return new MessageListenerAdapter(receiver,"receiveMessage");
        }
        /**
         * 绑定消息监听者和接收监听的方法
         * @param receiver
         * @return
         */
        @Bean
        public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage  receiver){
            return new MessageListenerAdapter(receiver,"receiveMessage2");
        }
        /**
         * 注册订阅者
         * @param latch
         * @return
         */
        @Bean
        ReceiverRedisMessage receiver(CountDownLatch latch) {
            return new ReceiverRedisMessage(latch);
        }
     
     
        /**
         * 计数器,用来控制线程
         * @return
         */
        @Bean
        public CountDownLatch latch(){
            return new CountDownLatch(1);//指定了计数的次数 1
        }
    }

    消息处理

    package com.example.day3_30.redisConfiBack;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
     
    import java.util.concurrent.CountDownLatch;
     
    /**
     * 注入消息接受类
     */
    public class ReceiverRedisMessage {
        private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class);
        private CountDownLatch latch;
     
        @Autowired
        public ReceiverRedisMessage(CountDownLatch latch) {
            this.latch = latch;
        }
     
     
        /**
         * 队列消息接收方法
         *
         * @param jsonMsg
         */
        public void receiveMessage(String jsonMsg) {
            log.info("[开始消费REDIS消息队列phone数据...]");
            try {
                System.out.println(jsonMsg);
                log.info("[消费REDIS消息队列phone数据成功.]");
            } catch (Exception e) {
                log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage());
            }
            latch.countDown();
        }
     
        /**
         * 队列消息接收方法
         *
         * @param jsonMsg
         */
        public void receiveMessage2(String jsonMsg) {
            log.info("[开始消费REDIS消息队列phoneTest2数据...]");
            try {
                System.out.println(jsonMsg);
                /**
                 *  此处执行自己代码逻辑 例如 插入 删除操作数据库等
                 */
     
                log.info("[消费REDIS消息队列phoneTest2数据成功.]");
            } catch (Exception e) {
                log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage());
            }
            latch.countDown();
        }
     
    }

    测试

    package com.example.day3_30.redisConfiBack;
     
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
     
    @RestController
    @RequestMapping
    public class PublisherController {
        private static final Logger log = LoggerFactory.getLogger(PublisherController.class);
        @Autowired
        private RedisTemplate redisTemplate;
        @GetMapping(value = "pub/{id}")
        public String pubMsg(@PathVariable String id){
            redisTemplate.convertAndSend("phone","223333");
            redisTemplate.convertAndSend("phoneTest2","34555665");
            log.info("Publisher sendes Topic... ");
            return "success";
        }
    }
  • 相关阅读:
    Go标准库Context
    事务并发处理: DB+ORM+逻辑代码
    日志:slf4j+log4j+maven配置
    Shiro workshop
    JSP Workshop
    sql records
    Java内存模型(JMM)
    Application, JDBC, 数据库连接池, Session, 数据库的关系
    Java位操作全面总结
    Effective Java总结
  • 原文地址:https://www.cnblogs.com/xiejn/p/15698826.html
Copyright © 2011-2022 走看看