zoukankan      html  css  js  c++  java
  • 三、RabbitMQ的交换机类型

    四种类型:

    1.FanoutExchange(扇型交换机)-全部路由
    广播模式或者订阅模式,可以同时绑定多个queue, 发送消息时,无需指定Routing Key
     适用场景:广播,群聊,新闻推送
    2.DirectExchange(直连交换机)- 根据Routing Key路由
     直连,通过Routing Key绑定queue,当发送消息到交换机时,会根据配置的Routing Key路由到不同的queue中
     当N个queue的Routing Key相同时,消息会被同时路由到这N个queue中去
    3.HeadersExchange(头交换机)- 根据header的匹配规则路由
    设置交换机的匹配header的规则,支持单个精确匹配where,部分匹配whereAny,全部匹配whereAll,根据匹配的结果路由到相应的queue中
    4.TopicExchange(主题交换机)
    主题交换机通过有匹配规则的路由键和队列绑定,*.a.*,#.b.#,*代表匹配任意一个单词,#代表匹配任意一个或多个单词
    发送消息时,设置路由键,如设置为l.a.m,则会被路由到绑定*.a.*的队列去,设置为l.b则会被路由到绑定#.b.#的队列去,设置为l.a.b.c会同时到两个队列中,设置l.m则会被丢弃

    PS:CustomExchange不是一种固定的类型,是用来配合插件一起使用的,具体参考上篇https://www.cnblogs.com/Hleaves/p/13594278.html

    测试:

    //初始化交换机和队列信息

    import org.springframework.amqp.core.*;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class MQExchangeConfig {
    
        public static final String DIRECT_EXCHANGE_NAME = "direct-Exchange";
        public static final String FANOUT_EXCHANGE_NAME = "fanout-Exchange";
        public static final String CUSTOM_EXCHANGE_NAME = "custom-Exchange";
        public static final String HEADERS_EXCHANGE_NAME = "headers-Exchange";
        public static final String TOPIC_EXCHANGE_NAME = "topic-Exchange";
        public static final String QUEUEA_NAME = "queueA";
        public static final String QUEUEB_NAME = "queueB";
        public static final String ROUTING_KEY_A_NAME = "routingKeyA";
        public static final String ROUTING_KEY_B_NAME = "routingKeyB";
    
    
        @Bean
        DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE_NAME);
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE_NAME);
        }
    
        @Bean
        CustomExchange customExchange() {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(CUSTOM_EXCHANGE_NAME, "x-delayed-message", true, false, args);
        }
    
        @Bean
        HeadersExchange headersExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("HeaderA", "aaa");
            args.put("HeaderB", "bbb");
            return (HeadersExchange) ExchangeBuilder.headersExchange(HEADERS_EXCHANGE_NAME).withArguments(args).build();
        }
    
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE_NAME);
        }
    
        @Bean
        Queue queueA() {
            return new Queue(QUEUEA_NAME);
        }
    
        @Bean
        Queue queueB() {
            return new Queue(QUEUEB_NAME);
        }
    
        @Bean
        Binding bindingAD(Queue queueA, DirectExchange directExchange) {
            return BindingBuilder.bind(queueA).to(directExchange).with(ROUTING_KEY_A_NAME);
        }
    
        @Bean
        Binding bindingAF(Queue queueA, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueA).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingAC(Queue queueA, CustomExchange customExchange) {
            return BindingBuilder.bind(queueA).to(customExchange).with(ROUTING_KEY_A_NAME).noargs();
        }
    
        @Bean
        Binding bindingAH(Queue queueA, HeadersExchange headersExchange) {
            //精确匹配
            // return BindingBuilder.bind(queueA).to(headersExchange).where("Header").matches("ccc");
            //部分匹配
            return BindingBuilder.bind(queueA).to(headersExchange).whereAny("HeaderA", "HeaderB").exist();
        }
    
        @Bean
        Binding bindingAT(Queue queueA, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueA).to(topicExchange).with("*." + ROUTING_KEY_A_NAME);
        }
    
        @Bean
        Binding bindingBD(Queue queueB, DirectExchange directExchange) {
            return BindingBuilder.bind(queueB).to(directExchange).with(ROUTING_KEY_B_NAME);
        }
    
        @Bean
        Binding bindingBF(Queue queueB, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueB).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingBC(Queue queueB, CustomExchange customExchange) {
            return BindingBuilder.bind(queueB).to(customExchange).with(ROUTING_KEY_B_NAME).noargs();
        }
    
        @Bean
        Binding bindingBH(Queue queueB, HeadersExchange headersExchange) {
            //全部匹配
            return BindingBuilder.bind(queueB).to(headersExchange).whereAll("HeaderA", "HeaderB").exist();
        }
    
        @Bean
        Binding bindingBT(Queue queueB, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueB).to(topicExchange).with("#." + ROUTING_KEY_B_NAME);
        }
    
        //先初始化队列
        @Bean
        @ConditionalOnBean(Queue.class)
        MQExchangeConsumer mqExchangeConsumer() {
            return new MQExchangeConsumer();
        }
    
    }

    //发送消息

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import static com.mhou.rabbitmq.exchange.MQExchangeConfig.*;
    
    @RestController
    @Slf4j
    public class MQExchangeSender {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/sendDiffExchange")
        public void sendDiffExchange(String exchangeType, String msg) {
            log.info("msg-{}", msg);
            switch (exchangeType) {
                case "f":
                    //routingkey的指定没有实际意义,可以为空
                    rabbitTemplate.convertSendAndReceive(FANOUT_EXCHANGE_NAME, "", msg + "a");
                    rabbitTemplate.convertSendAndReceive(FANOUT_EXCHANGE_NAME, "", msg + "b");
                    break;
                case "c":
                    rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a", message -> {
                        message.getMessageProperties().setDelay(10000);
                        return message;
                    });
                    rabbitTemplate.convertSendAndReceive(CUSTOM_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b", message -> {
                        message.getMessageProperties().setHeader("x-delay", 20000);
                        return message;
                    });
                    break;
                case "h":
                    rabbitTemplate.convertSendAndReceive(HEADERS_EXCHANGE_NAME, "", msg + "a", message -> {
                        message.getMessageProperties().setHeader("HeaderA", "aaa");
                        return message;
                    });
                    rabbitTemplate.convertSendAndReceive(HEADERS_EXCHANGE_NAME, "", msg + "b", message -> {
                        message.getMessageProperties().setHeader("HeaderA", "aaa");
                        message.getMessageProperties().setHeader("HeaderB", "bbb");
                        return message;
                    });
                    break;
                case "t":
                    rabbitTemplate.convertSendAndReceive(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A_NAME + "." + ROUTING_KEY_B_NAME, msg + "a");
                    rabbitTemplate.convertSendAndReceive(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A_NAME + "." + ROUTING_KEY_B_NAME, msg + "b");
                    break;
                default:
                    rabbitTemplate.convertSendAndReceive(DIRECT_EXCHANGE_NAME, ROUTING_KEY_A_NAME, msg + "a");
                    rabbitTemplate.convertSendAndReceive(DIRECT_EXCHANGE_NAME, ROUTING_KEY_B_NAME, msg + "b");
            }
        }
    }

    //结果

    1.FanoutExchange ,发送到交换机的消息可以同时被路由到AB队列

     2.HeadersExchange, 消息头部含有HeaderA或者HeaderB的消息会被路由到A队列,同时含有HeaderA和HeaderB的消息会被路由到B队列

     3.TopicExchange, 路由键routingKeyA.routingKeyB 匹配 #.routingKeyB,不匹配*.routingKeyA ,因此两条消息都会被路由到B队列

     4.DirectExchange,直接根据设置的路由键绑定队列

  • 相关阅读:
    项目源码--Android迷幻岛屿综合游戏
    实例源码--Android软件更新模块
    实例源码--Android小工具源码
    项目源码--Android3D影音播放器源码
    实例源码--Android时钟源码
    实例源码--Android简单音乐播放器源码
    项目源码--Android应用商店源码
    实例源码--Android理财工具源码
    实例源码--Android手机狗(防盗)源码
    谈事务的理解
  • 原文地址:https://www.cnblogs.com/Hleaves/p/13577178.html
Copyright © 2011-2022 走看看