zoukankan      html  css  js  c++  java
  • redis消息队列——发布订阅

    一、相关依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    

    二、redis 监听器配置

    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    
    
    @Configuration
    @EnableCaching
    public class RedisConfig{
        /**
         * Redis消息监听器容器
         * @param connectionFactory
         * @return
         */
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //订阅了一个叫pmp和channel 的通道,多通道
            container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));
            container.addMessageListener(listenerAdapter(new RedisChannelSub()),new PatternTopic("channel"));
            //这个container 可以添加多个 messageListener
            return container;
        }
    
        /**
         * 配置消息接收处理类
         * @param redisMsg  自定义消息接收类
         * @return
         */
        @Bean()
        @Scope("prototype")
        MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
            //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
            //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
            return new MessageListenerAdapter(redisMsg, "receiveMessage");//注意2个通道调用的方法都要为receiveMessage
        }
    
    }
    

    三、消息处理器

    import org.springframework.stereotype.Component;
    
    
    @Component
    public interface RedisMsg {
    
        public void receiveMessage(String message);
    }
    
    
    public class RedisPmpSub implements RedisMsg{
    
        /**
         * 接收消息的方法
         * @param message 订阅消息
         */
        public void receiveMessage(String message){
            //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
            System.out.println(message);
        }
    }
    

    消息发布者

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    //定时器
    @EnableScheduling
    @Component
    public class TestSenderController {
        @Autowired
            private StringRedisTemplate stringRedisTemplate;
    
        //向redis消息队列index通道发布消息
        @Scheduled(fixedRate = 2000)
        public void sendMessage(){
            stringRedisTemplate.convertAndSend("pmp",String.valueOf(Math.random()));
            stringRedisTemplate.convertAndSend("channel",String.valueOf(Math.random()));
        }
    }

    备注: 使用策略模式优化消息处理器,模板化建立通道

    参考链接 https://www.cnblogs.com/IT-study/p/11352254.html  

  • 相关阅读:
    JavaScript cookie详解
    Javascript数组的排序:sort()方法和reverse()方法
    javascript中write( ) 和 writeln( )的区别
    div做表格
    JS 盒模型 scrollLeft, scrollWidth, clientWidth, offsetWidth 详解
    Job for phpfpm.service failed because the control process exited with error code. See "systemctl status phpfpm.service" and "journalctl xe" for details.
    orm查询存在价格为空问题
    利用救援模式破解系统密码
    SSH服务拒绝了密码
    C# 调用 C++ DLL 中的委托,引发“对XXX::Invoke类型的已垃圾回收委托进行了回调”错误的解决办法
  • 原文地址:https://www.cnblogs.com/yuarvin/p/14539537.html
Copyright © 2011-2022 走看看