zoukankan      html  css  js  c++  java
  • rabbitmq(三)- 交换机

    rabbitmq 交换机有4种

    1. direct-它会把消息路由到那些 BindingKey RoutingKey完全匹配的队列中。
    2. fanout-它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
    3. topic-将消息路由到 BindingKey RoutingKey 相匹配的队列中。
    4. head-交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中headers 属性进行匹配。

    实战使用。

    这里以springboot为例子
    pom文件

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    1、direct

    direct交换机,它会把消息路由到那些 BindingKey RoutingKey完全匹配的队列中。

    声明队列
    声明交换机
    声明绑定

    @Configuration
    public class RabbitMqConfig implements InitializingBean {
        public static final String DIRECT_EXCHANGE = "eujian.exchange.direct";
        public static final String DIRECT_QUEUE = "eujian.queue.direct";
        @Autowired
        private AmqpAdmin amqpAdmin;
        @Override
        public void afterPropertiesSet() throws Exception {
            DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE);
            Queue queue = new Queue(DIRECT_QUEUE);
            Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
            amqpAdmin.declareExchange(directExchange);
            amqpAdmin.declareQueue(queue);
            amqpAdmin.declareBinding(binding);
        }
    }
    

    发送代码

    @RestController
    @RequestMapping
    public class Controller {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @GetMapping("sendDirectMq")
        public String sendDirectMq(String msg){
            rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE,RabbitMqConfig.DIRECT_QUEUE,msg);
            return msg;
        }
    }
    

    消费代码

    @Component
    public class DirectLister {
        @RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE)
        public void Lister(Message message){
            byte[] body = message.getBody();
            System.err.println("接受到的消息体:"+new String(body));
        }
    }
    

    执行命令
    curl localhost:8080/sendDirectMq?msg=directMsg

    ps:

    rabbitmq会默认给queue绑定到一个默认的direct交换机 '''。
    所以发送消息到''"driect交换机,队列也能收到消息。

    rabbitTemplate.convertAndSend("",RabbitMqConfig.DIRECT_QUEUE,msg);
    

    2、fanout

    fanout交换机,它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列

    配置

    @Configuration
    public class RabbitMqConfig implements InitializingBean {
        public static final String DIRECT_EXCHANGE = "eujian.exchange.direct";
        public static final String DIRECT_QUEUE = "eujian.queue.direct";
        public static final String FANOUT_EXCHANGE = "eujian.exchange.fanout";
        public static final String FANOUT_QUEUE1 = "eujian.queue.fanout1";
        public static final String FANOUT_QUEUE2 = "eujian.queue.fanout2";
        @Autowired
        private AmqpAdmin amqpAdmin;
        @Override
        public void afterPropertiesSet() throws Exception {
    
            //fanout
            FanoutExchange exchange = new FanoutExchange(FANOUT_EXCHANGE);
            Queue faoutqueue1 = new Queue(FANOUT_QUEUE1);
            Queue faoutqueue2 = new Queue(FANOUT_QUEUE2);
            Binding faoutBinding1 = BindingBuilder.bind(faoutqueue1).to(exchange);
            Binding faoutBinding2 = BindingBuilder.bind(faoutqueue2).to(exchange);
            amqpAdmin.declareExchange(exchange);
            amqpAdmin.declareQueue(faoutqueue1);
            amqpAdmin.declareQueue(faoutqueue2);
            amqpAdmin.declareBinding(faoutBinding1);
            amqpAdmin.declareBinding(faoutBinding2);
    
            //direct交换机
            DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE);
            Queue queue = new Queue(DIRECT_QUEUE);
            Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
            amqpAdmin.declareExchange(directExchange);
            amqpAdmin.declareQueue(queue);
            amqpAdmin.declareBinding(binding);
        }
    }
    

    生产者

        @GetMapping("sendFanoutMq")
        public String sendFanoutMq(String msg){
            rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE,"",msg);
            return msg;
        }
    

    消费者

    @Component
    public class FanoutQuue1Lister {
        @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE1)
        public void Lister(Message message){
            byte[] body = message.getBody();
            System.err.println(RabbitMqConfig.FANOUT_QUEUE1+",接受到的消息体:"+new String(body));
        }
    }
    
    @Component
    public class FanoutQuue2Lister {
        @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE2)
        public void Lister(Message message){
            byte[] body = message.getBody();
            System.err.println(RabbitMqConfig.FANOUT_QUEUE2+",接受到的消息体:"+new String(body));
        }
    }
    

    执行命令
    curl localhost:8080/sendFanoutMq?msg=fanoutMq

    3、topic

    topic交换机,根据 BindingKey和RoutingKey 匹配,然后路由到匹配的队列中。

    RoutingKey 为一个点号“.”分隔的字符串。例如:“com.rabbitmq.client ”。
    BindingKey也是一个点号“.”分隔的字符串。
    BindingKey 中可以存在两种特殊字符串“*”“#”,用于做模糊匹配,其中“*”用于匹配 个单词,“#”用于匹配多规格单词(可以是零个)。

    @Configuration
    public class RabbitMqConfig implements InitializingBean {
        public static final String DIRECT_EXCHANGE = "eujian.exchange.direct";
        public static final String DIRECT_QUEUE = "eujian.queue.direct";
    
        public static final String FANOUT_EXCHANGE = "eujian.exchange.fanout";
        public static final String FANOUT_QUEUE1 = "eujian.queue.fanout1";
        public static final String FANOUT_QUEUE2 = "eujian.queue.fanout2";
    
    
        public static final String TOPIC_EXCHANGE = "eujian.exchange.topic";
        public static final String TOPIC_QUEUE1 = "eujian.queue.topic1";
        public static final String TOPIC_QUEUE2 = "eujian.queue.topic2";
    
    
        @Autowired
        private AmqpAdmin amqpAdmin;
        @Override
        public void afterPropertiesSet() throws Exception {
            //topic
            TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE);
            Queue topicqueue1 = new Queue(TOPIC_QUEUE1);
            Queue topicqueue2 = new Queue(TOPIC_QUEUE2);
            Binding topicBinding1 = BindingBuilder.bind(topicqueue1).to(topicExchange).with("sp.total.#");
            Binding topicBinding2 = BindingBuilder.bind(topicqueue2).to(topicExchange).with("sp.*.end");
            amqpAdmin.declareExchange(topicExchange);
            amqpAdmin.declareQueue(topicqueue1);
            amqpAdmin.declareQueue(topicqueue2);
            amqpAdmin.declareBinding(topicBinding1);
            amqpAdmin.declareBinding(topicBinding2);
    
    
            //fanout
            FanoutExchange exchange = new FanoutExchange(FANOUT_EXCHANGE);
            Queue faoutqueue1 = new Queue(FANOUT_QUEUE1);
            Queue faoutqueue2 = new Queue(FANOUT_QUEUE2);
            Binding faoutBinding1 = BindingBuilder.bind(faoutqueue1).to(exchange);
            Binding faoutBinding2 = BindingBuilder.bind(faoutqueue2).to(exchange);
            amqpAdmin.declareExchange(exchange);
            amqpAdmin.declareQueue(faoutqueue1);
            amqpAdmin.declareQueue(faoutqueue2);
            amqpAdmin.declareBinding(faoutBinding1);
            amqpAdmin.declareBinding(faoutBinding2);
    
            //direct交换机
            DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE);
            Queue queue = new Queue(DIRECT_QUEUE);
            Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
            amqpAdmin.declareExchange(directExchange);
            amqpAdmin.declareQueue(queue);
            amqpAdmin.declareBinding(binding);
        }
    }
    

    生产者

        @GetMapping("sendTopicMq")
        public String sendTopicMq(String msg,String rouKey){
            rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,rouKey,msg);
            return msg;
        }
    

    消费者

    @Component
    public class TopicQuue1Lister {
        @RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE1)
        public void Lister(Message message){
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            String routingKey = messageProperties.getReceivedRoutingKey();
            System.err.println(RabbitMqConfig.TOPIC_QUEUE1+",接受到的消息体:"+new String(body)+",routingKey="+routingKey);
        }
    }
    
    @Component
    public class TopicQuue2Lister {
        @RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE2)
        public void Lister(Message message){
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            String routingKey = messageProperties.getReceivedRoutingKey();
            System.err.println(RabbitMqConfig.TOPIC_QUEUE2+",接受到的消息体:"+new String(body)+",routingKey="+routingKey);
        }
    }
    

    执行命令
    curl localhost:8080/sendTopicMq?msg=topicMq&rouKey=sp.a.end
    curl localhost:8080/sendTopicMq?msg=topicMq&rouKey=sp.a.b.end
    curl localhost:8080/sendTopicMq?msg=topicMq&rouKey=sp.total.x.y.z
    curl localhost:8080/sendTopicMq?msg=topicMq&rouKey=sp.total.end


    其中
    sp.a.end与sp.*.end匹配
    sp.a.b.end没有匹配
    sp.total.x.y.z与sp.total.#匹配
    sp.total.end与sp.*.end、sp.total.#都匹配

    4、head

    交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中headers 属性进行匹配。

    headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

    x-match有any和all两种模式
    all要求head要完全k,v匹配才会转发。
    any要求head有一个k,v匹配就会转发。

    配置

    @Configuration
    public class RabbitMqConfig implements InitializingBean {
        public static final String DIRECT_EXCHANGE = "eujian.exchange.direct";
        public static final String DIRECT_QUEUE = "eujian.queue.direct";
    
        public static final String FANOUT_EXCHANGE = "eujian.exchange.fanout";
        public static final String FANOUT_QUEUE1 = "eujian.queue.fanout1";
        public static final String FANOUT_QUEUE2 = "eujian.queue.fanout2";
    
    
        public static final String TOPIC_EXCHANGE = "eujian.exchange.topic";
        public static final String TOPIC_QUEUE1 = "eujian.queue.topic1";
        public static final String TOPIC_QUEUE2 = "eujian.queue.topic2";
    
    
        public static final String HEAD_EXCHANGE = "eujian.exchange.head";
        public static final String HEAD_QUEUE1 = "eujian.queue.head1";
        public static final String HEAD_QUEUE2 = "eujian.queue.head2";
        @Autowired
        private AmqpAdmin amqpAdmin;
        @Override
        public void afterPropertiesSet() throws Exception {
            //head
            Map<String, Object> map = new HashMap<>();
            map.put("k1","a");
            map.put("k2","b");
            HeadersExchange headersExchange = new HeadersExchange(HEAD_EXCHANGE);
            Queue headqueue1 = new Queue(HEAD_QUEUE1);
            Queue headqueue2 = new Queue(HEAD_QUEUE2);
            Binding match = BindingBuilder.bind(headqueue1).to(headersExchange).whereAny(map).match();
            Binding all = BindingBuilder.bind(headqueue2).to(headersExchange).whereAll(map).match();
            amqpAdmin.declareExchange(headersExchange);
            amqpAdmin.declareQueue(headqueue1);
            amqpAdmin.declareQueue(headqueue2);
            amqpAdmin.declareBinding(match);
            amqpAdmin.declareBinding(all);
    
    
            //topic
            TopicExchange topicExchange = new TopicExchange(TOPIC_EXCHANGE);
            Queue topicqueue1 = new Queue(TOPIC_QUEUE1);
            Queue topicqueue2 = new Queue(TOPIC_QUEUE2);
            Binding topicBinding1 = BindingBuilder.bind(topicqueue1).to(topicExchange).with("sp.total.#");
            Binding topicBinding2 = BindingBuilder.bind(topicqueue2).to(topicExchange).with("sp.*.end");
            amqpAdmin.declareExchange(topicExchange);
            amqpAdmin.declareQueue(topicqueue1);
            amqpAdmin.declareQueue(topicqueue2);
            amqpAdmin.declareBinding(topicBinding1);
            amqpAdmin.declareBinding(topicBinding2);
    
    
            //fanout
            FanoutExchange exchange = new FanoutExchange(FANOUT_EXCHANGE);
            Queue faoutqueue1 = new Queue(FANOUT_QUEUE1);
            Queue faoutqueue2 = new Queue(FANOUT_QUEUE2);
            Binding faoutBinding1 = BindingBuilder.bind(faoutqueue1).to(exchange);
            Binding faoutBinding2 = BindingBuilder.bind(faoutqueue2).to(exchange);
            amqpAdmin.declareExchange(exchange);
            amqpAdmin.declareQueue(faoutqueue1);
            amqpAdmin.declareQueue(faoutqueue2);
            amqpAdmin.declareBinding(faoutBinding1);
            amqpAdmin.declareBinding(faoutBinding2);
    
            //direct交换机
            DirectExchange directExchange = new DirectExchange(DIRECT_EXCHANGE);
            Queue queue = new Queue(DIRECT_QUEUE);
            Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
            amqpAdmin.declareExchange(directExchange);
            amqpAdmin.declareQueue(queue);
            amqpAdmin.declareBinding(binding);
        }
    }
    

    生产者

        @GetMapping("sendHeadMq")
        public String sendHeadMq(String msg, String k1,String k2){
            MessageProperties messageProperties = new MessageProperties();
            if(null!=k1){
                messageProperties.setHeader("k1",k1);
            }
            if(null!=k2){
                messageProperties.setHeader("k2",k2);
            }
            Message message = new Message(msg.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend(RabbitMqConfig.HEAD_EXCHANGE,"",message);
            return msg;
        }
    

    消费者

    @Component
    public class HeadQuue1Lister {
        @RabbitListener(queues = RabbitMqConfig.HEAD_QUEUE1)
        public void Lister(Message message){
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            System.err.println(RabbitMqConfig.HEAD_QUEUE1+",接受到的消息体:"+new String(body)+",headers="+headers);
        }
    }
    
    @Component
    public class HeadQuue2Lister {
        @RabbitListener(queues = RabbitMqConfig.HEAD_QUEUE2)
        public void Lister(Message message){
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            System.err.println(RabbitMqConfig.HEAD_QUEUE2+",接受到的消息体:"+new String(body)+",headers="+headers);
        }
    }
    

    管理台页面

    执行命令
    curl localhost:8080/sendHeadMq?msg=headMq&k1=a
    curl localhost:8080/sendHeadMq?msg=headMq&k1=a&k2=b
    curl localhost:8080/sendHeadMq?msg=headMq&k1=a&k2=c

    其中k1=a和eujian.queue.head1匹配
    其中k1=a,k2=b和eujian.queue.head1、eujian.queue.head2匹配
    其中k1=a,k2=c和eujian.queue.head1匹配

    代码地址:
    git clone -b exchange-teacher https://gitee.com/guoeryyj/rabbitmq-teacher.git
    rabbitmq(一)-基础入门
    rabbitmq(二)-概念介绍
    rabbitmq(三)-交换机

  • 相关阅读:
    poj 2676 Suduku (dfs)
    poj 1562 Oil Deposits (dfs)
    poj 2907 Collecting Beepers (dfs)
    poj 1655 Balancing Act (树形dfs)
    poj 3411 Paid Roads (dfs)
    hdu 2896 病毒侵袭 (AC)
    hdu 3065 病毒侵袭持续中 (AC)
    poj 2251 Dungeon Master (bfs)
    java中debug使用
    Swing入门级小项目总结
  • 原文地址:https://www.cnblogs.com/yeyongjian/p/14001494.html
Copyright © 2011-2022 走看看