zoukankan      html  css  js  c++  java
  • 【rabbitmq】之Exchange

    rabbitmq常用Exchange有3个,Direct,Topic,Fanout

    全局配置文件

    spring.rabbitmq.host=dev-mq.ttsingops.com
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=xxx
    spring.rabbitmq.password=xxxxx
    spring.rabbitmq.virtual-host=/cd

    三个完整交换机配置

    @Configuration
    public class RabbitMQExchangeConfig {
    
    
        /**
         * direct sexchange 点对点  完全匹配
         */
        public static final String DIRECT_EXCHANGE = "direct_exchange";
    
        /**
         * topic_exchage 点对点 规则匹配
         */
        public static final String TOPIC_EXCHANGE = "topic_exchange";
    
        /**
         * fanout_exchage 广播
         */
        public static final String FANOUT_EXCHANGE = "fanout_exchange";
    
    
        @Bean("directExchange")
        public DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE, true, false);
        }
    
        @Bean("topicExchange")
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
        @Bean("fanoutExchange")
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE, true, false);
        }
    
    }

    RabbitmqTemplate配置

    @Configuration
    public class RabbitMQConfig {
    
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
             rabbitTemplate.setMandatory(true);
            rabbitTemplate.setMessageConverter(jsonMessageConverter());
            return rabbitTemplate;
        }
    
    }

    DirectExchange

    可以理解为发布/订阅,点对点的一种交换机,A发消息,B消费消息。是一种完全匹配的交换机

    配置DirectExchange  绑定direct_queue 绑定direct_queue_key

    /********************direct************************/
        @Bean("directQueue")
        public Queue directQueue() {
            return new Queue("direct_queue", true, false, false);
        }
    
        @Bean("directBind")
        public Binding directBind(@Autowired @Qualifier("directExchange") DirectExchange directExchange) {
            return BindingBuilder.bind(directQueue()).to(directExchange).with("direct_queue_key");
        }

    发送DirectMQ消息

    @GetMapping("direct")
        public String direct(String direct) {
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.DIRECT_EXCHANGE, "direct_queue_key", direct);
            return "ok";
        }

    监听direct_queue消息

    @RabbitListener(queues = {"#{directQueue.name}"})
        public void directQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
    
            log.info("msg:{},mq.message:{}", msg, message.toString());
    
        }

    image

    TopicExchange

    TopicExchange配置

    /********************topic1,************************/
        @Bean("topicQueue")
        public Queue topicQueue() {
            return new Queue("topic_queue", true, false, false);
        }
        /**
         *  * 是单个匹配
         * @param topicExchange
         * @return
         */
        @Bean("topicBind")
        public Binding topicBind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
            return BindingBuilder.bind(topicQueue()).to(topicExchange).with("*.queue.key");
        }
    
        /********************topic2************************/
        @Bean("topic2Queue")
        public Queue topic2Queue() {
            return new Queue("topic_queue", true, false, false);
        }
    
        /**
         *  #可以多个匹配
         * @param topicExchange
         * @return
         */
        @Bean("topic2Bind")
        public Binding topic2Bind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
            return BindingBuilder.bind(topicQueue()).to(topicExchange).with("#.queue.key");
        }

    发送TopicExchange消息

    @GetMapping("topic")
        public String topic(String topic) {
            //* 单个匹配
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.queue.key", topic);
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.queue.key", topic);
            //# 多个匹配
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.1.queue.key", topic);
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.1.queue.key", topic);
            return "ok";
        }

    监听TopicExchange消息

    @RabbitListener(queues = {"#{topicQueue.name}"})
        public void topicQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
            log.info("topicQueue,msg:{},mq.message:{}", msg, message.toString());
        }
    
    
        @RabbitListener(queues = {"#{topic2Queue.name}"})
        public void topic2Queue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
            log.info("topic2Queue,msg:{},mq.message:{}", msg, message.toString());
    
        }

    image

    FanoutExchange

    /********************fanout************************/
        @Bean("fanoutQueue")
        public Queue fanoutQueue() {
            return new Queue("fanout_queue", true, false, false);
        }
    
        @Bean("fanoutBind")
        public Binding fanoutBind(@Autowired @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange);
        }
    @GetMapping("fanout")
        public String fanout(String fanout) {
            rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.FANOUT_EXCHANGE,"",fanout);
            return "ok";
        }
    @RabbitListener(queues = {"#{fanoutQueue.name}"})
        public void fanoutQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
    
            log.info("msg:{},mq.message:{}", msg, message.toString());
    
    
        }

    image

    以上就是Rabbitmq三个常用Exchange的基本用法。

  • 相关阅读:
    四、Ubuntu16.04下TestLink的部署【测试管理必备工具】
    配置反向代理服务器
    三、Ubuntu16.04 安装Jira8.2.2(自带中文包)和破解
    二、Ubuntu16.04安装搜狗wps
    【C#实现漫画算法系列】-判断 2 的乘方
    [Entity Framework+MVC复习总结1]-WebForm与Asp.Net MVC
    【数据结构总结1】-数据结构的自述
    快速理解区块链
    CSS容器属性
    CSS background-clip 属性
  • 原文地址:https://www.cnblogs.com/gyjx2016/p/13622097.html
Copyright © 2011-2022 走看看