zoukankan      html  css  js  c++  java
  • RabbitMQ的Topic模式发送与接收消息

    一、RabbitMQ的工作模式

    rabbitMQ总共有六种工作模式:simple简单模式、work工作模式、publish/subscribe发布订阅模式、routing路由模式、topic主题模式

    routing模式:

     topic主题模式:

     可以看出,topic模式为一种特殊的routing模式,通过图一可以看到,routing模式通过唯一的routingKey将交换机与队列绑定起来,只有当消息的路由键routing key 与队列的绑定键binging key匹配时,该消息才会进入该队列。topic模式也叫通配符模式,除了bindingkey 与routing key匹配规则不一样外,作用和topic模式一样,特殊匹配原则:

    * 表示一个单词

    # 表示任意个单词

    简单使用方法如下:

    application.properties:

    spring.rabbitmq.host=xxx.xxx.xxx.xxx
    spring.rabbitmq.username=liufuqiang
    spring.rabbitmq.password=wapj1314
    spring.rabbitmq.port=5672

    RabbitmqConfig.java

    新建队列

    @Bean
        Queue customerQueue(){
           return new Queue("liufuqiang_customer");
        }

    交换机:

        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("liufuqiang_topic_exchange");
        }

    将customerQueue与topicExchange绑定起来:

        @Bean
        Binding myTopicBinding(){
            Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*");
            return binding;
        }
    package com.example.rabbitmqdemo.config;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.converter.MappingJackson2MessageConverter;
    import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
    import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
    
    @Configuration
    public class RabbitmqConfig implements RabbitListenerConfigurer {
    
        @Value("${spring.rabbitmq.host}")
        private String rabbitmqHost;
    
        @Value("${spring.rabbitmq.username}")
        private String rabbitmqUsername;
    
        @Value("${spring.rabbitmq.password}")
        private String rabbitmqPassword;
    
        @Value("${spring.rabbitmq.port}")
        private int rabbitmqPort;
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
            rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    
        @Bean
        MessageHandlerMethodFactory messageHandlerMethodFactory(){
            DefaultMessageHandlerMethodFactory defaultMessageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
            defaultMessageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
            return defaultMessageHandlerMethodFactory;
        }
    
        @Bean
        public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
            return new MappingJackson2MessageConverter();
        }
    
        @Bean
        Queue customerQueue(){
           return new Queue("liufuqiang_customer");
        }
    
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("liufuqiang_topic_exchange");
        }
    
    
        @Bean
        Binding myTopicBinding(){
            Binding binding = BindingBuilder.bind(customerQueue()).to(topicExchange()).with("liufuqiang_rountingKey.*");
            return binding;
        }
    
        /***
         * 延迟队列
         * @return
         */
        @Bean
        Queue myDelayQueue() {
            return new Queue("liufuqiang_queue_delay");
        }
    
        @Bean
        TopicExchange topicExchangeDelay() {
            return new TopicExchange("liufuqiang_topic_exchange_delay");
        }
    
        @Bean
        Binding myTopicBindingDelay() {
            Binding binding = BindingBuilder.bind(myDelayQueue()).to(topicExchangeDelay()).with("liufuqiang_delay_routing");
            return binding;
        }
    
        @Bean
        public com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory() {
            com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setUsername(rabbitmqUsername);
            connectionFactory.setPassword(rabbitmqPassword);
            connectionFactory.setHost(rabbitmqHost);
            connectionFactory.setPort(rabbitmqPort);
            connectionFactory.setVirtualHost("/");
            return connectionFactory;
        }
    
        @Bean("rabbitListenerContainerFactory")
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setPrefetchCount(2);
            factory.setConcurrentConsumers(3);
    //        factory.setRecoveryInterval(recoveryInterval);
            configurer.configure(factory,  connectionFactory);
            return factory;
        }
    
    
    
    }

    二、生成者发送消息:

    RabbitProducer.java

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void firstMessage(){
            Map<String, Object> message = new HashMap<>();
            message.put("name", "liufuqiang");
            message.put("age", "23");
            rabbitTemplate.convertAndSend("liufuqiang_topic_exchange", "liufuqiang_rountingKey.random", JSONUtils.toJSONString(message));
        }

    调用发送消息接口,开始发送消息

     可以在管理界面看到交换机已新建完成,模式为topic模式

     队列liufuqiang_customer与交换机通过liufuqiang_rountingKey.*绑定起来,

     可以看到,刚才我们发送消息的rounting Key为liufuqiang_rountingKey.random,通过*匹配到,并且将消息发送给队列。

    三、消费消息

    RabbitConsumer.java

    @Component
    public class RabbitmqConsumer {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitmqConsumer.class);
    
        @RabbitListener(queues = "liufuqiang_customer")
        @RabbitHandler
        //@Payload
        public void getRabbitmqMessage(String message){
            System.out.println(message);
            logger.warn("获取消息{}", message);
        }
    }

    可以看到队列liufuqiang_customer里的消息已被消费

  • 相关阅读:
    6th week blog3
    6th week blog2(颜色)
    6th week blog1(CSS单位)
    布局—一侧固定宽度,一侧自适应
    布局—两侧固定,中间自适应
    九宫格
    选项卡
    缓冲运动框架
    封装一些常用的js工具函数-不定时更新(希望大家积极留言,反馈bug^_^)
    在一定[min,max]区间,生成n个不重复的随机数的封装函数
  • 原文地址:https://www.cnblogs.com/LiuFqiang/p/14283136.html
Copyright © 2011-2022 走看看